diff --git a/pymdoccbor/__init__.py b/pymdoccbor/__init__.py index 3e2f46a..5becc17 100644 --- a/pymdoccbor/__init__.py +++ b/pymdoccbor/__init__.py @@ -1 +1 @@ -__version__ = "0.9.0" +__version__ = "1.0.0" diff --git a/pymdoccbor/mdoc/issuer.py b/pymdoccbor/mdoc/issuer.py index f56ad01..2295998 100644 --- a/pymdoccbor/mdoc/issuer.py +++ b/pymdoccbor/mdoc/issuer.py @@ -5,6 +5,8 @@ from datetime import datetime, timezone from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey from pycose.keys import CoseKey, EC2Key from typing import Union @@ -23,14 +25,15 @@ class MdocCborIssuer: """ def __init__( self, - key_label: str = None, - user_pin: str = None, - lib_path: str = None, - slot_id: int = None, + key_label: str | None = None, + user_pin: str | None = None, + lib_path: str | None = None, + slot_id: int | None = None, hsm: bool = False, - alg: str = None, - kid: str = None, + alg: str | None = None, + kid: str | None = None, private_key: Union[dict, CoseKey] = {}, + cert_info: dict | None = None, ): """ Initialize a new MdocCborIssuer @@ -67,16 +70,17 @@ def __init__( self.hsm = hsm self.alg = alg self.kid = kid + self.cert_info = cert_info def new( self, data: dict, doctype: str, - validity: dict = None, - devicekeyinfo: Union[dict, CoseKey, str] = None, - cert_path: str = None, - revocation: dict = None, - status: dict = None + validity: dict | None = None, + devicekeyinfo: dict | CoseKey | str | None = None, + cert_path: str | None = None, + revocation: dict | None = None, + status: dict | None = None ) -> dict: """ create a new mdoc with signed mso @@ -93,49 +97,86 @@ def new( """ if isinstance(devicekeyinfo, dict): devicekeyinfoCoseKeyObject = CoseKey.from_dict(devicekeyinfo) - devicekeyinfo = { - 1: devicekeyinfoCoseKeyObject.kty.identifier, - -1: devicekeyinfoCoseKeyObject.crv.identifier, - -2: devicekeyinfoCoseKeyObject.x, - -3: devicekeyinfoCoseKeyObject.y, - } + if devicekeyinfoCoseKeyObject.kty.identifier == 2: # EC2Key + devicekeyinfo = { + 1: devicekeyinfoCoseKeyObject.kty.identifier, + -1: devicekeyinfoCoseKeyObject.crv.identifier, + -2: devicekeyinfoCoseKeyObject.x, + -3: devicekeyinfoCoseKeyObject.y, + } + elif devicekeyinfoCoseKeyObject.kty.identifier == 1: # OKPKey + devicekeyinfo = { + 1: devicekeyinfoCoseKeyObject.kty.identifier, + -1: devicekeyinfoCoseKeyObject.crv.identifier, + -2: devicekeyinfoCoseKeyObject.x, + } + elif devicekeyinfoCoseKeyObject.kty.identifier == 3: # RSAKey + devicekeyinfo = { + 1: devicekeyinfoCoseKeyObject.kty.identifier, + -1: devicekeyinfoCoseKeyObject.n, + -2: devicekeyinfoCoseKeyObject.e, + } + else: + raise TypeError("Unsupported key type in devicekeyinfo") if isinstance(devicekeyinfo, str): device_key_bytes = base64.urlsafe_b64decode(devicekeyinfo.encode("utf-8")) - public_key:EllipticCurvePublicKey = serialization.load_pem_public_key(device_key_bytes) - curve_name = public_key.curve.name - curve_map = { - "secp256r1": 1, # NIST P-256 - "secp384r1": 2, # NIST P-384 - "secp521r1": 3, # NIST P-521 - "brainpoolP256r1": 8, # Brainpool P-256 - "brainpoolP384r1": 9, # Brainpool P-384 - "brainpoolP512r1": 10, # Brainpool P-512 - # Add more curve mappings as needed - } - curve_identifier = curve_map.get(curve_name) - - # Extract the x and y coordinates from the public key - x = public_key.public_numbers().x.to_bytes( - (public_key.public_numbers().x.bit_length() + 7) - // 8, # Number of bytes needed - "big", # Byte order - ) + public_key = serialization.load_pem_public_key(device_key_bytes) + + if isinstance(public_key, EllipticCurvePublicKey): + curve_name = public_key.curve.name + curve_map = { + "secp256r1": 1, # NIST P-256 + "secp384r1": 2, # NIST P-384 + "secp521r1": 3, # NIST P-521 + "brainpoolP256r1": 8, # Brainpool P-256 + "brainpoolP384r1": 9, # Brainpool P-384 + "brainpoolP512r1": 10, # Brainpool P-512 + # Add more curve mappings as needed + } + curve_identifier = curve_map.get(curve_name) - y = public_key.public_numbers().y.to_bytes( - (public_key.public_numbers().y.bit_length() + 7) - // 8, # Number of bytes needed - "big", # Byte order - ) + # Extract the x and y coordinates from the public key + x = public_key.public_numbers().x.to_bytes( + (public_key.public_numbers().x.bit_length() + 7) + // 8, # Number of bytes needed + "big", # Byte order + ) - devicekeyinfo = { - 1: 2, - -1: curve_identifier, - -2: x, - -3: y, - } + y = public_key.public_numbers().y.to_bytes( + (public_key.public_numbers().y.bit_length() + 7) + // 8, # Number of bytes needed + "big", # Byte order + ) - else: - devicekeyinfo: CoseKey = devicekeyinfo + devicekeyinfo = { + 1: 2, + -1: curve_identifier, + -2: x, + -3: y, + } + elif isinstance(public_key, Ed25519PublicKey): + devicekeyinfo = { + 1: 1, # OKPKey + -1: "Ed25519", # Curve identifier for Ed25519 + -2: public_key.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw + ) + } + elif isinstance(public_key, RSAPublicKey): + devicekeyinfo = { + 1: 3, # RSAKey + -1: public_key.public_numbers().n.to_bytes( + (public_key.public_numbers().n.bit_length() + 7) // 8, + "big" + ), + -2: public_key.public_numbers().e.to_bytes( + (public_key.public_numbers().e.bit_length() + 7) // 8, + "big" + ) + } + else: + raise TypeError("Loaded public key is not an EllipticCurvePublicKey") if self.hsm: msoi = MsoIssuer( @@ -149,7 +190,8 @@ def new( alg=self.alg, kid=self.kid, validity=validity, - revocation=revocation + revocation=revocation, + cert_info=self.cert_info ) else: @@ -159,10 +201,11 @@ def new( alg=self.alg, cert_path=cert_path, validity=validity, - revocation=revocation + revocation=revocation, + cert_info=self.cert_info ) - mso = msoi.sign(doctype=doctype, device_key=devicekeyinfo,valid_from=datetime.now(timezone.utc)) + mso = msoi.sign(doctype=doctype, device_key=devicekeyinfo, valid_from=datetime.now(timezone.utc)) mso_cbor = mso.encode( tag=False, diff --git a/pymdoccbor/mso/issuer.py b/pymdoccbor/mso/issuer.py index 43acf90..421bf3b 100644 --- a/pymdoccbor/mso/issuer.py +++ b/pymdoccbor/mso/issuer.py @@ -5,27 +5,24 @@ import uuid import logging -logger = logging.getLogger("pymdoccbor") - -from pycose.headers import Algorithm #, KID -from pycose.keys import CoseKey, EC2Key +from pycose.keys import CoseKey +from pycose.headers import Algorithm from pycose.messages import Sign1Message from typing import Union from pymdoccbor.exceptions import MsoPrivateKeyRequired from pymdoccbor import settings -from pymdoccbor.x509 import MsoX509Fabric +from pymdoccbor.x509 import selfsigned_x509cert from pymdoccbor.tools import shuffle_dict from cryptography import x509 from cryptography.hazmat.primitives import serialization from cryptography.x509 import Certificate -from cbor_diag import * - +logger = logging.getLogger("pymdoccbor") -class MsoIssuer(MsoX509Fabric): +class MsoIssuer: """ MsoIssuer helper class to create a new mso """ @@ -34,17 +31,18 @@ def __init__( self, data: dict, validity: dict, - cert_path: str = None, - key_label: str = None, - user_pin: str = None, - lib_path: str = None, - slot_id: int = None, - kid: str = None, - alg: str = None, - hsm: bool = False, - private_key: Union[dict, CoseKey] = None, - digest_alg: str = settings.PYMDOC_HASHALG, - revocation: dict = None + cert_path: str | None = None, + key_label: str | None = None, + user_pin: str | None = None, + lib_path: str | None = None, + slot_id: int | None = None, + kid: str | None = None, + alg: str | None = None, + hsm: bool | None = False, + private_key: dict | CoseKey | None = None, + digest_alg: str | None = settings.PYMDOC_HASHALG, + revocation: dict | None = None, + cert_info: dict | None = None, ) -> None: """ Initialize a new MsoIssuer @@ -64,17 +62,17 @@ def __init__( :param revocation: dict: revocation status dict to include in the mso, it may include status_list and identifier_list keys """ - if not hsm: - if private_key: - if isinstance(private_key, dict): - self.private_key = CoseKey.from_dict(private_key) - if not self.private_key.kid: - self.private_key.kid = str(uuid.uuid4()) - elif isinstance(private_key, CoseKey): - self.private_key = private_key - else: - raise ValueError("private_key must be a dict or CoseKey object") + if private_key: + if isinstance(private_key, dict): + self.private_key = CoseKey.from_dict(private_key) + if not self.private_key.kid: + self.private_key.kid = str(uuid.uuid4()) + elif isinstance(private_key, CoseKey): + self.private_key = private_key else: + raise ValueError("private_key must be a dict or CoseKey object") + else: + if not hsm: raise MsoPrivateKeyRequired("MSO Writer requires a valid private key") if not validity: @@ -85,9 +83,8 @@ def __init__( self.data: dict = data self.hash_map: dict = {} - self.cert_path = cert_path self.disclosure_map: dict = {} - self.digest_alg: str = digest_alg + self.digest_alg = digest_alg self.key_label = key_label self.user_pin = user_pin self.lib_path = lib_path @@ -98,9 +95,20 @@ def __init__( self.validity = validity self.revocation = revocation + self.cert_path = cert_path + self.cert_info = cert_info + + if not self.cert_path and (not self.cert_info or not self.private_key): + raise ValueError( + "cert_path or cert_info with a private key must be provided to properly insert a certificate" + ) + alg_map = {"ES256": "sha256", "ES384": "sha384", "ES512": "sha512"} - hashfunc = getattr(hashlib, alg_map.get(self.alg)) + if self.alg not in alg_map: + raise ValueError(f"Unsupported algorithm: {self.alg}") + + hashfunc = getattr(hashlib, alg_map[self.alg]) digest_cnt = 0 for ns, values in data.items(): @@ -157,9 +165,9 @@ def format_datetime_repr(self, dt: datetime.datetime) -> str: def sign( self, - device_key: Union[dict, None] = None, - valid_from: Union[None, datetime.datetime] = None, - doctype: str = None, + device_key: dict | None = None, + valid_from: datetime.datetime | None = None, + doctype: str | None = None, ) -> Sign1Message: """ Sign a mso and returns it @@ -230,7 +238,14 @@ def sign( raise Exception(f"Certificate at {self.cert_path} failed parse") _cert = cert.public_bytes(getattr(serialization.Encoding, "DER")) else: - _cert = self.selfsigned_x509cert() + if not self.cert_info: + raise ValueError("cert_info must be provided if cert_path is not set") + + logger.warning( + "A self-signed certificate will be created using the provided cert_info but this is not recommended for production use." + ) + + _cert = selfsigned_x509cert(self.cert_info, self.private_key) if self.hsm: # print("payload diganostic notation: \n",cbor2diag(cbor2.dumps(cbor2.CBORTag(24, cbor2.dumps(payload))))) diff --git a/pymdoccbor/settings.py b/pymdoccbor/settings.py index 054e1d4..1f43dcc 100644 --- a/pymdoccbor/settings.py +++ b/pymdoccbor/settings.py @@ -1,8 +1,5 @@ -import datetime import os -from datetime import timezone - COSEKEY_HAZMAT_CRV_MAP = { "secp256r1": "P_256", "secp384r1": "P_384", @@ -23,30 +20,8 @@ DIGEST_SALT_LENGTH = 32 - X509_DER_CERT = os.getenv("X509_DER_CERT", None) -# OR - -X509_COUNTRY_NAME = os.getenv('X509_COUNTRY_NAME', "US") -X509_STATE_OR_PROVINCE_NAME = os.getenv('X509_STATE_OR_PROVINCE_NAME', "California") -X509_LOCALITY_NAME = os.getenv('X509_LOCALITY_NAME', "San Francisco") -X509_ORGANIZATION_NAME = os.getenv('X509_ORGANIZATION_NAME', "My Company") -X509_COMMON_NAME = os.getenv('X509_COMMON_NAME', "mysite.com") - -X509_NOT_VALID_BEFORE = os.getenv('X509_NOT_VALID_BEFORE', datetime.datetime.now(timezone.utc)) -X509_NOT_VALID_AFTER_DAYS = os.getenv('X509_NOT_VALID_AFTER_DAYS', 10) -X509_NOT_VALID_AFTER = os.getenv( - 'X509_NOT_VALID_AFTER', - datetime.datetime.now(timezone.utc) + datetime.timedelta( - days=X509_NOT_VALID_AFTER_DAYS - ) -) - -X509_SAN_URL = os.getenv( - "X509_SAN_URL", "https://credential-issuer.example.org" -) - CBORTAGS_ATTR_MAP = { "birth_date": 1004, "expiry_date": 1004, diff --git a/pymdoccbor/tests/cert_data.py b/pymdoccbor/tests/cert_data.py new file mode 100644 index 0000000..624c3e4 --- /dev/null +++ b/pymdoccbor/tests/cert_data.py @@ -0,0 +1,13 @@ +from datetime import datetime, timezone, timedelta + +CERT_DATA ={ + "country_name": "US", + "state_or_province_name": "California", + "locality_name": "San Francisco", + "organization_name": "Micov", + "common_name": "My Company", + "san_url": "mysite.com", + "not_valid_before": datetime.now(timezone.utc) - timedelta(days=1), + "not_valid_after": datetime.now(timezone.utc) + timedelta(days=10), + "san_url": "https://credential-issuer.example.org" +} \ No newline at end of file diff --git a/pymdoccbor/tests/pkey.py b/pymdoccbor/tests/pkey.py index 97e3210..56eb0fc 100644 --- a/pymdoccbor/tests/pkey.py +++ b/pymdoccbor/tests/pkey.py @@ -1,4 +1,7 @@ import os +import base64 + +from pycose.keys import RSAKey PKEY = { 'KTY': 'EC2', @@ -6,4 +9,33 @@ 'ALG': 'ES256', 'D': b"<\xe5\xbc;\x08\xadF\x1d\xc5\x0czR'T&\xbb\x91\xac\x84\xdc\x9ce\xbf\x0b,\x00\xcb\xdd\xbf\xec\xa2\xa5", 'KID': b"demo-kid" +} + +def base64_urldecode(v: str) -> bytes: + """Urlsafe base64 decoding. This function will handle missing + padding symbols. + + :returns: the decoded data in bytes, format, convert to str use method '.decode("utf-8")' on result + :rtype: bytes + """ + padded = f"{v}{'=' * divmod(len(v), 4)[1]}" + return base64.urlsafe_b64decode(padded) + +decoded_x = base64_urldecode("dGLQBwQIPWjc2aA6zRc06wlNVxiw72PMwJlEXHEvP-E") +decoded_d = base64_urldecode("NOHGihpyjNa_xBSd17Wr4ynkSM-afunMgpoPoFkelhI") + +PKEY_ED25519 = { + 'KTY': 'OKP', + 'CURVE': 'Ed25519', + 'ALG': 'EdDSA', + 'D': decoded_d, + 'X': decoded_x, + 'KID': b"demo-kid-ed25519" +} + +PKEY_RSA = { + 'KTY': 'RSA', + 'E': b'\xd4\xf1\xf2o', + 'N': b'[_\x81\\6y3\xbf\x01\xad\xba\xe26&\xcb\xa2g\xff\x97\xa1rv\xa7\x9a{\xfb\x01r^S\xfb\xefY\xb4\x14\xcesz\x99H\x02\xaf\xf5\xab\x18_\xac\xaaR\x13Q\xe6\xa0\x9a\x8c\x8a\x1f\x13\x0b9\xf3\xbb\xe1\x0b\xb9<\xe7\xc0\xffU\xa0\xcb\x1aw\xf2/\x11\x0e\xea^\x98:cp\x1f3\xc9\x81\x93e\x81\xb4\xb20s\xa6\xaaV\xf3\x03y\xb3\xd9\x93i\x14\xa7\xafi.\x08\xdey\x15s-V\x10\xf0\x0f+:E\x10\xec\xca\x93\x17\xecg\xaf!\x11\xe7\x91\xcdG7)\x83\xc3\xdd\xc2xp\xb2v_\xf2l\xc9\xc7\x15r\xf9\xa1U\xe9`\xde\xf1\xa9\xc2\xb6\xde\xebc|\xef\xb0s,\x10\xa1l\x81&\xcc\xb9\xfa\xb6\xffs\x1a9\x0c[7\xafJ\x1c\xd5\xb6\xc7?\x1c\x8fN\x1a\xde\x7f\xa4\x8f\xf6,\xed\x89b\x87\xcaXL\x8e}\xa5K\x0b\x9a\x8c\xb2\xd2\x91\x0f\xedI\x8e\x8fYq#\x8c\xd1\x02\xe2B\xff\xf1\x1dT\x15\xb1I\xe8\xd8\xfc[\xd5Y\x9ab\xcc\xe3\xff\xac\xfa\x85', + 'KEY_OPS': ['SIGN'] } \ No newline at end of file diff --git a/pymdoccbor/tests/test_02_mdoc_issuer.py b/pymdoccbor/tests/test_02_mdoc_issuer.py index 41e8879..1e1e407 100644 --- a/pymdoccbor/tests/test_02_mdoc_issuer.py +++ b/pymdoccbor/tests/test_02_mdoc_issuer.py @@ -11,15 +11,8 @@ from pymdoccbor.mdoc.verifier import MdocCbor from pymdoccbor.mso.issuer import MsoIssuer from pymdoccbor.tests.pid_data import PID_DATA - - -PKEY = { - 'KTY': 'EC2', - 'CURVE': 'P_256', - 'ALG': 'ES256', - 'D': os.urandom(32), - 'KID': b"demo-kid" -} +from pymdoccbor.tests.cert_data import CERT_DATA +from pymdoccbor.tests.pkey import PKEY, PKEY_ED25519, PKEY_RSA def extract_mso(mdoc:dict): @@ -35,7 +28,8 @@ def test_mso_writer(): data=PID_DATA, private_key=PKEY, validity=validity, - alg = "ES256" + alg = "ES256", + cert_info=CERT_DATA ) assert "eu.europa.ec.eudiw.pid.1" in msoi.hash_map @@ -49,15 +43,13 @@ def test_mso_writer(): Sign1Message.decode(mso.encode()) - # TODO: assertion about the content - # breakpoint() - def test_mdoc_issuer(): validity = {"issuance_date": "2025-01-17", "expiry_date": "2025-11-13" } mdoci = MdocCborIssuer( private_key=PKEY, - alg = "ES256" + alg = "ES256", + cert_info=CERT_DATA ) with open("pymdoccbor/tests/certs/fake-cert.pem", "rb") as file: fake_cert_file = file.read() @@ -86,6 +78,92 @@ def test_mdoc_issuer(): mdoci.dump() mdoci.dumps() + # check mso content for status list + mso = extract_mso(mdoc) + status_list = mso["status"]["status_list"] + assert status_list["idx"] == 0 + assert status_list["uri"] == "https://issuer.com/statuslists" + cert_bytes = status_list["certificate"] + cert:Certificate = load_der_x509_certificate(cert_bytes) + assert "Test ASL Issuer" in cert.subject.rfc4514_string(), "ASL is not signed with the expected certificate" + +def test_mdoc_issuer_EdDSA(): + validity = {"issuance_date": "2025-01-17", "expiry_date": "2025-11-13" } + mdoci = MdocCborIssuer( + private_key=PKEY, + alg = "ES256", + cert_info=CERT_DATA + ) + with open("pymdoccbor/tests/certs/fake-cert.pem", "rb") as file: + fake_cert_file = file.read() + asl_signing_cert = x509.load_pem_x509_certificate(fake_cert_file) + _asl_signing_cert = asl_signing_cert.public_bytes(getattr(serialization.Encoding, "DER")) + status_list = { + "status_list": { + "idx": 0, + "uri": "https://issuer.com/statuslists", + "certificate": _asl_signing_cert, + } + } + mdoc = mdoci.new( + doctype="eu.europa.ec.eudiw.pid.1", + data=PID_DATA, + devicekeyinfo=PKEY_ED25519, + validity=validity, + revocation=status_list + ) + + mdocp = MdocCbor() + aa = cbor2.dumps(mdoc) + mdocp.loads(aa) + assert mdocp.verify() is True + + mdoci.dump() + mdoci.dumps() + + # check mso content for status list + mso = extract_mso(mdoc) + status_list = mso["status"]["status_list"] + assert status_list["idx"] == 0 + assert status_list["uri"] == "https://issuer.com/statuslists" + cert_bytes = status_list["certificate"] + cert:Certificate = load_der_x509_certificate(cert_bytes) + assert "Test ASL Issuer" in cert.subject.rfc4514_string(), "ASL is not signed with the expected certificate" + +def test_mdoc_issuer_RSA(): + validity = {"issuance_date": "2025-01-17", "expiry_date": "2025-11-13" } + mdoci = MdocCborIssuer( + private_key=PKEY, + alg = "ES256", + cert_info=CERT_DATA + ) + with open("pymdoccbor/tests/certs/fake-cert.pem", "rb") as file: + fake_cert_file = file.read() + asl_signing_cert = x509.load_pem_x509_certificate(fake_cert_file) + _asl_signing_cert = asl_signing_cert.public_bytes(getattr(serialization.Encoding, "DER")) + status_list = { + "status_list": { + "idx": 0, + "uri": "https://issuer.com/statuslists", + "certificate": _asl_signing_cert, + } + } + mdoc = mdoci.new( + doctype="eu.europa.ec.eudiw.pid.1", + data=PID_DATA, + devicekeyinfo=PKEY_RSA, + validity=validity, + revocation=status_list + ) + + mdocp = MdocCbor() + aa = cbor2.dumps(mdoc) + mdocp.loads(aa) + assert mdocp.verify() is True + + mdoci.dump() + mdoci.dumps() + # check mso content for status list mso = extract_mso(mdoc) status_list = mso["status"]["status_list"] diff --git a/pymdoccbor/tests/test_03_mdoc_issuer.py b/pymdoccbor/tests/test_03_mdoc_issuer.py index 34deb89..da06e03 100644 --- a/pymdoccbor/tests/test_03_mdoc_issuer.py +++ b/pymdoccbor/tests/test_03_mdoc_issuer.py @@ -3,10 +3,12 @@ from pymdoccbor.tests.micov_data import MICOV_DATA from pymdoccbor.tests.pid_data import PID_DATA from pymdoccbor.tests.pkey import PKEY +from pymdoccbor.tests.cert_data import CERT_DATA mdoc = MdocCborIssuer( private_key=PKEY, alg="ES256", + cert_info=CERT_DATA ) def test_MdocCborIssuer_creation(): diff --git a/pymdoccbor/tests/test_04_issuer_signed.py b/pymdoccbor/tests/test_04_issuer_signed.py index efd6969..c137a0c 100644 --- a/pymdoccbor/tests/test_04_issuer_signed.py +++ b/pymdoccbor/tests/test_04_issuer_signed.py @@ -3,11 +3,13 @@ from pymdoccbor.mdoc.issuer import MdocCborIssuer from pymdoccbor.tests.micov_data import MICOV_DATA from pymdoccbor.tests.pkey import PKEY +from pymdoccbor.tests.cert_data import CERT_DATA mdoc = MdocCborIssuer( private_key=PKEY, alg="ES256", + cert_info=CERT_DATA ) mdoc.new( data=MICOV_DATA, diff --git a/pymdoccbor/tests/test_05_mdoc_verifier.py b/pymdoccbor/tests/test_05_mdoc_verifier.py index 58fe29b..c1078df 100644 --- a/pymdoccbor/tests/test_05_mdoc_verifier.py +++ b/pymdoccbor/tests/test_05_mdoc_verifier.py @@ -3,6 +3,7 @@ from pymdoccbor.mdoc.issuer import MdocCborIssuer from pymdoccbor.tests.micov_data import MICOV_DATA from pymdoccbor.tests.pkey import PKEY +from pymdoccbor.tests.cert_data import CERT_DATA def test_verifier_must_fail_document_type(): try: @@ -20,6 +21,7 @@ def test_mobile_document(): mdoc = MdocCborIssuer( private_key=PKEY, alg="ES256", + cert_info=CERT_DATA ) mdoc.new( data=MICOV_DATA, @@ -41,7 +43,8 @@ def test_mobile_document(): def test_mobile_document_dump(): mdoc = MdocCborIssuer( private_key=PKEY, - alg="ES256" + alg="ES256", + cert_info=CERT_DATA ) mdoc.new( data=MICOV_DATA, @@ -65,7 +68,8 @@ def test_mobile_document_dump(): def test_mobile_document_dumps(): mdoc = MdocCborIssuer( private_key=PKEY, - alg="ES256" + alg="ES256", + cert_info=CERT_DATA ) mdoc.new( data=MICOV_DATA, @@ -89,7 +93,8 @@ def test_mobile_document_dumps(): def test_mobile_document_verify(): mdoc = MdocCborIssuer( private_key=PKEY, - alg="ES256" + alg="ES256", + cert_info=CERT_DATA ) mdoc.new( data=MICOV_DATA, diff --git a/pymdoccbor/tests/test_06_mso_issuer.py b/pymdoccbor/tests/test_06_mso_issuer.py index ec30cce..0bd31aa 100644 --- a/pymdoccbor/tests/test_06_mso_issuer.py +++ b/pymdoccbor/tests/test_06_mso_issuer.py @@ -3,6 +3,7 @@ from pymdoccbor.mso.issuer import MsoIssuer from pymdoccbor.tests.micov_data import MICOV_DATA from pymdoccbor.tests.pkey import PKEY +from pymdoccbor.tests.cert_data import CERT_DATA def test_mso_issuer_fail(): @@ -19,7 +20,8 @@ def test_mso_issuer_creation(): "issuance_date": "2024-12-31", "expiry_date": "2050-12-31" }, - alg="ES256" + alg="ES256", + cert_info=CERT_DATA ) assert msoi.private_key @@ -36,7 +38,8 @@ def test_mso_issuer_sign(): "issuance_date": "2024-12-31", "expiry_date": "2050-12-31" }, - alg="ES256" + alg="ES256", + cert_info=CERT_DATA ) mso = msoi.sign() diff --git a/pymdoccbor/tests/test_07_mso_verifier.py b/pymdoccbor/tests/test_07_mso_verifier.py index a385bc3..1bab826 100644 --- a/pymdoccbor/tests/test_07_mso_verifier.py +++ b/pymdoccbor/tests/test_07_mso_verifier.py @@ -1,16 +1,16 @@ -import os -from pycose.keys import CoseKey, EC2Key from pymdoccbor.mso.verifier import MsoVerifier from pymdoccbor.mdoc.issuer import MdocCborIssuer from pymdoccbor.tests.micov_data import MICOV_DATA from pycose.messages import CoseMessage from pymdoccbor.tests.pkey import PKEY +from pymdoccbor.tests.cert_data import CERT_DATA mdoc = MdocCborIssuer( private_key=PKEY, alg="ES256", + cert_info=CERT_DATA ) mdoc.new( diff --git a/pymdoccbor/tests/test_08_mdoc_cbor.py b/pymdoccbor/tests/test_08_mdoc_cbor.py index 390c007..e0cc121 100644 --- a/pymdoccbor/tests/test_08_mdoc_cbor.py +++ b/pymdoccbor/tests/test_08_mdoc_cbor.py @@ -4,11 +4,13 @@ from pymdoccbor.tests.micov_data import MICOV_DATA from pymdoccbor.mdoc.verifier import MdocCbor from pymdoccbor.tests.pkey import PKEY +from pymdoccbor.tests.cert_data import CERT_DATA def test_mdoc_cbor_creation(): mdoci = MdocCborIssuer( private_key=PKEY, alg="ES256", + cert_info=CERT_DATA ) mdoc = mdoci.new( data=MICOV_DATA, @@ -46,6 +48,7 @@ def test_mdoc_cbor_invalid_status(): mdoci = MdocCborIssuer( private_key=PKEY, alg="ES256", + cert_info=CERT_DATA ) try: diff --git a/pymdoccbor/x509.py b/pymdoccbor/x509.py index b939435..37f9919 100644 --- a/pymdoccbor/x509.py +++ b/pymdoccbor/x509.py @@ -1,62 +1,108 @@ -from cwt import COSEKey from typing import Union +from pycose.keys import CoseKey + +from typing import Any, Union from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.x509 import Certificate from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec, ed25519 -from pymdoccbor import settings - -class MsoX509Fabric: +def selfsigned_x509cert(cert_info: dict[str, Any], private_key: CoseKey, encoding: str = "DER") -> Union[Certificate, bytes]: """ - MsoX509Fabric helper class to create a new mso + Returns an X.509 certificate derived from the private key of the MSO Issuer + + :param cert_info: dict[str, Any]: the certificate information, should contain at least one of the following: + - country_name + - state_or_province_name + - locality_name + - organization_name + - common_name + - not_valid_before + - not_valid_after + - san_url + + :param private_key: CoseKey: the private key to use for signing the certificate + :param encoding: str: the encoding to use, default is DER + + :return: Union[Certificate, bytes]: the X.509 certificate """ - def selfsigned_x509cert(self, encoding: str = "DER") -> Union[Certificate, bytes]: - """ - Returns an X.509 certificate derived from the private key of the MSO Issuer - - :param encoding: str: the encoding to use, default is DER - - :return: Union[Certificate, bytes]: the X.509 certificate - """ - ckey = COSEKey.from_bytes(self.private_key.encode()) - - subject = issuer = x509.Name([ - x509.NameAttribute(NameOID.COUNTRY_NAME, settings.X509_COUNTRY_NAME), - x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, settings.X509_STATE_OR_PROVINCE_NAME), - x509.NameAttribute(NameOID.LOCALITY_NAME, settings.X509_LOCALITY_NAME), - x509.NameAttribute(NameOID.ORGANIZATION_NAME, settings.X509_ORGANIZATION_NAME), - x509.NameAttribute(NameOID.COMMON_NAME, settings.X509_COMMON_NAME), - ]) - cert = x509.CertificateBuilder().subject_name( - subject - ).issuer_name( - issuer - ).public_key( - ckey.key.public_key() - ).serial_number( - x509.random_serial_number() - ).not_valid_before( - settings.X509_NOT_VALID_BEFORE - ).not_valid_after( - settings.X509_NOT_VALID_AFTER - ).add_extension( + if not private_key: + raise ValueError("private_key must be set") + + # convert the private key to a cryptography private key instance + if hasattr(private_key, "kty") and private_key.kty is not None and hasattr(private_key.kty, "identifier"): + if private_key.kty.identifier == 2: # EC2Key + private_key_inst = ec.derive_private_key( + int.from_bytes(private_key['d'], byteorder="big"), ec.SECP256R1() + ) + elif private_key.kty.identifier == 1: # OKPKey + private_key_inst = ed25519.Ed25519PrivateKey.from_private_bytes( + private_key['d'] + ) + else: + raise ValueError(f"Unsupported key type: {private_key.kty}") + else: + raise ValueError("private_key.kty or private_key.kty.identifier is not set or unknown") + + + public_key_inst = private_key_inst.public_key() + + name_attributes = [] + if "country_name" in cert_info: + name_attributes.append(x509.NameAttribute(NameOID.COUNTRY_NAME, cert_info["country_name"])) + if "state_or_province_name" in cert_info: + name_attributes.append(x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, cert_info["state_or_province_name"])) + if "locality_name" in cert_info: + name_attributes.append(x509.NameAttribute(NameOID.LOCALITY_NAME, cert_info["locality_name"])) + if "organization_name" in cert_info: + name_attributes.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, cert_info["organization_name"])) + if "common_name" in cert_info: + name_attributes.append(x509.NameAttribute(NameOID.COMMON_NAME, cert_info["common_name"])) + + subject = issuer = x509.Name(name_attributes) + + cert_builder = x509.CertificateBuilder().subject_name( + subject + ).issuer_name( + issuer + ).public_key( + public_key_inst + ).serial_number( + x509.random_serial_number() + ) + + if "not_valid_before" in cert_info: + cert_builder = cert_builder.not_valid_before( + cert_info["not_valid_before"] + ) + + if "not_valid_after" in cert_info: + cert_builder = cert_builder.not_valid_after( + cert_info["not_valid_after"] + ) + + + if "san_url" in cert_info: + cert_builder = cert_builder.add_extension( x509.SubjectAlternativeName( [ x509.UniformResourceIdentifier( - settings.X509_SAN_URL + cert_info["san_url"] ) ] ), critical=False, # Sign our certificate with our private key - ).sign(ckey.key, hashes.SHA256()) + ) + + cert = cert_builder.sign(private_key_inst, hashes.SHA256()) - if not encoding: - return cert - else: - return cert.public_bytes( - getattr(serialization.Encoding, encoding) - ) + if not encoding: + return cert + else: + return cert.public_bytes( + getattr(serialization.Encoding, encoding) + ) diff --git a/setup.py b/setup.py index 0c3703e..b4c7076 100644 --- a/setup.py +++ b/setup.py @@ -37,7 +37,6 @@ def readme(): include_package_data=True, install_requires=[ "cbor2>=5.4.0,<5.5.0", - "cwt>=2.3.0,<2.4", "cbor-diag>=1.1.0,<1.2", "pycose>=1.0.1" ],