From 705e113508f87ec1f8bb432fc779a513ba09b913 Mon Sep 17 00:00:00 2001 From: daniloffantinato <57760081+daniloffantinato@users.noreply.github.com> Date: Tue, 5 May 2020 17:13:18 -0400 Subject: [PATCH 1/2] Add option to put "database_prefix" when import is mode is "from_s3". Using this approach its possible to import between multiple accounts and add a database prefix. --- .../src/import_into_datacatalog.py | 68 +++++++++++++++---- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/utilities/Hive_metastore_migration/src/import_into_datacatalog.py b/utilities/Hive_metastore_migration/src/import_into_datacatalog.py index 5ba7aa71..b9d7214f 100644 --- a/utilities/Hive_metastore_migration/src/import_into_datacatalog.py +++ b/utilities/Hive_metastore_migration/src/import_into_datacatalog.py @@ -1,14 +1,34 @@ # Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# SPDX-License-Identifier: MIT-0 +# Licensed under the Amazon Software License (the "License"). You may not use +# this file except in compliance with the License. A copy of the License is +# located at +# +# http://aws.amazon.com/asl/ +# +# and in the "LICENSE" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. + from __future__ import print_function +import logging +import os + +from pyspark.sql.functions import lit, struct, array, col, concat + from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from hive_metastore_migration import * +logging.basicConfig() +logger = logging.getLogger(__name__) +logger.setLevel(getattr(logging, os.getenv('LOG_LEVEL', 'INFO'))) + + def transform_df_to_catalog_import_schema(sql_context, glue_context, df_databases, df_tables, df_partitions): df_databases_array = df_databases.select(df_databases['type'], array(df_databases['item']).alias('items')) df_tables_array = df_tables.select(df_tables['type'], df_tables['database'], @@ -40,8 +60,8 @@ def import_datacatalog(sql_context, glue_context, datacatalog_name, databases, t connection_options={'catalog.name': datacatalog_name, 'catalog.region': region}) -def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix - , region): +def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix, + region): # extract hive_metastore = HiveMetastore(connection, sql_context) hive_metastore.extract_metastore() @@ -50,17 +70,25 @@ def metastore_full_migration(sc, sql_context, glue_context, connection, datacata (databases, tables, partitions) = HiveMetastoreTransformer( sc, sql_context, db_prefix, table_prefix).transform(hive_metastore) - #load + # load import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region) -def metastore_import_from_s3(sql_context, glue_context,db_input_dir, tbl_input_dir, parts_input_dir, - datacatalog_name, region): +def metastore_import_from_s3(sql_context, glue_context, db_input_dir, tbl_input_dir, parts_input_dir, db_prefix, datacatalog_name, + region): # extract databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA) tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA) partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA) + + # Changes to Prefix on database + if db_prefix: + databases = databases.withColumn('item', struct(col('item.description'), col('item.locationUri'), concat(lit(db_prefix),col('item.name')).alias('name'), col('item.parameters'))) + tables = tables.withColumn("database",concat(lit(db_prefix),col('database')).alias('database')) + partitions = partitions.withColumn("database",concat(lit(db_prefix),col('database')).alias('database')) + partitions = partitions.withColumn('item', struct(col('item.creationTime'), col('item.creationTime'), concat(lit(db_prefix),col('item.namespaceName')).alias('namespaceName'), col('item.parameters'), col('item.storageDescriptor'), col('item.values'))) + # load import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region) @@ -71,21 +99,29 @@ def main(): from_s3 = 'from-s3' from_jdbc = 'from-jdbc' parser = argparse.ArgumentParser(prog=sys.argv[0]) - parser.add_argument('-m', '--mode', required=True, choices=[from_s3, from_jdbc], help='Choose to migrate metastore either from JDBC or from S3') - parser.add_argument('-c', '--connection-name', required=False, help='Glue Connection name for Hive metastore JDBC connection') - parser.add_argument('-R', '--region', required=False, help='AWS region of target Glue DataCatalog, default to "us-east-1"') - parser.add_argument('-d', '--database-prefix', required=False, help='Optional prefix for database names in Glue DataCatalog') - parser.add_argument('-t', '--table-prefix', required=False, help='Optional prefix for table name in Glue DataCatalog') - parser.add_argument('-D', '--database-input-path', required=False, help='An S3 path containing json files of metastore database entities') - parser.add_argument('-T', '--table-input-path', required=False, help='An S3 path containing json files of metastore table entities') - parser.add_argument('-P', '--partition-input-path', required=False, help='An S3 path containing json files of metastore partition entities') + parser.add_argument('-m', '--mode', required=True, choices=[from_s3, from_jdbc], + help='Choose to migrate metastore either from JDBC or from S3') + parser.add_argument('-c', '--connection-name', required=False, + help='Glue Connection name for Hive metastore JDBC connection') + parser.add_argument('-R', '--region', required=False, + help='AWS region of target Glue DataCatalog, default to "us-east-1"') + parser.add_argument('-d', '--database-prefix', required=False, + help='Optional prefix for database names in Glue DataCatalog') + parser.add_argument('-t', '--table-prefix', required=False, + help='Optional prefix for table name in Glue DataCatalog') + parser.add_argument('-D', '--database-input-path', required=False, + help='An S3 path containing json files of metastore database entities') + parser.add_argument('-T', '--table-input-path', required=False, + help='An S3 path containing json files of metastore table entities') + parser.add_argument('-P', '--partition-input-path', required=False, + help='An S3 path containing json files of metastore partition entities') options = get_options(parser, sys.argv) if options['mode'] == from_s3: validate_options_in_mode( options=options, mode=from_s3, required_options=['database_input_path', 'table_input_path', 'partition_input_path'], - not_allowed_options=['database_prefix', 'table_prefix'] + not_allowed_options=['table_prefix'] ) elif options['mode'] == from_jdbc: validate_options_in_mode( @@ -110,6 +146,7 @@ def main(): db_input_dir=options['database_input_path'], tbl_input_dir=options['table_input_path'], parts_input_dir=options['partition_input_path'], + db_prefix=options.get('database_prefix') or '', datacatalog_name='datacatalog', region=options.get('region') or 'us-east-1' ) @@ -126,5 +163,6 @@ def main(): region=options.get('region') or 'us-east-1' ) + if __name__ == '__main__': main() From 9d8356d21be70c338947a10ead109d2f7cfe9ea9 Mon Sep 17 00:00:00 2001 From: Ryo Manabe Date: Fri, 23 May 2025 09:42:53 +0900 Subject: [PATCH 2/2] Modifications to merge #66 * Reverted license header * Removed unused codes and fixed indent * Added --database-prefix parameter for from-s3 mode in README --- utilities/Hive_metastore_migration/README.md | 1 + .../src/import_into_datacatalog.py | 28 ++++--------------- 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/utilities/Hive_metastore_migration/README.md b/utilities/Hive_metastore_migration/README.md index b228a59a..ab881486 100644 --- a/utilities/Hive_metastore_migration/README.md +++ b/utilities/Hive_metastore_migration/README.md @@ -243,6 +243,7 @@ as an Glue ETL job, if AWS Glue can directly connect to your Hive metastore. - `--database-input-path` set to the S3 path containing only databases. For example: `s3://someBucket/output_path_from_previous_job/databases` - `--table-input-path` set to the S3 path containing only tables. For example: `s3://someBucket/output_path_from_previous_job/tables` - `--partition-input-path` set to the S3 path containing only partitions. For example: `s3://someBucket/output_path_from_previous_job/partitions` + - `--database-prefix` (optional) set to a string prefix that is applied to the database name created in AWS Glue Data Catalog. You can use it as a way to track the origin of the metadata, and avoid naming conflicts. The default is the empty string. Also, because there is no need to connect to any JDBC source, the job doesn't require any connections. diff --git a/utilities/Hive_metastore_migration/src/import_into_datacatalog.py b/utilities/Hive_metastore_migration/src/import_into_datacatalog.py index b9d7214f..d4244930 100644 --- a/utilities/Hive_metastore_migration/src/import_into_datacatalog.py +++ b/utilities/Hive_metastore_migration/src/import_into_datacatalog.py @@ -1,21 +1,8 @@ # Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# Licensed under the Amazon Software License (the "License"). You may not use -# this file except in compliance with the License. A copy of the License is -# located at -# -# http://aws.amazon.com/asl/ -# -# and in the "LICENSE" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. - +# SPDX-License-Identifier: MIT-0 from __future__ import print_function -import logging -import os - from pyspark.sql.functions import lit, struct, array, col, concat from awsglue.context import GlueContext @@ -24,11 +11,6 @@ from hive_metastore_migration import * -logging.basicConfig() -logger = logging.getLogger(__name__) -logger.setLevel(getattr(logging, os.getenv('LOG_LEVEL', 'INFO'))) - - def transform_df_to_catalog_import_schema(sql_context, glue_context, df_databases, df_tables, df_partitions): df_databases_array = df_databases.select(df_databases['type'], array(df_databases['item']).alias('items')) df_tables_array = df_tables.select(df_tables['type'], df_tables['database'], @@ -81,14 +63,14 @@ def metastore_import_from_s3(sql_context, glue_context, db_input_dir, tbl_input_ databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA) tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA) partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA) - - # Changes to Prefix on database + + # Changes to Prefix on database if db_prefix: databases = databases.withColumn('item', struct(col('item.description'), col('item.locationUri'), concat(lit(db_prefix),col('item.name')).alias('name'), col('item.parameters'))) tables = tables.withColumn("database",concat(lit(db_prefix),col('database')).alias('database')) partitions = partitions.withColumn("database",concat(lit(db_prefix),col('database')).alias('database')) partitions = partitions.withColumn('item', struct(col('item.creationTime'), col('item.creationTime'), concat(lit(db_prefix),col('item.namespaceName')).alias('namespaceName'), col('item.parameters'), col('item.storageDescriptor'), col('item.values'))) - + # load import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region) @@ -146,7 +128,7 @@ def main(): db_input_dir=options['database_input_path'], tbl_input_dir=options['table_input_path'], parts_input_dir=options['partition_input_path'], - db_prefix=options.get('database_prefix') or '', + db_prefix=options.get('database_prefix') or '', datacatalog_name='datacatalog', region=options.get('region') or 'us-east-1' )