Attempted to fix issue where the vault tries to re-initialized even though it's already initialized (because of PV backed storage) + uncommented some of the CI/CD automation (as become more comforatable with it
All checks were successful
Build and deploy Bridgeman Accessible Hashicorp Vault Implementation / deploy (push) Successful in 2m13s
All checks were successful
Build and deploy Bridgeman Accessible Hashicorp Vault Implementation / deploy (push) Successful in 2m13s
This commit is contained in:
parent
42343bbad7
commit
b2054f85ec
5 changed files with 313 additions and 118 deletions
|
|
@ -12,7 +12,7 @@
|
|||
# | Any modification or use of this script is at the user's own risk. |
|
||||
# |*******************************************************************|
|
||||
|
||||
import os, sys, subprocess, logging
|
||||
import os, sys, subprocess, json, logging
|
||||
|
||||
from CommandRunner import CommandRunner
|
||||
|
||||
|
|
@ -24,6 +24,54 @@ from CommandRunner import CommandRunner
|
|||
# # Log the error and re-raise or handle as appropriate
|
||||
# raise RuntimeError(f"Command '{' '.join(command)}' failed with error: {e.stderr}") from e
|
||||
|
||||
def check_app_role_enabled():
|
||||
"""Check if the AppRole auth method is enabled in vault
|
||||
|
||||
Returns:
|
||||
bool: True if AppRole is enabled, False otherwise
|
||||
"""
|
||||
|
||||
# List the enabled auth methods
|
||||
auth_return_code, auth_output, auth_err = CommandRunner.run_command('vault auth list -format=json')
|
||||
|
||||
# If non-zero return code, raise an error
|
||||
if auth_return_code != 0:
|
||||
logging.error('Failed to list auth methods')
|
||||
logging.error('Auth Output: ' + auth_output)
|
||||
logging.error('Auth Error: ' + auth_err)
|
||||
raise RuntimeError('Failed to list auth methods')
|
||||
|
||||
# Parse the output
|
||||
auth_methods = json.loads(auth_output)
|
||||
|
||||
# Use the existence of the 'approle/' key to determine if AppRole is enabled
|
||||
return 'approle/' in auth_methods
|
||||
|
||||
def check_policy_exists(policy_name: str) -> bool:
|
||||
"""Check if a policy exists in vault
|
||||
|
||||
Args:
|
||||
policy_name (str): The name of the policy to check
|
||||
Returns:
|
||||
bool: True if the policy exists, False otherwise
|
||||
"""
|
||||
|
||||
# List the policies
|
||||
policy_return_code, policy_output, policy_err = CommandRunner.run_command('vault policy list -format=json')
|
||||
|
||||
# If non-zero return code, raise an error
|
||||
if policy_return_code != 0:
|
||||
logging.error('Failed to list policies')
|
||||
logging.error('Policy Output: ' + policy_output)
|
||||
logging.error('Policy Error: ' + policy_err)
|
||||
raise RuntimeError('Failed to list policies')
|
||||
|
||||
# Parse the output
|
||||
policies = json.loads(policy_output)
|
||||
|
||||
# Check if the policy name exists in the returned list
|
||||
return policy_name in policies
|
||||
|
||||
def create_policy(policy_name: str, policy_capabilities: list[str], policy_path: str = 'secret/*'):
|
||||
"""Create a policy in vault
|
||||
|
||||
|
|
@ -33,16 +81,49 @@ def create_policy(policy_name: str, policy_capabilities: list[str], policy_path:
|
|||
policy_path (str, optional): The path the policy should apply to. Defaults to all secrets (`secret/*`).
|
||||
"""
|
||||
|
||||
# Create the "policy file" that is specified on the command line
|
||||
policy_caps = '["' + '","'.join(policy_capabilities) + '"]'
|
||||
policy_content = 'path "' + policy_path + '" {\n capabilities = ' + policy_caps + '\n}'
|
||||
policy = '<<EOF\n' + policy_content + '\nEOF'
|
||||
|
||||
# Write the policy to vault
|
||||
policy_return_code, policy_output, policy_err = CommandRunner.run_command('vault policy write ' + policy_name + ' - ' + policy, False)
|
||||
if policy_return_code == 2:
|
||||
|
||||
# If non-zero return code, raise an error
|
||||
if policy_return_code != 0:
|
||||
logging.error('Failed to create the policy')
|
||||
logging.error('Policy Output: ' + policy_output)
|
||||
logging.error('Policy Error: ' + policy_err)
|
||||
raise RuntimeError('Failed to create the policy')
|
||||
|
||||
def check_app_role_exists(role_name: str) -> bool:
|
||||
"""Check if an AppRole role exists in vault
|
||||
|
||||
Args:
|
||||
role_name (str): The name of the role to check
|
||||
Returns:
|
||||
bool: True if the role exists, False otherwise
|
||||
"""
|
||||
|
||||
# To use `vault list`, we need to specify the "path" (which is a slash separated string dictating the hierarchy to the thing we want to list)
|
||||
role_list_path = '/'.join(['auth', 'approle', 'role'])
|
||||
|
||||
# List the roles
|
||||
role_return_code, role_output, role_err = CommandRunner.run_command(f'vault list --format=json {role_list_path}')
|
||||
|
||||
# If non-zero return code, raise an error
|
||||
if role_return_code != 0:
|
||||
logging.error('Failed to list AppRole roles')
|
||||
logging.error('Role Output: ' + role_output)
|
||||
logging.error('Role Error: ' + role_err)
|
||||
raise RuntimeError('Failed to list AppRole roles')
|
||||
|
||||
# Parse the output
|
||||
roles = json.loads(role_output)
|
||||
|
||||
# Check if the role name exists in the returned list
|
||||
return role_name in roles
|
||||
|
||||
def get_role_id(role_name: str) -> str:
|
||||
"""Get the `role_id` for a given role
|
||||
|
||||
|
|
@ -53,10 +134,23 @@ def get_role_id(role_name: str) -> str:
|
|||
str: The `role_id` for the given role
|
||||
"""
|
||||
|
||||
# Get the role_id from vault
|
||||
# The read path for the role_id of the given role (path being a slash separated string dictating the hierarchy to the thing we want to read)
|
||||
role_read_path = '/'.join(['auth', 'approle', 'role', role_name, 'role-id'])
|
||||
role_return_code, role_id, role_err = CommandRunner.run_command('vault read ' + role_read_path + ' | grep role_id | awk \'{print $2}\'')
|
||||
|
||||
|
||||
# Get the role_id from vault
|
||||
role_return_code, role_id_output, role_id_err = CommandRunner.run_command('vault read --format=json ' + role_read_path)
|
||||
|
||||
# If non-zero return code, raise an error
|
||||
if role_return_code != 0:
|
||||
logging.error('Failed to get the role_id for role: ' + role_name)
|
||||
logging.error('Role ID Output: ' + role_id_output)
|
||||
logging.error('Role ID Error: ' + role_id_err)
|
||||
raise RuntimeError('Failed to get the role_id for role: ' + role_name)
|
||||
|
||||
# Parse the role_id from the output
|
||||
role_id_json = json.loads(role_id_output)
|
||||
role_id = role_id_json['data']['role_id']
|
||||
|
||||
logging.debug('Role ID: ' + role_id)
|
||||
|
||||
return role_id
|
||||
|
|
@ -71,10 +165,23 @@ def get_secret_id(role_name: str) -> str:
|
|||
str: The `secret_id` for the given role
|
||||
"""
|
||||
|
||||
# Get the secret_id from vault
|
||||
# The write path for the secret_id of the given role (path being a slash separated string dictating the hierarchy to the thing we want to write)
|
||||
secret_write_path = '/'.join(['auth', 'approle', 'role', role_name, 'secret-id'])
|
||||
secret_return_code, secret_id, secret_err = CommandRunner.run_command('vault write -f ' + secret_write_path + ' | grep "secret_id " | awk \'{print $2}\'')
|
||||
|
||||
# Get the secret_id from vault (by writing to the secret-id endpoint)
|
||||
secret_return_code, secret_id_output, secret_id_err = CommandRunner.run_command('vault write --format=json -f ' + secret_write_path)
|
||||
|
||||
# If non-zero return code, raise an error
|
||||
if secret_return_code != 0:
|
||||
logging.error('Failed to get the secret_id for role: ' + role_name)
|
||||
logging.error('Secret ID Output: ' + secret_id_output)
|
||||
logging.error('Secret ID Error: ' + secret_id_err)
|
||||
raise RuntimeError('Failed to get the secret_id for role: ' + role_name)
|
||||
|
||||
# Parse the secret_id from the output
|
||||
secret_id_json = json.loads(secret_id_output)
|
||||
secret_id = secret_id_json['data']['secret_id']
|
||||
|
||||
logging.debug('Secret ID: ' + secret_id)
|
||||
|
||||
return secret_id
|
||||
|
|
@ -90,10 +197,19 @@ def create_app_role(role_name: str, policy_name: str) -> tuple[str, str]:
|
|||
tuple[str, str]: The `role_id` and `secret_id` of the newly created role
|
||||
"""
|
||||
|
||||
# Create a role
|
||||
# The write path for creating the role (path being a slash separated string dictating the hierarchy to the thing we want to write)
|
||||
role_write_path = '/'.join(['auth', 'approle', 'role', role_name])
|
||||
|
||||
# Create a role
|
||||
role_write_return_code, role_write_output, role_write_err = CommandRunner.run_command('vault write ' + role_write_path + ' token_policies="' + policy_name + '"')
|
||||
|
||||
# If non-zero return code, raise an error
|
||||
if role_write_return_code != 0:
|
||||
logging.error('Failed to create AppRole role: ' + role_name)
|
||||
logging.error('Role Write Output: ' + role_write_output)
|
||||
logging.error('Role Write Error: ' + role_write_err)
|
||||
raise RuntimeError('Failed to create AppRole role: ' + role_name)
|
||||
|
||||
logging.debug(role_write_output)
|
||||
|
||||
# Get the role_id
|
||||
|
|
@ -165,8 +281,9 @@ def main(token: str):
|
|||
# Login to vault
|
||||
CommandRunner.run_command(f'vault login {token}')
|
||||
|
||||
# Enable approle auth method
|
||||
CommandRunner.run_command('vault auth enable approle')
|
||||
if not check_app_role_enabled():
|
||||
# Enable approle auth method
|
||||
CommandRunner.run_command('vault auth enable approle')
|
||||
|
||||
# -- Create a policy for the app --
|
||||
|
||||
|
|
@ -179,33 +296,50 @@ def main(token: str):
|
|||
|
||||
logging.debug(f'Policy Name: {policy_name}')
|
||||
|
||||
# The policy capabilities can be set via the `POLICY_CAPABILITIES` environment variable
|
||||
# Or defaults to `['read', 'create', 'update']` if not set
|
||||
if 'POLICY_CAPABILITIES' in os.environ:
|
||||
policy_capabilities = [cap.strip() for cap in os.environ['POLICY_CAPABILITIES'].split(',')]
|
||||
else:
|
||||
policy_capabilities = ['read', 'create', 'update']
|
||||
|
||||
logging.debug(f'Policy Capabilities: {', '.join(policy_capabilities)}')
|
||||
if not check_policy_exists(policy_name):
|
||||
# The policy capabilities can be set via the `POLICY_CAPABILITIES` environment variable
|
||||
# Or defaults to `['read', 'create', 'update']` if not set
|
||||
if 'POLICY_CAPABILITIES' in os.environ:
|
||||
policy_capabilities = [cap.strip() for cap in os.environ['POLICY_CAPABILITIES'].split(',')]
|
||||
else:
|
||||
policy_capabilities = ['read', 'create', 'update']
|
||||
|
||||
create_policy(policy_name, policy_capabilities)
|
||||
logging.debug(f'Policy Capabilities: {', '.join(policy_capabilities)}')
|
||||
|
||||
create_policy(policy_name, policy_capabilities)
|
||||
|
||||
# -- create an approle role --
|
||||
|
||||
# Create a role
|
||||
# The role name can be set via the `ROLE_NAME` environment variable
|
||||
# Or defaults to `node-app` if not set
|
||||
if 'ROLE_NAME' in os.environ:
|
||||
role_name = os.environ['ROLE_NAME']
|
||||
else:
|
||||
role_name = 'node-app'
|
||||
|
||||
role_id, secret_id = create_app_role(role_name, policy_name)
|
||||
|
||||
# Save the role_id and secret_id to a backup file
|
||||
save_role_vars_to_backup_file(role_name, role_id, secret_id)
|
||||
if not check_app_role_exists(role_name):
|
||||
# Create a role
|
||||
role_id, secret_id = create_app_role(role_name, policy_name)
|
||||
|
||||
# Save the role_id and secret_id to a file
|
||||
save_role_vars_to_file(role_id, secret_id)
|
||||
# Save the role_id and secret_id to a backup file
|
||||
save_role_vars_to_backup_file(role_name, role_id, secret_id)
|
||||
|
||||
# Save the role_id and secret_id to a file
|
||||
save_role_vars_to_file(role_id, secret_id)
|
||||
else:
|
||||
# Get the existing role_id and secret_id
|
||||
role_id = get_role_id(role_name)
|
||||
secret_id = get_secret_id(role_name)
|
||||
|
||||
# Save the role_id and secret_id to a file
|
||||
# QUESTION: Should this be conditional on if the file already exists or not?
|
||||
save_role_vars_to_file(role_id, secret_id)
|
||||
|
||||
logging.info('AppRole setup complete')
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Parse the root token (used to login) from the command line arguments
|
||||
token = sys.argv[1]
|
||||
|
||||
# Run the script's main function
|
||||
main(token)
|
||||
Loading…
Add table
Add a link
Reference in a new issue