cryptography 学习笔记

这篇博客介绍了如何利用Python的cryptography库生成RSA私钥和CSR,然后对CSR进行签名。接着展示了如何通过ZeroSSL API自动化请求并下载SSL证书,包括设置私钥、创建CSR、验证域名以及保存证书到Apache配置路径。博客内容涵盖了Python编程、加密技术以及自动化证书管理。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

cryptography 学习笔记,通过简单的学习喜欢上了这个库。

使用 cryptography 生成私钥、 CSR,并通过私钥对CSR进行签名。

from os import path

from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes

# 私钥文件地址
private_key_file = './key.pem'
# 私钥密码
password = b"passphrase"


def generate_private_key() -> RSAPrivateKey:
    """
    生成私钥
    :return: 私钥
    """
    # Generate our key
    key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
    )
    # Write our key to disk for safe keeping
    with open(private_key_file, "wb") as f:
        f.write(key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.BestAvailableEncryption(b"passphrase"),
        ))
    return key


def sign_csr(key: RSAPrivateKey) -> str:
    """
    对csr进行签名
    :param key: 私钥
    :return: 签名后的CSR
    """
    # Generate a CSR
    csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
        # Provide various details about who we are.
        x509.NameAttribute(NameOID.COUNTRY_NAME, u"CN"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Hubei"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, u"Wuhan"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Wuhan Test Cloud Information Technology Co., Ltd. "),
        x509.NameAttribute(NameOID.COMMON_NAME, u"*.test.com"),
    ])).add_extension(
        x509.SubjectAlternativeName([
            # Describe what sites we want this certificate for.
            x509.DNSName(u"test.com"),
            x509.DNSName(u"*.test.com"),
        ]),
        critical=False,
        # Sign the CSR with our private key.
    ).sign(key, hashes.SHA256())
    # Write our CSR out to disk.
    cert_csr: bytes = csr.public_bytes(serialization.Encoding.PEM)
    # with open("./csr.pem", "wb") as f:
    #     f.write(certificate_csr)
    return cert_csr.decode('utf-8')


# 私钥数据
if path.exists(private_key_file):
    private_key_file = open(private_key_file, 'r')
    private_key = load_pem_private_key(private_key_file.read().encode(), password)
else:
    private_key = our_generate_private_key()
certificate_csr = sign_csr(private_key).replace('\n', '')
print(certificate_csr)

结合前面的代码,自动生气ZeroSSL证书:

import os

import requests
import json
from urllib3.exceptions import InsecureRequestWarning
from pathlib import Path
import time
import shutil
import glob
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes

# Suppress https warning (Burp)
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)


class SSLCertReNew(object):

    def __init__(self):
        self.url = 'https://api.zerossl.com'
        # for testing purposes with Burp
        # self.proxies = { 'http' : 'http://127.0.0.1:8080', 'https' : 'http://127.0.0.1:8080' }
        self.proxies = None
        self.apiKey = 'API_KEY_HERE'  # https://app.zerossl.com/developer
        self.certificateDomain = 'example.me'
        self.private_key = self.our_generate_private_key()
        self.csr = self.create_csr().replace('\n', '')
        self.certHash = None
        self.HttpsUrl = None
        self.HttpsContent = None
        # run steps
        self.initial_request()
        self.verification_methods()
        if self.status == 0:
            time.sleep(10)
        else:
            self.download_and_save()

    def our_generate_private_key(self) -> RSAPrivateKey:
        """
        生成私钥
        :return: 私钥
        """
        # Generate our key
        key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
        )
        # Write our key to disk for safe keeping
        # with open(private_key_file, "wb") as f:
        #     f.write(key.private_bytes(
        #         encoding=serialization.Encoding.PEM,
        #         format=serialization.PrivateFormat.TraditionalOpenSSL,
        #         encryption_algorithm=serialization.BestAvailableEncryption(b"passphrase"),
        #     ))

        return key

    def create_csr(self) -> str:
        """
        对csr进行签名
        :return: 签名后的CSR
        """
        # Generate a CSR
        csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
            # Provide various details about who we are.
            x509.NameAttribute(NameOID.COUNTRY_NAME, u"CN"),
            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Hubei"),
            x509.NameAttribute(NameOID.LOCALITY_NAME, u"wuhan"),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Test Cloud Information Technology Co., Ltd. "),
            x509.NameAttribute(NameOID.COMMON_NAME, u"example.me"),
        ])).add_extension(
            x509.SubjectAlternativeName([
                # Describe what sites we want this certificate for.
                x509.DNSName(u"example.me"),
                x509.DNSName(u"test.example.me"),
            ]),
            critical=False,
            # Sign the CSR with our private key.
        ).sign(self.private_key, hashes.SHA256())
        # Write our CSR out to disk.
        cert_csr: bytes = csr.public_bytes(serialization.Encoding.PEM)
        # with open("./csr.pem", "wb") as f:
        #     f.write(certificate_csr)
        return cert_csr.decode('utf-8')

    def initial_request(self):
        response = requests.post(self.url + f'/certificates?access_key={self.apiKey}',
                                 proxies=self.proxies,
                                 data={'certificate_domains': self.certificateDomain,
                                       'certificate_validity_days': 90,  # 证书有效期 90 天
                                       'certificate_csr': self.csr}
                                 )
        result = json.loads(response.text)
        self.certHash = result['id']
        # url from json
        self.HttpsUrl = result['validation']['other_methods'][f'{self.certificateDomain}']['file_validation_url_https']
        self.HttpsContent = result['validation']['other_methods'][f'{self.certificateDomain}'][
            'file_validation_content']
        self.dirOne = self.HttpsUrl.split('/')[-3]
        self.dirTwo = self.HttpsUrl.split('/')[-2]
        self.fileName = self.HttpsUrl.split('/')[-1]
        # create directories
        Path(f'/var/www/{self.certificateDomain}/{self.dirOne}/{self.dirTwo}').mkdir(parents=True, exist_ok=True)
        # save file
        # convert array into string with newline
        string = '\n'.join(
            result['validation']['other_methods'][f'{self.certificateDomain}']['file_validation_content'])
        f = open(f'/var/www/{self.certificateDomain}/{self.dirOne}/{self.dirTwo}/{self.fileName}', 'w')
        f.write(string)
        f.close()

    def verification_methods(self):
        response = requests.post(self.url + f'/certificates/{self.certHash}/challenges?access_key={self.apiKey}',
                                 proxies=self.proxies, data={'validation_method': 'HTTPS_CSR_HASH'})

    def verification_status(self):
        response = requests.post(self.url + f'/certificates/{self.certHash}/status?access_key={self.apiKey}',
                                 proxies=self.proxies)
        result = json.loads(response.text)
        self.status = result['validation_completed']

    def download_and_save(self):
        response = requests.get(self.url + f'/certificates/{self.certHash}/download/return?access_key={self.apiKey}',
                                verify=False)
        result = json.loads(response.text)

        ca_bundle = result['ca_bundle.crt']
        cert = result['certificate.crt']

        f = open(f'/etc/apache2/ssl/{self.certificateDomain}_cert.pem', 'w+')
        f.write(cert)
        f.close()

        f = open(f'/etc/apache2/ssl/{self.certificateDomain}_ca.pem', 'w+')
        f.write(ca_bundle)
        f.close()

        # move private key
        shutil.move(f'{self.certificateDomain}_key.pem', f'/etc/apache2/ssl/{self.certificateDomain}_key.pem')

        # delete files in /var/www/site/.wellknown/pki-verification
        files = glob.glob(f'/var/www/{self.certificateDomain}/{self.dirOne}/{self.dirTwo}/*')
        for f in files:
            os.remove(f)


obj = SSLCertReNew()

官方网:https://github.com/pyca/cryptography

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值