Source code for octavia_tempest_plugin.common.cert_utils

# Copyright 2018 Rackspace US Inc.  All rights reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import datetime

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography import x509
from cryptography.x509.oid import NameOID


[docs] def generate_ca_cert_and_key(): """Creates a CA cert and key for testing. :returns: The cryptography CA cert and CA key objects. """ ca_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend()) subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Denial"), x509.NameAttribute(NameOID.LOCALITY_NAME, u"Corvallis"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"OpenStack"), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"Octavia"), x509.NameAttribute(NameOID.COMMON_NAME, u"ca_cert.example.com"), ]) ca_cert = x509.CertificateBuilder().subject_name( subject ).issuer_name( issuer ).public_key( ca_key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=10) ).add_extension( x509.SubjectAlternativeName([x509.DNSName(u"ca_cert.example.com")]), critical=False, ).add_extension( x509.BasicConstraints(ca=True, path_length=None), critical=True, ).add_extension( # KeyUsage(digital_signature, content_commitment, key_encipherment, # data_encipherment, key_agreement, key_cert_sign, crl_sign, # encipher_only, decipher_only) x509.KeyUsage(True, False, False, False, False, True, True, False, False), critical=True, ).sign(ca_key, hashes.SHA256(), default_backend()) return ca_cert, ca_key
[docs] def generate_server_cert_and_key(ca_cert, ca_key, server_uuid): """Creates a server cert and key for testing. :param ca_cert: A cryptography CA certificate (x509) object. :param ca_key: A cryptography CA key (x509) object. :param server_uuid: A UUID identifying the server. :returns: The cryptography server cert and key objects. """ server_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend()) subject = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Denial"), x509.NameAttribute(NameOID.LOCALITY_NAME, u"Corvallis"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"OpenStack"), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"Octavia"), x509.NameAttribute(NameOID.COMMON_NAME, u"{}.example.com".format( server_uuid)), ]) server_cert = x509.CertificateBuilder().subject_name( subject ).issuer_name( ca_cert.subject ).public_key( server_key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=10) ).add_extension( x509.SubjectAlternativeName( [x509.DNSName(u"{}.example.com".format(server_uuid))]), critical=False, ).add_extension( x509.BasicConstraints(ca=False, path_length=None), critical=True, ).add_extension( # KeyUsage(digital_signature, content_commitment, key_encipherment, # data_encipherment, key_agreement, key_cert_sign, crl_sign, # encipher_only, decipher_only) x509.KeyUsage(True, False, True, False, False, False, False, False, False), critical=True, ).sign(ca_key, hashes.SHA256(), default_backend()) return server_cert, server_key
[docs] def generate_client_cert_and_key(ca_cert, ca_key, client_uuid): """Creates a client cert and key for testing. :param ca_cert: A cryptography CA certificate (x509) object. :param ca_key: A cryptography CA key (x509) object. :param client_uuid: A UUID identifying the client. :returns: The cryptography server cert and key objects. """ client_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend()) subject = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Denial"), x509.NameAttribute(NameOID.LOCALITY_NAME, u"Corvallis"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"OpenStack"), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"Octavia"), x509.NameAttribute(NameOID.COMMON_NAME, u"{}".format(client_uuid)), ]) client_cert = x509.CertificateBuilder().subject_name( subject ).issuer_name( ca_cert.subject ).public_key( client_key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=10) ).add_extension( x509.BasicConstraints(ca=False, path_length=None), critical=True, ).add_extension( # KeyUsage(digital_signature, content_commitment, key_encipherment, # data_encipherment, key_agreement, key_cert_sign, crl_sign, # encipher_only, decipher_only) x509.KeyUsage(True, True, True, False, False, False, False, False, False), critical=True, ).sign(ca_key, hashes.SHA256(), default_backend()) return client_cert, client_key
[docs] def generate_pkcs12_bundle(server_cert, server_key): """Creates a pkcs12 formated bundle. :param server_cert: A cryptography certificate (x509) object. :param server_key: A cryptography key (x509) object. :returns: A pkcs12 bundle. """ p12 = pkcs12.serialize_key_and_certificates( b'', server_key, server_cert, cas=None, encryption_algorithm=NoEncryption()) return p12
[docs] def generate_certificate_revocation_list(ca_cert, ca_key, cert_to_revoke): """Create a certificate revocation list with a revoked certificate. :param ca_cert: A cryptography CA certificate (x509) object. :param ca_key: A cryptography CA key (x509) object. :param cert_to_revoke: A cryptography CA certificate (x509) object. :returns: A signed certificate revocation list. """ crl_builder = x509.CertificateRevocationListBuilder() crl_builder = crl_builder.issuer_name(ca_cert.subject) crl_builder = crl_builder.last_update(datetime.datetime.utcnow()) crl_builder = crl_builder.next_update(datetime.datetime.utcnow() + datetime.timedelta(1, 0, 0)) revoked_cert = x509.RevokedCertificateBuilder().serial_number( cert_to_revoke.serial_number ).revocation_date( datetime.datetime.utcnow() ).build(default_backend()) crl_builder = crl_builder.add_revoked_certificate(revoked_cert) return crl_builder.sign(private_key=ca_key, algorithm=hashes.SHA256(), backend=default_backend())