diff options
| author | siddharth <s@ricketyspace.net> | 2021-05-05 23:13:42 -0400 | 
|---|---|---|
| committer | siddharth <s@ricketyspace.net> | 2021-05-05 23:13:42 -0400 | 
| commit | de395d368359d0f938e646bc7a32ba57388863f8 (patch) | |
| tree | 69ebd016aff5f90c3c44fe2f369bbe4b75948c92 | |
| parent | 0bc00df1719826abbb8f87662e2c7fd5e81efe75 (diff) | |
acmens.py: move around `_do_challenge`
| -rw-r--r-- | acmens.py | 152 | 
1 files changed, 76 insertions, 76 deletions
| @@ -44,6 +44,82 @@ def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"):      return out +def _do_request(url, data=None, err_msg="Error", depth=0): +    try: +        resp = urllib.request.urlopen( +            urllib.request.Request( +                url, +                data=data, +                headers={ +                    "Content-Type": "application/jose+json", +                    "User-Agent": "acmens", +                }, +            ) +        ) +        resp_data, code, headers = ( +            resp.read().decode("utf8"), +            resp.getcode(), +            resp.headers, +        ) +    except IOError as e: +        resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) +        code, headers = getattr(e, "code", None), {} +    try: +        resp_data = json.loads(resp_data)  # try to parse json results +    except ValueError: +        pass  # ignore json parsing errors +    if ( +        depth < 100 +        and code == 400 +        and resp_data["type"] == "urn:ietf:params:acme:error:badNonce" +    ): +        raise IndexError(resp_data)  # allow 100 retrys for bad nonces +    if code not in [200, 201, 204]: +        raise ValueError( +            "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format( +                err_msg, url, data, code, resp_data +            ) +        ) +    return resp_data, code, headers + + +def _send_signed_request(url, payload, nonce_url, auth, account_key, err_msg, depth=0): +    """Make signed request to ACME endpoint""" +    payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) +    new_nonce = _do_request(nonce_url)[2]["Replay-Nonce"] +    protected = {"url": url, "alg": "RS256", "nonce": new_nonce} +    protected.update(auth) +    protected64 = _b64(json.dumps(protected).encode("utf8")) +    protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8") +    out = _cmd( +        ["openssl", "dgst", "-sha256", "-sign", account_key], +        stdin=subprocess.PIPE, +        cmd_input=protected_input, +        err_msg="OpenSSL Error", +    ) +    data = json.dumps( +        {"protected": protected64, "payload": payload64, "signature": _b64(out)} +    ) +    try: +        return _do_request(url, data=data.encode("utf8"), err_msg=err_msg, depth=depth) +    except IndexError:  # retry bad nonces (they raise IndexError) +        return _send_signed_request( +            url, payload, auth, account_key, err_msg, depth=(depth + 1) +        ) + + +def _poll_until_not(url, pending_statuses, nonce_url, auth, account_key, err_msg): +    """Poll until status is not in pending_statuses""" +    result, t0 = None, time.time() +    while result is None or result["status"] in pending_statuses: +        assert time.time() - t0 < 3600, "Polling timeout"  # 1 hour timeout +        time.sleep(0 if result is None else 2) +        result, _, _ = _send_signed_request( +            url, None, nonce_url, auth, account_key, err_msg +        ) +    return result + +  def _do_challenge(challenge_type, authz_url, nonce_url, auth, account_key, thumbprint):      """Do ACME challenge"""      # Request challenges @@ -139,82 +215,6 @@ Notes:      sys.stderr.write("{} verified!\n".format(domain)) -def _do_request(url, data=None, err_msg="Error", depth=0): -    try: -        resp = urllib.request.urlopen( -            urllib.request.Request( -                url, -                data=data, -                headers={ -                    "Content-Type": "application/jose+json", -                    "User-Agent": "acmens", -                }, -            ) -        ) -        resp_data, code, headers = ( -            resp.read().decode("utf8"), -            resp.getcode(), -            resp.headers, -        ) -    except IOError as e: -        resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) -        code, headers = getattr(e, "code", None), {} -    try: -        resp_data = json.loads(resp_data)  # try to parse json results -    except ValueError: -        pass  # ignore json parsing errors -    if ( -        depth < 100 -        and code == 400 -        and resp_data["type"] == "urn:ietf:params:acme:error:badNonce" -    ): -        raise IndexError(resp_data)  # allow 100 retrys for bad nonces -    if code not in [200, 201, 204]: -        raise ValueError( -            "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format( -                err_msg, url, data, code, resp_data -            ) -        ) -    return resp_data, code, headers - - -def _send_signed_request(url, payload, nonce_url, auth, account_key, err_msg, depth=0): -    """Make signed request to ACME endpoint""" -    payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8")) -    new_nonce = _do_request(nonce_url)[2]["Replay-Nonce"] -    protected = {"url": url, "alg": "RS256", "nonce": new_nonce} -    protected.update(auth) -    protected64 = _b64(json.dumps(protected).encode("utf8")) -    protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8") -    out = _cmd( -        ["openssl", "dgst", "-sha256", "-sign", account_key], -        stdin=subprocess.PIPE, -        cmd_input=protected_input, -        err_msg="OpenSSL Error", -    ) -    data = json.dumps( -        {"protected": protected64, "payload": payload64, "signature": _b64(out)} -    ) -    try: -        return _do_request(url, data=data.encode("utf8"), err_msg=err_msg, depth=depth) -    except IndexError:  # retry bad nonces (they raise IndexError) -        return _send_signed_request( -            url, payload, auth, account_key, err_msg, depth=(depth + 1) -        ) - - -def _poll_until_not(url, pending_statuses, nonce_url, auth, account_key, err_msg): -    """Poll until status is not in pending_statuses""" -    result, t0 = None, time.time() -    while result is None or result["status"] in pending_statuses: -        assert time.time() - t0 < 3600, "Polling timeout"  # 1 hour timeout -        time.sleep(0 if result is None else 2) -        result, _, _ = _send_signed_request( -            url, None, nonce_url, auth, account_key, err_msg -        ) -    return result - -  def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"):      """Use the ACME protocol to get an ssl certificate signed by a      certificate authority. | 
