Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 79 additions & 27 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ def _convert_hms_table_to_external(self, src_table: Table) -> bool:
def _convert_wasbs_table_to_abfss(self, src_table: Table) -> bool:
"""
Converts a Hive metastore azure wasbs table to abfss using alter table command.
This command and workflow requires a single user cluster with DB16 or newer.
"""
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.")
inventory_table = self._tables_crawler.full_name
Expand All @@ -392,7 +393,6 @@ def _convert_wasbs_table_to_abfss(self, src_table: Table) -> bool:
return False
try:
old_table = self._catalog.getTableMetadata(table_identifier)
entity_storage_locations = self._get_entity_storage_locations(old_table)
table_location = old_table.storage()
new_location = self._catalog_storage(
self._spark._jvm.scala.Some( # pylint: disable=protected-access
Expand All @@ -404,32 +404,84 @@ def _convert_wasbs_table_to_abfss(self, src_table: Table) -> bool:
table_location.compressed(),
table_location.properties(),
)
new_table = self._catalog_table(
old_table.identifier(),
old_table.tableType(),
new_location,
old_table.schema(),
old_table.provider(),
old_table.partitionColumnNames(),
old_table.bucketSpec(),
old_table.owner(),
old_table.createTime(),
old_table.lastAccessTime(),
old_table.createVersion(),
old_table.properties(),
old_table.stats(),
old_table.viewText(),
old_table.comment(),
old_table.unsupportedFeatures(),
old_table.tracksPartitionsInCatalog(),
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
# From DBR 16, there's a new constructor argument: entityStorageLocations (Seq[EntityStorageLocation])
# (We can't detect whether the argument is needed by the constructor, but assume that if the accessor
# is present on the source table then the argument is needed.)
*([entity_storage_locations] if entity_storage_locations is not None else []),
)
logger.info(f"Updating table {src_table.name} location from {src_table.location} to {new_table_location}")
if 'collation' in dir(old_table):
new_table = self._catalog_table(
old_table.identifier(),
old_table.tableType(),
new_location,
old_table.schema(),
old_table.provider(),
old_table.partitionColumnNames(),
old_table.bucketSpec(),
old_table.owner(),
old_table.createTime(),
old_table.lastAccessTime(),
old_table.createVersion(),
old_table.properties(),
old_table.stats(),
old_table.viewText(),
old_table.comment(),
old_table.collation(),
old_table.unsupportedFeatures(),
old_table.tracksPartitionsInCatalog(),
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
old_table.viewDependencyList(),
old_table.tableConstraints(),
old_table.deltaRuntimeProperties(),
old_table.pipelineUuid(),
old_table.rowFilter(),
old_table.columnMasks(),
old_table.enableAutoMaintenance(),
old_table.effectiveAutoMaintenanceFlag(),
old_table.baseTableId(),
old_table.baseTableLocation(),
old_table.accessPoint(),
old_table.deltaUniformIceberg(),
old_table.shallowClones(),
old_table.encryptionDetails(),
old_table.deltaSharingKind(),
old_table.reconciliationDefinition(),
old_table.parentTable(),
old_table.partitionLogLocation(),
old_table.capabilities(),
old_table.auxiliaryManagedLocation(),
old_table.provisioningInfo(),
old_table.useRemoteFiltering(),
old_table.deltaCoordinatedCommitsInfo(),
old_table.rowFiltersImplicitFromABAC(),
old_table.columnMasksImplicitFromABAC(),
old_table.entityStorageLocations(),
old_table.parentTableUuid(),
old_table.isArclightTableCreation(),
old_table.resourceName(),
old_table.governanceMetadata(),
)
else:
new_table = self._catalog_table(
old_table.identifier(),
old_table.tableType(),
new_location,
old_table.schema(),
old_table.provider(),
old_table.partitionColumnNames(),
old_table.bucketSpec(),
old_table.owner(),
old_table.createTime(),
old_table.lastAccessTime(),
old_table.createVersion(),
old_table.properties(),
old_table.stats(),
old_table.viewText(),
old_table.comment(),
old_table.unsupportedFeatures(),
old_table.tracksPartitionsInCatalog(),
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
)
self._catalog.alterTable(new_table)
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning(f"Error converting HMS table {src_table.name} to abfss: {e}", exc_info=True)
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ class ConvertWASBSToADLSGen2(Workflow):
def __init__(self):
super().__init__('convert-wasbs-to-adls-gen2-experimental')

@job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables])
@job_task(job_cluster="main", depends_on=[Assessment.crawl_tables])
def convert_wasbs_to_adls_gen2(self, ctx: RuntimeContext):
"""This workflow task converts WASBS paths to ADLS Gen2 paths in the Hive Metastore."""
ctx.tables_migrator.convert_wasbs_to_adls_gen2()
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/workspace_access/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ def pick_owner_group(self, prompt: Prompts) -> str | None:
return None
if len(groups) == 1:
return groups[0].display_name
group_names = [group.display_name for group in groups]
group_names = [group.display_name for group in groups if group and group.display_name]
return prompt.choice("Select the group to be used as the owner group", group_names, max_attempts=3)

def user_in_group(self, group_name: str, user: User) -> bool:
Expand Down
Loading