Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ $ python export_db.py --help
usage: export_db.py [-h] [--users] [--workspace]
[--notebook-format {DBC,SOURCE,HTML}] [--download]
[--libs] [--clusters] [--jobs] [--metastore] [--secrets]
[--scope-names SCOPE_NAMES [SCOPE_NAMES ...]] [--skip-scope-acl]
[--metastore-unicode] [--cluster-name CLUSTER_NAME]
[--database DATABASE] [--iam IAM] [--skip-failed]
[--mounts] [--azure] [--profile PROFILE]
Expand Down Expand Up @@ -286,6 +287,10 @@ optional arguments:
--metastore log all the metastore table definitions
--metastore-unicode log all the metastore table definitions including
unicode characters
--secrets log all the secret scopes
--scope-names SCOPE_NAMES [SCOPE_NAMES ...]
log only the specified secret scope
--skip-scope-acl Skip logging the secret ACLs during export
--table-acls log all table ACL grant and deny statements
--cluster-name CLUSTER_NAME
Cluster name to export the metastore to a specific
Expand Down Expand Up @@ -335,6 +340,7 @@ usage: import_db.py [-h] [--users] [--workspace] [--workspace-top-level]
[--archive-missing] [--libs] [--clusters] [--jobs]
[--metastore] [--metastore-unicode] [--get-repair-log]
[--cluster-name CLUSTER_NAME] [--skip-failed] [--azure]
[--secrets] [--scope-names SCOPE_NAMES [SCOPE_NAMES ...]] [--skip-scope-acl]
[--profile PROFILE] [--single-user SINGLE_USER]
[--no-ssl-verification] [--silent] [--debug]
[--set-export-dir SET_EXPORT_DIR] [--pause-all-jobs]
Expand Down Expand Up @@ -376,6 +382,10 @@ optional arguments:
cluster. Cluster will be started.
--skip-failed Skip missing users that do not exist when importing
user notebooks
--secrets Import all secret scopes
--scope-names SCOPE_NAMES [SCOPE_NAMES ...]
import only the specified secret scope
--skip-scope-acl Skip importing the secret ACLs during export
--azure Run on Azure. (Default is AWS)
--profile PROFILE Profile to parse the credentials
--no-ssl-verification
Expand Down
2 changes: 1 addition & 1 deletion dbclient/ClustersClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self, configs, checkpoint_service):
super().__init__(configs)
self._checkpoint_service = checkpoint_service
self.groups_to_keep = configs.get("groups_to_keep", False)
self.skip_missing_users = configs['skip_missing_users']
self.skip_missing_users = configs.get('skip_missing_users', False)

create_configs = {'num_workers',
'autoscale',
Expand Down
112 changes: 62 additions & 50 deletions dbclient/SecretsClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ def get_secret_value(self, scope_name, secret_key, cid, ec_id, error_logger):
else:
return results_get.get('data')

def log_all_secrets(self, cluster_name=None, log_dir='secret_scopes/'):
def secret_scope_map(self, scope_name: str):
return {'name': scope_name}

def log_all_secrets(self, cluster_name=None, log_dir='secret_scopes/', secret_scopes: list = None):
scopes_dir = self.get_export_dir() + log_dir
scopes_list = self.get_secret_scopes_list()
scopes_list = map(self.secret_scope_map, secret_scopes) if secret_scopes else self.get_secret_scopes_list()
error_logger = logging_utils.get_error_logger(
wmconstants.WM_EXPORT, wmconstants.SECRET_OBJECT, self.get_export_dir())
os.makedirs(scopes_dir, exist_ok=True)
Expand Down Expand Up @@ -130,58 +133,67 @@ def get_all_other_permissions(scope_name, acl_dict):
scope_perms.pop('MANAGE')
return scope_perms

def import_all_secrets(self, log_dir='secret_scopes/'):
def import_all_secrets(self, log_dir='secret_scopes/', secret_scopes: list = [], skip_acl_updates: bool = False):
scopes_dir = self.get_export_dir() + log_dir
error_logger = logging_utils.get_error_logger(
wmconstants.WM_IMPORT, wmconstants.SECRET_OBJECT, self.get_export_dir())
scopes_acl_dict = self.load_acl_dict()
scopes_acl_dict = self.load_acl_dict() if not skip_acl_updates else {}
contains_elements = any(secret_scopes)
for root, subdirs, files in self.walk(scopes_dir):
for scope_name in files:
file_path = root + scope_name
# print('Log file: ', file_path)
# check if scopes acls are empty, then skip
if scopes_acl_dict.get(scope_name, None) is None:
print("Scope is empty with no manage permissions. Skipping...")
continue
# check if users has can manage perms then we can add during creation time
has_user_manage = self.has_users_can_manage_permission(scope_name, scopes_acl_dict)
create_scope_args = {'scope': scope_name}
if has_user_manage:
create_scope_args['initial_manage_principal'] = 'users'
other_permissions = self.get_all_other_permissions(scope_name, scopes_acl_dict)
create_resp = self.post('/secrets/scopes/create', create_scope_args)
logging_utils.log_response_error(
error_logger, create_resp, ignore_error_list=['RESOURCE_ALREADY_EXISTS'])
if other_permissions:
# use this dict minus the `users:MANAGE` permissions and apply the other permissions to the scope
for perm, principal_list in other_permissions.items():
put_acl_args = {"scope": scope_name,
"permission": perm}
for x in principal_list:
put_acl_args["principal"] = x
logging.info(put_acl_args)
put_resp = self.post('/secrets/acls/put', put_acl_args)
logging_utils.log_response_error(error_logger, put_resp)
# loop through the scope and create the k/v pairs
with open(file_path, 'r', encoding="utf-8") as fp:
for s in fp:
s_dict = json.loads(s)
k = s_dict.get('name')
v = s_dict.get('value')
if 'WARNING: skipped' in v:
error_logger.error(f"Skipping scope {scope_name} as value is corrupted due to being too large \n")
if (contains_elements and scope_name in secret_scopes) or (not contains_elements):
file_path = root + scope_name
# print('Log file: ', file_path)
if not skip_acl_updates:
# check if scopes acls are empty, then skip
if scopes_acl_dict.get(scope_name, None) is None:
print("Scope is empty with no manage permissions. Skipping...")
continue
try:
put_secret_args = {'scope': scope_name,
'key': k,
'string_value': base64.b64decode(v.encode('ascii')).decode('ascii')}
put_resp = self.post('/secrets/put', put_secret_args)
logging_utils.log_response_error(error_logger, put_resp)
except Exception as error:
if "Invalid base64-encoded string" in str(error) or 'decode' in str(error) or "padding" in str(error):
error_msg = f"secret_scope: {scope_name} has invalid invalid data characters: {str(error)} skipping.. and logging to error file."
logging.error(error_msg)
error_logger.error(error_msg)
# check if users has can manage perms then we can add during creation time
has_user_manage = self.has_users_can_manage_permission(scope_name, scopes_acl_dict)
create_scope_args = {'scope': scope_name}
if has_user_manage:
create_scope_args['initial_manage_principal'] = 'users'
other_permissions = self.get_all_other_permissions(scope_name, scopes_acl_dict)
create_resp = self.post('/secrets/scopes/create', create_scope_args)
logging_utils.log_response_error(
error_logger, create_resp, ignore_error_list=['RESOURCE_ALREADY_EXISTS'])
if other_permissions:
# use this dict minus the `users:MANAGE` permissions and apply the other permissions to the scope
for perm, principal_list in other_permissions.items():
put_acl_args = {"scope": scope_name,
"permission": perm}
for x in principal_list:
put_acl_args["principal"] = x
logging.info(put_acl_args)
put_resp = self.post('/secrets/acls/put', put_acl_args)
logging_utils.log_response_error(error_logger, put_resp)
else:
logging.info("Skipping ACL Updates for {}".format(scope_name))
# loop through the scope and create the k/v pairs
with open(file_path, 'r', encoding="utf-8") as fp:
for s in fp:
s_dict = json.loads(s)
k = s_dict.get('name')
v = s_dict.get('value')
if 'WARNING: skipped' in v:
error_logger.error(
f"Skipping scope {scope_name} as value is corrupted due to being too large \n")
continue
try:
put_secret_args = {'scope': scope_name,
'key': k,
'string_value': base64.b64decode(v.encode('ascii')).decode('ascii')}
put_resp = self.post('/secrets/put', put_secret_args)
logging_utils.log_response_error(error_logger, put_resp)
except Exception as error:
if "Invalid base64-encoded string" in str(error) or 'decode' in str(
error) or "padding" in str(error):
error_msg = f"secret_scope: {scope_name} has invalid invalid data characters: {str(error)} skipping.. and logging to error file."
logging.error(error_msg)
error_logger.error(error_msg)

else:
raise error
else:
raise error
else:
logging.info("Skipping import of {}".format(scope_name))
21 changes: 19 additions & 2 deletions dbclient/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ def get_export_parser():
parser.add_argument('--secrets', action='store_true',
help='log all the secret scopes')

# get specific secret scope
parser.add_argument('--scope-names', type=str, action='store', nargs="+",
help='log only the specified secret scope')

# Export Secret ACLs
parser.add_argument('--skip-scope-acl', action='store_true', default= False,
help='Skip logging the secret ACLs during export')

# get all mlflow experiments
parser.add_argument('--mlflow-experiments', action='store_true',
help='log all the mlflow experiments')
Expand Down Expand Up @@ -326,6 +334,14 @@ def get_import_parser():
parser.add_argument('--secrets', action='store_true',
help='Import all secret scopes')

# import only specific secret scope
parser.add_argument('--scope-names', type=str, action='store', nargs="+",
help='import only the specified secret scope')

# Skip Importing Secret ACLs
parser.add_argument('--skip-scope-acl', action='store_true', default= False,
help='Skip importing the secret ACLs during export')

# import all mlflow experiments
parser.add_argument('--mlflow-experiments', action='store_true',
help='Import all the mlflow experiments')
Expand Down Expand Up @@ -426,8 +442,8 @@ def build_client_config(profile, url, token, args):
'skip_failed': args.skip_failed,
'debug': args.debug,
'file_format': str(args.notebook_format),
'timeout':args.timeout,
'skip_missing_users':args.skip_missing_users
'timeout':args.timeout if 'timeout' in args else 86400,
'skip_missing_users':args.skip_missing_users if 'skip_missing_users' in args else True
}
# this option only exists during imports so we check for existence
if 'overwrite_notebooks' in args:
Expand All @@ -450,6 +466,7 @@ def build_client_config(profile, url, token, args):
config['num_parallel'] = args.num_parallel
config['retry_total'] = args.retry_total
config['retry_backoff'] = args.retry_backoff
config['timeout'] = args.timeout if 'timeout' in args else 86400
return config


Expand Down
10 changes: 7 additions & 3 deletions export_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,16 @@ def main():
if not args.cluster_name:
print("Please provide an existing cluster name w/ --cluster-name option\n")
return
print("Export the secret scopes configs at {0}".format(now))
scope_names = args.scope_names if args.scope_names else None
print("Export the secret scopes configs at {0} for scope: {1}".format(now, scope_names))
start = timer()
sc = SecretsClient(client_config, checkpoint_service)
# log job configs
sc.log_all_secrets(args.cluster_name)
sc.log_all_secrets_acls()
sc.log_all_secrets(args.cluster_name, secret_scopes=scope_names)
if not args.skip_scope_acl:
sc.log_all_secrets_acls()
else:
print("Skipping ACL Export")
end = timer()
print("Complete Secrets Export Time: " + str(timedelta(seconds=end - start)))

Expand Down
26 changes: 16 additions & 10 deletions import_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging_utils
import os


# python 3.6
def main():
# define a parser to identify what component to import / export
Expand Down Expand Up @@ -130,7 +131,7 @@ def main():
hive_c = HiveClient(client_config, checkpoint_service)
# log job configs
hive_c.import_hive_metastore(cluster_name=args.cluster_name, has_unicode=args.metastore_unicode,
should_repair_table=args.repair_metastore_tables)
should_repair_table=args.repair_metastore_tables)
end = timer()
print("Complete Metastore Import Time: " + str(timedelta(seconds=end - start)))

Expand All @@ -149,13 +150,15 @@ def main():
# log table ACLS configs
notebook_exit_value = table_acls_c.import_table_acls()
end = timer()
print(f'Complete Table ACLs with exit value: {json.dumps(notebook_exit_value)}, Import Time: {timedelta(seconds=end - start)}')
print(
f'Complete Table ACLs with exit value: {json.dumps(notebook_exit_value)}, Import Time: {timedelta(seconds=end - start)}')

if args.secrets:
print("Import secret scopes configs at {0}".format(now))
start = timer()
sc = SecretsClient(client_config, checkpoint_service)
sc.import_all_secrets()
scopes = args.scope_names if args.scope_names else []
sc.import_all_secrets(secret_scopes=scopes, skip_acl_updates=args.skip_scope_acl)
end = timer()
print("Complete Secrets Import Time: " + str(timedelta(seconds=end - start)))

Expand All @@ -176,7 +179,7 @@ def main():
jobs_c.pause_all_jobs(False)
end = timer()
print("Unpaused all jobs time: " + str(timedelta(seconds=end - start)))

if args.import_pause_status:
print("Importing pause status for migrated jobs {0}".format(now))
start = timer()
Expand All @@ -186,7 +189,6 @@ def main():
end = timer()
print("Import pause jobs time: " + str(timedelta(seconds=end - start)))


if args.delete_all_jobs:
print("Delete all current jobs {0}".format(now))
start = timer()
Expand Down Expand Up @@ -254,27 +256,31 @@ def main():
print("Importing MLflow experiments.")
mlflow_c = MLFlowClient(client_config, checkpoint_service)
mlflow_c.import_mlflow_experiments(num_parallel=args.num_parallel)
failed_task_log = logging_utils.get_error_log_file(wmconstants.WM_IMPORT, wmconstants.MLFLOW_EXPERIMENT_OBJECT, client_config['export_dir'])
failed_task_log = logging_utils.get_error_log_file(wmconstants.WM_IMPORT, wmconstants.MLFLOW_EXPERIMENT_OBJECT,
client_config['export_dir'])
logging_utils.raise_if_failed_task_file_exists(failed_task_log, "MLflow Runs Import.")

if args.mlflow_experiments_permissions:
print("Importing MLflow experiment permissions.")
mlflow_c = MLFlowClient(client_config, checkpoint_service)
mlflow_c.import_mlflow_experiments_acls(num_parallel=args.num_parallel)
failed_task_log = logging_utils.get_error_log_file(wmconstants.WM_IMPORT, wmconstants.MLFLOW_EXPERIMENT_PERMISSION_OBJECT, client_config['export_dir'])
failed_task_log = logging_utils.get_error_log_file(wmconstants.WM_IMPORT,
wmconstants.MLFLOW_EXPERIMENT_PERMISSION_OBJECT,
client_config['export_dir'])
logging_utils.raise_if_failed_task_file_exists(failed_task_log, "MLflow Experiments Permissions Import.")

if args.mlflow_runs:
print("Importing MLflow runs.")
mlflow_c = MLFlowClient(client_config, checkpoint_service)
assert args.src_profile is not None, "Import MLflow runs requires --src-profile flag."
src_login_args = get_login_credentials(profile=args.src_profile)
src_client_config = build_client_config(args.src_profile, src_login_args['host'], src_login_args.get('token', login_args.get('password')), args)
src_client_config = build_client_config(args.src_profile, src_login_args['host'],
src_login_args.get('token', login_args.get('password')), args)
mlflow_c.import_mlflow_runs(src_client_config, num_parallel=args.num_parallel)
failed_task_log = logging_utils.get_error_log_file(wmconstants.WM_IMPORT, wmconstants.MLFLOW_RUN_OBJECT, client_config['export_dir'])
failed_task_log = logging_utils.get_error_log_file(wmconstants.WM_IMPORT, wmconstants.MLFLOW_RUN_OBJECT,
client_config['export_dir'])
logging_utils.raise_if_failed_task_file_exists(failed_task_log, "MLflow Runs Import.")


if args.get_repair_log:
print("Finding partitioned tables to repair at {0}".format(now))
start = timer()
Expand Down