From 1d9bd0929e326c794ffddd95356c9652fce6f205 Mon Sep 17 00:00:00 2001 From: siddharth Date: Wed, 5 May 2021 22:54:43 -0400 Subject: acmens.py: move `_send_signed_request` out of `sign_csr` and `revoke_crt` --- acmens.py | 160 ++++++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 94 insertions(+), 66 deletions(-) diff --git a/acmens.py b/acmens.py index b88da0e..3e82729 100644 --- a/acmens.py +++ b/acmens.py @@ -16,6 +16,14 @@ __version__ = "0.1.4-dev0" CA_PRD = "https://acme-v02.api.letsencrypt.org" CA_STG = "https://acme-staging-v02.api.letsencrypt.org" +CA_DIR = None + + +def _directory(ca_url): + global CA_DIR + if CA_DIR is None: + CA_DIR = json.loads(urlopen(ca_url + "/directory").read().decode("utf8")) + return CA_DIR def _b64(b): @@ -75,6 +83,31 @@ def _do_request(url, data=None, err_msg="Error", depth=0): return resp_data, code, headers +def _send_signed_request(url, payload, nonce_url, auth, account_key, err_msg, depth=0): + """Make signed request to ACME endpoint""" + payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) + new_nonce = _do_request(nonce_url)[2]["Replay-Nonce"] + protected = {"url": url, "alg": "RS256", "nonce": new_nonce} + protected.update(auth) + protected64 = _b64(json.dumps(protected).encode("utf8")) + protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8") + out = _cmd( + ["openssl", "dgst", "-sha256", "-sign", account_key], + stdin=subprocess.PIPE, + cmd_input=protected_input, + err_msg="OpenSSL Error", + ) + data = json.dumps( + {"protected": protected64, "payload": payload64, "signature": _b64(out)} + ) + try: + return _do_request(url, data=data.encode("utf8"), err_msg=err_msg, depth=depth) + except IndexError: # retry bad nonces (they raise IndexError) + return _send_signed_request( + url, payload, auth, account_key, err_msg, depth=(depth + 1) + ) + + def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"): """Use the ACME protocol to get an ssl certificate signed by a certificate authority. @@ -91,49 +124,24 @@ def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"): :rtype: string """ - DIRECTORY = json.loads(urlopen(ca_url + "/directory").read().decode("utf8")) - - # helper function - make signed requests - def _send_signed_request(url, payload, err_msg, depth=0): - payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) - new_nonce = _do_request(DIRECTORY["newNonce"])[2]["Replay-Nonce"] - protected = {"url": url, "alg": "RS256", "nonce": new_nonce} - protected.update( - {"jwk": jwk} if acct_headers is None else {"kid": acct_headers["Location"]} - ) - protected64 = _b64(json.dumps(protected).encode("utf8")) - protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8") - out = _cmd( - ["openssl", "dgst", "-sha256", "-sign", account_key], - stdin=subprocess.PIPE, - cmd_input=protected_input, - err_msg="OpenSSL Error", - ) - data = json.dumps( - {"protected": protected64, "payload": payload64, "signature": _b64(out)} - ) - try: - return _do_request( - url, data=data.encode("utf8"), err_msg=err_msg, depth=depth - ) - except IndexError: # retry bad nonces (they raise IndexError) - return _send_signed_request(url, payload, err_msg, depth=(depth + 1)) # helper function - poll until complete - def _poll_until_not(url, pending_statuses, err_msg): + def _poll_until_not(url, pending_statuses, nonce_url, auth, account_key, err_msg): result, t0 = None, time.time() while result is None or result["status"] in pending_statuses: assert time.time() - t0 < 3600, "Polling timeout" # 1 hour timeout time.sleep(0 if result is None else 2) - result, _, _ = _send_signed_request(url, None, err_msg) + result, _, _ = _send_signed_request( + url, None, nonce_url, auth, account_key, err_msg + ) return result # helper function - do challenge - def _do_challenge(authz_url, thumbprint): + def _do_challenge(authz_url, nonce_url, auth, account_key, thumbprint): # Request challenges sys.stderr.write("Requesting challenges...\n") chl_result, chl_code, chl_headers = _send_signed_request( - authz_url, None, "Error getting challenges" + authz_url, None, nonce_url, auth, account_key, "Error getting challenges" ) domain = chl_result["identifier"]["value"] @@ -203,10 +211,18 @@ Notes: _send_signed_request( challenge["url"], {}, + nonce_url, + auth, + account_key, "Error requesting challenge verfication: {0}".format(domain), ) chl_verification = _poll_until_not( - challenge["url"], ["pending"], "Error checking challenge verification" + challenge["url"], + ["pending"], + nonce_url, + auth, + account_key, + "Error checking challenge verification", ) if chl_verification["status"] != "valid": raise ValueError( @@ -281,14 +297,22 @@ Notes: # Step 4: Generate the payload for registering user and initiate registration. sys.stderr.write("Registering {0}...\n".format(email)) reg = {"termsOfServiceAgreed": True} + nonce_url = _directory(ca_url)["newNonce"] + auth = {"jwk": jwk} acct_headers = None result, code, acct_headers = _send_signed_request( - DIRECTORY["newAccount"], reg, "Error registering" + _directory(ca_url)["newAccount"], + reg, + nonce_url, + auth, + account_key, + "Error registering", ) if code == 201: sys.stderr.write("Registered!\n") else: sys.stderr.write("Already registered!\n") + auth = {"kid": acct_headers["Location"]} # Step 5: Request challenges for domains sys.stderr.write("Making new order for {0}...\n".format(", ".join(domains))) @@ -296,17 +320,27 @@ Notes: for domain in domains: id["identifiers"].append({"type": "dns", "value": domain}) order, order_code, order_headers = _send_signed_request( - DIRECTORY["newOrder"], id, "Error creating new order" + _directory(ca_url)["newOrder"], + id, + nonce_url, + auth, + account_key, + "Error creating new order", ) for authz in order["authorizations"]: - _do_challenge(authz, thumbprint) + _do_challenge(authz, nonce_url, auth, account_key, thumbprint) # Step 8: Finalize csr_der = _cmd( ["openssl", "req", "-in", csr, "-outform", "DER"], err_msg="DER Export Error" ) fnlz_resp, fnlz_code, fnlz_headers = _send_signed_request( - order["finalize"], {"csr": _b64(csr_der)}, "Error finalizing order" + order["finalize"], + {"csr": _b64(csr_der)}, + nonce_url, + auth, + account_key, + "Error finalizing order", ) # Step 9: Wait for CA to mark test as valid @@ -314,6 +348,9 @@ Notes: order = _poll_until_not( order_headers["Location"], ["pending", "processing"], + nonce_url, + auth, + account_key, "Error checking order status", ) @@ -325,7 +362,12 @@ Notes: # Step 10: Get the certificate. sys.stderr.write("Getting certificate...\n") signed_pem, _, _ = _send_signed_request( - order["certificate"], None, "Error getting certificate" + order["certificate"], + None, + nonce_url, + auth, + account_key, + "Error getting certificate", ) sys.stderr.write("Received certificate!\n") @@ -344,38 +386,11 @@ def revoke_crt(ca_url, account_key, crt): :param string account_key: Path to your Let's Encrypt account private key. :param string crt: Path to the signed certificate. """ - DIRECTORY = json.loads(urlopen(ca_url + "/directory").read().decode("utf8")) def _a64(a): "Shortcut function to go from jwt base64 string to bytes" return base64.urlsafe_b64decode(str(a + ("=" * (len(a) % 4)))) - # helper function - make signed requests - def _send_signed_request(url, payload, err_msg, depth=0): - payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) - new_nonce = _do_request(DIRECTORY["newNonce"])[2]["Replay-Nonce"] - protected = {"url": url, "alg": "RS256", "nonce": new_nonce} - protected.update( - {"jwk": jwk} if acct_headers is None else {"kid": acct_headers["Location"]} - ) - protected64 = _b64(json.dumps(protected).encode("utf8")) - protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8") - out = _cmd( - ["openssl", "dgst", "-sha256", "-sign", account_key], - stdin=subprocess.PIPE, - cmd_input=protected_input, - err_msg="OpenSSL Error", - ) - data = json.dumps( - {"protected": protected64, "payload": payload64, "signature": _b64(out)} - ) - try: - return _do_request( - url, data=data.encode("utf8"), err_msg=err_msg, depth=depth - ) - except IndexError: # retry bad nonces (they raise IndexError) - return _send_signed_request(url, payload, err_msg, depth=(depth + 1)) - # Step 1: Get account public key sys.stderr.write("Reading pubkey file...\n") out = _cmd( @@ -405,10 +420,18 @@ def revoke_crt(ca_url, account_key, crt): # Step 2: Get account info. sys.stderr.write("Getting account info...\n") reg = {"onlyReturnExistiing": True} + nonce_url = _directory(ca_url)["newNonce"] + auth = {"jwk": jwk} acct_headers = None result, code, acct_headers = _send_signed_request( - DIRECTORY["newAccount"], reg, "Error getting account info" + _directory(ca_url)["newAccount"], + reg, + nonce_url, + auth, + account_key, + "Error getting account info", ) + auth = {"kid": acct_headers["Location"]} # Step 3: Generate the payload. crt_der = _cmd( @@ -419,7 +442,12 @@ def revoke_crt(ca_url, account_key, crt): "certificate": crt_der64, } _send_signed_request( - DIRECTORY["revokeCert"], rvk_payload, "Error revoking certificate" + _directory(ca_url)["revokeCert"], + rvk_payload, + nonce_url, + auth, + account_key, + "Error revoking certificate", ) sys.stderr.write("Certificate revoked!\n") -- cgit v1.2.3