144 lines
No EOL
5.1 KiB
Python
144 lines
No EOL
5.1 KiB
Python
import os, json
|
|
from CommandRunner import CommandRunner
|
|
|
|
class Initializer:
|
|
def create_unseal_keys_file(self, file = '/vault/creds/unseal-keys'):
|
|
"""Write the vault's unseal keys to a file
|
|
|
|
Args:
|
|
file (str, optional): The path of the file to output. Defaults to '/vault/creds/unseal-keys'.
|
|
"""
|
|
|
|
with open(file, 'w+') as f:
|
|
f.write('\n'.join(self.unseal_keys))
|
|
|
|
def create_root_token_file(self, file = '/vault/creds/root-token'):
|
|
"""Write the vault's root token to a file
|
|
|
|
Args:
|
|
file (str, optional): The path of the file to output. Defaults to '/vault/creds/root-token'.
|
|
"""
|
|
|
|
with open(file, 'w+') as f:
|
|
f.write(self.root_token)
|
|
|
|
def init_vault(self):
|
|
"""Initialize vault
|
|
|
|
This includes creating the root token and unseal keys.
|
|
Which we want to store in case we need them later
|
|
"""
|
|
|
|
print('*----------------------*')
|
|
print('| Initialization Vault |')
|
|
print('*----------------------*')
|
|
|
|
# Initialize the vault
|
|
return_code, init_output, init_err = CommandRunner.run_command('vault operator init -format=json')
|
|
|
|
# Parse the unseal keys and root token from the initialization response
|
|
self.unseal_keys = json.loads(init_output)['unseal_keys_b64']
|
|
self.root_token = json.loads(init_output)['root_token']
|
|
|
|
# UPDATE: Is mounted as a volume instead
|
|
#CommandRunner.run_command('mkdir /vault/creds')
|
|
|
|
self.create_unseal_keys_file()
|
|
self.create_root_token_file()
|
|
|
|
def is_vault_unsealed(self) -> bool:
|
|
"""Check if the vault is sealed or not
|
|
|
|
Returns:
|
|
bool: If the vault is unsealed or not
|
|
"""
|
|
|
|
# Get the status of the vault
|
|
# Note, because it returns a non-zero exit code when the vault is sealed, we set check to False
|
|
# Which is also why we need to check the return code manually
|
|
seal_status_returncode, seal_status_raw, seal_status_err = CommandRunner.run_command('vault status -format=json', False)
|
|
|
|
# Verify the return code is either 0 (unsealed) or 2 (sealed)
|
|
if seal_status_returncode != 0 and seal_status_returncode != 2:
|
|
raise RuntimeError('Failed to get the status of the vault')
|
|
|
|
# Print the raw status
|
|
print(seal_status_raw)
|
|
|
|
# Parse the seal stat from the status
|
|
seal_status = json.loads(seal_status_raw)['sealed']
|
|
|
|
print(f'Is Sealed: {seal_status}')
|
|
|
|
return seal_status
|
|
|
|
def unseal_vault(self):
|
|
"""Unseal the vault"""
|
|
|
|
print('*-----------------*')
|
|
print('| Unsealing Vault |')
|
|
print('*-----------------*')
|
|
|
|
# Use each key to unseal the vault
|
|
for key in self.unseal_keys:
|
|
return_code, unseal_output, unseal_err = CommandRunner.run_command(f'vault operator unseal {key}')
|
|
|
|
print(unseal_output)
|
|
|
|
# If the vault is now unsealed break/escape from the loop
|
|
if not self.is_vault_unsealed():
|
|
print('Vault is unsealed')
|
|
break
|
|
|
|
def setup_secrets_engine(self):
|
|
"""Setup the secrets engine"""
|
|
|
|
print('*---------------------------*')
|
|
print('| Setting up secrets engine |')
|
|
print('*---------------------------*')
|
|
|
|
login_return_code, login_output, login_err = CommandRunner.run_command(f'vault login {self.root_token}')
|
|
|
|
print(login_output)
|
|
|
|
engin_enable_return_code, engine_enable_output, engine_enable_err = CommandRunner.run_command('vault secrets enable -path secret kv')
|
|
|
|
print(engine_enable_output)
|
|
|
|
def setup_audit_device(self):
|
|
print('*---------------------------*')
|
|
print('| Setting up Audit Device |')
|
|
print('*---------------------------*')
|
|
|
|
audit_return_code, audit_output, audit_err = CommandRunner.run_command('vault audit enable file file_path=/vault/logs/vault-audit.log')
|
|
|
|
print(audit_output)
|
|
|
|
def setup_app_role_access(self):
|
|
"""Run the app role creation script"""
|
|
|
|
print('*----------------------------*')
|
|
print('| Setting up App Role access |')
|
|
print('*----------------------------*')
|
|
|
|
print(f'Policy Capabilities: {os.getenv("POLICY_CAPABILITIES")}')
|
|
|
|
# Run the custom entrypoint Python script
|
|
CommandRunner.run_command_in_real_time(f'python3 /setup-scripts/app-role-access.py {self.root_token}')
|
|
|
|
def main():
|
|
initializer = Initializer()
|
|
# Check if the root-token file and unseal keys files exist
|
|
#if os.path.exists('/vault/creds/root-token') and os.path.exists('/vault/creds/unseal-keys'):
|
|
if not initializer.is_vault_unsealed():
|
|
print('Vault already setup. Skipping...')
|
|
# QUESTION: Should there be code here to get the Role ID and Secret ID in case the originally created .env file doesn't exist for some reason
|
|
else:
|
|
initializer.init_vault()
|
|
initializer.unseal_vault()
|
|
initializer.setup_secrets_engine()
|
|
initializer.setup_audit_device()
|
|
initializer.setup_app_role_access()
|
|
|
|
if __name__ == '__main__':
|
|
main() |