#!/usr/bin/env python3 # |*******************************************************************| # | Setup script | # | | # | Author: Alan Bridgeman | # | Created: 2024-03-30 | # | | # | COPYRIGHT © 2024 Bridgeman Accessible/Alan Bridgeman. | # | | # | This work is presented AS IS, with no warranty of any kind. | # | Any modification or use of this script is at the user's own risk. | # |*******************************************************************| import os, sys, subprocess, json, logging from CommandRunner import CommandRunner #def run_command(command): # try: # result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, check=True) # return result.stdout.decode('utf-8').strip() # except subprocess.CalledProcessError as e: # # Log the error and re-raise or handle as appropriate # raise RuntimeError(f"Command '{' '.join(command)}' failed with error: {e.stderr}") from e def check_app_role_enabled(): """Check if the AppRole auth method is enabled in vault Returns: bool: True if AppRole is enabled, False otherwise """ # List the enabled auth methods auth_return_code, auth_output, auth_err = CommandRunner.run_command('vault auth list -format=json') # If non-zero return code, raise an error if auth_return_code != 0: logging.error('Failed to list auth methods') logging.error('Auth Output: ' + auth_output) logging.error('Auth Error: ' + auth_err) raise RuntimeError('Failed to list auth methods') # Parse the output auth_methods = json.loads(auth_output) # Use the existence of the 'approle/' key to determine if AppRole is enabled return 'approle/' in auth_methods def check_policy_exists(policy_name: str) -> bool: """Check if a policy exists in vault Args: policy_name (str): The name of the policy to check Returns: bool: True if the policy exists, False otherwise """ # List the policies policy_return_code, policy_output, policy_err = CommandRunner.run_command('vault policy list -format=json') # If non-zero return code, raise an error if policy_return_code != 0: logging.error('Failed to list policies') logging.error('Policy Output: ' + policy_output) logging.error('Policy Error: ' + policy_err) raise RuntimeError('Failed to list policies') # Parse the output policies = json.loads(policy_output) # Check if the policy name exists in the returned list return policy_name in policies def create_policy(policy_name: str, policy_capabilities: list[str], policy_path: str = 'secret/*'): """Create a policy in vault Args: policy_name (str): The name of the policy to create policy_capabilities (list[str]): The capabilities of the policy (ex. read, write, etc...) policy_path (str, optional): The path the policy should apply to. Defaults to all secrets (`secret/*`). """ # Create the "policy file" that is specified on the command line policy_caps = '["' + '","'.join(policy_capabilities) + '"]' policy_content = 'path "' + policy_path + '" {\n capabilities = ' + policy_caps + '\n}' policy = '< bool: """Check if an AppRole role exists in vault Args: role_name (str): The name of the role to check Returns: bool: True if the role exists, False otherwise """ # To use `vault list`, we need to specify the "path" (which is a slash separated string dictating the hierarchy to the thing we want to list) role_list_path = '/'.join(['auth', 'approle', 'role']) # List the roles role_return_code, role_output, role_err = CommandRunner.run_command(f'vault list --format=json {role_list_path}') # If non-zero return code, raise an error if role_return_code != 0: logging.error('Failed to list AppRole roles') logging.error('Role Output: ' + role_output) logging.error('Role Error: ' + role_err) raise RuntimeError('Failed to list AppRole roles') # Parse the output roles = json.loads(role_output) # Check if the role name exists in the returned list return role_name in roles def get_role_id(role_name: str) -> str: """Get the `role_id` for a given role Args: role_name (str): The name of the role to get the `role_id` for Returns: str: The `role_id` for the given role """ # The read path for the role_id of the given role (path being a slash separated string dictating the hierarchy to the thing we want to read) role_read_path = '/'.join(['auth', 'approle', 'role', role_name, 'role-id']) # Get the role_id from vault role_return_code, role_id_output, role_id_err = CommandRunner.run_command('vault read --format=json ' + role_read_path) # If non-zero return code, raise an error if role_return_code != 0: logging.error('Failed to get the role_id for role: ' + role_name) logging.error('Role ID Output: ' + role_id_output) logging.error('Role ID Error: ' + role_id_err) raise RuntimeError('Failed to get the role_id for role: ' + role_name) # Parse the role_id from the output role_id_json = json.loads(role_id_output) role_id = role_id_json['data']['role_id'] logging.debug('Role ID: ' + role_id) return role_id def get_secret_id(role_name: str) -> str: """Get a/the `secret_id` for a given role Args: role_name (str): The name of the role to get the `secret_id` for Returns: str: The `secret_id` for the given role """ # The write path for the secret_id of the given role (path being a slash separated string dictating the hierarchy to the thing we want to write) secret_write_path = '/'.join(['auth', 'approle', 'role', role_name, 'secret-id']) # Get the secret_id from vault (by writing to the secret-id endpoint) secret_return_code, secret_id_output, secret_id_err = CommandRunner.run_command('vault write --format=json -f ' + secret_write_path) # If non-zero return code, raise an error if secret_return_code != 0: logging.error('Failed to get the secret_id for role: ' + role_name) logging.error('Secret ID Output: ' + secret_id_output) logging.error('Secret ID Error: ' + secret_id_err) raise RuntimeError('Failed to get the secret_id for role: ' + role_name) # Parse the secret_id from the output secret_id_json = json.loads(secret_id_output) secret_id = secret_id_json['data']['secret_id'] logging.debug('Secret ID: ' + secret_id) return secret_id def create_app_role(role_name: str, policy_name: str) -> tuple[str, str]: """Create an approle role and return the role_id and secret_id Args: role_name (str): The name of the role to create policy_name (str): The name of the policy to associate with the role Returns: tuple[str, str]: The `role_id` and `secret_id` of the newly created role """ # The write path for creating the role (path being a slash separated string dictating the hierarchy to the thing we want to write) role_write_path = '/'.join(['auth', 'approle', 'role', role_name]) # Create a role role_write_return_code, role_write_output, role_write_err = CommandRunner.run_command('vault write ' + role_write_path + ' token_policies="' + policy_name + '"') # If non-zero return code, raise an error if role_write_return_code != 0: logging.error('Failed to create AppRole role: ' + role_name) logging.error('Role Write Output: ' + role_write_output) logging.error('Role Write Error: ' + role_write_err) raise RuntimeError('Failed to create AppRole role: ' + role_name) logging.debug(role_write_output) # Get the role_id role_id = get_role_id(role_name) # Get the secret_id secret_id = get_secret_id(role_name) return role_id, secret_id def save_role_vars_to_backup_file(role_name: str, role_id: str, secret_id: str): """Save the role_id and secret_id to a backup file (in the `/vault/creds` directory which is mounted to the host) This is used as a sort of backup in rare case where the `/role_vars/.env` doesn't persist across restarts/instances. Args: role_name (str): The name of the role (used as the filename inside of the `/vault/creds` directory) role_id (str): The `role_id` to save secret_id (str): The `secret_id` to save """ file_name = f'/vault/creds/{role_name}' logging.debug('File Name: ' + file_name) # Create the file if it doesn't exist if not os.path.isfile(file_name): CommandRunner.run_command('touch "' + file_name + '"') # Write the role_id and secret_id to the file with open(file_name, 'w+') as f: f.write('='.join([os.environ['ROLE_ID_SECRET_NAME'], role_id]) + '\n') f.write('='.join([os.environ['SECRET_ID_SECRET_NAME'], secret_id])) def save_role_vars_to_file(role_id: str, secret_id: str): """Save the role_id and secret_id to a file (`/role_vars/.env`) This file can then be loaded/mounted/etc... into other containers or by other scripts to load the values so that the app can load it Args: role_id (str): The `role_id` to save secret_id (str): The `secret_id` to save """ # Create the directory if it doesn't exist if not os.path.isdir('/role_vars'): os.mkdir('/role_vars') file_name = '/role_vars/.env' logging.debug('File Name: ' + file_name) # Create the file if it doesn't exist if not os.path.isfile(file_name): CommandRunner.run_command('touch "' + file_name + '"') # Write the role_id and secret_id to the file with open(file_name, 'w+') as f: f.write('='.join([os.environ['ROLE_ID_SECRET_NAME'], role_id]) + '\n') f.write('='.join([os.environ['SECRET_ID_SECRET_NAME'], secret_id])) def main(token: str): #logging.basicConfig( # filename='/var/log/entrypoint.log', # level=logging.DEBUG, # format='%(asctime)s - %(levelname)s - %(message)s' #) # Login to vault CommandRunner.run_command(f'vault login {token}') if not check_app_role_enabled(): # Enable approle auth method CommandRunner.run_command('vault auth enable approle') # -- Create a policy for the app -- # The policy name can be set via the `POLICY_NAME` environment variable # Or defaults to `node-app-policy` if not set if 'POLICY_NAME' in os.environ: policy_name = os.environ['POLICY_NAME'] else: policy_name = 'node-app-policy' logging.debug(f'Policy Name: {policy_name}') if not check_policy_exists(policy_name): # The policy capabilities can be set via the `POLICY_CAPABILITIES` environment variable # Or defaults to `['read', 'create', 'update']` if not set if 'POLICY_CAPABILITIES' in os.environ: policy_capabilities = [cap.strip() for cap in os.environ['POLICY_CAPABILITIES'].split(',')] else: policy_capabilities = ['read', 'create', 'update'] logging.debug(f'Policy Capabilities: {', '.join(policy_capabilities)}') create_policy(policy_name, policy_capabilities) # -- create an approle role -- # The role name can be set via the `ROLE_NAME` environment variable # Or defaults to `node-app` if not set if 'ROLE_NAME' in os.environ: role_name = os.environ['ROLE_NAME'] else: role_name = 'node-app' if not check_app_role_exists(role_name): # Create a role role_id, secret_id = create_app_role(role_name, policy_name) # Save the role_id and secret_id to a backup file save_role_vars_to_backup_file(role_name, role_id, secret_id) # Save the role_id and secret_id to a file save_role_vars_to_file(role_id, secret_id) else: # Get the existing role_id and secret_id role_id = get_role_id(role_name) secret_id = get_secret_id(role_name) # Save the role_id and secret_id to a file # QUESTION: Should this be conditional on if the file already exists or not? save_role_vars_to_file(role_id, secret_id) logging.info('AppRole setup complete') if __name__ == '__main__': # Parse the root token (used to login) from the command line arguments token = sys.argv[1] # Run the script's main function main(token)