import os, json from CommandRunner import CommandRunner class Initializer: def create_unseal_keys_file(self, file = '/vault/creds/unseal-keys'): """Write the vault's unseal keys to a file Args: file (str, optional): The path of the file to output. Defaults to '/vault/creds/unseal-keys'. """ with open(file, 'w+') as f: f.write('\n'.join(self.unseal_keys)) def create_root_token_file(self, file = '/vault/creds/root-token'): """Write the vault's root token to a file Args: file (str, optional): The path of the file to output. Defaults to '/vault/creds/root-token'. """ with open(file, 'w+') as f: f.write(self.root_token) def init_vault(self): """Initialize vault This includes creating the root token and unseal keys. Which we want to store in case we need them later """ print('*----------------------*') print('| Initialization Vault |') print('*----------------------*') # Initialize the vault return_code, init_output, init_err = CommandRunner.run_command('vault operator init -format=json') # Parse the unseal keys and root token from the initialization response self.unseal_keys = json.loads(init_output)['unseal_keys_b64'] self.root_token = json.loads(init_output)['root_token'] # UPDATE: Is mounted as a volume instead #CommandRunner.run_command('mkdir /vault/creds') self.create_unseal_keys_file() self.create_root_token_file() def is_vault_unsealed(self) -> bool: """Check if the vault is sealed or not Returns: bool: If the vault is unsealed or not """ # Get the status of the vault # Note, because it returns a non-zero exit code when the vault is sealed, we set check to False # Which is also why we need to check the return code manually seal_status_returncode, seal_status_raw, seal_status_err = CommandRunner.run_command('vault status -format=json', False) # Verify the return code is either 0 (unsealed) or 2 (sealed) if seal_status_returncode != 0 and seal_status_returncode != 2: raise RuntimeError('Failed to get the status of the vault') # Print the raw status print(seal_status_raw) # Parse the seal stat from the status seal_status = json.loads(seal_status_raw)['sealed'] print(f'Is Sealed: {seal_status}') return seal_status def unseal_vault(self): """Unseal the vault""" print('*-----------------*') print('| Unsealing Vault |') print('*-----------------*') # Use each key to unseal the vault for key in self.unseal_keys: return_code, unseal_output, unseal_err = CommandRunner.run_command(f'vault operator unseal {key}') print(unseal_output) # If the vault is now unsealed break/escape from the loop if not self.is_vault_unsealed(): print('Vault is unsealed') break def setup_secrets_engine(self): """Setup the secrets engine""" print('*---------------------------*') print('| Setting up secrets engine |') print('*---------------------------*') login_return_code, login_output, login_err = CommandRunner.run_command(f'vault login {self.root_token}') print(login_output) engin_enable_return_code, engine_enable_output, engine_enable_err = CommandRunner.run_command('vault secrets enable -path secret kv') print(engine_enable_output) def setup_audit_device(self): print('*---------------------------*') print('| Setting up Audit Device |') print('*---------------------------*') audit_return_code, audit_output, audit_err = CommandRunner.run_command('vault audit enable file file_path=/vault/logs/vault-audit.log') print(audit_output) def setup_app_role_access(self): """Run the app role creation script""" print('*----------------------------*') print('| Setting up App Role access |') print('*----------------------------*') print(f'Policy Capabilities: {os.getenv("POLICY_CAPABILITIES")}') # Run the custom entrypoint Python script CommandRunner.run_command_in_real_time(f'python3 /setup-scripts/app-role-access.py {self.root_token}') def main(): initializer = Initializer() # Check if the root-token file and unseal keys files exist #if os.path.exists('/vault/creds/root-token') and os.path.exists('/vault/creds/unseal-keys'): if not initializer.is_vault_unsealed(): print('Vault already setup. Skipping...') # QUESTION: Should there be code here to get the Role ID and Secret ID in case the originally created .env file doesn't exist for some reason else: initializer.init_vault() initializer.unseal_vault() initializer.setup_secrets_engine() initializer.setup_audit_device() initializer.setup_app_role_access() if __name__ == '__main__': main()