from kubernetes import client, config from kubernetes.client.rest import ApiException import os class KubernetesSecretsManager: def __init__(self): # This magic line automatically reads the KSA token mounted at # /var/run/secrets/kubernetes.io/serviceaccount/token inside the pod! config.load_incluster_config() self.api_instance = client.CoreV1Api() # Read namespace from standard K8s environment variable, default to 'default' self.namespace = os.environ.get('POD_NAMESPACE', 'default') def publish_unseal_keys_credentials(self, secret_name: str, unseal_keys: list): """Publish the unseal keys as a Kubernetes Secret Args: secret_name (str): The name of the Kubernetes Secret to create unseal_keys (list): The vault's unseal keys """ # Define the secret payload using string_data (K8s auto-base64 encodes it for us) secret_body = client.V1Secret( metadata=client.V1ObjectMeta(name=secret_name), type="Opaque", string_data={ f'unseal_key_{i}': key for i, key in enumerate(unseal_keys) } ) try: # Attempt to create the secret self.api_instance.create_namespaced_secret( namespace=self.namespace, body=secret_body ) print("[SUCCESS] Kubernetes Secret created.") except ApiException as e: # Status 409 means the Secret already exists if e.status == 409: print("[ERROR] Secret already exists.") else: print(f"[ERROR] Failed to publish K8s Secret: {e}") raise e def publish_root_token_credentials(self, secret_name: str, root_token: str): """Publish the root token as a Kubernetes Secret Args: secret_name (str): The name of the Kubernetes Secret to create root_token (str): The vault's root token """ # Define the secret payload using string_data (K8s auto-base64 encodes it for us) secret_body = client.V1Secret( metadata=client.V1ObjectMeta(name=secret_name), type="Opaque", string_data={ 'root_token': root_token } ) try: # Attempt to create the secret self.api_instance.create_namespaced_secret( namespace=self.namespace, body=secret_body ) print("[SUCCESS] Kubernetes Secret created.") except ApiException as e: # Status 409 means the Secret already exists if e.status == 409: print("[ERROR] Secret already exists.") else: print(f"[ERROR] Failed to publish K8s Secret: {e}") raise e def publish_approle_credentials(self, secret_name: str, role_id: str, secret_id: str): """ Creates or updates a Kubernetes Secret with the Vault AppRole credentials. """ print(f"Publishing AppRole credentials to Kubernetes Secret: {secret_name}") # Define the secret payload using string_data (K8s auto-base64 encodes it for us) secret_body = client.V1Secret( metadata=client.V1ObjectMeta(name=secret_name), type="Opaque", string_data={ "role_id": role_id, "secret_id": secret_id } ) try: # Attempt to create the secret self.api_instance.create_namespaced_secret( namespace=self.namespace, body=secret_body ) print("[SUCCESS] Kubernetes Secret created.") except ApiException as e: # Status 409 means the Secret already exists if e.status == 409: print("[INFO] Secret already exists. Patching with new credentials...") self.api_instance.patch_namespaced_secret( name=secret_name, namespace=self.namespace, body=secret_body ) print("[SUCCESS] Kubernetes Secret updated.") else: print(f"[ERROR] Failed to publish K8s Secret: {e}") raise e