diff options
| -rw-r--r-- | acmens.py | 160 | 
1 files changed, 94 insertions, 66 deletions
| @@ -16,6 +16,14 @@ __version__ = "0.1.4-dev0"  CA_PRD = "https://acme-v02.api.letsencrypt.org"  CA_STG = "https://acme-staging-v02.api.letsencrypt.org" +CA_DIR = None + + +def _directory(ca_url): +    global CA_DIR +    if CA_DIR is None: +        CA_DIR = json.loads(urlopen(ca_url + "/directory").read().decode("utf8")) +    return CA_DIR  def _b64(b): @@ -75,6 +83,31 @@ def _do_request(url, data=None, err_msg="Error", depth=0):      return resp_data, code, headers +def _send_signed_request(url, payload, nonce_url, auth, account_key, err_msg, depth=0): +    """Make signed request to ACME endpoint""" +    payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) +    new_nonce = _do_request(nonce_url)[2]["Replay-Nonce"] +    protected = {"url": url, "alg": "RS256", "nonce": new_nonce} +    protected.update(auth) +    protected64 = _b64(json.dumps(protected).encode("utf8")) +    protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8") +    out = _cmd( +        ["openssl", "dgst", "-sha256", "-sign", account_key], +        stdin=subprocess.PIPE, +        cmd_input=protected_input, +        err_msg="OpenSSL Error", +    ) +    data = json.dumps( +        {"protected": protected64, "payload": payload64, "signature": _b64(out)} +    ) +    try: +        return _do_request(url, data=data.encode("utf8"), err_msg=err_msg, depth=depth) +    except IndexError:  # retry bad nonces (they raise IndexError) +        return _send_signed_request( +            url, payload, auth, account_key, err_msg, depth=(depth + 1) +        ) + +  def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"):      """Use the ACME protocol to get an ssl certificate signed by a      certificate authority. @@ -91,49 +124,24 @@ def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"):      :rtype: string      """ -    DIRECTORY = json.loads(urlopen(ca_url + "/directory").read().decode("utf8")) - -    # helper function - make signed requests -    def _send_signed_request(url, payload, err_msg, depth=0): -        payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) -        new_nonce = _do_request(DIRECTORY["newNonce"])[2]["Replay-Nonce"] -        protected = {"url": url, "alg": "RS256", "nonce": new_nonce} -        protected.update( -            {"jwk": jwk} if acct_headers is None else {"kid": acct_headers["Location"]} -        ) -        protected64 = _b64(json.dumps(protected).encode("utf8")) -        protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8") -        out = _cmd( -            ["openssl", "dgst", "-sha256", "-sign", account_key], -            stdin=subprocess.PIPE, -            cmd_input=protected_input, -            err_msg="OpenSSL Error", -        ) -        data = json.dumps( -            {"protected": protected64, "payload": payload64, "signature": _b64(out)} -        ) -        try: -            return _do_request( -                url, data=data.encode("utf8"), err_msg=err_msg, depth=depth -            ) -        except IndexError:  # retry bad nonces (they raise IndexError) -            return _send_signed_request(url, payload, err_msg, depth=(depth + 1))      # helper function - poll until complete -    def _poll_until_not(url, pending_statuses, err_msg): +    def _poll_until_not(url, pending_statuses, nonce_url, auth, account_key, err_msg):          result, t0 = None, time.time()          while result is None or result["status"] in pending_statuses:              assert time.time() - t0 < 3600, "Polling timeout"  # 1 hour timeout              time.sleep(0 if result is None else 2) -            result, _, _ = _send_signed_request(url, None, err_msg) +            result, _, _ = _send_signed_request( +                url, None, nonce_url, auth, account_key, err_msg +            )          return result      # helper function - do challenge -    def _do_challenge(authz_url, thumbprint): +    def _do_challenge(authz_url, nonce_url, auth, account_key, thumbprint):          # Request challenges          sys.stderr.write("Requesting challenges...\n")          chl_result, chl_code, chl_headers = _send_signed_request( -            authz_url, None, "Error getting challenges" +            authz_url, None, nonce_url, auth, account_key, "Error getting challenges"          )          domain = chl_result["identifier"]["value"] @@ -203,10 +211,18 @@ Notes:          _send_signed_request(              challenge["url"],              {}, +            nonce_url, +            auth, +            account_key,              "Error requesting challenge verfication: {0}".format(domain),          )          chl_verification = _poll_until_not( -            challenge["url"], ["pending"], "Error checking challenge verification" +            challenge["url"], +            ["pending"], +            nonce_url, +            auth, +            account_key, +            "Error checking challenge verification",          )          if chl_verification["status"] != "valid":              raise ValueError( @@ -281,14 +297,22 @@ Notes:      # Step 4: Generate the payload for registering user and initiate registration.      sys.stderr.write("Registering {0}...\n".format(email))      reg = {"termsOfServiceAgreed": True} +    nonce_url = _directory(ca_url)["newNonce"] +    auth = {"jwk": jwk}      acct_headers = None      result, code, acct_headers = _send_signed_request( -        DIRECTORY["newAccount"], reg, "Error registering" +        _directory(ca_url)["newAccount"], +        reg, +        nonce_url, +        auth, +        account_key, +        "Error registering",      )      if code == 201:          sys.stderr.write("Registered!\n")      else:          sys.stderr.write("Already registered!\n") +    auth = {"kid": acct_headers["Location"]}      # Step 5: Request challenges for domains      sys.stderr.write("Making new order for {0}...\n".format(", ".join(domains))) @@ -296,17 +320,27 @@ Notes:      for domain in domains:          id["identifiers"].append({"type": "dns", "value": domain})      order, order_code, order_headers = _send_signed_request( -        DIRECTORY["newOrder"], id, "Error creating new order" +        _directory(ca_url)["newOrder"], +        id, +        nonce_url, +        auth, +        account_key, +        "Error creating new order",      )      for authz in order["authorizations"]: -        _do_challenge(authz, thumbprint) +        _do_challenge(authz, nonce_url, auth, account_key, thumbprint)      # Step 8: Finalize      csr_der = _cmd(          ["openssl", "req", "-in", csr, "-outform", "DER"], err_msg="DER Export Error"      )      fnlz_resp, fnlz_code, fnlz_headers = _send_signed_request( -        order["finalize"], {"csr": _b64(csr_der)}, "Error finalizing order" +        order["finalize"], +        {"csr": _b64(csr_der)}, +        nonce_url, +        auth, +        account_key, +        "Error finalizing order",      )      # Step 9: Wait for CA to mark test as valid @@ -314,6 +348,9 @@ Notes:      order = _poll_until_not(          order_headers["Location"],          ["pending", "processing"], +        nonce_url, +        auth, +        account_key,          "Error checking order status",      ) @@ -325,7 +362,12 @@ Notes:      # Step 10: Get the certificate.      sys.stderr.write("Getting certificate...\n")      signed_pem, _, _ = _send_signed_request( -        order["certificate"], None, "Error getting certificate" +        order["certificate"], +        None, +        nonce_url, +        auth, +        account_key, +        "Error getting certificate",      )      sys.stderr.write("Received certificate!\n") @@ -344,38 +386,11 @@ def revoke_crt(ca_url, account_key, crt):      :param string account_key: Path to your Let's Encrypt account private key.      :param string crt: Path to the signed certificate.      """ -    DIRECTORY = json.loads(urlopen(ca_url + "/directory").read().decode("utf8"))      def _a64(a):          "Shortcut function to go from jwt base64 string to bytes"          return base64.urlsafe_b64decode(str(a + ("=" * (len(a) % 4)))) -    # helper function - make signed requests -    def _send_signed_request(url, payload, err_msg, depth=0): -        payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) -        new_nonce = _do_request(DIRECTORY["newNonce"])[2]["Replay-Nonce"] -        protected = {"url": url, "alg": "RS256", "nonce": new_nonce} -        protected.update( -            {"jwk": jwk} if acct_headers is None else {"kid": acct_headers["Location"]} -        ) -        protected64 = _b64(json.dumps(protected).encode("utf8")) -        protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8") -        out = _cmd( -            ["openssl", "dgst", "-sha256", "-sign", account_key], -            stdin=subprocess.PIPE, -            cmd_input=protected_input, -            err_msg="OpenSSL Error", -        ) -        data = json.dumps( -            {"protected": protected64, "payload": payload64, "signature": _b64(out)} -        ) -        try: -            return _do_request( -                url, data=data.encode("utf8"), err_msg=err_msg, depth=depth -            ) -        except IndexError:  # retry bad nonces (they raise IndexError) -            return _send_signed_request(url, payload, err_msg, depth=(depth + 1)) -      # Step 1: Get account public key      sys.stderr.write("Reading pubkey file...\n")      out = _cmd( @@ -405,10 +420,18 @@ def revoke_crt(ca_url, account_key, crt):      # Step 2: Get account info.      sys.stderr.write("Getting account info...\n")      reg = {"onlyReturnExistiing": True} +    nonce_url = _directory(ca_url)["newNonce"] +    auth = {"jwk": jwk}      acct_headers = None      result, code, acct_headers = _send_signed_request( -        DIRECTORY["newAccount"], reg, "Error getting account info" +        _directory(ca_url)["newAccount"], +        reg, +        nonce_url, +        auth, +        account_key, +        "Error getting account info",      ) +    auth = {"kid": acct_headers["Location"]}      # Step 3: Generate the payload.      crt_der = _cmd( @@ -419,7 +442,12 @@ def revoke_crt(ca_url, account_key, crt):          "certificate": crt_der64,      }      _send_signed_request( -        DIRECTORY["revokeCert"], rvk_payload, "Error revoking certificate" +        _directory(ca_url)["revokeCert"], +        rvk_payload, +        nonce_url, +        auth, +        account_key, +        "Error revoking certificate",      )      sys.stderr.write("Certificate revoked!\n") | 
