summaryrefslogtreecommitdiffstats
path: root/acmens.py
diff options
context:
space:
mode:
authorsiddharth <s@ricketyspace.net>2021-05-05 20:19:18 -0400
committersiddharth <s@ricketyspace.net>2021-05-05 20:19:18 -0400
commitf613c29c4ed0a0448450166c9c6fd36209c394c8 (patch)
treebddb5d2c97ff266e60496b5c822ccef8d595396a /acmens.py
parentb6cf3a6839e3480131d2c78d1a672e6839722ade (diff)
acmens.py: move `_do_request` out of `sign_csr` and `revoke_crt`
Diffstat (limited to 'acmens.py')
-rw-r--r--acmens.py117
1 files changed, 39 insertions, 78 deletions
diff --git a/acmens.py b/acmens.py
index cf7424a..b88da0e 100644
--- a/acmens.py
+++ b/acmens.py
@@ -36,6 +36,45 @@ def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"):
return out
+def _do_request(url, data=None, err_msg="Error", depth=0):
+ try:
+ resp = urllib.request.urlopen(
+ urllib.request.Request(
+ url,
+ data=data,
+ headers={
+ "Content-Type": "application/jose+json",
+ "User-Agent": "acmens",
+ },
+ )
+ )
+ resp_data, code, headers = (
+ resp.read().decode("utf8"),
+ resp.getcode(),
+ resp.headers,
+ )
+ except IOError as e:
+ resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e)
+ code, headers = getattr(e, "code", None), {}
+ try:
+ resp_data = json.loads(resp_data) # try to parse json results
+ except ValueError:
+ pass # ignore json parsing errors
+ if (
+ depth < 100
+ and code == 400
+ and resp_data["type"] == "urn:ietf:params:acme:error:badNonce"
+ ):
+ raise IndexError(resp_data) # allow 100 retrys for bad nonces
+ if code not in [200, 201, 204]:
+ raise ValueError(
+ "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(
+ err_msg, url, data, code, resp_data
+ )
+ )
+ return resp_data, code, headers
+
+
def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"):
"""Use the ACME protocol to get an ssl certificate signed by a
certificate authority.
@@ -54,45 +93,6 @@ def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"):
"""
DIRECTORY = json.loads(urlopen(ca_url + "/directory").read().decode("utf8"))
- # helper function - make request and automatically parse json response
- def _do_request(url, data=None, err_msg="Error", depth=0):
- try:
- resp = urllib.request.urlopen(
- urllib.request.Request(
- url,
- data=data,
- headers={
- "Content-Type": "application/jose+json",
- "User-Agent": "acmens",
- },
- )
- )
- resp_data, code, headers = (
- resp.read().decode("utf8"),
- resp.getcode(),
- resp.headers,
- )
- except IOError as e:
- resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e)
- code, headers = getattr(e, "code", None), {}
- try:
- resp_data = json.loads(resp_data) # try to parse json results
- except ValueError:
- pass # ignore json parsing errors
- if (
- depth < 100
- and code == 400
- and resp_data["type"] == "urn:ietf:params:acme:error:badNonce"
- ):
- raise IndexError(resp_data) # allow 100 retrys for bad nonces
- if code not in [200, 201, 204]:
- raise ValueError(
- "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(
- err_msg, url, data, code, resp_data
- )
- )
- return resp_data, code, headers
-
# helper function - make signed requests
def _send_signed_request(url, payload, err_msg, depth=0):
payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8"))
@@ -350,45 +350,6 @@ def revoke_crt(ca_url, account_key, crt):
"Shortcut function to go from jwt base64 string to bytes"
return base64.urlsafe_b64decode(str(a + ("=" * (len(a) % 4))))
- # helper function - make request and automatically parse json response
- def _do_request(url, data=None, err_msg="Error", depth=0):
- try:
- resp = urllib.request.urlopen(
- urllib.request.Request(
- url,
- data=data,
- headers={
- "Content-Type": "application/jose+json",
- "User-Agent": "acmens",
- },
- )
- )
- resp_data, code, headers = (
- resp.read().decode("utf8"),
- resp.getcode(),
- resp.headers,
- )
- except IOError as e:
- resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e)
- code, headers = getattr(e, "code", None), {}
- try:
- resp_data = json.loads(resp_data) # try to parse json results
- except ValueError:
- pass # ignore json parsing errors
- if (
- depth < 100
- and code == 400
- and resp_data["type"] == "urn:ietf:params:acme:error:badNonce"
- ):
- raise IndexError(resp_data) # allow 100 retrys for bad nonces
- if code not in [200, 201, 204]:
- raise ValueError(
- "{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(
- err_msg, url, data, code, resp_data
- )
- )
- return resp_data, code, headers
-
# helper function - make signed requests
def _send_signed_request(url, payload, err_msg, depth=0):
payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8"))