|
| 1 | +# ------------------------------------ |
| 2 | +# Copyright (c) Microsoft Corporation. |
| 3 | +# Licensed under the MIT License. |
| 4 | +# ------------------------------------ |
| 5 | +import asyncio |
| 6 | +import base64 |
| 7 | +import os |
| 8 | +from azure.identity.aio import DefaultAzureCredential |
| 9 | +from azure.keyvault.certificates.aio import CertificateClient |
| 10 | +from azure.keyvault.certificates import CertificatePolicy |
| 11 | +from azure.keyvault.secrets.aio import SecretClient |
| 12 | +from azure.core.exceptions import HttpResponseError |
| 13 | +from cryptography.hazmat.primitives.serialization import pkcs12 |
| 14 | + |
| 15 | +# ---------------------------------------------------------------------------------------------------------- |
| 16 | +# Prerequisites: |
| 17 | +# 1. An Azure Key Vault. (https://docs.microsoft.com/en-us/azure/key-vault/quick-create-cli) |
| 18 | +# |
| 19 | +# 2. A service principal with certificate get, delete, and purge permissions, as well as secret get |
| 20 | +# permissions. |
| 21 | +# |
| 22 | +# 3. azure-keyvault-certificates, azure-keyvault-secrets, azure-identity, and cryptography (v3.3+) packages |
| 23 | +# (pip install these). |
| 24 | +# |
| 25 | +# 4. Set Environment variables AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET, VAULT_URL. (See |
| 26 | +# https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-certificates#authenticate-the-client) |
| 27 | +# |
| 28 | +# ---------------------------------------------------------------------------------------------------------- |
| 29 | +# Sample - demonstrates how to get the private key of an existing Key Vault certificate |
| 30 | +# |
| 31 | +# 1. Create a new certificate (CertificateClient.create_certificate) |
| 32 | +# |
| 33 | +# 2. Get a certificate secret (SecretClient.get_secret) |
| 34 | +# |
| 35 | +# 3. Delete a certificate (CertificateClient.delete_certificate) |
| 36 | +# |
| 37 | +# 4. Purge a certificate (CertificateClient.purge_deleted_secret) |
| 38 | +# |
| 39 | +# ---------------------------------------------------------------------------------------------------------- |
| 40 | + |
| 41 | +async def run_sample(): |
| 42 | + # Instantiate a certificate client that will be used to call the service. |
| 43 | + # Notice that the client is using default Azure credentials. |
| 44 | + # To make default credentials work, ensure that environment variables 'AZURE_CLIENT_ID', |
| 45 | + # 'AZURE_CLIENT_SECRET' and 'AZURE_TENANT_ID' are set with the service principal credentials. |
| 46 | + VAULT_URL = os.environ["VAULT_URL"] |
| 47 | + credential = DefaultAzureCredential() |
| 48 | + certificate_client = CertificateClient(vault_url=VAULT_URL, credential=credential) |
| 49 | + |
| 50 | + # Instantiate a secret client that will be used to call the service. |
| 51 | + # Notice that this client can reuse the credential object created above. |
| 52 | + secret_client = SecretClient(vault_url=VAULT_URL, credential=credential) |
| 53 | + try: |
| 54 | + # Let's create a certificate in the vault. |
| 55 | + # If the certificate already exists in the Key Vault, then a new version of the certificate is created. |
| 56 | + print("\n.. Create certificate") |
| 57 | + |
| 58 | + # Before creating your certificate, let's create the management policy for your certificate. |
| 59 | + # Here we use the default policy. |
| 60 | + cert_name = "PrivateKeyCertificate" |
| 61 | + cert_policy = CertificatePolicy.get_default() |
| 62 | + |
| 63 | + # Awaiting create_certificate will return the certificate as a KeyVaultCertificate |
| 64 | + # if creation is successful, and the CertificateOperation if not. |
| 65 | + created_certificate = await certificate_client.create_certificate( |
| 66 | + certificate_name=cert_name, policy=cert_policy |
| 67 | + ) |
| 68 | + print("Certificate with name '{}' was created".format(created_certificate.name)) |
| 69 | + |
| 70 | + # Key Vault also creates a secret with the same name as the created certificate. |
| 71 | + # This secret contains protected information about the certificate, such as its private key. |
| 72 | + print("\n.. Get a secret by name") |
| 73 | + certificate_secret = await secret_client.get_secret(name=cert_name) |
| 74 | + print("Certificate secret with name '{}' was found.".format(certificate_secret.name)) |
| 75 | + |
| 76 | + # Now we can extract the private key and public certificate from the secret using the cryptography |
| 77 | + # package. `additional_certificates` will be empty since the secret only contains one certificate. |
| 78 | + # This example shows how to parse a certificate in PKCS12 format since it's the default in Key Vault, |
| 79 | + # but PEM certificates are supported as well. With a PEM certificate, you could use load_pem_private_key |
| 80 | + # in place of load_key_and_certificates. |
| 81 | + cert_bytes = base64.b64decode(certificate_secret.value) |
| 82 | + private_key, public_certificate, additional_certificates = pkcs12.load_key_and_certificates( |
| 83 | + data=cert_bytes, |
| 84 | + password=None |
| 85 | + ) |
| 86 | + print("Certificate with name '{}' was parsed.".format(certificate_secret.name)) |
| 87 | + |
| 88 | + # Now we can clean up the vault by deleting, then purging, the certificate. |
| 89 | + print("\n.. Delete certificate") |
| 90 | + deleted_certificate = await certificate_client.delete_certificate(certificate_name=cert_name) |
| 91 | + print("Certificate with name '{}' was deleted.".format(deleted_certificate.name)) |
| 92 | + |
| 93 | + await certificate_client.purge_deleted_certificate(certificate_name=deleted_certificate.name) |
| 94 | + print("Certificate with name '{}' is being purged.".format(deleted_certificate.name)) |
| 95 | + |
| 96 | + except HttpResponseError as e: |
| 97 | + print("\nrun_sample has caught an error. {}".format(e.message)) |
| 98 | + |
| 99 | + finally: |
| 100 | + print("\nrun_sample done") |
| 101 | + await credential.close() |
| 102 | + await certificate_client.close() |
| 103 | + await secret_client.close() |
| 104 | + |
| 105 | + |
| 106 | +if __name__ == "__main__": |
| 107 | + try: |
| 108 | + loop = asyncio.get_event_loop() |
| 109 | + loop.run_until_complete(run_sample()) |
| 110 | + loop.close() |
| 111 | + |
| 112 | + except Exception as e: |
| 113 | + print("Top level error: {}".format(str(e))) |
0 commit comments