diff options
| author | Daniel Roesler <diafygi@gmail.com> | 2015-11-14 15:40:23 -0800 | 
|---|---|---|
| committer | Daniel Roesler <diafygi@gmail.com> | 2015-11-14 15:40:23 -0800 | 
| commit | 2c635c0df0f2e174b67cfcfddac425863e92e98a (patch) | |
| tree | c7eb2dea9fc7b027f9b7957e2aa10d63e1884572 | |
| parent | 63c6caf7cb3e0c271fce6dbbf2e7baef55426151 (diff) | |
switched to using http-01 challenge since SimpleHTTP is being removed
| -rw-r--r-- | sign_csr.py | 144 | 
1 files changed, 64 insertions, 80 deletions
| diff --git a/sign_csr.py b/sign_csr.py index 28b5257..815c4ea 100644 --- a/sign_csr.py +++ b/sign_csr.py @@ -1,6 +1,6 @@  #!/usr/bin/env python -import argparse, subprocess, json, os, urllib2, sys, base64, binascii, ssl, \ -    hashlib, tempfile, re, time, copy, textwrap, copy +import argparse, subprocess, json, os, urllib2, sys, base64, binascii, time, \ +    hashlib, tempfile, re, copy, textwrap  def sign_csr(pubkey, csr, email=None):      """Use the ACME protocol to get an ssl certificate signed by a @@ -25,10 +25,6 @@ def sign_csr(pubkey, csr, email=None):          "Shortcut function to go from bytes to jwt base64 string"          return base64.urlsafe_b64encode(b).replace("=", "") -    def _a64(a): -        "Shortcut function to go from jwt base64 string to bytes" -        return base64.urlsafe_b64decode(str(a + ("=" * (len(a) % 4)))) -      # Step 1: Get account public key      sys.stderr.write("Reading pubkey file...\n")      proc = subprocess.Popen(["openssl", "rsa", "-pubin", "-in", pubkey, "-noout", "-text"], @@ -54,7 +50,9 @@ def sign_csr(pubkey, csr, email=None):              "n": pub_mod64,          },      } -    sys.stderr.write("Found public key!\n".format(header)) +    accountkey_json = json.dumps(header['jwk'], sort_keys=True, separators=(',', ':')) +    thumbprint = _b64(hashlib.sha256(accountkey_json).digest()) +    sys.stderr.write("Found public key!\n")      # Step 2: Get the domain names to be certified      sys.stderr.write("Reading csr file...\n") @@ -86,6 +84,7 @@ def sign_csr(pubkey, csr, email=None):      # Step 4: Generate the payloads that need to be signed      # registration      sys.stderr.write("Building request payloads...\n") +    reg_nonce = urllib2.urlopen(nonce_req).headers['Replay-Nonce']      reg_raw = json.dumps({          "resource": "new-reg",          "contact": ["mailto:{}".format(email)], @@ -93,7 +92,7 @@ def sign_csr(pubkey, csr, email=None):      }, sort_keys=True, indent=4)      reg_b64 = _b64(reg_raw)      reg_protected = copy.deepcopy(header) -    reg_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']}) +    reg_protected.update({"nonce": reg_nonce})      reg_protected64 = _b64(json.dumps(reg_protected, sort_keys=True, indent=4))      reg_file = tempfile.NamedTemporaryFile(dir=".", prefix="register_", suffix=".json")      reg_file.write("{}.{}".format(reg_protected64, reg_b64)) @@ -102,12 +101,11 @@ def sign_csr(pubkey, csr, email=None):      reg_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="register_", suffix=".sig")      reg_file_sig_name = os.path.basename(reg_file_sig.name) -    # need signature for each domain identifier and challenge +    # need signature for each domain identifiers      ids = [] -    tests = []      for domain in domains: - -        # identifier +        sys.stderr.write("Building request for {}...\n".format(domain)) +        id_nonce = urllib2.urlopen(nonce_req).headers['Replay-Nonce']          id_raw = json.dumps({              "resource": "new-authz",              "identifier": { @@ -117,7 +115,7 @@ def sign_csr(pubkey, csr, email=None):          }, sort_keys=True)          id_b64 = _b64(id_raw)          id_protected = copy.deepcopy(header) -        id_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']}) +        id_protected.update({"nonce": id_nonce})          id_protected64 = _b64(json.dumps(id_protected, sort_keys=True, indent=4))          id_file = tempfile.NamedTemporaryFile(dir=".", prefix="domain_", suffix=".json")          id_file.write("{}.{}".format(id_protected64, id_b64)) @@ -135,44 +133,20 @@ def sign_csr(pubkey, csr, email=None):              "sig_name": id_file_sig_name,          }) -        # challenge request -        test_path = _b64(os.urandom(16)) -        test_raw = json.dumps({ -            "resource": "challenge", -            "type": "simpleHttp", -            "tls": False, -        }, sort_keys=True, indent=4) -        test_b64 = _b64(test_raw) -        test_protected = copy.deepcopy(header) -        test_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']}) -        test_protected64 = _b64(json.dumps(test_protected, sort_keys=True, indent=4)) -        test_file = tempfile.NamedTemporaryFile(dir=".", prefix="challenge_", suffix=".json") -        test_file.write("{}.{}".format(test_protected64, test_b64)) -        test_file.flush() -        test_file_name = os.path.basename(test_file.name) -        test_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="challenge_", suffix=".sig") -        test_file_sig_name = os.path.basename(test_file_sig.name) -        tests.append({ -            "protected64": test_protected64, -            "data64": test_b64, -            "file": test_file, -            "file_name": test_file_name, -            "sig": test_file_sig, -            "sig_name": test_file_sig_name, -        }) -      # need signature for the final certificate issuance +    sys.stderr.write("Building request for CSR...\n")      proc = subprocess.Popen(["openssl", "req", "-in", csr, "-outform", "DER"],          stdout=subprocess.PIPE, stderr=subprocess.PIPE)      csr_der, err = proc.communicate()      csr_der64 = _b64(csr_der) +    csr_nonce = urllib2.urlopen(nonce_req).headers['Replay-Nonce']      csr_raw = json.dumps({          "resource": "new-cert",          "csr": csr_der64,      }, sort_keys=True, indent=4)      csr_b64 = _b64(csr_raw)      csr_protected = copy.deepcopy(header) -    csr_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']}) +    csr_protected.update({"nonce": csr_nonce})      csr_protected64 = _b64(json.dumps(csr_protected, sort_keys=True, indent=4))      csr_file = tempfile.NamedTemporaryFile(dir=".", prefix="cert_", suffix=".json")      csr_file.write("{}.{}".format(csr_protected64, csr_b64)) @@ -187,13 +161,11 @@ STEP 2: You need to sign some files (replace 'user.key' with your user private k  openssl dgst -sha256 -sign user.key -out {} {}  {} -{}  openssl dgst -sha256 -sign user.key -out {} {}  """.format(      reg_file_sig_name, reg_file_name,      "\n".join("openssl dgst -sha256 -sign user.key -out {} {}".format(i['sig_name'], i['file_name']) for i in ids), -    "\n".join("openssl dgst -sha256 -sign user.key -out {} {}".format(i['sig_name'], i['file_name']) for i in tests),      csr_file_sig_name, csr_file_name))      stdout = sys.stdout @@ -207,8 +179,6 @@ openssl dgst -sha256 -sign user.key -out {} {}      for n, i in enumerate(ids):          i['sig'].seek(0)          i['sig64'] = _b64(i['sig'].read()) -        tests[n]['sig'].seek(0) -        tests[n]['sig64'] = _b64(tests[n]['sig'].read())      # Step 7: Register the user      sys.stderr.write("Registering {}...\n".format(email)) @@ -218,8 +188,9 @@ openssl dgst -sha256 -sign user.key -out {} {}          "payload": reg_b64,          "signature": reg_sig64,      }, sort_keys=True, indent=4) +    reg_url = "{}/acme/new-reg".format(CA)      try: -        resp = urllib2.urlopen("{}/acme/new-reg".format(CA), reg_data) +        resp = urllib2.urlopen(reg_url, reg_data)          result = json.loads(resp.read())      except urllib2.HTTPError as e:          err = e.read() @@ -228,6 +199,7 @@ openssl dgst -sha256 -sign user.key -out {} {}              sys.stderr.write("Already registered. Skipping...\n")          else:              sys.stderr.write("Error: reg_data:\n") +            sys.stderr.write("POST {}\n".format(reg_url))              sys.stderr.write(reg_data)              sys.stderr.write("\n")              sys.stderr.write(err) @@ -236,6 +208,7 @@ openssl dgst -sha256 -sign user.key -out {} {}      # Step 8: Request challenges for each domain      responses = [] +    tests = []      for n, i in enumerate(ids):          sys.stderr.write("Requesting challenges for {}...\n".format(i['domain']))          id_data = json.dumps({ @@ -244,40 +217,52 @@ openssl dgst -sha256 -sign user.key -out {} {}              "payload": i['data64'],              "signature": i['sig64'],          }, sort_keys=True, indent=4) +        id_url = "{}/acme/new-authz".format(CA)          try: -            resp = urllib2.urlopen("{}/acme/new-authz".format(CA), id_data) +            resp = urllib2.urlopen(id_url, id_data)              result = json.loads(resp.read())          except urllib2.HTTPError as e:              sys.stderr.write("Error: id_data:\n") +            sys.stderr.write("POST {}\n".format(id_url))              sys.stderr.write(id_data)              sys.stderr.write("\n")              sys.stderr.write(e.read())              sys.stderr.write("\n")              raise -        challenge = [c for c in result['challenges'] if c['type'] == "simpleHttp"][0] +        challenge = [c for c in result['challenges'] if c['type'] == "http-01"][0] +        keyauthorization = "{}.{}".format(challenge['token'], thumbprint) -        # challenge response payload -        response_raw = json.dumps({ -            "type": "simpleHttp", -            "token": challenge['token'], -            "tls": False, +        # challenge request +        sys.stderr.write("Building challenge responses for {}...\n".format(i['domain'])) +        test_nonce = urllib2.urlopen(nonce_req).headers['Replay-Nonce'] +        test_raw = json.dumps({ +            "resource": "challenge", +            "keyAuthorization": keyauthorization,          }, sort_keys=True, indent=4) -        response_b64 = _b64(response_raw) -        response_protected64 = _b64(json.dumps({"alg": "RS256"})) -        response_file = tempfile.NamedTemporaryFile(dir=".", prefix="response_", suffix=".json") -        response_file.write("{}.{}".format(response_protected64, response_b64)) -        response_file.flush() -        response_file_name = os.path.basename(response_file.name) -        response_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="response_", suffix=".sig") -        response_file_sig_name = os.path.basename(response_file_sig.name) -        responses.append({ +        test_b64 = _b64(test_raw) +        test_protected = copy.deepcopy(header) +        test_protected.update({"nonce": test_nonce}) +        test_protected64 = _b64(json.dumps(test_protected, sort_keys=True, indent=4)) +        test_file = tempfile.NamedTemporaryFile(dir=".", prefix="challenge_", suffix=".json") +        test_file.write("{}.{}".format(test_protected64, test_b64)) +        test_file.flush() +        test_file_name = os.path.basename(test_file.name) +        test_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="challenge_", suffix=".sig") +        test_file_sig_name = os.path.basename(test_file_sig.name) +        tests.append({              "uri": challenge['uri'], -            "protected64": response_protected64, -            "data64": response_b64, -            "file": response_file, -            "file_name": response_file_name, -            "sig": response_file_sig, -            "sig_name": response_file_sig_name, +            "protected64": test_protected64, +            "data64": test_b64, +            "file": test_file, +            "file_name": test_file_name, +            "sig": test_file_sig, +            "sig_name": test_file_sig_name, +        }) + +        # challenge response for server +        responses.append({ +            "uri": ".well-known/acme-challenge/{}".format(challenge['token']), +            "data": keyauthorization,          })      # Step 9: Ask the user to sign the challenge responses @@ -288,7 +273,7 @@ STEP 3: You need to sign some more files (replace 'user.key' with your user priv  """.format(      "\n".join("openssl dgst -sha256 -sign user.key -out {} {}".format( -        i['sig_name'], i['file_name']) for i in responses))) +        i['sig_name'], i['file_name']) for i in tests)))      stdout = sys.stdout      sys.stdout = sys.stderr @@ -297,17 +282,11 @@ STEP 3: You need to sign some more files (replace 'user.key' with your user priv      # Step 10: Load the response signatures      for n, i in enumerate(ids): -        responses[n]['sig'].seek(0) -        responses[n]['sig64'] = _b64(responses[n]['sig'].read()) +        tests[n]['sig'].seek(0) +        tests[n]['sig64'] = _b64(tests[n]['sig'].read())      # Step 11: Ask the user to host the token on their server      for n, i in enumerate(ids): -        response_payload = json.dumps({ -            "header": {"alg": "RS256"}, -            "protected": responses[n]['protected64'], -            "payload": responses[n]['data64'], -            "signature": responses[n]['sig64'], -        }).replace('"', '\\"')          sys.stderr.write("""\  STEP {}: You need to run this command on {} (don't stop the python command until the next step). @@ -317,7 +296,7 @@ sudo python -c "import BaseHTTPServer; \\      s = BaseHTTPServer.HTTPServer(('0.0.0.0', 80), h); \\      s.serve_forever()" -""".format(n+4, i['domain'], response_payload)) +""".format(n+4, i['domain'], responses[n]['data']))          stdout = sys.stdout          sys.stdout = sys.stderr @@ -332,11 +311,13 @@ sudo python -c "import BaseHTTPServer; \\              "payload": tests[n]['data64'],              "signature": tests[n]['sig64'],          }, sort_keys=True, indent=4) +        test_url = tests[n]['uri']          try: -            resp = urllib2.urlopen(responses[n]['uri'], test_data) +            resp = urllib2.urlopen(test_url, test_data)              test_result = json.loads(resp.read())          except urllib2.HTTPError as e:              sys.stderr.write("Error: test_data:\n") +            sys.stderr.write("POST {}\n".format(test_url))              sys.stderr.write(test_data)              sys.stderr.write("\n")              sys.stderr.write(e.read()) @@ -347,10 +328,11 @@ sudo python -c "import BaseHTTPServer; \\          sys.stderr.write("Waiting for {} challenge to pass...\n".format(i['domain']))          while True:              try: -                resp = urllib2.urlopen(responses[n]['uri']) +                resp = urllib2.urlopen(test_url)                  challenge_status = json.loads(resp.read())              except urllib2.HTTPError as e:                  sys.stderr.write("Error: test_data:\n") +                sys.stderr.write("GET {}\n".format(test_url))                  sys.stderr.write(test_data)                  sys.stderr.write("\n")                  sys.stderr.write(e.read()) @@ -374,11 +356,13 @@ sudo python -c "import BaseHTTPServer; \\          "payload": csr_b64,          "signature": csr_sig64,      }, sort_keys=True, indent=4) +    csr_url = "{}/acme/new-cert".format(CA)      try: -        resp = urllib2.urlopen("{}/acme/new-cert".format(CA), csr_data) +        resp = urllib2.urlopen(csr_url, csr_data)          signed_der = resp.read()      except urllib2.HTTPError as e:          sys.stderr.write("Error: csr_data:\n") +        sys.stderr.write("POST {}\n".format(csr_url))          sys.stderr.write(csr_data)          sys.stderr.write("\n")          sys.stderr.write(e.read()) | 
