All checks were successful
Build and deploy Bridgeman Accessible Hashicorp Vault Implementation / deploy (push) Successful in 2m48s
117 lines
No EOL
4.3 KiB
Python
117 lines
No EOL
4.3 KiB
Python
from kubernetes import client, config
|
|
from kubernetes.client.rest import ApiException
|
|
import os
|
|
|
|
class KubernetesSecretManager:
|
|
def __init__(self):
|
|
# This magic line automatically reads the KSA token mounted at
|
|
# /var/run/secrets/kubernetes.io/serviceaccount/token inside the pod!
|
|
config.load_incluster_config()
|
|
self.api_instance = client.CoreV1Api()
|
|
|
|
# Read namespace from standard K8s environment variable, default to 'default'
|
|
self.namespace = os.environ.get('POD_NAMESPACE', 'default')
|
|
|
|
def publish_unseal_keys_credentials(self, secret_name: str, unseal_keys: list):
|
|
"""Publish the unseal keys as a Kubernetes Secret
|
|
|
|
Args:
|
|
secret_name (str): The name of the Kubernetes Secret to create
|
|
unseal_keys (list): The vault's unseal keys
|
|
"""
|
|
|
|
# Define the secret payload using string_data (K8s auto-base64 encodes it for us)
|
|
secret_body = client.V1Secret(
|
|
metadata=client.V1ObjectMeta(name=secret_name),
|
|
type="Opaque",
|
|
string_data={
|
|
f'unseal_key_{i}': key for i, key in enumerate(unseal_keys)
|
|
}
|
|
)
|
|
|
|
try:
|
|
# Attempt to create the secret
|
|
self.api_instance.create_namespaced_secret(
|
|
namespace=self.namespace,
|
|
body=secret_body
|
|
)
|
|
print("[SUCCESS] Kubernetes Secret created.")
|
|
|
|
except ApiException as e:
|
|
# Status 409 means the Secret already exists
|
|
if e.status == 409:
|
|
print("[ERROR] Secret already exists.")
|
|
else:
|
|
print(f"[ERROR] Failed to publish K8s Secret: {e}")
|
|
raise e
|
|
|
|
def publish_root_token_credentials(self, secret_name: str, root_token: str):
|
|
"""Publish the root token as a Kubernetes Secret
|
|
|
|
Args:
|
|
secret_name (str): The name of the Kubernetes Secret to create
|
|
root_token (str): The vault's root token
|
|
"""
|
|
|
|
# Define the secret payload using string_data (K8s auto-base64 encodes it for us)
|
|
secret_body = client.V1Secret(
|
|
metadata=client.V1ObjectMeta(name=secret_name),
|
|
type="Opaque",
|
|
string_data={
|
|
'root_token': root_token
|
|
}
|
|
)
|
|
|
|
try:
|
|
# Attempt to create the secret
|
|
self.api_instance.create_namespaced_secret(
|
|
namespace=self.namespace,
|
|
body=secret_body
|
|
)
|
|
print("[SUCCESS] Kubernetes Secret created.")
|
|
|
|
except ApiException as e:
|
|
# Status 409 means the Secret already exists
|
|
if e.status == 409:
|
|
print("[ERROR] Secret already exists.")
|
|
else:
|
|
print(f"[ERROR] Failed to publish K8s Secret: {e}")
|
|
raise e
|
|
|
|
def publish_approle_credentials(self, secret_name: str, role_id: str, secret_id: str):
|
|
"""
|
|
Creates or updates a Kubernetes Secret with the Vault AppRole credentials.
|
|
"""
|
|
print(f"Publishing AppRole credentials to Kubernetes Secret: {secret_name}")
|
|
|
|
# Define the secret payload using string_data (K8s auto-base64 encodes it for us)
|
|
secret_body = client.V1Secret(
|
|
metadata=client.V1ObjectMeta(name=secret_name),
|
|
type="Opaque",
|
|
string_data={
|
|
"role_id": role_id,
|
|
"secret_id": secret_id
|
|
}
|
|
)
|
|
|
|
try:
|
|
# Attempt to create the secret
|
|
self.api_instance.create_namespaced_secret(
|
|
namespace=self.namespace,
|
|
body=secret_body
|
|
)
|
|
print("[SUCCESS] Kubernetes Secret created.")
|
|
|
|
except ApiException as e:
|
|
# Status 409 means the Secret already exists
|
|
if e.status == 409:
|
|
print("[INFO] Secret already exists. Patching with new credentials...")
|
|
self.api_instance.patch_namespaced_secret(
|
|
name=secret_name,
|
|
namespace=self.namespace,
|
|
body=secret_body
|
|
)
|
|
print("[SUCCESS] Kubernetes Secret updated.")
|
|
else:
|
|
print(f"[ERROR] Failed to publish K8s Secret: {e}")
|
|
raise e |