Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _create_data_source_yaml_str(self) -> str:
"""

def _get_3_schema_dir(self):
schema_name = self.dataset_prefix[self.data_source_impl.sql_dialect.get_schema_prefix_index()]
schema_name = self.extract_schema_from_prefix()
return f"{self.s3_test_dir}/staging-dir/{ATHENA_CATALOG}/{schema_name}"

def drop_test_schema_if_exists(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,7 @@ def _build_table_namespace_for_schema_query(self, prefixes: list[str]) -> tuple[
project_id=prefixes[0],
dataset=None, # We only need the project id to query the schemas, as it's always in the `INFORMATION_SCHEMA`
)

schema_index: int | None = self.sql_dialect.get_schema_prefix_index()

schema_name: str = prefixes[schema_index] if schema_index is not None and schema_index < len(prefixes) else None
schema_name = self.extract_schema_from_prefix(prefixes)
if schema_name is None:
raise ValueError(f"Cannot determine schema name from prefixes: {prefixes}")

Expand Down
49 changes: 27 additions & 22 deletions soda-core/src/soda_core/common/data_source_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,43 +162,51 @@ def get_columns_metadata(self, dataset_prefixes: list[str], dataset_name: str) -
return self.sql_dialect.build_column_metadatas_from_query_result(query_result)

def build_columns_metadata_query_str(self, dataset_prefixes: list[str], dataset_name: str) -> str:
database_index: int | None = self.sql_dialect.get_database_prefix_index()
schema_index: int | None = self.sql_dialect.get_schema_prefix_index()
schema_name: Optional[str] = self.extract_schema_from_prefix(dataset_prefixes)
database_name: Optional[str] = self.extract_database_from_prefix(dataset_prefixes)

table_namespace: DataSourceNamespace = (
SchemaDataSourceNamespace(schema=dataset_prefixes[schema_index])
if database_index is None
else DbSchemaDataSourceNamespace(
database=dataset_prefixes[database_index], schema=dataset_prefixes[schema_index]
)
SchemaDataSourceNamespace(schema=schema_name)
if database_name is None
else DbSchemaDataSourceNamespace(database=database_name, schema=schema_name)
)

# BigQuery must be able to override to get the location
return self.sql_dialect.build_columns_metadata_query_str(
table_namespace=table_namespace, table_name=dataset_name
)

def extract_schema_from_prefix(self, prefixes: list[str]) -> Optional[str]:
schema_index: int | None = self.sql_dialect.get_schema_prefix_index()
if schema_index is None:
return None
schema_name: str = prefixes[schema_index] if schema_index < len(prefixes) else None
return schema_name

def extract_database_from_prefix(self, prefixes: list[str]) -> Optional[str]:
database_index: int | None = self.sql_dialect.get_database_prefix_index()
if database_index is None:
return None
database_name: str = prefixes[database_index] if database_index < len(prefixes) else None
return database_name

def _build_table_namespace_for_schema_query(self, prefixes: list[str]) -> tuple[DataSourceNamespace, str]:
"""
Builds the table namespace for the schema query.
Returns the table namespace and the schema name.
"""
schema_name: Optional[str] = self.extract_schema_from_prefix(prefixes)
database_name: str | None = self.extract_database_from_prefix(prefixes)
database_index: int | None = self.sql_dialect.get_database_prefix_index()
schema_index: int | None = self.sql_dialect.get_schema_prefix_index()

schema_name: str = prefixes[schema_index] if schema_index is not None and schema_index < len(prefixes) else None
if schema_name is None:
raise ValueError(f"Cannot determine schema name from prefixes: {prefixes}")

database_name: str | None = (
prefixes[database_index] if database_index is not None and database_index < len(prefixes) else None
)

table_namespace: DataSourceNamespace = (
SchemaDataSourceNamespace(schema=prefixes[schema_index])
SchemaDataSourceNamespace(schema=schema_name)
if database_index is None
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable database_index is retrieved but never used in this method. After the refactoring, the method now uses database_name directly (extracted via extract_database_from_prefix) to determine whether to create a SchemaDataSourceNamespace or DbSchemaDataSourceNamespace. The check on line 208 should use database_name is None instead of database_index is None. Consider removing the unused variable and fixing the conditional check.

Copilot uses AI. Check for mistakes.
else DbSchemaDataSourceNamespace(
database=database_name,
schema=prefixes[schema_index],
schema=schema_name,
)
)
return table_namespace, schema_name
Expand Down Expand Up @@ -232,12 +240,9 @@ def verify_if_table_exists(self, prefixes: list[str], table_name: str) -> bool:

def _get_fully_qualified_table_names(self, prefixes: list[str], table_name: str) -> list[FullyQualifiedTableName]:
metadata_tables_query: MetadataTablesQuery = self.create_metadata_tables_query()
database_index = self.sql_dialect.get_database_prefix_index()
schema_index = self.sql_dialect.get_schema_prefix_index()
database_name = (
prefixes[database_index] if database_index is not None and database_index < len(prefixes) else None
)
schema_name = prefixes[schema_index] if schema_index is not None and schema_index < len(prefixes) else None
database_name = self.extract_database_from_prefix(prefixes)
schema_name = self.extract_schema_from_prefix(prefixes)

fully_qualified_table_names: list[FullyQualifiedTableName] = metadata_tables_query.execute(
database_name=database_name,
schema_name=schema_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def drop_test_schema_if_exists(self) -> None:
drop_table_sql = dialect.build_drop_table_sql(DROP_TABLE(table_identifier))
self.data_source_impl.execute_update(drop_table_sql)
# Drop the schema if it exists.
schema_name = self.dataset_prefix[self.data_source_impl.sql_dialect.get_schema_prefix_index()]
schema_name = self.extract_schema_from_prefix()
if self._does_schema_exist(schema_name):
self.data_source_impl.execute_update(f"DROP SCHEMA {dialect.quote_default(schema_name)};")

Expand Down
20 changes: 10 additions & 10 deletions soda-tests/src/helpers/data_source_test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,8 @@ def end_test_session_drop_schema(self) -> None:
self.drop_test_schema_if_exists()

def query_existing_test_tables(self) -> list[FullyQualifiedTableName]:
database: Optional[str] = None
if self.data_source_impl.sql_dialect.get_database_prefix_index() is not None:
database = self.dataset_prefix[self.data_source_impl.sql_dialect.get_database_prefix_index()]

schema: Optional[str] = None
if self.data_source_impl.sql_dialect.get_schema_prefix_index() is not None:
schema = self.dataset_prefix[self.data_source_impl.sql_dialect.get_schema_prefix_index()]
database: Optional[str] = self.extract_database_from_prefix()
schema: Optional[str] = self.extract_schema_from_prefix()

metadata_tables_query: MetadataTablesQuery = self.data_source_impl.create_metadata_tables_query()
fully_qualified_table_names: list[FullyQualifiedTableName] = metadata_tables_query.execute(
Expand Down Expand Up @@ -313,11 +308,16 @@ def create_test_schema_if_not_exists_sql(self) -> str:
def post_test_schema_create_sql(self) -> str:
return self.data_source_impl.sql_dialect.post_schema_create_sql(self.dataset_prefix)

def extract_database_from_prefix(self) -> Optional[str]:
return self.data_source_impl.extract_database_from_prefix(self.dataset_prefix)

def extract_schema_from_prefix(self) -> Optional[str]:
return self.data_source_impl.extract_schema_from_prefix(self.dataset_prefix)

def drop_test_schema_if_exists(self) -> None:
schema_index = self.data_source_impl.sql_dialect.get_schema_prefix_index()
if schema_index is None:
schema = self.extract_schema_from_prefix()
if not schema:
raise AssertionError("Data source does not support schemas")
schema = self.dataset_prefix[schema_index]

self.drop_schema_if_exists(schema)

Expand Down
Loading