diff options
| author | siddharth <s@ricketyspace.net> | 2021-04-16 20:33:13 -0400 | 
|---|---|---|
| committer | siddharth <s@ricketyspace.net> | 2021-04-16 20:33:13 -0400 | 
| commit | 89d5ecba4d10a6214760a127eefed3394a4bf081 (patch) | |
| tree | a538cc4e7c832439735f89a3b44f70136a2a0ec4 | |
| parent | d63be36ff390a2d1d381b9ef50ae98f4259cf4a3 (diff) | |
acmens.py: update sign_csr
* acmens.py (sign_csr): Add support for multiple domains
| -rw-r--r-- | acmens.py | 192 | 
1 files changed, 103 insertions, 89 deletions
| @@ -12,7 +12,7 @@ from urllib.request import urlopen  from urllib.error import HTTPError -__version__ = "0.1.3" +__version__ = "0.1.4-dev"  def sign_csr(account_key, csr, email=None, challenge_type="http"): @@ -125,6 +125,80 @@ def sign_csr(account_key, csr, email=None, challenge_type="http"):              result, _, _ = _send_signed_request(url, None, err_msg)          return result +    # helper function - do challenge +    def _do_challenge(authz_url, thumbprint): +        # Request challenges +        sys.stderr.write("Requesting challenges...\n") +        chl_result, chl_code, chl_headers = _send_signed_request( +            authz_url, None, "Error getting challenges" +        ) +        domain = chl_result["identifier"]["value"] + +        type_id = "dns-01" if challenge_type == "dns" else "http-01" +        challenge = [c for c in chl_result["challenges"] if c["type"] == type_id][0] +        keyauthorization = "{0}.{1}".format(challenge["token"], thumbprint) +        dns_payload = _b64(hashlib.sha256(keyauthorization.encode()).digest()) + +        # Ask the user to host the token on their server +        if challenge_type == "dns": +            sys.stderr.write( +                """\ +Please update your DNS for '{domain}' to have the following TXT record: + +-------------- +_acme-challenge    IN    TXT ( \"{keyauth}\" ) +-------------- + +""".format( +                    domain=domain, keyauth=dns_payload +                ) +            ) +        else: +            # Challenge response for http server. +            response_uri = ".well-known/acme-challenge/{0}".format(challenge["token"]) + +            sys.stderr.write( +                """\ +Please update your server to serve the following file at this URL: + +-------------- +URL: http://{domain}/{uri} +File contents: \"{token}\" +-------------- + +Notes: +- Do not include the quotes in the file. +- The file should be one line without any spaces. + +""".format( +                    domain=domain, uri=response_uri, token=keyauthorization +                ) +            ) + +        stdout = sys.stdout +        sys.stdout = sys.stderr +        if challenge_type == "dns": +            input("Press Enter when the TXT record is updated on the DNS...") +        else: +            input("Press Enter when you've got the file hosted on your server...") +        sys.stdout = stdout + +        # Let the CA know you're ready for the challenge +        sys.stderr.write("Requesting verification for {0}...\n".format(domain)) +        _send_signed_request( +            challenge["url"], +            {}, +            "Error requesting challenge verfication: {0}".format(domain), +        ) +        chl_verification = _poll_until_not( +            challenge["url"], ["pending"], "Error checking challenge verification" +        ) +        if chl_verification["status"] != "valid": +            raise ValueError( +                "Challenge did not pass for {0}: {1}".format(domain, chl_verification) +            ) +        sys.stderr.write("{} verified!\n".format(domain)) +      # Step 1: Get account public key      sys.stderr.write("Reading pubkey file...\n")      out = _cmd( @@ -158,15 +232,29 @@ def sign_csr(account_key, csr, email=None, challenge_type="http"):          ["openssl", "req", "-in", csr, "-noout", "-text"],          err_msg="Error loading {}".format(csr),      ) -    domain = None +    domains = set([]) +    cn = None      common_name = re.search("Subject:.*? CN *= *([^\s,;/]+)", out.decode("utf8"))      if common_name is not None: -        domain = common_name.group(1) -    sys.stderr.write("Found domain {0}\n".format(domain)) +        domains.add(common_name.group(1)) +        cn = common_name.group(1) +    subj_alt_names = re.search( +        "X509v3 Subject Alternative Name: \n +([^\n]+)\n", +        out.decode("utf8"), +        re.MULTILINE | re.DOTALL, +    ) +    if subj_alt_names is not None: +        for san in subj_alt_names.group(1).split(", "): +            if san.startswith("DNS:"): +                dm = san[4:] +                if cn is None and dm.find("*") == -1: +                    cn = dm +                domains.add(dm) +    sys.stderr.write("Found domains {}\n".format(", ".join(domains)))      # Step 3: Ask user for contact email      if not email: -        default_email = "webmaster@{0}".format(domain) +        default_email = "webmaster@{0}".format(cn)          stdout = sys.stdout          sys.stdout = sys.stderr          input_email = input( @@ -187,90 +275,16 @@ def sign_csr(account_key, csr, email=None, challenge_type="http"):      else:          sys.stderr.write("Already registered!\n") -    # Step 5: Request challenges for domain -    sys.stderr.write("Making new order for {0}...\n".format(domain)) -    id = { -        "identifiers": [{"type": "dns", "value": domain,}], -    } +    # Step 5: Request challenges for domains +    sys.stderr.write("Making new order for {0}...\n".format(", ".join(domains))) +    id = {"identifiers": []} +    for domain in domains: +        id["identifiers"].append({"type": "dns", "value": domain})      order, order_code, order_headers = _send_signed_request(          DIRECTORY["newOrder"], id, "Error creating new order"      ) - -    # Request challenges -    sys.stderr.write("Requesting challenges...\n") -    chl_result, chl_code, chl_headers = _send_signed_request( -        order["authorizations"][0], None, "Error getting challenges" -    ) - -    type_id = "dns-01" if challenge_type == "dns" else "http-01" -    challenge = [c for c in chl_result["challenges"] if c["type"] == type_id][0] -    token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge["token"]) -    keyauthorization = "{0}.{1}".format(challenge["token"], thumbprint) -    dns_payload = _b64(hashlib.sha256(keyauthorization.encode()).digest()) - -    # build request for the server to test this challenge. -    test_url = challenge["url"] -    test_raw = "{}" - -    # Step 6: Ask the user to host the token on their server -    if challenge_type == "dns": -        sys.stderr.write( -            """\ -Please update your DNS for {domain} to have the following TXT record: - --------------- -_acme-challenge    IN    TXT ( \"{keyauth}\" ) --------------- - -""".format( -                domain=domain.replace("*.", ""), keyauth=dns_payload -            ) -        ) -    else: -        # Challenge response for http server. -        response_uri = ".well-known/acme-challenge/{0}".format(challenge["token"]) - -        sys.stderr.write( -            """\ -Please update your server to serve the following file at this URL: - --------------- -URL: http://{domain}/{uri} -File contents: \"{token}\" --------------- - -Notes: -- Do not include the quotes in the file. -- The file should be one line without any spaces. - -""".format( -                domain=domain, uri=response_uri, token=keyauthorization -            ) -        ) - -    stdout = sys.stdout -    sys.stdout = sys.stderr -    if challenge_type == "dns": -        input("Press Enter when the TXT record is updated on the DNS...") -    else: -        input("Press Enter when you've got the file hosted on your server...") -    sys.stdout = stdout - -    # Step 7: Let the CA know you're ready for the challenge -    sys.stderr.write("Requesting verification for {0}...\n".format(domain)) -    _send_signed_request( -        challenge["url"], -        {}, -        "Error requesting challenge verfication: {0}".format(domain), -    ) -    chl_verification = _poll_until_not( -        challenge["url"], ["pending"], "Error checking challenge verification" -    ) -    if chl_verification["status"] != "valid": -        raise ValueError( -            "Challenge did not pass for {0}: {1}".format(domain, chl_verification) -        ) -    sys.stderr.write("{} verified!\n".format(domain)) +    for authz in order["authorizations"]: +        _do_challenge(authz, thumbprint)      # Step 8: Finalize      csr_der = _cmd( @@ -281,7 +295,7 @@ Notes:      )      # Step 9: Wait for CA to mark test as valid -    sys.stderr.write("Waiting for {0} challenge to pass...\n".format(domain)) +    sys.stderr.write("Waiting for {0} challenge to pass...\n".format(cn))      order = _poll_until_not(          order_headers["Location"],          ["pending", "processing"], @@ -289,9 +303,9 @@ Notes:      )      if order["status"] == "valid": -        sys.stderr.write("Passed {0} challenge!\n".format(domain)) +        sys.stderr.write("Passed {0} challenge!\n".format(cn))      else: -        raise ValueError("'{0}' challenge did not pass: {1}".format(domain, order)) +        raise ValueError("'{0}' challenge did not pass: {1}".format(cn, order))      # Step 10: Get the certificate.      sys.stderr.write("Getting certificate...\n") | 
