Skip to content

Commit fd8cab8

Browse files
author
dichenli
committed
1. Fix bug partition not migrated. 2. Fix OutOfMemoryException due to coalesce
1 parent 8c39aa2 commit fd8cab8

File tree

3 files changed

+7
-24
lines changed

3 files changed

+7
-24
lines changed

utilities/Hive_metastore_migration/src/export_from_datacatalog.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ def transform_catalog_to_df(dyf):
2727
def datacatalog_migrate_to_s3(databases, tables, partitions, output_path):
2828

2929
# load
30-
coalesce_by_row_count(databases).write.format('json').mode('overwrite').save(output_path + 'databases')
31-
coalesce_by_row_count(tables).write.format('json').mode('overwrite').save(output_path + 'tables')
32-
coalesce_by_row_count(partitions).write.format('json').mode('overwrite').save(output_path + 'partitions')
30+
databases.write.format('json').mode('overwrite').save(output_path + 'databases')
31+
tables.write.format('json').mode('overwrite').save(output_path + 'tables')
32+
partitions.write.format('json').mode('overwrite').save(output_path + 'partitions')
3333

3434

3535
# apply hard-coded schema on dataframes, ensure schema is consistent for transformations

utilities/Hive_metastore_migration/src/hive_metastore_migration.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -225,20 +225,6 @@ def join_other_to_single_column(df, other, on, how, new_column_name):
225225
return df.join(other=other_combined, on=on, how=how)
226226

227227

228-
def coalesce_by_row_count(df, desired_rows_per_partition=10):
229-
"""
230-
Coalesce dataframe to reduce number of partitions, to avoid fragmentation of data
231-
:param df: dataframe
232-
:param desired_rows_per_partition: desired number of rows per partition, there is no guarantee the actual rows count
233-
is larger or smaller
234-
:type df: DataFrame
235-
:return: dataframe coalesced
236-
"""
237-
count = df.count()
238-
partitions = count / desired_rows_per_partition + 1
239-
return df.coalesce(partitions)
240-
241-
242228
def batch_items_within_partition(sql_context, df, key_col, value_col, values_col):
243229
"""
244230
Group a DataFrame of key, value pairs, create a list of values for the same key in each spark partition, but there
@@ -1432,9 +1418,9 @@ def etl_from_metastore(sc, sql_context, db_prefix, table_prefix, hive_metastore,
14321418
# load
14331419
output_path = get_output_dir(options['output_path'])
14341420

1435-
coalesce_by_row_count(databases).write.format('json').mode('overwrite').save(output_path + 'databases')
1436-
coalesce_by_row_count(tables).write.format('json').mode('overwrite').save(output_path + 'tables')
1437-
coalesce_by_row_count(partitions).write.format('json').mode('overwrite').save(output_path + 'partitions')
1421+
databases.write.format('json').mode('overwrite').save(output_path + 'databases')
1422+
tables.write.format('json').mode('overwrite').save(output_path + 'tables')
1423+
partitions.write.format('json').mode('overwrite').save(output_path + 'partitions')
14381424

14391425

14401426
def etl_to_metastore(sc, sql_context, hive_metastore, options):

utilities/Hive_metastore_migration/src/import_into_datacatalog.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,8 @@ def transform_df_to_catalog_import_schema(sql_context, glue_context, df_database
3535

3636
def import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region):
3737

38-
# TEMP: get around datacatalog writer performance issue
39-
limited_partitions = partitions.limit(10)
40-
4138
(dyf_databases, dyf_tables, dyf_partitions) = transform_df_to_catalog_import_schema(
42-
sql_context, glue_context, databases, tables, limited_partitions)
39+
sql_context, glue_context, databases, tables, partitions)
4340

4441
# load
4542
glue_context.write_dynamic_frame.from_options(

0 commit comments

Comments
 (0)