# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: AGPL-3.0-only
#
# Copyright © 2015-2018 Daniel Roesler <diafygi@gmail.com>
# Copyright © 2021-2022 siddharth ravikumar <s@ricketyspace.net>
#
import argparse
import subprocess
import json
import urllib.request
import sys
import base64
import binascii
import time
import hashlib
import re
from urllib.request import urlopen
from urllib.error import URLError
__version__ = "0.3.0"
CA_PRD = "https://acme-v02.api.letsencrypt.org"
CA_STG = "https://acme-staging-v02.api.letsencrypt.org"
CA_DIR = None
def _directory(ca_url):
global CA_DIR
if CA_DIR is None:
CA_DIR = json.loads(urlopen(ca_url + "/directory").read().decode("utf8"))
return CA_DIR
def _b64(b):
"Convert bytes to JWT base64 string"
if type(b) is str:
b = b.encode()
return base64.urlsafe_b64encode(b).decode().replace("=", "")
def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"):
"Runs external commands"
proc = subprocess.Popen(
cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = proc.communicate(cmd_input)
if proc.returncode != 0:
sys.stderr.write("{0}: {1}\n".format(err_msg, err.decode()))
sys.exit(1)
return out
def _do_request(url, data=None, err_msg="Error"):
try:
resp = urllib.request.urlopen(
urllib.request.Request(
url,
data=data,
headers={
"Content-Type": "application/jose+json",
"User-Agent": "acmens",
},
)
)
resp_data, code, headers = (
resp.read().decode("utf8"),
resp.getcode(),
resp.headers,
)
except URLError as e:
resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e)
code, headers = getattr(e, "code", None), {}
try:
resp_data = json.loads(resp_data) # try to parse json results
except ValueError:
pass # resp_data is not a JSON string; that's fine
return resp_data, code, headers
def _mk_signed_req_body(url, payload, nonce, auth, account_key):
if len(nonce) < 1:
sys.stderr.write("_mk_signed_req_body: nonce invalid: {}".format(nonce))
sys.exit(1)
payload64 = "" if payload is None else _b64(json.dumps(payload).encode("utf8"))
protected = {"url": url, "alg": "RS256", "nonce": nonce}
protected.update(auth)
protected64 = _b64(json.dumps(protected).encode("utf8"))
protected_input = "{0}.{1}".format(protected64, payload64).encode("utf8")
out = _cmd(
["openssl", "dgst", "-sha256", "-sign", account_key],
stdin=subprocess.PIPE,
cmd_input=protected_input,
err_msg="OpenSSL Error",
)
return json.dumps(
{"protected": protected64, "payload": payload64, "signature": _b64(out)}
)
def _send_signed_request(url, payload, nonce_url, auth, account_key, err_msg):
"""Make signed request to ACME endpoint"""
tried = 0
nonce = _do_request(nonce_url)[2]["Replay-Nonce"]
while True:
data = _mk_signed_req_body(url, payload, nonce, auth, account_key)
resp_data, resp_code, headers = _do_request(
url, data=data.encode("utf8"), err_msg=err_msg
)
if resp_code in [200, 201, 204]:
return resp_data, resp_code, headers
elif (
resp_code == 400
and resp_data.get("type", "") == "urn:ietf:params:acme:error:badNonce"
and tried < 100
):
nonce = headers.get("Replay-Nonce", "")
tried += 1
continue
else:
sys.stderr.write(
"{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(
err_msg, url, data, resp_code, resp_data
)
)
sys.exit(1)
def _poll_until_not(url, pending_statuses, nonce_url, auth, account_key, err_msg):
"""Poll until status is not in pending_statuses"""
result, t0 = None, time.time()
while result is None or result["status"] in pending_statuses:
assert time.time() - t0 < 3600, "Polling timeout" # 1 hour timeout
time.sleep(0 if result is None else 2)
result, _, _ = _send_signed_request(
url, None, nonce_url, auth, account_key, err_msg
)
return result
def _do_challenge(challenge_type, authz_url, nonce_url, auth, account_key, thumbprint):
"""Do ACME challenge"""
# Request challenges
sys.stderr.write("Requesting challenges...\n")
chl_result, chl_code, chl_headers = _send_signed_request(
authz_url, None, nonce_url, auth, account_key, "Error getting challenges"
)
domain = chl_result["identifier"]["value"]
# Choose challenge.
preferred_type = "dns-01" if challenge_type == "dns" else "http-01"
challenge = None
dns_challenge = None
http_challenge = None
for c in chl_result["challenges"]:
if c["type"] == preferred_type:
challenge = c
if c["type"] == "dns-01":
dns_challenge = c
if c["type"] == "http-01":
http_challenge = c
if challenge is None:
if http_challenge:
# Fallback to http challenge.
challenge = http_challenge
challenge_type = "http"
elif dns_challenge:
# Fallback to dns challenge.
challenge = dns_challenge
challenge_type = "dns"
else:
sys.stderr.write("Error: Unable to find challenges!")
sys.exit(1)
keyauthorization = "{0}.{1}".format(challenge["token"], thumbprint)
dns_payload = _b64(hashlib.sha256(keyauthorization.encode()).digest())
# Ask the user to host the token on their server
if challenge_type == "dns":
sys.stderr.write(
"""\
Please update your DNS for '{domain}' to have the following TXT record:
--------------
_acme-challenge IN TXT ( \"{keyauth}\" )
--------------
""".format(
domain=domain, keyauth=dns_payload
)
)
final_msg = "You can remove the _acme-challenge DNS TXT record now."
else:
# Challenge response for http server.
response_uri = ".well-known/acme-challenge/{0}".format(challenge["token"])
sys.stderr.write(
"""\
Please update your server to serve the following file at this URL:
--------------
URL: http://{domain}/{uri}
File contents: \"{token}\"
--------------
Notes:
- Do not include the quotes in the file.
- The file should be one line without any spaces.
""".format(
domain=domain, uri=response_uri, token=keyauthorization
)
)
final_msg = "You can remove the acme-challenge file from your webserver now."
stdout = sys.stdout
sys.stdout = sys.stderr
if challenge_type == "dns":
input("Press Enter when the TXT record is updated on the DNS...")
else:
input("Press Enter when you've got the file hosted on your server...")
sys.stdout = stdout
# Let the CA know you're ready for the challenge
sys.stderr.write("Requesting verification for {0}...\n".format(domain))
_send_signed_request(
challenge["url"],
{},
nonce_url,
auth,
account_key,
"Error requesting challenge verfication: {0}".format(domain),
)
chl_verification = _poll_until_not(
challenge["url"],
["pending"],
nonce_url,
auth,
account_key,
"Error checking challenge verification",
)
if chl_verification["status"] != "valid":
raise ValueError(
"Challenge did not pass for {0}: {1}".format(domain, chl_verification)
)
sys.stderr.write("{} verified!\n".format(domain))
sys.stderr.write("{}\n".format(final_msg))
def _agree_to(terms):
"""Asks user whether they agree to the Let's Encrypt Subscriber
Agreement. It will immediately exit if user does not agree."""
sys.stderr.write(
"\nDo you agree to the Let's Encrypt Subscriber Agreement\n({})? ".format(terms)
)
ans = input()
if re.search(r"^[Yy]", ans) is None:
sys.stderr.write("Error: Cannot continue. Exiting.\n")
sys.exit(1)
def sign_csr(ca_url, account_key, csr, email=None, challenge_type="http"):
"""Use the ACME protocol to get an ssl certificate signed by a
certificate authority.
:param string ca_url: Let's Encrypt endpoint.
:param string account_key: Path to the user account key.
:param string csr: Path to the certificate signing request.
:param string email: An optional user account contact email
(defaults to webmaster@<shortest_domain>)
:param string challenge_type: The challenge type to use.
(defaults to http)
:returns: Signed Certificate (PEM format)
:rtype: string
"""
# Step 1: Get account public key
sys.stderr.write("Reading pubkey file...\n")
out = _cmd(
["openssl", "rsa", "-in", account_key, "-noout", "-text"],
err_msg="Error reading account public key",
)
pub_hex, pub_exp = re.search(
r"modulus:[\s]+?00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
out.decode("utf8"),
re.MULTILINE | re.DOTALL,
).groups()
pub_mod = binascii.unhexlify(re.sub("(\s|:)", "", pub_hex))
pub_mod64 = _b64(pub_mod)
pub_exp = int(pub_exp)
pub_exp = "{0:x}".format(pub_exp)
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
pub_exp = binascii.unhexlify(pub_exp)
pub_exp64 = _b64(pub_exp)
jwk = {
"e": pub_exp64,
"kty": "RSA",
"n": pub_mod64,
}
accountkey_json = json.dumps(jwk, sort_keys=True, separators=(",", ":"))
thumbprint = _b64(hashlib.sha256(accountkey_json.encode()).digest())
sys.stderr.write("Found public key!\n")
# Step 2: Get the domain names to be certified
sys.stderr.write("Reading csr file...\n")
out = _cmd(
["openssl", "req", "-in", csr, "-noout", "-text"],
err_msg="Error loading {}".format(csr),
)
domains = set([])
cn = None
common_name = re.search("Subject:.*? CN *= *([^\s,;/]+)", out.decode("utf8"))
if common_name is not None:
domains.add(common_name.group(1))
cn = common_name.group(1)
subj_alt_names = re.search(
"X509v3 Subject Alternative Name: \n +([^\n]+)\n",
out.decode("utf8"),
re.MULTILINE | re.DOTALL,
)
if subj_alt_names is not None:
for san in subj_alt_names.group(1).split(", "):
if san.startswith("DNS:"):
dm = san[4:]
if cn is None and dm.find("*") == -1:
cn = dm
domains.add(dm)
sys.stderr.write("Found domains {}\n".format(", ".join(domains)))
# Step 3: Ask user for contact email
if not email:
default_email = "webmaster@{0}".format(cn)
stdout = sys.stdout
sys.stdout = sys.stderr
input_email = input(
"STEP 1: What is your contact email? ({0}) ".format(default_email)
)
email = input_email if input_email else default_email
sys.stdout = stdout
# Step 4: Generate the payload for registering user and initiate registration.
sys.stderr.write("Registering {0}...\n".format(email))
_agree_to(_directory(ca_url)["meta"]["termsOfService"])
reg = {"termsOfServiceAgreed": True}
nonce_url = _directory(ca_url)["newNonce"]
auth = {"jwk": jwk}
acct_headers = None
result, code, acct_headers = _send_signed_request(
_directory(ca_url)["newAccount"],
reg,
nonce_url,
auth,
account_key,
"Error registering",
)
if code == 201:
sys.stderr.write("Registered!\n")
else:
sys.stderr.write("Already registered!\n")
auth = {"kid": acct_headers["Location"]}
sys.stderr.write("Updating account...")
ua_result, ua_code, ua_headers = _send_signed_request(
acct_headers["Location"],
{"contact": ["mailto:{}".format(email)]},
nonce_url,
auth,
account_key,
"Error updating account",
)
sys.stderr.write("Done\n")
# Step 5: Request challenges for domains
sys.stderr.write("Making new order for {0}...\n".format(", ".join(domains)))
id = {"identifiers": []}
for domain in domains:
id["identifiers"].append({"type": "dns", "value": domain})
order, order_code, order_headers = _send_signed_request(
_directory(ca_url)["newOrder"],
id,
nonce_url,
auth,
account_key,
"Error creating new order",
)
for authz in order["authorizations"]:
_do_challenge(challenge_type, authz, nonce_url, auth, account_key, thumbprint)
# Step 8: Finalize
csr_der = _cmd(
["openssl", "req", "-in", csr, "-outform", "DER"], err_msg="DER Export Error"
)
fnlz_resp, fnlz_code, fnlz_headers = _send_signed_request(
order["finalize"],
{"csr": _b64(csr_der)},
nonce_url,
auth,
account_key,
"Error finalizing order",
)
# Step 9: Wait for CA to mark test as valid
sys.stderr.write("Waiting for {0} challenge to pass...\n".format(cn))
order = _poll_until_not(
order_headers["Location"],
["pending", "processing"],
nonce_url,
auth,
account_key,
"Error checking order status",
)
if order["status"] == "valid":
sys.stderr.write("Passed {0} challenge!\n".format(cn))
else:
raise ValueError("'{0}' challenge did not pass: {1}".format(cn, order))
# Step 10: Get the certificate.
sys.stderr.write("Getting certificate...\n")
signed_pem, _, _ = _send_signed_request(
order["certificate"],
None,
nonce_url,
auth,
account_key,
"Error getting certificate",
)
sys.stderr.write("Received certificate!\n")
return signed_pem
def revoke_crt(ca_url, account_key, crt):
"""Use the ACME protocol to revoke an ssl certificate signed by a
certificate authority.
:param string ca_url: Let's Encrypt endpoint.
:param string account_key: Path to your Let's Encrypt account private key.
:param string crt: Path to the signed certificate.
"""
def _a64(a):
"Shortcut function to go from jwt base64 string to bytes"
return base64.urlsafe_b64decode(str(a + ("=" * (len(a) % 4))))
# Step 1: Get account public key
sys.stderr.write("Reading pubkey file...\n")
out = _cmd(
["openssl", "rsa", "-in", account_key, "-noout", "-text"],
err_msg="Error reading account public key",
)
pub_hex, pub_exp = re.search(
r"modulus:[\s]+?00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
out.decode("utf8"),
re.MULTILINE | re.DOTALL,
).groups()
pub_mod = binascii.unhexlify(re.sub("(\s|:)", "", pub_hex))
pub_mod64 = _b64(pub_mod)
pub_exp = int(pub_exp)
pub_exp = "{0:x}".format(pub_exp)
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
pub_exp = binascii.unhexlify(pub_exp)
pub_exp64 = _b64(pub_exp)
jwk = {
"e": pub_exp64,
"kty": "RSA",
"n": pub_mod64,
}
sys.stderr.write("Found public key!\n")
# Step 2: Get account info.
sys.stderr.write("Getting account info...\n")
reg = {"onlyReturnExisting": True}
nonce_url = _directory(ca_url)["newNonce"]
auth = {"jwk": jwk}
acct_headers = None
result, code, acct_headers = _send_signed_request(
_directory(ca_url)["newAccount"],
reg,
nonce_url,
auth,
account_key,
"Error getting account info",
)
auth = {"kid": acct_headers["Location"]}
# Step 3: Generate the payload.
crt_der = _cmd(
["openssl", "x509", "-in", crt, "-outform", "DER"], err_msg="DER export error"
)
crt_der64 = _b64(crt_der)
rvk_payload = {
"certificate": crt_der64,
}
_send_signed_request(
_directory(ca_url)["revokeCert"],
rvk_payload,
nonce_url,
auth,
account_key,
"Error revoking certificate",
)
sys.stderr.write("Certificate revoked!\n")
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""acmens may be used for getting a new SSL certificate, renewing a
SSL certificate for a domain, and revoking a certificate for a domain.
It's meant to be run locally from your computer.""",
)
parser.add_argument("--version", action="store_true", help="Show version and exit")
parser.add_argument(
"--revoke", action="store_true", help="Revoke a signed certificate"
)
parser.add_argument(
"--stage", action="store_true", help="Use Let's Encrypt's staging endpoint"
)
parser.add_argument(
"-k",
"--account-key",
help="path to your Let's Encrypt account private key",
)
parser.add_argument(
"-e",
"--email",
default=None,
help="contact email, default is webmaster@<shortest_domain>",
)
parser.add_argument(
"-c",
"--challenge",
default="http",
help="Challenge type (http or dns), default is http",
)
parser.add_argument("--csr", help="path to your certificate signing request")
parser.add_argument("--crt", help="path to your signed certificate")
args = parser.parse_args()
if args.version:
print("acmens v{}".format(__version__))
sys.exit(0)
if args.account_key is None:
sys.stderr.write("Error: Path account key is required\n")
sys.exit(1)
if (not args.revoke) and (args.csr is None):
sys.stderr.write("Error: Path to CSR required\n")
sys.exit(1)
if args.revoke and args.crt is None:
sys.stderr.write("Error: Path to signed cert required\n")
sys.exit(1)
ca_url = CA_PRD
if args.stage:
ca_url = CA_STG
if args.revoke:
revoke_crt(ca_url, args.account_key, args.crt)
else:
signed_crt = sign_csr(
ca_url,
args.account_key,
args.csr,
email=args.email,
challenge_type=args.challenge,
)
sys.stdout.write(signed_crt)