custom-hashicorp-vault/setup-scripts/prod-setup.py
2025-12-28 15:43:17 -06:00

202 lines
No EOL
7.8 KiB
Python

import os, json
from CommandRunner import CommandRunner
class Initializer:
def check_if_initialized(self) -> bool:
"""Check if the vault is already initialized
Returns:
bool: If the vault is initialized or not
"""
# Get the status of the vault
# Note, because it returns a non-zero exit code when the vault is uninitialized, we set check to False
# Which is also why we need to check the return code manually
init_status_returncode, init_status_raw, init_status_err = CommandRunner.run_command('vault status -format=json', False)
# Verify the return code
# Note, because there can be other meanings for return (ex. 2 for Sealed), we don't use a hard check here.
if init_status_returncode != 0 and init_status_returncode != 2:
print(f'[WARNING] Status Return Code (for Init Check): {init_status_returncode}')
#raise RuntimeError('Failed to get the initialization status of the vault')
# Print the raw status
#print(init_status_raw)
# Parse the initialized stat from the status
init_status = json.loads(init_status_raw)['initialized']
print(f'Is Initialized: {init_status}')
return init_status
def check_root_token_and_unseal_keys_files_exist(self) -> bool:
"""Check if the root token and unseal keys files exist
Returns:
bool: If both files exist
"""
root_token_exists = os.path.exists('/vault/creds/root-token')
unseal_keys_exists = os.path.exists('/vault/creds/unseal-keys')
return root_token_exists and unseal_keys_exists
def create_unseal_keys_file(self, file = '/vault/creds/unseal-keys'):
"""Write the vault's unseal keys to a file
Args:
file (str, optional): The path of the file to output. Defaults to '/vault/creds/unseal-keys'.
"""
with open(file, 'w+') as f:
f.write('\n'.join(self.unseal_keys))
def create_root_token_file(self, file = '/vault/creds/root-token'):
"""Write the vault's root token to a file
Args:
file (str, optional): The path of the file to output. Defaults to '/vault/creds/root-token'.
"""
with open(file, 'w+') as f:
f.write(self.root_token)
def init_vault(self):
"""Initialize vault
This includes creating the root token and unseal keys.
Which we want to store in case we need them later
"""
print('*----------------------*')
print('| Initialization Vault |')
print('*----------------------*')
# Initialize the vault
return_code, init_output, init_err = CommandRunner.run_command('vault operator init -format=json')
# Parse the unseal keys and root token from the initialization response
self.unseal_keys = json.loads(init_output)['unseal_keys_b64']
self.root_token = json.loads(init_output)['root_token']
# UPDATE: Is mounted as a volume instead
#CommandRunner.run_command('mkdir /vault/creds')
self.create_unseal_keys_file()
self.create_root_token_file()
def is_vault_sealed(self) -> bool:
"""Check if the vault is sealed or not
Returns:
bool: If the vault is sealed or not
"""
# Get the status of the vault
# Note, because it returns a non-zero exit code when the vault is sealed, we set check to False
# Which is also why we need to check the return code manually
seal_status_returncode, seal_status_raw, seal_status_err = CommandRunner.run_command('vault status -format=json', False)
# Verify the return code is either 0 (unsealed) or 2 (sealed)
if seal_status_returncode != 0 and seal_status_returncode != 2:
raise RuntimeError('Failed to get the status of the vault')
# Print the raw status
#print(seal_status_raw)
# Parse the seal stat from the status
seal_status = json.loads(seal_status_raw)['sealed']
print(f'Is Sealed: {seal_status}')
return seal_status
def unseal_vault(self):
"""Unseal the vault"""
print('*-----------------*')
print('| Unsealing Vault |')
print('*-----------------*')
# Use each key to unseal the vault
for key in self.unseal_keys:
return_code, unseal_output, unseal_err = CommandRunner.run_command(f'vault operator unseal {key}')
print(unseal_output)
# If the vault is now unsealed break/escape from the loop
if not self.is_vault_unsealed():
print('Vault is unsealed')
break
def setup_secrets_engine(self):
"""Setup the secrets engine"""
print('*---------------------------*')
print('| Setting up secrets engine |')
print('*---------------------------*')
login_return_code, login_output, login_err = CommandRunner.run_command(f'vault login {self.root_token}')
print(login_output)
engin_enable_return_code, engine_enable_output, engine_enable_err = CommandRunner.run_command('vault secrets enable -path secret kv')
print(engine_enable_output)
def setup_audit_device(self):
print('*---------------------------*')
print('| Setting up Audit Device |')
print('*---------------------------*')
audit_return_code, audit_output, audit_err = CommandRunner.run_command('vault audit enable file file_path=/vault/logs/vault-audit.log')
print(audit_output)
def setup_app_role_access(self):
"""Run the app role creation script"""
print('*----------------------------*')
print('| Setting up App Role access |')
print('*----------------------------*')
print(f'Policy Capabilities: {os.getenv("POLICY_CAPABILITIES")}')
# Run the custom entrypoint Python script
CommandRunner.run_command_in_real_time(f'python3 /setup-scripts/app-role-access.py {self.root_token}')
def main():
initializer = Initializer()
# We only want to initialize the vault if it isn't initialized already
# Because we use Persistent Volumes (PVs) for vault data on restarts the vault is already initialized
if not initializer.check_if_initialized():
initializer.init_vault()
# This is just a safety check/measure to ensure the script can continue
# The only time this would likely be triggered is if there was some kind of PV desyncronization but it shouldn't really happen.
if not initializer.check_root_token_and_unseal_keys_files_exist():
raise RuntimeError('Vault is in an inconsistent state for this script to continue. Please ensure the vault can be initialized OR both the root token and unseal keys files exist alongside and initialized vault.')
# Check if the vault is sealed (as we need to unseal it to set it up)
if initializer.is_vault_sealed():
initializer.unseal_vault()
else:
print('Vault is already unsealed. Proceeding to setup...')
# QUESTION: If the vault data is already setup (PV on restart) do we need to re-setup this stuff?
initializer.setup_secrets_engine()
initializer.setup_audit_device()
initializer.setup_app_role_access()
else:
print('Vault is already initialized. Skipping initialization and setup...')
# Check if the vault is already unsealed (we assume it's already setup properly if it is)
if initializer.is_vault_sealed():
initializer.unseal_vault()
# Code goes here to get the Role ID and Secret ID (app role access) in the case that the originally created .env file doesn't exist anymore for some reason
if __name__ == '__main__':
main()