summaryrefslogtreecommitdiffstats
path: root/sign_csr.py
diff options
context:
space:
mode:
authorDaniel Roesler <diafygi@gmail.com>2015-11-14 15:40:23 -0800
committerDaniel Roesler <diafygi@gmail.com>2015-11-14 15:40:23 -0800
commit2c635c0df0f2e174b67cfcfddac425863e92e98a (patch)
treec7eb2dea9fc7b027f9b7957e2aa10d63e1884572 /sign_csr.py
parent63c6caf7cb3e0c271fce6dbbf2e7baef55426151 (diff)
switched to using http-01 challenge since SimpleHTTP is being removed
Diffstat (limited to 'sign_csr.py')
-rw-r--r--sign_csr.py144
1 files changed, 64 insertions, 80 deletions
diff --git a/sign_csr.py b/sign_csr.py
index 28b5257..815c4ea 100644
--- a/sign_csr.py
+++ b/sign_csr.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
-import argparse, subprocess, json, os, urllib2, sys, base64, binascii, ssl, \
- hashlib, tempfile, re, time, copy, textwrap, copy
+import argparse, subprocess, json, os, urllib2, sys, base64, binascii, time, \
+ hashlib, tempfile, re, copy, textwrap
def sign_csr(pubkey, csr, email=None):
"""Use the ACME protocol to get an ssl certificate signed by a
@@ -25,10 +25,6 @@ def sign_csr(pubkey, csr, email=None):
"Shortcut function to go from bytes to jwt base64 string"
return base64.urlsafe_b64encode(b).replace("=", "")
- def _a64(a):
- "Shortcut function to go from jwt base64 string to bytes"
- return base64.urlsafe_b64decode(str(a + ("=" * (len(a) % 4))))
-
# Step 1: Get account public key
sys.stderr.write("Reading pubkey file...\n")
proc = subprocess.Popen(["openssl", "rsa", "-pubin", "-in", pubkey, "-noout", "-text"],
@@ -54,7 +50,9 @@ def sign_csr(pubkey, csr, email=None):
"n": pub_mod64,
},
}
- sys.stderr.write("Found public key!\n".format(header))
+ accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))
+ thumbprint = _b64(hashlib.sha256(accountkey_json).digest())
+ sys.stderr.write("Found public key!\n")
# Step 2: Get the domain names to be certified
sys.stderr.write("Reading csr file...\n")
@@ -86,6 +84,7 @@ def sign_csr(pubkey, csr, email=None):
# Step 4: Generate the payloads that need to be signed
# registration
sys.stderr.write("Building request payloads...\n")
+ reg_nonce = urllib2.urlopen(nonce_req).headers['Replay-Nonce']
reg_raw = json.dumps({
"resource": "new-reg",
"contact": ["mailto:{}".format(email)],
@@ -93,7 +92,7 @@ def sign_csr(pubkey, csr, email=None):
}, sort_keys=True, indent=4)
reg_b64 = _b64(reg_raw)
reg_protected = copy.deepcopy(header)
- reg_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']})
+ reg_protected.update({"nonce": reg_nonce})
reg_protected64 = _b64(json.dumps(reg_protected, sort_keys=True, indent=4))
reg_file = tempfile.NamedTemporaryFile(dir=".", prefix="register_", suffix=".json")
reg_file.write("{}.{}".format(reg_protected64, reg_b64))
@@ -102,12 +101,11 @@ def sign_csr(pubkey, csr, email=None):
reg_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="register_", suffix=".sig")
reg_file_sig_name = os.path.basename(reg_file_sig.name)
- # need signature for each domain identifier and challenge
+ # need signature for each domain identifiers
ids = []
- tests = []
for domain in domains:
-
- # identifier
+ sys.stderr.write("Building request for {}...\n".format(domain))
+ id_nonce = urllib2.urlopen(nonce_req).headers['Replay-Nonce']
id_raw = json.dumps({
"resource": "new-authz",
"identifier": {
@@ -117,7 +115,7 @@ def sign_csr(pubkey, csr, email=None):
}, sort_keys=True)
id_b64 = _b64(id_raw)
id_protected = copy.deepcopy(header)
- id_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']})
+ id_protected.update({"nonce": id_nonce})
id_protected64 = _b64(json.dumps(id_protected, sort_keys=True, indent=4))
id_file = tempfile.NamedTemporaryFile(dir=".", prefix="domain_", suffix=".json")
id_file.write("{}.{}".format(id_protected64, id_b64))
@@ -135,44 +133,20 @@ def sign_csr(pubkey, csr, email=None):
"sig_name": id_file_sig_name,
})
- # challenge request
- test_path = _b64(os.urandom(16))
- test_raw = json.dumps({
- "resource": "challenge",
- "type": "simpleHttp",
- "tls": False,
- }, sort_keys=True, indent=4)
- test_b64 = _b64(test_raw)
- test_protected = copy.deepcopy(header)
- test_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']})
- test_protected64 = _b64(json.dumps(test_protected, sort_keys=True, indent=4))
- test_file = tempfile.NamedTemporaryFile(dir=".", prefix="challenge_", suffix=".json")
- test_file.write("{}.{}".format(test_protected64, test_b64))
- test_file.flush()
- test_file_name = os.path.basename(test_file.name)
- test_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="challenge_", suffix=".sig")
- test_file_sig_name = os.path.basename(test_file_sig.name)
- tests.append({
- "protected64": test_protected64,
- "data64": test_b64,
- "file": test_file,
- "file_name": test_file_name,
- "sig": test_file_sig,
- "sig_name": test_file_sig_name,
- })
-
# need signature for the final certificate issuance
+ sys.stderr.write("Building request for CSR...\n")
proc = subprocess.Popen(["openssl", "req", "-in", csr, "-outform", "DER"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
csr_der, err = proc.communicate()
csr_der64 = _b64(csr_der)
+ csr_nonce = urllib2.urlopen(nonce_req).headers['Replay-Nonce']
csr_raw = json.dumps({
"resource": "new-cert",
"csr": csr_der64,
}, sort_keys=True, indent=4)
csr_b64 = _b64(csr_raw)
csr_protected = copy.deepcopy(header)
- csr_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']})
+ csr_protected.update({"nonce": csr_nonce})
csr_protected64 = _b64(json.dumps(csr_protected, sort_keys=True, indent=4))
csr_file = tempfile.NamedTemporaryFile(dir=".", prefix="cert_", suffix=".json")
csr_file.write("{}.{}".format(csr_protected64, csr_b64))
@@ -187,13 +161,11 @@ STEP 2: You need to sign some files (replace 'user.key' with your user private k
openssl dgst -sha256 -sign user.key -out {} {}
{}
-{}
openssl dgst -sha256 -sign user.key -out {} {}
""".format(
reg_file_sig_name, reg_file_name,
"\n".join("openssl dgst -sha256 -sign user.key -out {} {}".format(i['sig_name'], i['file_name']) for i in ids),
- "\n".join("openssl dgst -sha256 -sign user.key -out {} {}".format(i['sig_name'], i['file_name']) for i in tests),
csr_file_sig_name, csr_file_name))
stdout = sys.stdout
@@ -207,8 +179,6 @@ openssl dgst -sha256 -sign user.key -out {} {}
for n, i in enumerate(ids):
i['sig'].seek(0)
i['sig64'] = _b64(i['sig'].read())
- tests[n]['sig'].seek(0)
- tests[n]['sig64'] = _b64(tests[n]['sig'].read())
# Step 7: Register the user
sys.stderr.write("Registering {}...\n".format(email))
@@ -218,8 +188,9 @@ openssl dgst -sha256 -sign user.key -out {} {}
"payload": reg_b64,
"signature": reg_sig64,
}, sort_keys=True, indent=4)
+ reg_url = "{}/acme/new-reg".format(CA)
try:
- resp = urllib2.urlopen("{}/acme/new-reg".format(CA), reg_data)
+ resp = urllib2.urlopen(reg_url, reg_data)
result = json.loads(resp.read())
except urllib2.HTTPError as e:
err = e.read()
@@ -228,6 +199,7 @@ openssl dgst -sha256 -sign user.key -out {} {}
sys.stderr.write("Already registered. Skipping...\n")
else:
sys.stderr.write("Error: reg_data:\n")
+ sys.stderr.write("POST {}\n".format(reg_url))
sys.stderr.write(reg_data)
sys.stderr.write("\n")
sys.stderr.write(err)
@@ -236,6 +208,7 @@ openssl dgst -sha256 -sign user.key -out {} {}
# Step 8: Request challenges for each domain
responses = []
+ tests = []
for n, i in enumerate(ids):
sys.stderr.write("Requesting challenges for {}...\n".format(i['domain']))
id_data = json.dumps({
@@ -244,40 +217,52 @@ openssl dgst -sha256 -sign user.key -out {} {}
"payload": i['data64'],
"signature": i['sig64'],
}, sort_keys=True, indent=4)
+ id_url = "{}/acme/new-authz".format(CA)
try:
- resp = urllib2.urlopen("{}/acme/new-authz".format(CA), id_data)
+ resp = urllib2.urlopen(id_url, id_data)
result = json.loads(resp.read())
except urllib2.HTTPError as e:
sys.stderr.write("Error: id_data:\n")
+ sys.stderr.write("POST {}\n".format(id_url))
sys.stderr.write(id_data)
sys.stderr.write("\n")
sys.stderr.write(e.read())
sys.stderr.write("\n")
raise
- challenge = [c for c in result['challenges'] if c['type'] == "simpleHttp"][0]
+ challenge = [c for c in result['challenges'] if c['type'] == "http-01"][0]
+ keyauthorization = "{}.{}".format(challenge['token'], thumbprint)
- # challenge response payload
- response_raw = json.dumps({
- "type": "simpleHttp",
- "token": challenge['token'],
- "tls": False,
+ # challenge request
+ sys.stderr.write("Building challenge responses for {}...\n".format(i['domain']))
+ test_nonce = urllib2.urlopen(nonce_req).headers['Replay-Nonce']
+ test_raw = json.dumps({
+ "resource": "challenge",
+ "keyAuthorization": keyauthorization,
}, sort_keys=True, indent=4)
- response_b64 = _b64(response_raw)
- response_protected64 = _b64(json.dumps({"alg": "RS256"}))
- response_file = tempfile.NamedTemporaryFile(dir=".", prefix="response_", suffix=".json")
- response_file.write("{}.{}".format(response_protected64, response_b64))
- response_file.flush()
- response_file_name = os.path.basename(response_file.name)
- response_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="response_", suffix=".sig")
- response_file_sig_name = os.path.basename(response_file_sig.name)
- responses.append({
+ test_b64 = _b64(test_raw)
+ test_protected = copy.deepcopy(header)
+ test_protected.update({"nonce": test_nonce})
+ test_protected64 = _b64(json.dumps(test_protected, sort_keys=True, indent=4))
+ test_file = tempfile.NamedTemporaryFile(dir=".", prefix="challenge_", suffix=".json")
+ test_file.write("{}.{}".format(test_protected64, test_b64))
+ test_file.flush()
+ test_file_name = os.path.basename(test_file.name)
+ test_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="challenge_", suffix=".sig")
+ test_file_sig_name = os.path.basename(test_file_sig.name)
+ tests.append({
"uri": challenge['uri'],
- "protected64": response_protected64,
- "data64": response_b64,
- "file": response_file,
- "file_name": response_file_name,
- "sig": response_file_sig,
- "sig_name": response_file_sig_name,
+ "protected64": test_protected64,
+ "data64": test_b64,
+ "file": test_file,
+ "file_name": test_file_name,
+ "sig": test_file_sig,
+ "sig_name": test_file_sig_name,
+ })
+
+ # challenge response for server
+ responses.append({
+ "uri": ".well-known/acme-challenge/{}".format(challenge['token']),
+ "data": keyauthorization,
})
# Step 9: Ask the user to sign the challenge responses
@@ -288,7 +273,7 @@ STEP 3: You need to sign some more files (replace 'user.key' with your user priv
""".format(
"\n".join("openssl dgst -sha256 -sign user.key -out {} {}".format(
- i['sig_name'], i['file_name']) for i in responses)))
+ i['sig_name'], i['file_name']) for i in tests)))
stdout = sys.stdout
sys.stdout = sys.stderr
@@ -297,17 +282,11 @@ STEP 3: You need to sign some more files (replace 'user.key' with your user priv
# Step 10: Load the response signatures
for n, i in enumerate(ids):
- responses[n]['sig'].seek(0)
- responses[n]['sig64'] = _b64(responses[n]['sig'].read())
+ tests[n]['sig'].seek(0)
+ tests[n]['sig64'] = _b64(tests[n]['sig'].read())
# Step 11: Ask the user to host the token on their server
for n, i in enumerate(ids):
- response_payload = json.dumps({
- "header": {"alg": "RS256"},
- "protected": responses[n]['protected64'],
- "payload": responses[n]['data64'],
- "signature": responses[n]['sig64'],
- }).replace('"', '\\"')
sys.stderr.write("""\
STEP {}: You need to run this command on {} (don't stop the python command until the next step).
@@ -317,7 +296,7 @@ sudo python -c "import BaseHTTPServer; \\
s = BaseHTTPServer.HTTPServer(('0.0.0.0', 80), h); \\
s.serve_forever()"
-""".format(n+4, i['domain'], response_payload))
+""".format(n+4, i['domain'], responses[n]['data']))
stdout = sys.stdout
sys.stdout = sys.stderr
@@ -332,11 +311,13 @@ sudo python -c "import BaseHTTPServer; \\
"payload": tests[n]['data64'],
"signature": tests[n]['sig64'],
}, sort_keys=True, indent=4)
+ test_url = tests[n]['uri']
try:
- resp = urllib2.urlopen(responses[n]['uri'], test_data)
+ resp = urllib2.urlopen(test_url, test_data)
test_result = json.loads(resp.read())
except urllib2.HTTPError as e:
sys.stderr.write("Error: test_data:\n")
+ sys.stderr.write("POST {}\n".format(test_url))
sys.stderr.write(test_data)
sys.stderr.write("\n")
sys.stderr.write(e.read())
@@ -347,10 +328,11 @@ sudo python -c "import BaseHTTPServer; \\
sys.stderr.write("Waiting for {} challenge to pass...\n".format(i['domain']))
while True:
try:
- resp = urllib2.urlopen(responses[n]['uri'])
+ resp = urllib2.urlopen(test_url)
challenge_status = json.loads(resp.read())
except urllib2.HTTPError as e:
sys.stderr.write("Error: test_data:\n")
+ sys.stderr.write("GET {}\n".format(test_url))
sys.stderr.write(test_data)
sys.stderr.write("\n")
sys.stderr.write(e.read())
@@ -374,11 +356,13 @@ sudo python -c "import BaseHTTPServer; \\
"payload": csr_b64,
"signature": csr_sig64,
}, sort_keys=True, indent=4)
+ csr_url = "{}/acme/new-cert".format(CA)
try:
- resp = urllib2.urlopen("{}/acme/new-cert".format(CA), csr_data)
+ resp = urllib2.urlopen(csr_url, csr_data)
signed_der = resp.read()
except urllib2.HTTPError as e:
sys.stderr.write("Error: csr_data:\n")
+ sys.stderr.write("POST {}\n".format(csr_url))
sys.stderr.write(csr_data)
sys.stderr.write("\n")
sys.stderr.write(e.read())