From 0bc00df1719826abbb8f87662e2c7fd5e81efe75 Mon Sep 17 00:00:00 2001 From: siddharth Date: Wed, 5 May 2021 23:12:39 -0400 Subject: acmens.py: move `_do_challenge` out of `sign_csr` --- acmens.py | 191 +++++++++++++++++++++++++++++++------------------------------- 1 file changed, 96 insertions(+), 95 deletions(-) (limited to 'acmens.py') diff --git a/acmens.py b/acmens.py index 3e2fd79..b3bab22 100644 --- a/acmens.py +++ b/acmens.py @@ -44,6 +44,101 @@ def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"): return out +def _do_challenge(challenge_type, authz_url, nonce_url, auth, account_key, thumbprint): + """Do ACME challenge""" + # Request challenges + sys.stderr.write("Requesting challenges...\n") + chl_result, chl_code, chl_headers = _send_signed_request( + authz_url, None, nonce_url, auth, account_key, "Error getting challenges" + ) + domain = chl_result["identifier"]["value"] + + # Choose challenge. + preferred_type = "dns-01" if challenge_type == "dns" else "http-01" + challenge = None + http_challenge = None + for c in chl_result["challenges"]: + if c["type"] == preferred_type: + challenge = c + if c["type"] == "http-01": + http_challenge = c + if challenge is None: + if http_challenge is None: + sys.stderr.write("Error: Unable to find challenges!") + sys.exit(1) + challenge = http_challenge # Fallback to http challenge. + keyauthorization = "{0}.{1}".format(challenge["token"], thumbprint) + dns_payload = _b64(hashlib.sha256(keyauthorization.encode()).digest()) + + # Ask the user to host the token on their server + if challenge_type == "dns": + sys.stderr.write( + """\ +Please update your DNS for '{domain}' to have the following TXT record: + +-------------- +_acme-challenge IN TXT ( \"{keyauth}\" ) +-------------- + +""".format( + domain=domain, keyauth=dns_payload + ) + ) + else: + # Challenge response for http server. + response_uri = ".well-known/acme-challenge/{0}".format(challenge["token"]) + + sys.stderr.write( + """\ +Please update your server to serve the following file at this URL: + +-------------- +URL: http://{domain}/{uri} +File contents: \"{token}\" +-------------- + +Notes: +- Do not include the quotes in the file. +- The file should be one line without any spaces. + +""".format( + domain=domain, uri=response_uri, token=keyauthorization + ) + ) + + stdout = sys.stdout + sys.stdout = sys.stderr + if challenge_type == "dns": + input("Press Enter when the TXT record is updated on the DNS...") + else: + input("Press Enter when you've got the file hosted on your server...") + sys.stdout = stdout + + # Let the CA know you're ready for the challenge + sys.stderr.write("Requesting verification for {0}...\n".format(domain)) + _send_signed_request( + challenge["url"], + {}, + nonce_url, + auth, + account_key, + "Error requesting challenge verfication: {0}".format(domain), + ) + chl_verification = _poll_until_not( + challenge["url"], + ["pending"], + nonce_url, + auth, + account_key, + "Error checking challenge verification", + ) + if chl_verification["status"] != "valid": + raise ValueError( + "Challenge did not pass for {0}: {1}".format(domain, chl_verification) + ) + sys.stderr.write("{} verified!\n".format(domain)) + + def _do_request(url, data=None, err_msg="Error", depth=0): try: resp = urllib.request.urlopen( @@ -137,100 +232,6 @@ def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"): """ - # helper function - do challenge - def _do_challenge(authz_url, nonce_url, auth, account_key, thumbprint): - # Request challenges - sys.stderr.write("Requesting challenges...\n") - chl_result, chl_code, chl_headers = _send_signed_request( - authz_url, None, nonce_url, auth, account_key, "Error getting challenges" - ) - domain = chl_result["identifier"]["value"] - - # Choose challenge. - preferred_type = "dns-01" if challenge_type == "dns" else "http-01" - challenge = None - http_challenge = None - for c in chl_result["challenges"]: - if c["type"] == preferred_type: - challenge = c - if c["type"] == "http-01": - http_challenge = c - if challenge is None: - if http_challenge is None: - sys.stderr.write("Error: Unable to find challenges!") - sys.exit(1) - challenge = http_challenge # Fallback to http challenge. - keyauthorization = "{0}.{1}".format(challenge["token"], thumbprint) - dns_payload = _b64(hashlib.sha256(keyauthorization.encode()).digest()) - - # Ask the user to host the token on their server - if challenge_type == "dns": - sys.stderr.write( - """\ -Please update your DNS for '{domain}' to have the following TXT record: - --------------- -_acme-challenge IN TXT ( \"{keyauth}\" ) --------------- - -""".format( - domain=domain, keyauth=dns_payload - ) - ) - else: - # Challenge response for http server. - response_uri = ".well-known/acme-challenge/{0}".format(challenge["token"]) - - sys.stderr.write( - """\ -Please update your server to serve the following file at this URL: - --------------- -URL: http://{domain}/{uri} -File contents: \"{token}\" --------------- - -Notes: -- Do not include the quotes in the file. -- The file should be one line without any spaces. - -""".format( - domain=domain, uri=response_uri, token=keyauthorization - ) - ) - - stdout = sys.stdout - sys.stdout = sys.stderr - if challenge_type == "dns": - input("Press Enter when the TXT record is updated on the DNS...") - else: - input("Press Enter when you've got the file hosted on your server...") - sys.stdout = stdout - - # Let the CA know you're ready for the challenge - sys.stderr.write("Requesting verification for {0}...\n".format(domain)) - _send_signed_request( - challenge["url"], - {}, - nonce_url, - auth, - account_key, - "Error requesting challenge verfication: {0}".format(domain), - ) - chl_verification = _poll_until_not( - challenge["url"], - ["pending"], - nonce_url, - auth, - account_key, - "Error checking challenge verification", - ) - if chl_verification["status"] != "valid": - raise ValueError( - "Challenge did not pass for {0}: {1}".format(domain, chl_verification) - ) - sys.stderr.write("{} verified!\n".format(domain)) - # Step 1: Get account public key sys.stderr.write("Reading pubkey file...\n") out = _cmd( @@ -329,7 +330,7 @@ Notes: "Error creating new order", ) for authz in order["authorizations"]: - _do_challenge(authz, nonce_url, auth, account_key, thumbprint) + _do_challenge(challenge_type, authz, nonce_url, auth, account_key, thumbprint) # Step 8: Finalize csr_der = _cmd( -- cgit v1.2.3