summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsiddharth <s@ricketyspace.net>2021-05-05 22:54:43 -0400
committersiddharth <s@ricketyspace.net>2021-05-05 22:54:43 -0400
commit1d9bd0929e326c794ffddd95356c9652fce6f205 (patch)
treef6152beedf78b0f0ed6b230699ab83e152f09191
parentf613c29c4ed0a0448450166c9c6fd36209c394c8 (diff)
acmens.py: move `_send_signed_request` out of `sign_csr` and `revoke_crt`
-rw-r--r--acmens.py160
1 files changed, 94 insertions, 66 deletions
diff --git a/acmens.py b/acmens.py
index b88da0e..3e82729 100644
--- a/acmens.py
+++ b/acmens.py
@@ -16,6 +16,14 @@ __version__ = "0.1.4-dev0"
CA_PRD = "https://acme-v02.api.letsencrypt.org"
CA_STG = "https://acme-staging-v02.api.letsencrypt.org"
+CA_DIR = None
+
+
+def _directory(ca_url):
+ global CA_DIR
+ if CA_DIR is None:
+ CA_DIR = json.loads(urlopen(ca_url + "/directory").read().decode("utf8"))
+ return CA_DIR
def _b64(b):
@@ -75,6 +83,31 @@ def _do_request(url, data=None, err_msg="Error", depth=0):
return resp_data, code, headers
+def _send_signed_request(url, payload, nonce_url, auth, account_key, err_msg, depth=0):
+ """Make signed request to ACME endpoint"""
+ payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8"))
+ new_nonce = _do_request(nonce_url)[2]["Replay-Nonce"]
+ protected = {"url": url, "alg": "RS256", "nonce": new_nonce}
+ protected.update(auth)
+ protected64 = _b64(json.dumps(protected).encode("utf8"))
+ protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8")
+ out = _cmd(
+ ["openssl", "dgst", "-sha256", "-sign", account_key],
+ stdin=subprocess.PIPE,
+ cmd_input=protected_input,
+ err_msg="OpenSSL Error",
+ )
+ data = json.dumps(
+ {"protected": protected64, "payload": payload64, "signature": _b64(out)}
+ )
+ try:
+ return _do_request(url, data=data.encode("utf8"), err_msg=err_msg, depth=depth)
+ except IndexError: # retry bad nonces (they raise IndexError)
+ return _send_signed_request(
+ url, payload, auth, account_key, err_msg, depth=(depth + 1)
+ )
+
+
def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"):
"""Use the ACME protocol to get an ssl certificate signed by a
certificate authority.
@@ -91,49 +124,24 @@ def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"):
:rtype: string
"""
- DIRECTORY = json.loads(urlopen(ca_url + "/directory").read().decode("utf8"))
-
- # helper function - make signed requests
- def _send_signed_request(url, payload, err_msg, depth=0):
- payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8"))
- new_nonce = _do_request(DIRECTORY["newNonce"])[2]["Replay-Nonce"]
- protected = {"url": url, "alg": "RS256", "nonce": new_nonce}
- protected.update(
- {"jwk": jwk} if acct_headers is None else {"kid": acct_headers["Location"]}
- )
- protected64 = _b64(json.dumps(protected).encode("utf8"))
- protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8")
- out = _cmd(
- ["openssl", "dgst", "-sha256", "-sign", account_key],
- stdin=subprocess.PIPE,
- cmd_input=protected_input,
- err_msg="OpenSSL Error",
- )
- data = json.dumps(
- {"protected": protected64, "payload": payload64, "signature": _b64(out)}
- )
- try:
- return _do_request(
- url, data=data.encode("utf8"), err_msg=err_msg, depth=depth
- )
- except IndexError: # retry bad nonces (they raise IndexError)
- return _send_signed_request(url, payload, err_msg, depth=(depth + 1))
# helper function - poll until complete
- def _poll_until_not(url, pending_statuses, err_msg):
+ def _poll_until_not(url, pending_statuses, nonce_url, auth, account_key, err_msg):
result, t0 = None, time.time()
while result is None or result["status"] in pending_statuses:
assert time.time() - t0 < 3600, "Polling timeout" # 1 hour timeout
time.sleep(0 if result is None else 2)
- result, _, _ = _send_signed_request(url, None, err_msg)
+ result, _, _ = _send_signed_request(
+ url, None, nonce_url, auth, account_key, err_msg
+ )
return result
# helper function - do challenge
- def _do_challenge(authz_url, thumbprint):
+ def _do_challenge(authz_url, nonce_url, auth, account_key, thumbprint):
# Request challenges
sys.stderr.write("Requesting challenges...\n")
chl_result, chl_code, chl_headers = _send_signed_request(
- authz_url, None, "Error getting challenges"
+ authz_url, None, nonce_url, auth, account_key, "Error getting challenges"
)
domain = chl_result["identifier"]["value"]
@@ -203,10 +211,18 @@ Notes:
_send_signed_request(
challenge["url"],
{},
+ nonce_url,
+ auth,
+ account_key,
"Error requesting challenge verfication: {0}".format(domain),
)
chl_verification = _poll_until_not(
- challenge["url"], ["pending"], "Error checking challenge verification"
+ challenge["url"],
+ ["pending"],
+ nonce_url,
+ auth,
+ account_key,
+ "Error checking challenge verification",
)
if chl_verification["status"] != "valid":
raise ValueError(
@@ -281,14 +297,22 @@ Notes:
# Step 4: Generate the payload for registering user and initiate registration.
sys.stderr.write("Registering {0}...\n".format(email))
reg = {"termsOfServiceAgreed": True}
+ nonce_url = _directory(ca_url)["newNonce"]
+ auth = {"jwk": jwk}
acct_headers = None
result, code, acct_headers = _send_signed_request(
- DIRECTORY["newAccount"], reg, "Error registering"
+ _directory(ca_url)["newAccount"],
+ reg,
+ nonce_url,
+ auth,
+ account_key,
+ "Error registering",
)
if code == 201:
sys.stderr.write("Registered!\n")
else:
sys.stderr.write("Already registered!\n")
+ auth = {"kid": acct_headers["Location"]}
# Step 5: Request challenges for domains
sys.stderr.write("Making new order for {0}...\n".format(", ".join(domains)))
@@ -296,17 +320,27 @@ Notes:
for domain in domains:
id["identifiers"].append({"type": "dns", "value": domain})
order, order_code, order_headers = _send_signed_request(
- DIRECTORY["newOrder"], id, "Error creating new order"
+ _directory(ca_url)["newOrder"],
+ id,
+ nonce_url,
+ auth,
+ account_key,
+ "Error creating new order",
)
for authz in order["authorizations"]:
- _do_challenge(authz, thumbprint)
+ _do_challenge(authz, nonce_url, auth, account_key, thumbprint)
# Step 8: Finalize
csr_der = _cmd(
["openssl", "req", "-in", csr, "-outform", "DER"], err_msg="DER Export Error"
)
fnlz_resp, fnlz_code, fnlz_headers = _send_signed_request(
- order["finalize"], {"csr": _b64(csr_der)}, "Error finalizing order"
+ order["finalize"],
+ {"csr": _b64(csr_der)},
+ nonce_url,
+ auth,
+ account_key,
+ "Error finalizing order",
)
# Step 9: Wait for CA to mark test as valid
@@ -314,6 +348,9 @@ Notes:
order = _poll_until_not(
order_headers["Location"],
["pending", "processing"],
+ nonce_url,
+ auth,
+ account_key,
"Error checking order status",
)
@@ -325,7 +362,12 @@ Notes:
# Step 10: Get the certificate.
sys.stderr.write("Getting certificate...\n")
signed_pem, _, _ = _send_signed_request(
- order["certificate"], None, "Error getting certificate"
+ order["certificate"],
+ None,
+ nonce_url,
+ auth,
+ account_key,
+ "Error getting certificate",
)
sys.stderr.write("Received certificate!\n")
@@ -344,38 +386,11 @@ def revoke_crt(ca_url, account_key, crt):
:param string account_key: Path to your Let's Encrypt account private key.
:param string crt: Path to the signed certificate.
"""
- DIRECTORY = json.loads(urlopen(ca_url + "/directory").read().decode("utf8"))
def _a64(a):
"Shortcut function to go from jwt base64 string to bytes"
return base64.urlsafe_b64decode(str(a + ("=" * (len(a) % 4))))
- # helper function - make signed requests
- def _send_signed_request(url, payload, err_msg, depth=0):
- payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8"))
- new_nonce = _do_request(DIRECTORY["newNonce"])[2]["Replay-Nonce"]
- protected = {"url": url, "alg": "RS256", "nonce": new_nonce}
- protected.update(
- {"jwk": jwk} if acct_headers is None else {"kid": acct_headers["Location"]}
- )
- protected64 = _b64(json.dumps(protected).encode("utf8"))
- protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8")
- out = _cmd(
- ["openssl", "dgst", "-sha256", "-sign", account_key],
- stdin=subprocess.PIPE,
- cmd_input=protected_input,
- err_msg="OpenSSL Error",
- )
- data = json.dumps(
- {"protected": protected64, "payload": payload64, "signature": _b64(out)}
- )
- try:
- return _do_request(
- url, data=data.encode("utf8"), err_msg=err_msg, depth=depth
- )
- except IndexError: # retry bad nonces (they raise IndexError)
- return _send_signed_request(url, payload, err_msg, depth=(depth + 1))
-
# Step 1: Get account public key
sys.stderr.write("Reading pubkey file...\n")
out = _cmd(
@@ -405,10 +420,18 @@ def revoke_crt(ca_url, account_key, crt):
# Step 2: Get account info.
sys.stderr.write("Getting account info...\n")
reg = {"onlyReturnExistiing": True}
+ nonce_url = _directory(ca_url)["newNonce"]
+ auth = {"jwk": jwk}
acct_headers = None
result, code, acct_headers = _send_signed_request(
- DIRECTORY["newAccount"], reg, "Error getting account info"
+ _directory(ca_url)["newAccount"],
+ reg,
+ nonce_url,
+ auth,
+ account_key,
+ "Error getting account info",
)
+ auth = {"kid": acct_headers["Location"]}
# Step 3: Generate the payload.
crt_der = _cmd(
@@ -419,7 +442,12 @@ def revoke_crt(ca_url, account_key, crt):
"certificate": crt_der64,
}
_send_signed_request(
- DIRECTORY["revokeCert"], rvk_payload, "Error revoking certificate"
+ _directory(ca_url)["revokeCert"],
+ rvk_payload,
+ nonce_url,
+ auth,
+ account_key,
+ "Error revoking certificate",
)
sys.stderr.write("Certificate revoked!\n")