#!/usr/bin/env python3 # |*******************************************************************| # | Setup script | # | | # | Author: Alan Bridgeman | # | Created: 2024-03-30 | # | | # | COPYRIGHT © 2024 Bridgeman Accessible/Alan Bridgeman. | # | | # | This work is presented AS IS, with no warranty of any kind. | # | Any modification or use of this script is at the user's own risk. | # |*******************************************************************| import os, sys, subprocess, logging from CommandRunner import CommandRunner #def run_command(command): # try: # result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, check=True) # return result.stdout.decode('utf-8').strip() # except subprocess.CalledProcessError as e: # # Log the error and re-raise or handle as appropriate # raise RuntimeError(f"Command '{' '.join(command)}' failed with error: {e.stderr}") from e def create_policy(policy_name: str, policy_capabilities: list[str], policy_path: str = 'secret/*'): """Create a policy in vault Args: policy_name (str): The name of the policy to create policy_capabilities (list[str]): The capabilities of the policy (ex. read, write, etc...) policy_path (str, optional): The path the policy should apply to. Defaults to all secrets (`secret/*`). """ policy_caps = '["' + '","'.join(policy_capabilities) + '"]' policy_content = 'path "' + policy_path + '" {\n capabilities = ' + policy_caps + '\n}' policy = '< str: """Get the `role_id` for a given role Args: role_name (str): The name of the role to get the `role_id` for Returns: str: The `role_id` for the given role """ # Get the role_id from vault role_read_path = '/'.join(['auth', 'approle', 'role', role_name, 'role-id']) role_return_code, role_id, role_err = CommandRunner.run_command('vault read ' + role_read_path + ' | grep role_id | awk \'{print $2}\'') logging.debug('Role ID: ' + role_id) return role_id def get_secret_id(role_name: str) -> str: """Get a/the `secret_id` for a given role Args: role_name (str): The name of the role to get the `secret_id` for Returns: str: The `secret_id` for the given role """ # Get the secret_id from vault secret_write_path = '/'.join(['auth', 'approle', 'role', role_name, 'secret-id']) secret_return_code, secret_id, secret_err = CommandRunner.run_command('vault write -f ' + secret_write_path + ' | grep "secret_id " | awk \'{print $2}\'') logging.debug('Secret ID: ' + secret_id) return secret_id def create_app_role(role_name: str, policy_name: str) -> tuple[str, str]: """Create an approle role and return the role_id and secret_id Args: role_name (str): The name of the role to create policy_name (str): The name of the policy to associate with the role Returns: tuple[str, str]: The `role_id` and `secret_id` of the newly created role """ # Create a role role_write_path = '/'.join(['auth', 'approle', 'role', role_name]) role_write_return_code, role_write_output, role_write_err = CommandRunner.run_command('vault write ' + role_write_path + ' token_policies="' + policy_name + '"') logging.debug(role_write_output) # Get the role_id role_id = get_role_id(role_name) # Get the secret_id secret_id = get_secret_id(role_name) return role_id, secret_id def save_role_vars_to_backup_file(role_name: str, role_id: str, secret_id: str): """Save the role_id and secret_id to a backup file (in the `/vault/creds` directory which is mounted to the host) This is used as a sort of backup in rare case where the `/role_vars/.env` doesn't persist across restarts/instances. Args: role_name (str): The name of the role (used as the filename inside of the `/vault/creds` directory) role_id (str): The `role_id` to save secret_id (str): The `secret_id` to save """ file_name = f'/vault/creds/{role_name}' logging.debug('File Name: ' + file_name) # Create the file if it doesn't exist if not os.path.isfile(file_name): CommandRunner.run_command('touch "' + file_name + '"') # Write the role_id and secret_id to the file with open(file_name, 'w+') as f: f.write('='.join([os.environ['ROLE_ID_SECRET_NAME'], role_id]) + '\n') f.write('='.join([os.environ['SECRET_ID_SECRET_NAME'], secret_id])) def save_role_vars_to_file(role_id: str, secret_id: str): """Save the role_id and secret_id to a file (`/role_vars/.env`) This file can then be loaded/mounted/etc... into other containers or by other scripts to load the values so that the app can load it Args: role_id (str): The `role_id` to save secret_id (str): The `secret_id` to save """ # Create the directory if it doesn't exist if not os.path.isdir('/role_vars'): os.mkdir('/role_vars') file_name = '/role_vars/.env' logging.debug('File Name: ' + file_name) # Create the file if it doesn't exist if not os.path.isfile(file_name): CommandRunner.run_command('touch "' + file_name + '"') # Write the role_id and secret_id to the file with open(file_name, 'w+') as f: f.write('='.join([os.environ['ROLE_ID_SECRET_NAME'], role_id]) + '\n') f.write('='.join([os.environ['SECRET_ID_SECRET_NAME'], secret_id])) def main(token: str): #logging.basicConfig( # filename='/var/log/entrypoint.log', # level=logging.DEBUG, # format='%(asctime)s - %(levelname)s - %(message)s' #) # Login to vault CommandRunner.run_command(f'vault login {token}') # Enable approle auth method CommandRunner.run_command('vault auth enable approle') # -- Create a policy for the app -- # The policy name can be set via the `POLICY_NAME` environment variable # Or defaults to `node-app-policy` if not set if 'POLICY_NAME' in os.environ: policy_name = os.environ['POLICY_NAME'] else: policy_name = 'node-app-policy' logging.debug(f'Policy Name: {policy_name}') # The policy capabilities can be set via the `POLICY_CAPABILITIES` environment variable # Or defaults to `['read', 'create', 'update']` if not set if 'POLICY_CAPABILITIES' in os.environ: policy_capabilities = [cap.strip() for cap in os.environ['POLICY_CAPABILITIES'].split(',')] else: policy_capabilities = ['read', 'create', 'update'] logging.debug(f'Policy Capabilities: {', '.join(policy_capabilities)}') create_policy(policy_name, policy_capabilities) # -- create an approle role -- # Create a role if 'ROLE_NAME' in os.environ: role_name = os.environ['ROLE_NAME'] else: role_name = 'node-app' role_id, secret_id = create_app_role(role_name, policy_name) # Save the role_id and secret_id to a backup file save_role_vars_to_backup_file(role_name, role_id, secret_id) # Save the role_id and secret_id to a file save_role_vars_to_file(role_id, secret_id) if __name__ == '__main__': token = sys.argv[1] main(token)