custom-hashicorp-vault/setup-scripts/prod-setup.py
2025-05-07 06:30:09 -05:00

144 lines
No EOL
5.1 KiB
Python

import os, json
from CommandRunner import CommandRunner
class Initializer:
def create_unseal_keys_file(self, file = '/vault/creds/unseal-keys'):
"""Write the vault's unseal keys to a file
Args:
file (str, optional): The path of the file to output. Defaults to '/vault/creds/unseal-keys'.
"""
with open(file, 'w+') as f:
f.write('\n'.join(self.unseal_keys))
def create_root_token_file(self, file = '/vault/creds/root-token'):
"""Write the vault's root token to a file
Args:
file (str, optional): The path of the file to output. Defaults to '/vault/creds/root-token'.
"""
with open(file, 'w+') as f:
f.write(self.root_token)
def init_vault(self):
"""Initialize vault
This includes creating the root token and unseal keys.
Which we want to store in case we need them later
"""
print('*----------------------*')
print('| Initialization Vault |')
print('*----------------------*')
# Initialize the vault
return_code, init_output, init_err = CommandRunner.run_command('vault operator init -format=json')
# Parse the unseal keys and root token from the initialization response
self.unseal_keys = json.loads(init_output)['unseal_keys_b64']
self.root_token = json.loads(init_output)['root_token']
# UPDATE: Is mounted as a volume instead
#CommandRunner.run_command('mkdir /vault/creds')
self.create_unseal_keys_file()
self.create_root_token_file()
def is_vault_unsealed(self) -> bool:
"""Check if the vault is sealed or not
Returns:
bool: If the vault is unsealed or not
"""
# Get the status of the vault
# Note, because it returns a non-zero exit code when the vault is sealed, we set check to False
# Which is also why we need to check the return code manually
seal_status_returncode, seal_status_raw, seal_status_err = CommandRunner.run_command('vault status -format=json', False)
# Verify the return code is either 0 (unsealed) or 2 (sealed)
if seal_status_returncode != 0 and seal_status_returncode != 2:
raise RuntimeError('Failed to get the status of the vault')
# Print the raw status
print(seal_status_raw)
# Parse the seal stat from the status
seal_status = json.loads(seal_status_raw)['sealed']
print(f'Is Sealed: {seal_status}')
return seal_status
def unseal_vault(self):
"""Unseal the vault"""
print('*-----------------*')
print('| Unsealing Vault |')
print('*-----------------*')
# Use each key to unseal the vault
for key in self.unseal_keys:
return_code, unseal_output, unseal_err = CommandRunner.run_command(f'vault operator unseal {key}')
print(unseal_output)
# If the vault is now unsealed break/escape from the loop
if not self.is_vault_unsealed():
print('Vault is unsealed')
break
def setup_secrets_engine(self):
"""Setup the secrets engine"""
print('*---------------------------*')
print('| Setting up secrets engine |')
print('*---------------------------*')
login_return_code, login_output, login_err = CommandRunner.run_command(f'vault login {self.root_token}')
print(login_output)
engin_enable_return_code, engine_enable_output, engine_enable_err = CommandRunner.run_command('vault secrets enable -path secret kv')
print(engine_enable_output)
def setup_audit_device(self):
print('*---------------------------*')
print('| Setting up Audit Device |')
print('*---------------------------*')
audit_return_code, audit_output, audit_err = CommandRunner.run_command('vault audit enable file file_path=/vault/logs/vault-audit.log')
print(audit_output)
def setup_app_role_access(self):
"""Run the app role creation script"""
print('*----------------------------*')
print('| Setting up App Role access |')
print('*----------------------------*')
print(f'Policy Capabilities: {os.getenv("POLICY_CAPABILITIES")}')
# Run the custom entrypoint Python script
CommandRunner.run_command_in_real_time(f'python3 /setup-scripts/app-role-access.py {self.root_token}')
def main():
initializer = Initializer()
# Check if the root-token file and unseal keys files exist
#if os.path.exists('/vault/creds/root-token') and os.path.exists('/vault/creds/unseal-keys'):
if not initializer.is_vault_unsealed():
print('Vault already setup. Skipping...')
# QUESTION: Should there be code here to get the Role ID and Secret ID in case the originally created .env file doesn't exist for some reason
else:
initializer.init_vault()
initializer.unseal_vault()
initializer.setup_secrets_engine()
initializer.setup_audit_device()
initializer.setup_app_role_access()
if __name__ == '__main__':
main()