diff options
| -rw-r--r-- | sign_csr.py | 163 | 
1 files changed, 73 insertions, 90 deletions
| diff --git a/sign_csr.py b/sign_csr.py index 68dbbfc..3f7d4ce 100644 --- a/sign_csr.py +++ b/sign_csr.py @@ -1,7 +1,7 @@  import argparse, subprocess, json, os, urllib2, sys, base64, binascii, ssl, \ -    hashlib, tempfile, re, time, copy, textwrap +    hashlib, tempfile, re, time, copy, textwrap, copy -def sign_csr(pubkey, csr): +def sign_csr(pubkey, csr, email=None):      """Use the ACME protocol to get an ssl certificate signed by a      certificate authority. @@ -11,9 +11,9 @@ def sign_csr(pubkey, csr):      :rtype: string      """ -    #CA = "http://localhost:4000/acme" -    CA = "https://www.letsencrypt-demo.org/acme" -    nonce_req = urllib2.Request("{}/new-reg".format(CA)) +    CA = "https://acme-staging.api.letsencrypt.org" +    TERMS = "https://letsencrypt.org/documents/LE-SA-v1.0.1-July-27-2015.pdf" +    nonce_req = urllib2.Request("{}/directory".format(CA))      nonce_req.get_method = lambda : 'HEAD'      def _b64(b): @@ -67,24 +67,20 @@ def sign_csr(pubkey, csr):                  domains.add(san[4:])      sys.stderr.write("Found domains {}\n".format(", ".join(domains))) -    #Step 2: Generate the payloads that need to be signed +    #Step 3: Generate the payloads that need to be signed      #registration -    reg_email = "webmaster@{}".format(min(domains, key=len)) +    reg_email = email if email else "webmaster@{}".format(min(domains, key=len))      reg_raw = json.dumps({ +        "resource": "new-reg",          "contact": ["mailto:{}".format(reg_email)], -        "agreement": "https://www.letsencrypt-demo.org/terms", -        #"agreement": "https://letsencrypt.org/be-good", +        "agreement": TERMS,      }, sort_keys=True, indent=4)      reg_b64 = _b64(reg_raw) -    try: -        urllib2.urlopen(nonce_req).info() -    except urllib2.HTTPError as e: -        reg_nonce = json.dumps({ -            "nonce": e.hdrs.get("replay-nonce", _b64(os.urandom(16))), -        }, sort_keys=True, indent=4) -        reg_nonce64 = _b64(reg_nonce) +    reg_protected = copy.deepcopy(header) +    reg_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']}) +    reg_protected64 = _b64(json.dumps(reg_protected, sort_keys=True, indent=4))      reg_file = tempfile.NamedTemporaryFile(dir=".", prefix="register_", suffix=".json") -    reg_file.write("{}.{}".format(reg_nonce64, reg_b64)) +    reg_file.write("{}.{}".format(reg_protected64, reg_b64))      reg_file.flush()      reg_file_name = os.path.basename(reg_file.name)      reg_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="register_", suffix=".sig") @@ -96,24 +92,26 @@ def sign_csr(pubkey, csr):      for domain in domains:          #identifier -        id_raw = json.dumps({"identifier": {"type": "dns", "value": domain}}, sort_keys=True) +        id_raw = json.dumps({ +            "resource": "new-authz", +            "identifier": { +                "type": "dns", +                "value": domain, +            }, +        }, sort_keys=True)          id_b64 = _b64(id_raw) -        try: -            urllib2.urlopen(nonce_req).info() -        except urllib2.HTTPError as e: -            id_nonce = json.dumps({ -                "nonce": e.hdrs.get("replay-nonce", _b64(os.urandom(16))), -            }, sort_keys=True, indent=4) -            id_nonce64 = _b64(id_nonce) +        id_protected = copy.deepcopy(header) +        id_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']}) +        id_protected64 = _b64(json.dumps(id_protected, sort_keys=True, indent=4))          id_file = tempfile.NamedTemporaryFile(dir=".", prefix="domain_", suffix=".json") -        id_file.write("{}.{}".format(id_nonce64, id_b64)) +        id_file.write("{}.{}".format(id_protected64, id_b64))          id_file.flush()          id_file_name = os.path.basename(id_file.name)          id_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="domain_", suffix=".sig")          id_file_sig_name = os.path.basename(id_file_sig.name)          ids.append({              "domain": domain, -            "nonce64": id_nonce64, +            "protected64": id_protected64,              "data64": id_b64,              "file": id_file,              "file_name": id_file_name, @@ -124,26 +122,23 @@ def sign_csr(pubkey, csr):          #challenge          test_path = _b64(os.urandom(16))          test_raw = json.dumps({ +            "resource": "challenge",              "type": "simpleHttp",              "path": test_path,              "tls": False,          }, sort_keys=True, indent=4)          test_b64 = _b64(test_raw) -        try: -            urllib2.urlopen(nonce_req).info() -        except urllib2.HTTPError as e: -            test_nonce = json.dumps({ -                "nonce": e.hdrs.get("replay-nonce", _b64(os.urandom(16))), -            }, sort_keys=True, indent=4) -            test_nonce64 = _b64(test_nonce) +        test_protected = copy.deepcopy(header) +        test_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']}) +        test_protected64 = _b64(json.dumps(test_protected, sort_keys=True, indent=4))          test_file = tempfile.NamedTemporaryFile(dir=".", prefix="challenge_", suffix=".json") -        test_file.write("{}.{}".format(test_nonce64, test_b64)) +        test_file.write("{}.{}".format(test_protected64, test_b64))          test_file.flush()          test_file_name = os.path.basename(test_file.name)          test_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="challenge_", suffix=".sig")          test_file_sig_name = os.path.basename(test_file_sig.name)          tests.append({ -            "nonce64": test_nonce64, +            "protected64": test_protected64,              "data64": test_b64,              "file": test_file,              "file_name": test_file_name, @@ -151,25 +146,47 @@ def sign_csr(pubkey, csr):              "sig_name": test_file_sig_name,          }) -    #Step 3: Ask the user to sign the payloads +    #need signature for the final certificate issuance +    proc = subprocess.Popen(["openssl", "req", "-in", csr, "-outform", "DER"], +        stdout=subprocess.PIPE, stderr=subprocess.PIPE) +    csr_der, err = proc.communicate() +    csr_der64 = _b64(csr_der) +    csr_raw = json.dumps({ +        "resource": "new-cert", +        "csr": csr_der64, +    }, sort_keys=True, indent=4) +    csr_b64 = _b64(csr_raw) +    csr_protected = copy.deepcopy(header) +    csr_protected.update({"nonce": urllib2.urlopen(nonce_req).headers['Replay-Nonce']}) +    csr_protected64 = _b64(json.dumps(csr_protected, sort_keys=True, indent=4)) +    csr_file = tempfile.NamedTemporaryFile(dir=".", prefix="cert_", suffix=".json") +    csr_file.write("{}.{}".format(csr_protected64, csr_b64)) +    csr_file.flush() +    csr_file_name = os.path.basename(csr_file.name) +    csr_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="cert_", suffix=".sig") +    csr_file_sig_name = os.path.basename(csr_file_sig.name) + +    #Step 4: Ask the user to sign the payloads      sys.stderr.write("""  STEP 1: You need to sign some files (replace 'user.key' with your user private key).  openssl dgst -sha256 -sign user.key -out {} {}  {}  {} +openssl dgst -sha256 -sign user.key -out {} {}  """.format(      reg_file_sig_name, reg_file_name,      "\n".join("openssl dgst -sha256 -sign user.key -out {} {}".format(i['sig_name'], i['file_name']) for i in ids), -    "\n".join("openssl dgst -sha256 -sign user.key -out {} {}".format(i['sig_name'], i['file_name']) for i in tests))) +    "\n".join("openssl dgst -sha256 -sign user.key -out {} {}".format(i['sig_name'], i['file_name']) for i in tests), +    csr_file_sig_name, csr_file_name))      stdout = sys.stdout      sys.stdout = sys.stderr      raw_input("Press Enter when you've run the above commands in a new terminal window...")      sys.stdout = stdout -    #Step 4: Load the signatures +    #Step 5: Load the signatures      reg_file_sig.seek(0)      reg_sig64 = _b64(reg_file_sig.read())      for n, i in enumerate(ids): @@ -178,16 +195,16 @@ openssl dgst -sha256 -sign user.key -out {} {}          tests[n]['sig'].seek(0)          tests[n]['sig64'] = _b64(tests[n]['sig'].read()) -    #Step 5: Register the user +    #Step 6: Register the user      sys.stderr.write("Registering {}...\n".format(reg_email))      reg_data = json.dumps({          "header": header, -        "protected": reg_nonce64, +        "protected": reg_protected64,          "payload": reg_b64,          "signature": reg_sig64,      }, sort_keys=True, indent=4)      try: -        resp = urllib2.urlopen("{}/new-reg".format(CA), reg_data) +        resp = urllib2.urlopen("{}/acme/new-reg".format(CA), reg_data)          result = json.loads(resp.read())      except urllib2.HTTPError as e:          err = e.read() @@ -206,17 +223,19 @@ openssl dgst -sha256 -sign user.key -out {} {}      csr_authz = []      for n, i in enumerate(ids): -        #Step 6: Get simpleHttps challenge token +        #Step 7: Get simpleHttps challenge token          sys.stderr.write("Requesting challenges for {}...\n".format(i['domain']))          id_data = json.dumps({              "header": header, -            "protected": i['nonce64'], +            "protected": i['protected64'],              "payload": i['data64'],              "signature": i['sig64'],          }, sort_keys=True, indent=4)          try: -            resp = urllib2.urlopen("{}/new-authz".format(CA), id_data) +            resp = urllib2.urlopen("{}/acme/new-authz".format(CA), id_data)              result = json.loads(resp.read()) +            from pprint import pprint +            pprint(result)          except urllib2.HTTPError as e:              sys.stderr.write("Error: id_data:\n")              sys.stderr.write(id_data) @@ -227,7 +246,7 @@ openssl dgst -sha256 -sign user.key -out {} {}          token, uri = [[c['token'], c['uri']] for c in result['challenges'] if c['type'] == "simpleHttp"][0]          csr_authz.append(re.search("^([^?]+)", uri).group(1)) -        #Step 7: Ask the user to host the token on their server +        #Step 8: Ask the user to host the token on their server          sys.stderr.write("""  STEP {}: You need to run this command on {} (don't stop the python command until the next step). @@ -244,11 +263,11 @@ sudo python -c "import BaseHTTPServer; \\          raw_input("Press Enter when you've got the python command running on your server...")          sys.stdout = stdout -        #Step 8: Let the CA know you're ready for the challenge +        #Step 9: Let the CA know you're ready for the challenge          sys.stderr.write("Requesting verification for {}...\n".format(i['domain']))          test_data = json.dumps({              "header": header, -            "protected": tests[n]['nonce64'], +            "protected": tests[n]['protected64'],              "payload": tests[n]['data64'],              "signature": tests[n]['sig64'],          }, sort_keys=True, indent=4) @@ -263,55 +282,18 @@ sudo python -c "import BaseHTTPServer; \\              sys.stderr.write("\n")              raise -    #Step 9: Build the certificate request payload -    proc = subprocess.Popen(["openssl", "req", "-in", csr, "-outform", "DER"], -        stdout=subprocess.PIPE, stderr=subprocess.PIPE) -    csr_der, err = proc.communicate() -    csr_der64 = _b64(csr_der) -    csr_raw = json.dumps({ -        "csr": csr_der64, -        "authorizations": csr_authz, -    }, sort_keys=True, indent=4) -    csr_b64 = _b64(csr_raw) -    try: -        urllib2.urlopen(nonce_req).info() -    except urllib2.HTTPError as e: -        csr_nonce = json.dumps({ -            "nonce": e.hdrs.get("replay-nonce", _b64(os.urandom(16))), -        }, sort_keys=True, indent=4) -        csr_nonce64 = _b64(csr_nonce) -    csr_file = tempfile.NamedTemporaryFile(dir=".", prefix="cert_", suffix=".json") -    csr_file.write("{}.{}".format(csr_nonce64, csr_b64)) -    csr_file.flush() -    csr_file_name = os.path.basename(csr_file.name) -    csr_file_sig = tempfile.NamedTemporaryFile(dir=".", prefix="cert_", suffix=".sig") -    csr_file_sig_name = os.path.basename(csr_file_sig.name) - -    #Step 10: Ask the user to sign the certificate request -    sys.stderr.write(""" -FINAL STEP: You need to sign one more file (replace 'user.key' with your user private key). - -openssl dgst -sha256 -sign user.key -out {} {} - -""".format(csr_file_sig_name, csr_file_name)) - -    stdout = sys.stdout -    sys.stdout = sys.stderr -    raw_input("Press Enter when you've run the above command in a new terminal window...") -    sys.stdout = stdout - -    #Step 11: Get the certificate signed +    #Step 10: Get the certificate signed      sys.stderr.write("Requesting signature...\n")      csr_file_sig.seek(0)      csr_sig64 = _b64(csr_file_sig.read())      csr_data = json.dumps({          "header": header, -        "protected": csr_nonce64, +        "protected": csr_protected64,          "payload": csr_b64,          "signature": csr_sig64,      }, sort_keys=True, indent=4)      try: -        resp = urllib2.urlopen("{}/new-cert".format(CA), csr_data) +        resp = urllib2.urlopen("{}/acme/new-cert".format(CA), csr_data)          signed_der = resp.read()      except urllib2.HTTPError as e:          sys.stderr.write("Error: csr_data:\n") @@ -321,7 +303,7 @@ openssl dgst -sha256 -sign user.key -out {} {}          sys.stderr.write("\n")          raise -    #Step 12: Convert the signed cert from DER to PEM +    #Step 11: Convert the signed cert from DER to PEM      sys.stderr.write("Certificate signed!\n")      sys.stderr.write("You can stop running the python command on your server (Ctrl+C works).\n")      signed_der64 = base64.b64encode(signed_der) @@ -361,8 +343,9 @@ $ python sign_csr.py user.pub domain.csr > signed.crt  """)      parser.add_argument("pubkey_path", help="path to your account public key")      parser.add_argument("csr_path", help="path to your certificate signing request") +    parser.add_argument("--email", default=None, help="contact email, default is webmaster@<shortest_domain>")      args = parser.parse_args() -    signed_crt = sign_csr(args.pubkey_path, args.csr_path) +    signed_crt = sign_csr(args.pubkey_path, args.csr_path, email=args.email)      sys.stdout.write(signed_crt) | 
