diff --git a/dbclient/HiveClient.py b/dbclient/HiveClient.py index 3980b7f..69673b1 100644 --- a/dbclient/HiveClient.py +++ b/dbclient/HiveClient.py @@ -561,9 +561,17 @@ def log_table_ddl(self, cid, ec_id, db_name, table_name, metastore_dir, error_lo return False # read that data using the dbfs rest endpoint which can handle 2MB of text easily read_args = {'path': '/tmp/migration/tmp_export_ddl.txt'} - read_resp = self.get('/dbfs/read', read_args) + offSet = 0 + length = 999999 + data_res = '' + while True: + read_resp = self.get(f'/dbfs/read?length={length}&offset={offSet}', read_args) + data_res += read_resp.get('data') + if int(read_resp.get('bytes_read')) >= length: + offSet += length + else: break with open(table_ddl_path, "w", encoding="utf-8") as fp: - fp.write(base64.b64decode(read_resp.get('data')).decode('utf-8')) + fp.write(base64.b64decode(data_res).decode('utf-8')) return True else: export_ddl_cmd = 'print(ddl_str)' diff --git a/dbclient/JobsClient.py b/dbclient/JobsClient.py index 0d4dace..061c58a 100644 --- a/dbclient/JobsClient.py +++ b/dbclient/JobsClient.py @@ -31,8 +31,8 @@ def get_jobs_list(self, print_json=False): # 'tasks' field) on API 2.0. res = self.get("/jobs/list", print_json, version='2.0') for job in res.get('jobs', []): - jobsById[job.get('job_id')] = job - + if job.get('settings', {}).get('schedule', {}).get('pause_status', '') == 'UNPAUSED' or job.get('settings', {}).get('continuous', {}).get('pause_status', '') == 'UNPAUSED': + jobsById[job.get('job_id')] = job limit = 25 # max limit supported by the API offset = 0 has_more = True @@ -46,7 +46,8 @@ def get_jobs_list(self, print_json=False): for job in res.get('jobs', []): jobId = job.get('job_id') # only replaces "real" MULTI_TASK jobs, as they contain the task definitions. - if jobsById[jobId]['settings'].get('format') == 'MULTI_TASK': + presentInJobsById = jobsById.get(jobId, None) + if presentInJobsById and jobsById[jobId]['settings'].get('format') == 'MULTI_TASK': jobsById[jobId] = job return jobsById.values() diff --git a/dbclient/parser.py b/dbclient/parser.py index e6ad312..f19735d 100644 --- a/dbclient/parser.py +++ b/dbclient/parser.py @@ -586,4 +586,10 @@ def get_pipeline_parser() -> argparse.ArgumentParser: parser.add_argument('--bypass-secret-acl', action='store_true', default=False, help='Use to set the initial principal for secrets in standard-tier workspaces') + parser.add_argument('--database', action='store', default=None, + help='Database name to export for the metastore and table ACLs. Single database name supported') + + parser.add_argument('--iam', action='store', + help='IAM Instance Profile to export metastore entires') + return parser diff --git a/tasks/tasks.py b/tasks/tasks.py index 593b10e..6a251a3 100644 --- a/tasks/tasks.py +++ b/tasks/tasks.py @@ -293,8 +293,15 @@ def __init__(self, client_config, checkpoint_service, args, skip=False): self.args = args def run(self): + print("Arguments:") + print(self.args) hive_c = HiveClient(self.client_config, self.checkpoint_service) - hive_c.export_hive_metastore(cluster_name=self.args.cluster_name, + if self.args.database is not None: + # export only a single database with a given iam role + database_name = self.args.database + hive_c.export_database(database_name, self.args.cluster_name, self.args.iam, has_unicode=self.args.metastore_unicode) + else: + hive_c.export_hive_metastore(cluster_name=self.args.cluster_name, has_unicode=self.args.metastore_unicode)