From f613c29c4ed0a0448450166c9c6fd36209c394c8 Mon Sep 17 00:00:00 2001 From: siddharth Date: Wed, 5 May 2021 20:19:18 -0400 Subject: acmens.py: move `_do_request` out of `sign_csr` and `revoke_crt` --- acmens.py | 117 +++++++++++++++++++++----------------------------------------- 1 file changed, 39 insertions(+), 78 deletions(-) diff --git a/acmens.py b/acmens.py index cf7424a..b88da0e 100644 --- a/acmens.py +++ b/acmens.py @@ -36,6 +36,45 @@ def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"): return out +def _do_request(url, data=None, err_msg="Error", depth=0): + try: + resp = urllib.request.urlopen( + urllib.request.Request( + url, + data=data, + headers={ + "Content-Type": "application/jose+json", + "User-Agent": "acmens", + }, + ) + ) + resp_data, code, headers = ( + resp.read().decode("utf8"), + resp.getcode(), + resp.headers, + ) + except IOError as e: + resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) + code, headers = getattr(e, "code", None), {} + try: + resp_data = json.loads(resp_data) # try to parse json results + except ValueError: + pass # ignore json parsing errors + if ( + depth < 100 + and code == 400 + and resp_data["type"] == "urn:ietf:params:acme:error:badNonce" + ): + raise IndexError(resp_data) # allow 100 retrys for bad nonces + if code not in [200, 201, 204]: + raise ValueError( + "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format( + err_msg, url, data, code, resp_data + ) + ) + return resp_data, code, headers + + def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"): """Use the ACME protocol to get an ssl certificate signed by a certificate authority. @@ -54,45 +93,6 @@ def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"): """ DIRECTORY = json.loads(urlopen(ca_url + "/directory").read().decode("utf8")) - # helper function - make request and automatically parse json response - def _do_request(url, data=None, err_msg="Error", depth=0): - try: - resp = urllib.request.urlopen( - urllib.request.Request( - url, - data=data, - headers={ - "Content-Type": "application/jose+json", - "User-Agent": "acmens", - }, - ) - ) - resp_data, code, headers = ( - resp.read().decode("utf8"), - resp.getcode(), - resp.headers, - ) - except IOError as e: - resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) - code, headers = getattr(e, "code", None), {} - try: - resp_data = json.loads(resp_data) # try to parse json results - except ValueError: - pass # ignore json parsing errors - if ( - depth < 100 - and code == 400 - and resp_data["type"] == "urn:ietf:params:acme:error:badNonce" - ): - raise IndexError(resp_data) # allow 100 retrys for bad nonces - if code not in [200, 201, 204]: - raise ValueError( - "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format( - err_msg, url, data, code, resp_data - ) - ) - return resp_data, code, headers - # helper function - make signed requests def _send_signed_request(url, payload, err_msg, depth=0): payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) @@ -350,45 +350,6 @@ def revoke_crt(ca_url, account_key, crt): "Shortcut function to go from jwt base64 string to bytes" return base64.urlsafe_b64decode(str(a + ("=" * (len(a) % 4)))) - # helper function - make request and automatically parse json response - def _do_request(url, data=None, err_msg="Error", depth=0): - try: - resp = urllib.request.urlopen( - urllib.request.Request( - url, - data=data, - headers={ - "Content-Type": "application/jose+json", - "User-Agent": "acmens", - }, - ) - ) - resp_data, code, headers = ( - resp.read().decode("utf8"), - resp.getcode(), - resp.headers, - ) - except IOError as e: - resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) - code, headers = getattr(e, "code", None), {} - try: - resp_data = json.loads(resp_data) # try to parse json results - except ValueError: - pass # ignore json parsing errors - if ( - depth < 100 - and code == 400 - and resp_data["type"] == "urn:ietf:params:acme:error:badNonce" - ): - raise IndexError(resp_data) # allow 100 retrys for bad nonces - if code not in [200, 201, 204]: - raise ValueError( - "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format( - err_msg, url, data, code, resp_data - ) - ) - return resp_data, code, headers - # helper function - make signed requests def _send_signed_request(url, payload, err_msg, depth=0): payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) -- cgit v1.2.3