diff options
author | rsiddharth <s@ricketyspace.net> | 2020-04-12 12:32:10 -0400 |
---|---|---|
committer | rsiddharth <s@ricketyspace.net> | 2020-04-12 12:32:10 -0400 |
commit | b30e94d971dfade1c3b5ffa231ab2a2b472700db (patch) | |
tree | 8b367d60c8341cce79b03b86a01bdd8ddb6294d6 /acmens.py | |
parent | 2127478b55a8b1cbd06829ebdd398dd999cc41b5 (diff) |
sign_csr.py -> acmens.py
Diffstat (limited to 'acmens.py')
-rw-r--r-- | acmens.py | 255 |
1 files changed, 255 insertions, 0 deletions
diff --git a/acmens.py b/acmens.py new file mode 100644 index 0000000..c2ea739 --- /dev/null +++ b/acmens.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 +import argparse, subprocess, json, os, urllib.request, sys, base64, binascii, \ + time, hashlib, tempfile, re, copy, textwrap + +from urllib.request import urlopen +from urllib.error import HTTPError + +def sign_csr(account_key, csr, email=None): + """Use the ACME protocol to get an ssl certificate signed by a + certificate authority. + + :param string account_key: Path to the user account key. + :param string csr: Path to the certificate signing request. + :param string email: An optional user account contact email + (defaults to webmaster@<shortest_domain>) + + :returns: Signed Certificate (PEM format) + :rtype: string + + """ + #CA = "https://acme-staging-v02.api.letsencrypt.org" + CA = "https://acme-v02.api.letsencrypt.org" + DIRECTORY = json.loads(urlopen(CA + "/directory").read().decode('utf8')) + + def _b64(b): + "Shortcut function to go from bytes to jwt base64 string" + if type(b) is str: + b = b.encode() + + return base64.urlsafe_b64encode(b).decode().replace("=", "") + + # helper function - run external commands + def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"): + proc = subprocess.Popen(cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = proc.communicate(cmd_input) + if proc.returncode != 0: + raise IOError("{0}\n{1}".format(err_msg, err)) + return out + + # helper function - make request and automatically parse json response + def _do_request(url, data=None, err_msg="Error", depth=0): + try: + resp = urllib.request.urlopen(urllib.request.Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-nosudo"})) + resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers + except IOError as e: + resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) + code, headers = getattr(e, "code", None), {} + try: + resp_data = json.loads(resp_data) # try to parse json results + except ValueError: + pass # ignore json parsing errors + if depth < 100 and code == 400 and resp_data['type'] == "urn:ietf:params:acme:error:badNonce": + raise IndexError(resp_data) # allow 100 retrys for bad nonces + if code not in [200, 201, 204]: + raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, code, resp_data)) + return resp_data, code, headers + + # helper function - make signed requests + def _send_signed_request(url, payload, err_msg, depth=0): + payload64 = "" if payload is None else _b64(json.dumps(payload).encode('utf8')) + new_nonce = _do_request(DIRECTORY['newNonce'])[2]['Replay-Nonce'] + protected = {"url": url, "alg": "RS256", "nonce": new_nonce} + protected.update({"jwk": jwk} if acct_headers is None else {"kid": acct_headers['Location']}) + protected64 = _b64(json.dumps(protected).encode('utf8')) + protected_input = "{0}.{1}".format(protected64, payload64).encode('utf8') + out = _cmd(["openssl", "dgst", "-sha256", "-sign", account_key], stdin=subprocess.PIPE, cmd_input=protected_input, err_msg="OpenSSL Error") + data = json.dumps({"protected": protected64, "payload": payload64, "signature": _b64(out)}) + try: + return _do_request(url, data=data.encode('utf8'), err_msg=err_msg, depth=depth) + except IndexError: # retry bad nonces (they raise IndexError) + return _send_signed_request(url, payload, err_msg, depth=(depth + 1)) + + # helper function - poll until complete + def _poll_until_not(url, pending_statuses, err_msg): + result, t0 = None, time.time() + while result is None or result['status'] in pending_statuses: + assert (time.time() - t0 < 3600), "Polling timeout" # 1 hour timeout + time.sleep(0 if result is None else 2) + result, _, _ = _send_signed_request(url, None, err_msg) + return result + + + # Step 1: Get account public key + sys.stderr.write("Reading pubkey file...\n") + out = _cmd(["openssl", "rsa", "-in", account_key, "-noout", "-text"], err_msg="Error reading account public key") + pub_hex, pub_exp = re.search( + r"modulus:[\s]+?00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)", + out.decode('utf8'), re.MULTILINE|re.DOTALL).groups() + pub_mod = binascii.unhexlify(re.sub("(\s|:)", "", pub_hex)) + pub_mod64 = _b64(pub_mod) + pub_exp = int(pub_exp) + pub_exp = "{0:x}".format(pub_exp) + pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp + pub_exp = binascii.unhexlify(pub_exp) + pub_exp64 = _b64(pub_exp) + jwk = { + "e": pub_exp64, + "kty": "RSA", + "n": pub_mod64, + } + accountkey_json = json.dumps(jwk, sort_keys=True, separators=(',', ':')) + thumbprint = _b64(hashlib.sha256(accountkey_json.encode()).digest()) + sys.stderr.write("Found public key!\n") + + # Step 2: Get the domain names to be certified + sys.stderr.write("Reading csr file...\n") + out = _cmd(["openssl", "req", "-in", csr, "-noout", "-text"], err_msg="Error loading {}".format(csr)) + domains = set([]) + common_name = re.search("Subject:.*? CN *= *([^\s,;/]+)", out.decode('utf8')) + if common_name is not None: + domains.add(common_name.group(1)) + subject_alt_names = re.search("X509v3 Subject Alternative Name: \n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL) + if subject_alt_names is not None: + for san in subject_alt_names.group(1).split(", "): + if san.startswith("DNS:"): + domains.add(san[4:]) + sys.stderr.write("Found domains {0}\n".format(", ".join(domains))) + + # Step 3: Ask user for contact email + if not email: + default_email = "webmaster@{0}".format(min(domains, key=len)) + stdout = sys.stdout + sys.stdout = sys.stderr + input_email = input("STEP 1: What is your contact email? ({0}) ".format(default_email)) + email = input_email if input_email else default_email + sys.stdout = stdout + + + # Step 4: Generate the payload for registering user and initiate registration. + sys.stderr.write("Registering {0}...\n".format(email)) + reg = { + "termsOfServiceAgreed": True + } + acct_headers = None + result, code, acct_headers = _send_signed_request(DIRECTORY['newAccount'], reg, "Error registering") + if code == 201: + sys.stderr.write("Registered!\n") + else: + sys.stderr.write("Already registered!\n") + + + # Step 5: Request challenges for each domain + for domain in domains: + sys.stderr.write("Making new order for {0}...\n".format(domain)) + id = { + "identifiers": [{ + "type": "dns", + "value": domain, + }], + } + order, order_code, order_headers = _send_signed_request(DIRECTORY['newOrder'], id, "Error creating new order") + + # Request challenges + sys.stderr.write("Requesting challenges...\n") + chl_result, chl_code, chl_headers = _send_signed_request(order['authorizations'][0], None, "Error getting challenges") + + challenge = [c for c in chl_result['challenges'] if c['type'] == "http-01"][0] + token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge['token']) + keyauthorization = "{0}.{1}".format(challenge['token'], thumbprint) + + # build request for the server to test this challenge. + test_url = challenge['url'] + test_raw = "{}" + + # challenge response for server + response = { + "uri": ".well-known/acme-challenge/{0}".format(challenge['token']), + "data": keyauthorization, + } + + # Step 6: Ask the user to host the token on their server + sys.stderr.write("""\ +Please update your server to serve the following file at this URL: + +-------------- +URL: http://{0}/{1} +File contents: \"{2}\" +-------------- + +Notes: +- Do not include the quotes in the file. +- The file should be one line without any spaces. + +""".format(domain, response['uri'], response['data'])) + + stdout = sys.stdout + sys.stdout = sys.stderr + input("Press Enter when you've got the file hosted on your server...") + sys.stdout = stdout + + # Step 7: Let the CA know you're ready for the challenge + sys.stderr.write("Requesting verification for {0}...\n".format(domain)) + _send_signed_request(challenge['url'], {}, "Error requesting challenge verfication: {0}".format(domain)) + chl_verification = _poll_until_not(challenge['url'], ["pending"], "Error checking challenge verification") + if chl_verification['status'] != "valid": + raise ValueError("Challenge did not pass for {0}: {1}".format(domain, chl_verification)) + sys.stderr.write("{} verified!\n".format(domain)) + + # Step 8: Finalize + csr_der = _cmd(["openssl", "req", "-in", csr, "-outform", "DER"], err_msg="DER Export Error") + fnlz_resp, fnlz_code, fnlz_headers = _send_signed_request(order['finalize'], {"csr": _b64(csr_der)}, "Error finalizing order") + + # Step 9: Wait for CA to mark test as valid + sys.stderr.write("Waiting for {0} challenge to pass...\n".format(domain)) + order = _poll_until_not(order_headers['Location'], ["pending", "processing"], "Error checking order status") + + if order['status'] == "valid": + sys.stderr.write("Passed {0} challenge!\n".format(domain)) + else: + raise ValueError("'{0}' challenge did not pass: {1}".format(domain, order)) + + + # Step 10: Get the certificate. + sys.stderr.write("Getting certificate...\n") + signed_pem, _, _ = _send_signed_request(order['certificate'], None, "Error getting certificate") + + sys.stderr.write("Received certificate!\n") + sys.stderr.write("You can remove the acme-challenge file from your webserver now.\n") + + return signed_pem + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description="""\ +Get a SSL certificate signed by a Let's Encrypt (ACME) certificate +authority and output that signed certificate. You do NOT need to run +this script on your server, it is meant to be run on your +computer. The script will request you to manually deploy the acme +challenge on your server. + +NOTE: YOUR ACCOUNT KEY NEEDS TO BE DIFFERENT FROM YOUR DOMAIN KEY. + +Prerequisites: +* openssl +* python version 3 + +Example: Generate an account keypair, a domain key and csr, and have the domain csr signed. +-------------- +$ openssl genrsa -aes256 4096 > user.key +$ openssl rsa -in user.key -pubout > user.pub +$ openssl genrsa -aes256 4096 > domain.key +$ openssl req -new -sha256 -key domain.key -subj "/CN=example.com" > domain.csr +$ python3 acmens.py --account-key user.key --email user@example.com domain.csr > signed.crt +-------------- + +""") + parser.add_argument("-k", "--account-key", required=True, help="path to your Let's Encrypt account private key") + parser.add_argument("-e", "--email", default=None, help="contact email, default is webmaster@<shortest_domain>") + parser.add_argument("csr_path", help="path to your certificate signing request") + + args = parser.parse_args() + signed_crt = sign_csr(args.account_key, args.csr_path, email=args.email) + sys.stdout.write(signed_crt) + |