211 lines
No EOL
7.9 KiB
Python
211 lines
No EOL
7.9 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# |*******************************************************************|
|
|
# | Setup script |
|
|
# | |
|
|
# | Author: Alan Bridgeman |
|
|
# | Created: 2024-03-30 |
|
|
# | |
|
|
# | COPYRIGHT © 2024 Bridgeman Accessible/Alan Bridgeman. |
|
|
# | |
|
|
# | This work is presented AS IS, with no warranty of any kind. |
|
|
# | Any modification or use of this script is at the user's own risk. |
|
|
# |*******************************************************************|
|
|
|
|
import os, sys, subprocess, logging
|
|
|
|
from CommandRunner import CommandRunner
|
|
|
|
#def run_command(command):
|
|
# try:
|
|
# result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, check=True)
|
|
# return result.stdout.decode('utf-8').strip()
|
|
# except subprocess.CalledProcessError as e:
|
|
# # Log the error and re-raise or handle as appropriate
|
|
# raise RuntimeError(f"Command '{' '.join(command)}' failed with error: {e.stderr}") from e
|
|
|
|
def create_policy(policy_name: str, policy_capabilities: list[str], policy_path: str = 'secret/*'):
|
|
"""Create a policy in vault
|
|
|
|
Args:
|
|
policy_name (str): The name of the policy to create
|
|
policy_capabilities (list[str]): The capabilities of the policy (ex. read, write, etc...)
|
|
policy_path (str, optional): The path the policy should apply to. Defaults to all secrets (`secret/*`).
|
|
"""
|
|
|
|
policy_caps = '["' + '","'.join(policy_capabilities) + '"]'
|
|
policy_content = 'path "' + policy_path + '" {\n capabilities = ' + policy_caps + '\n}'
|
|
policy = '<<EOF\n' + policy_content + '\nEOF'
|
|
policy_return_code, policy_output, policy_err = CommandRunner.run_command('vault policy write ' + policy_name + ' - ' + policy, False)
|
|
if policy_return_code == 2:
|
|
logging.error('Failed to create the policy')
|
|
logging.error('Policy Output: ' + policy_output)
|
|
logging.error('Policy Error: ' + policy_err)
|
|
raise RuntimeError('Failed to create the policy')
|
|
|
|
def get_role_id(role_name: str) -> str:
|
|
"""Get the `role_id` for a given role
|
|
|
|
Args:
|
|
role_name (str): The name of the role to get the `role_id` for
|
|
|
|
Returns:
|
|
str: The `role_id` for the given role
|
|
"""
|
|
|
|
# Get the role_id from vault
|
|
role_read_path = '/'.join(['auth', 'approle', 'role', role_name, 'role-id'])
|
|
role_return_code, role_id, role_err = CommandRunner.run_command('vault read ' + role_read_path + ' | grep role_id | awk \'{print $2}\'')
|
|
|
|
logging.debug('Role ID: ' + role_id)
|
|
|
|
return role_id
|
|
|
|
def get_secret_id(role_name: str) -> str:
|
|
"""Get a/the `secret_id` for a given role
|
|
|
|
Args:
|
|
role_name (str): The name of the role to get the `secret_id` for
|
|
|
|
Returns:
|
|
str: The `secret_id` for the given role
|
|
"""
|
|
|
|
# Get the secret_id from vault
|
|
secret_write_path = '/'.join(['auth', 'approle', 'role', role_name, 'secret-id'])
|
|
secret_return_code, secret_id, secret_err = CommandRunner.run_command('vault write -f ' + secret_write_path + ' | grep "secret_id " | awk \'{print $2}\'')
|
|
|
|
logging.debug('Secret ID: ' + secret_id)
|
|
|
|
return secret_id
|
|
|
|
def create_app_role(role_name: str, policy_name: str) -> tuple[str, str]:
|
|
"""Create an approle role and return the role_id and secret_id
|
|
|
|
Args:
|
|
role_name (str): The name of the role to create
|
|
policy_name (str): The name of the policy to associate with the role
|
|
|
|
Returns:
|
|
tuple[str, str]: The `role_id` and `secret_id` of the newly created role
|
|
"""
|
|
|
|
# Create a role
|
|
role_write_path = '/'.join(['auth', 'approle', 'role', role_name])
|
|
role_write_return_code, role_write_output, role_write_err = CommandRunner.run_command('vault write ' + role_write_path + ' token_policies="' + policy_name + '"')
|
|
|
|
logging.debug(role_write_output)
|
|
|
|
# Get the role_id
|
|
role_id = get_role_id(role_name)
|
|
|
|
# Get the secret_id
|
|
secret_id = get_secret_id(role_name)
|
|
|
|
return role_id, secret_id
|
|
|
|
def save_role_vars_to_backup_file(role_name: str, role_id: str, secret_id: str):
|
|
"""Save the role_id and secret_id to a backup file (in the `/vault/creds` directory which is mounted to the host)
|
|
|
|
This is used as a sort of backup in rare case where the `/role_vars/.env` doesn't persist across restarts/instances.
|
|
|
|
Args:
|
|
role_name (str): The name of the role (used as the filename inside of the `/vault/creds` directory)
|
|
role_id (str): The `role_id` to save
|
|
secret_id (str): The `secret_id` to save
|
|
"""
|
|
|
|
file_name = f'/vault/creds/{role_name}'
|
|
|
|
logging.debug('File Name: ' + file_name)
|
|
|
|
# Create the file if it doesn't exist
|
|
if not os.path.isfile(file_name):
|
|
CommandRunner.run_command('touch "' + file_name + '"')
|
|
|
|
# Write the role_id and secret_id to the file
|
|
with open(file_name, 'w+') as f:
|
|
f.write('='.join([os.environ['ROLE_ID_SECRET_NAME'], role_id]) + '\n')
|
|
f.write('='.join([os.environ['SECRET_ID_SECRET_NAME'], secret_id]))
|
|
|
|
def save_role_vars_to_file(role_id: str, secret_id: str):
|
|
"""Save the role_id and secret_id to a file (`/role_vars/.env`)
|
|
|
|
This file can then be loaded/mounted/etc... into other containers or by other scripts to load the values so that the app can load it
|
|
|
|
Args:
|
|
role_id (str): The `role_id` to save
|
|
secret_id (str): The `secret_id` to save
|
|
"""
|
|
|
|
# Create the directory if it doesn't exist
|
|
if not os.path.isdir('/role_vars'):
|
|
os.mkdir('/role_vars')
|
|
|
|
file_name = '/role_vars/.env'
|
|
|
|
logging.debug('File Name: ' + file_name)
|
|
|
|
# Create the file if it doesn't exist
|
|
if not os.path.isfile(file_name):
|
|
CommandRunner.run_command('touch "' + file_name + '"')
|
|
|
|
# Write the role_id and secret_id to the file
|
|
with open(file_name, 'w+') as f:
|
|
f.write('='.join([os.environ['ROLE_ID_SECRET_NAME'], role_id]) + '\n')
|
|
f.write('='.join([os.environ['SECRET_ID_SECRET_NAME'], secret_id]))
|
|
|
|
def main(token: str):
|
|
#logging.basicConfig(
|
|
# filename='/var/log/entrypoint.log',
|
|
# level=logging.DEBUG,
|
|
# format='%(asctime)s - %(levelname)s - %(message)s'
|
|
#)
|
|
|
|
# Login to vault
|
|
CommandRunner.run_command(f'vault login {token}')
|
|
|
|
# Enable approle auth method
|
|
CommandRunner.run_command('vault auth enable approle')
|
|
|
|
# -- Create a policy for the app --
|
|
|
|
# The policy name can be set via the `POLICY_NAME` environment variable
|
|
# Or defaults to `node-app-policy` if not set
|
|
if 'POLICY_NAME' in os.environ:
|
|
policy_name = os.environ['POLICY_NAME']
|
|
else:
|
|
policy_name = 'node-app-policy'
|
|
|
|
logging.debug(f'Policy Name: {policy_name}')
|
|
|
|
# The policy capabilities can be set via the `POLICY_CAPABILITIES` environment variable
|
|
# Or defaults to `['read', 'create', 'update']` if not set
|
|
if 'POLICY_CAPABILITIES' in os.environ:
|
|
policy_capabilities = [cap.strip() for cap in os.environ['POLICY_CAPABILITIES'].split(',')]
|
|
else:
|
|
policy_capabilities = ['read', 'create', 'update']
|
|
|
|
logging.debug(f'Policy Capabilities: {', '.join(policy_capabilities)}')
|
|
|
|
create_policy(policy_name, policy_capabilities)
|
|
|
|
# -- create an approle role --
|
|
|
|
# Create a role
|
|
if 'ROLE_NAME' in os.environ:
|
|
role_name = os.environ['ROLE_NAME']
|
|
else:
|
|
role_name = 'node-app'
|
|
|
|
role_id, secret_id = create_app_role(role_name, policy_name)
|
|
|
|
# Save the role_id and secret_id to a backup file
|
|
save_role_vars_to_backup_file(role_name, role_id, secret_id)
|
|
|
|
# Save the role_id and secret_id to a file
|
|
save_role_vars_to_file(role_id, secret_id)
|
|
|
|
if __name__ == '__main__':
|
|
token = sys.argv[1]
|
|
main(token) |