From de395d368359d0f938e646bc7a32ba57388863f8 Mon Sep 17 00:00:00 2001 From: siddharth Date: Wed, 5 May 2021 23:13:42 -0400 Subject: acmens.py: move around `_do_challenge` --- acmens.py | 152 +++++++++++++++++++++++++++++++------------------------------- 1 file changed, 76 insertions(+), 76 deletions(-) diff --git a/acmens.py b/acmens.py index b3bab22..b7971ed 100644 --- a/acmens.py +++ b/acmens.py @@ -44,6 +44,82 @@ def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"): return out +def _do_request(url, data=None, err_msg="Error", depth=0): + try: + resp = urllib.request.urlopen( + urllib.request.Request( + url, + data=data, + headers={ + "Content-Type": "application/jose+json", + "User-Agent": "acmens", + }, + ) + ) + resp_data, code, headers = ( + resp.read().decode("utf8"), + resp.getcode(), + resp.headers, + ) + except IOError as e: + resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) + code, headers = getattr(e, "code", None), {} + try: + resp_data = json.loads(resp_data) # try to parse json results + except ValueError: + pass # ignore json parsing errors + if ( + depth < 100 + and code == 400 + and resp_data["type"] == "urn:ietf:params:acme:error:badNonce" + ): + raise IndexError(resp_data) # allow 100 retrys for bad nonces + if code not in [200, 201, 204]: + raise ValueError( + "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format( + err_msg, url, data, code, resp_data + ) + ) + return resp_data, code, headers + + +def _send_signed_request(url, payload, nonce_url, auth, account_key, err_msg, depth=0): + """Make signed request to ACME endpoint""" + payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) + new_nonce = _do_request(nonce_url)[2]["Replay-Nonce"] + protected = {"url": url, "alg": "RS256", "nonce": new_nonce} + protected.update(auth) + protected64 = _b64(json.dumps(protected).encode("utf8")) + protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8") + out = _cmd( + ["openssl", "dgst", "-sha256", "-sign", account_key], + stdin=subprocess.PIPE, + cmd_input=protected_input, + err_msg="OpenSSL Error", + ) + data = json.dumps( + {"protected": protected64, "payload": payload64, "signature": _b64(out)} + ) + try: + return _do_request(url, data=data.encode("utf8"), err_msg=err_msg, depth=depth) + except IndexError: # retry bad nonces (they raise IndexError) + return _send_signed_request( + url, payload, auth, account_key, err_msg, depth=(depth + 1) + ) + + +def _poll_until_not(url, pending_statuses, nonce_url, auth, account_key, err_msg): + """Poll until status is not in pending_statuses""" + result, t0 = None, time.time() + while result is None or result["status"] in pending_statuses: + assert time.time() - t0 < 3600, "Polling timeout" # 1 hour timeout + time.sleep(0 if result is None else 2) + result, _, _ = _send_signed_request( + url, None, nonce_url, auth, account_key, err_msg + ) + return result + + def _do_challenge(challenge_type, authz_url, nonce_url, auth, account_key, thumbprint): """Do ACME challenge""" # Request challenges @@ -139,82 +215,6 @@ Notes: sys.stderr.write("{} verified!\n".format(domain)) -def _do_request(url, data=None, err_msg="Error", depth=0): - try: - resp = urllib.request.urlopen( - urllib.request.Request( - url, - data=data, - headers={ - "Content-Type": "application/jose+json", - "User-Agent": "acmens", - }, - ) - ) - resp_data, code, headers = ( - resp.read().decode("utf8"), - resp.getcode(), - resp.headers, - ) - except IOError as e: - resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) - code, headers = getattr(e, "code", None), {} - try: - resp_data = json.loads(resp_data) # try to parse json results - except ValueError: - pass # ignore json parsing errors - if ( - depth < 100 - and code == 400 - and resp_data["type"] == "urn:ietf:params:acme:error:badNonce" - ): - raise IndexError(resp_data) # allow 100 retrys for bad nonces - if code not in [200, 201, 204]: - raise ValueError( - "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format( - err_msg, url, data, code, resp_data - ) - ) - return resp_data, code, headers - - -def _send_signed_request(url, payload, nonce_url, auth, account_key, err_msg, depth=0): - """Make signed request to ACME endpoint""" - payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) - new_nonce = _do_request(nonce_url)[2]["Replay-Nonce"] - protected = {"url": url, "alg": "RS256", "nonce": new_nonce} - protected.update(auth) - protected64 = _b64(json.dumps(protected).encode("utf8")) - protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8") - out = _cmd( - ["openssl", "dgst", "-sha256", "-sign", account_key], - stdin=subprocess.PIPE, - cmd_input=protected_input, - err_msg="OpenSSL Error", - ) - data = json.dumps( - {"protected": protected64, "payload": payload64, "signature": _b64(out)} - ) - try: - return _do_request(url, data=data.encode("utf8"), err_msg=err_msg, depth=depth) - except IndexError: # retry bad nonces (they raise IndexError) - return _send_signed_request( - url, payload, auth, account_key, err_msg, depth=(depth + 1) - ) - - -def _poll_until_not(url, pending_statuses, nonce_url, auth, account_key, err_msg): - """Poll until status is not in pending_statuses""" - result, t0 = None, time.time() - while result is None or result["status"] in pending_statuses: - assert time.time() - t0 < 3600, "Polling timeout" # 1 hour timeout - time.sleep(0 if result is None else 2) - result, _, _ = _send_signed_request( - url, None, nonce_url, auth, account_key, err_msg - ) - return result - - def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"): """Use the ACME protocol to get an ssl certificate signed by a certificate authority. -- cgit v1.2.3