diff options
| -rw-r--r-- | acmens.py | 117 | 
1 files changed, 39 insertions, 78 deletions
| @@ -36,6 +36,45 @@ def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"):      return out +def _do_request(url, data=None, err_msg="Error", depth=0): +    try: +        resp = urllib.request.urlopen( +            urllib.request.Request( +                url, +                data=data, +                headers={ +                    "Content-Type": "application/jose+json", +                    "User-Agent": "acmens", +                }, +            ) +        ) +        resp_data, code, headers = ( +            resp.read().decode("utf8"), +            resp.getcode(), +            resp.headers, +        ) +    except IOError as e: +        resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) +        code, headers = getattr(e, "code", None), {} +    try: +        resp_data = json.loads(resp_data)  # try to parse json results +    except ValueError: +        pass  # ignore json parsing errors +    if ( +        depth < 100 +        and code == 400 +        and resp_data["type"] == "urn:ietf:params:acme:error:badNonce" +    ): +        raise IndexError(resp_data)  # allow 100 retrys for bad nonces +    if code not in [200, 201, 204]: +        raise ValueError( +            "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format( +                err_msg, url, data, code, resp_data +            ) +        ) +    return resp_data, code, headers + +  def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"):      """Use the ACME protocol to get an ssl certificate signed by a      certificate authority. @@ -54,45 +93,6 @@ def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"):      """      DIRECTORY = json.loads(urlopen(ca_url + "/directory").read().decode("utf8")) -    # helper function - make request and automatically parse json response -    def _do_request(url, data=None, err_msg="Error", depth=0): -        try: -            resp = urllib.request.urlopen( -                urllib.request.Request( -                    url, -                    data=data, -                    headers={ -                        "Content-Type": "application/jose+json", -                        "User-Agent": "acmens", -                    }, -                ) -            ) -            resp_data, code, headers = ( -                resp.read().decode("utf8"), -                resp.getcode(), -                resp.headers, -            ) -        except IOError as e: -            resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) -            code, headers = getattr(e, "code", None), {} -        try: -            resp_data = json.loads(resp_data)  # try to parse json results -        except ValueError: -            pass  # ignore json parsing errors -        if ( -            depth < 100 -            and code == 400 -            and resp_data["type"] == "urn:ietf:params:acme:error:badNonce" -        ): -            raise IndexError(resp_data)  # allow 100 retrys for bad nonces -        if code not in [200, 201, 204]: -            raise ValueError( -                "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format( -                    err_msg, url, data, code, resp_data -                ) -            ) -        return resp_data, code, headers -      # helper function - make signed requests      def _send_signed_request(url, payload, err_msg, depth=0):          payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) @@ -350,45 +350,6 @@ def revoke_crt(ca_url, account_key, crt):          "Shortcut function to go from jwt base64 string to bytes"          return base64.urlsafe_b64decode(str(a + ("=" * (len(a) % 4)))) -    # helper function - make request and automatically parse json response -    def _do_request(url, data=None, err_msg="Error", depth=0): -        try: -            resp = urllib.request.urlopen( -                urllib.request.Request( -                    url, -                    data=data, -                    headers={ -                        "Content-Type": "application/jose+json", -                        "User-Agent": "acmens", -                    }, -                ) -            ) -            resp_data, code, headers = ( -                resp.read().decode("utf8"), -                resp.getcode(), -                resp.headers, -            ) -        except IOError as e: -            resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) -            code, headers = getattr(e, "code", None), {} -        try: -            resp_data = json.loads(resp_data)  # try to parse json results -        except ValueError: -            pass  # ignore json parsing errors -        if ( -            depth < 100 -            and code == 400 -            and resp_data["type"] == "urn:ietf:params:acme:error:badNonce" -        ): -            raise IndexError(resp_data)  # allow 100 retrys for bad nonces -        if code not in [200, 201, 204]: -            raise ValueError( -                "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format( -                    err_msg, url, data, code, resp_data -                ) -            ) -        return resp_data, code, headers -      # helper function - make signed requests      def _send_signed_request(url, payload, err_msg, depth=0):          payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) | 
