From 89d5ecba4d10a6214760a127eefed3394a4bf081 Mon Sep 17 00:00:00 2001 From: siddharth Date: Fri, 16 Apr 2021 20:33:13 -0400 Subject: acmens.py: update sign_csr * acmens.py (sign_csr): Add support for multiple domains --- acmens.py | 192 +++++++++++++++++++++++++++++++++----------------------------- 1 file changed, 103 insertions(+), 89 deletions(-) (limited to 'acmens.py') diff --git a/acmens.py b/acmens.py index da1f3c6..9946ba1 100644 --- a/acmens.py +++ b/acmens.py @@ -12,7 +12,7 @@ from urllib.request import urlopen from urllib.error import HTTPError -__version__ = "0.1.3" +__version__ = "0.1.4-dev" def sign_csr(account_key, csr, email=None, challenge_type="http"): @@ -125,6 +125,80 @@ def sign_csr(account_key, csr, email=None, challenge_type="http"): result, _, _ = _send_signed_request(url, None, err_msg) return result + # helper function - do challenge + def _do_challenge(authz_url, thumbprint): + # Request challenges + sys.stderr.write("Requesting challenges...\n") + chl_result, chl_code, chl_headers = _send_signed_request( + authz_url, None, "Error getting challenges" + ) + domain = chl_result["identifier"]["value"] + + type_id = "dns-01" if challenge_type == "dns" else "http-01" + challenge = [c for c in chl_result["challenges"] if c["type"] == type_id][0] + keyauthorization = "{0}.{1}".format(challenge["token"], thumbprint) + dns_payload = _b64(hashlib.sha256(keyauthorization.encode()).digest()) + + # Ask the user to host the token on their server + if challenge_type == "dns": + sys.stderr.write( + """\ +Please update your DNS for '{domain}' to have the following TXT record: + +-------------- +_acme-challenge IN TXT ( \"{keyauth}\" ) +-------------- + +""".format( + domain=domain, keyauth=dns_payload + ) + ) + else: + # Challenge response for http server. + response_uri = ".well-known/acme-challenge/{0}".format(challenge["token"]) + + sys.stderr.write( + """\ +Please update your server to serve the following file at this URL: + +-------------- +URL: http://{domain}/{uri} +File contents: \"{token}\" +-------------- + +Notes: +- Do not include the quotes in the file. +- The file should be one line without any spaces. + +""".format( + domain=domain, uri=response_uri, token=keyauthorization + ) + ) + + stdout = sys.stdout + sys.stdout = sys.stderr + if challenge_type == "dns": + input("Press Enter when the TXT record is updated on the DNS...") + else: + input("Press Enter when you've got the file hosted on your server...") + sys.stdout = stdout + + # Let the CA know you're ready for the challenge + sys.stderr.write("Requesting verification for {0}...\n".format(domain)) + _send_signed_request( + challenge["url"], + {}, + "Error requesting challenge verfication: {0}".format(domain), + ) + chl_verification = _poll_until_not( + challenge["url"], ["pending"], "Error checking challenge verification" + ) + if chl_verification["status"] != "valid": + raise ValueError( + "Challenge did not pass for {0}: {1}".format(domain, chl_verification) + ) + sys.stderr.write("{} verified!\n".format(domain)) + # Step 1: Get account public key sys.stderr.write("Reading pubkey file...\n") out = _cmd( @@ -158,15 +232,29 @@ def sign_csr(account_key, csr, email=None, challenge_type="http"): ["openssl", "req", "-in", csr, "-noout", "-text"], err_msg="Error loading {}".format(csr), ) - domain = None + domains = set([]) + cn = None common_name = re.search("Subject:.*? CN *= *([^\s,;/]+)", out.decode("utf8")) if common_name is not None: - domain = common_name.group(1) - sys.stderr.write("Found domain {0}\n".format(domain)) + domains.add(common_name.group(1)) + cn = common_name.group(1) + subj_alt_names = re.search( + "X509v3 Subject Alternative Name: \n +([^\n]+)\n", + out.decode("utf8"), + re.MULTILINE | re.DOTALL, + ) + if subj_alt_names is not None: + for san in subj_alt_names.group(1).split(", "): + if san.startswith("DNS:"): + dm = san[4:] + if cn is None and dm.find("*") == -1: + cn = dm + domains.add(dm) + sys.stderr.write("Found domains {}\n".format(", ".join(domains))) # Step 3: Ask user for contact email if not email: - default_email = "webmaster@{0}".format(domain) + default_email = "webmaster@{0}".format(cn) stdout = sys.stdout sys.stdout = sys.stderr input_email = input( @@ -187,90 +275,16 @@ def sign_csr(account_key, csr, email=None, challenge_type="http"): else: sys.stderr.write("Already registered!\n") - # Step 5: Request challenges for domain - sys.stderr.write("Making new order for {0}...\n".format(domain)) - id = { - "identifiers": [{"type": "dns", "value": domain,}], - } + # Step 5: Request challenges for domains + sys.stderr.write("Making new order for {0}...\n".format(", ".join(domains))) + id = {"identifiers": []} + for domain in domains: + id["identifiers"].append({"type": "dns", "value": domain}) order, order_code, order_headers = _send_signed_request( DIRECTORY["newOrder"], id, "Error creating new order" ) - - # Request challenges - sys.stderr.write("Requesting challenges...\n") - chl_result, chl_code, chl_headers = _send_signed_request( - order["authorizations"][0], None, "Error getting challenges" - ) - - type_id = "dns-01" if challenge_type == "dns" else "http-01" - challenge = [c for c in chl_result["challenges"] if c["type"] == type_id][0] - token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge["token"]) - keyauthorization = "{0}.{1}".format(challenge["token"], thumbprint) - dns_payload = _b64(hashlib.sha256(keyauthorization.encode()).digest()) - - # build request for the server to test this challenge. - test_url = challenge["url"] - test_raw = "{}" - - # Step 6: Ask the user to host the token on their server - if challenge_type == "dns": - sys.stderr.write( - """\ -Please update your DNS for {domain} to have the following TXT record: - --------------- -_acme-challenge IN TXT ( \"{keyauth}\" ) --------------- - -""".format( - domain=domain.replace("*.", ""), keyauth=dns_payload - ) - ) - else: - # Challenge response for http server. - response_uri = ".well-known/acme-challenge/{0}".format(challenge["token"]) - - sys.stderr.write( - """\ -Please update your server to serve the following file at this URL: - --------------- -URL: http://{domain}/{uri} -File contents: \"{token}\" --------------- - -Notes: -- Do not include the quotes in the file. -- The file should be one line without any spaces. - -""".format( - domain=domain, uri=response_uri, token=keyauthorization - ) - ) - - stdout = sys.stdout - sys.stdout = sys.stderr - if challenge_type == "dns": - input("Press Enter when the TXT record is updated on the DNS...") - else: - input("Press Enter when you've got the file hosted on your server...") - sys.stdout = stdout - - # Step 7: Let the CA know you're ready for the challenge - sys.stderr.write("Requesting verification for {0}...\n".format(domain)) - _send_signed_request( - challenge["url"], - {}, - "Error requesting challenge verfication: {0}".format(domain), - ) - chl_verification = _poll_until_not( - challenge["url"], ["pending"], "Error checking challenge verification" - ) - if chl_verification["status"] != "valid": - raise ValueError( - "Challenge did not pass for {0}: {1}".format(domain, chl_verification) - ) - sys.stderr.write("{} verified!\n".format(domain)) + for authz in order["authorizations"]: + _do_challenge(authz, thumbprint) # Step 8: Finalize csr_der = _cmd( @@ -281,7 +295,7 @@ Notes: ) # Step 9: Wait for CA to mark test as valid - sys.stderr.write("Waiting for {0} challenge to pass...\n".format(domain)) + sys.stderr.write("Waiting for {0} challenge to pass...\n".format(cn)) order = _poll_until_not( order_headers["Location"], ["pending", "processing"], @@ -289,9 +303,9 @@ Notes: ) if order["status"] == "valid": - sys.stderr.write("Passed {0} challenge!\n".format(domain)) + sys.stderr.write("Passed {0} challenge!\n".format(cn)) else: - raise ValueError("'{0}' challenge did not pass: {1}".format(domain, order)) + raise ValueError("'{0}' challenge did not pass: {1}".format(cn, order)) # Step 10: Get the certificate. sys.stderr.write("Getting certificate...\n") -- cgit v1.2.3