Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _create_data_source_yaml_str(self) -> str:
"""

def _get_3_schema_dir(self):
schema_name = self.dataset_prefix[self.data_source_impl.sql_dialect.get_schema_prefix_index()]
schema_name = self.extract_schema_from_prefix()
return f"{self.s3_test_dir}/staging-dir/{ATHENA_CATALOG}/{schema_name}"

def drop_test_schema_if_exists(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,7 @@ def _build_table_namespace_for_schema_query(self, prefixes: list[str]) -> tuple[
project_id=prefixes[0],
dataset=None, # We only need the project id to query the schemas, as it's always in the `INFORMATION_SCHEMA`
)

schema_index: int | None = self.sql_dialect.get_schema_prefix_index()

schema_name: str = prefixes[schema_index] if schema_index is not None and schema_index < len(prefixes) else None
schema_name = self.extract_schema_from_prefix(prefixes)
if schema_name is None:
raise ValueError(f"Cannot determine schema name from prefixes: {prefixes}")

Expand Down
51 changes: 29 additions & 22 deletions soda-core/src/soda_core/common/data_source_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,43 +162,53 @@
return self.sql_dialect.build_column_metadatas_from_query_result(query_result)

def build_columns_metadata_query_str(self, dataset_prefixes: list[str], dataset_name: str) -> str:
database_index: int | None = self.sql_dialect.get_database_prefix_index()
schema_index: int | None = self.sql_dialect.get_schema_prefix_index()
schema_name: str = self.extract_schema_from_prefix(dataset_prefixes)
database_name: str = self.extract_database_from_prefix(dataset_prefixes)

table_namespace: DataSourceNamespace = (
SchemaDataSourceNamespace(schema=dataset_prefixes[schema_index])
if database_index is None
else DbSchemaDataSourceNamespace(
database=dataset_prefixes[database_index], schema=dataset_prefixes[schema_index]
)
SchemaDataSourceNamespace(schema=schema_name)
if database_name is None
else DbSchemaDataSourceNamespace(database=database_name, schema=schema_name)
)

# BigQuery must be able to override to get the location
return self.sql_dialect.build_columns_metadata_query_str(
table_namespace=table_namespace, table_name=dataset_name
)

def extract_schema_from_prefix(self, prefixes: list[str]) -> str:
schema_index: int | None = self.sql_dialect.get_schema_prefix_index()
if schema_index is None:
return None

Check warning on line 182 in soda-core/src/soda_core/common/data_source_impl.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Return a value of type "str" instead of "NoneType" or update function "extract_schema_from_prefix" type hint.

See more on https://sonarcloud.io/project/issues?id=sodadata_soda-sql&issues=AZrMNQpRb3RSIyf1NCOi&open=AZrMNQpRb3RSIyf1NCOi&pullRequest=2504
schema_name: str = prefixes[schema_index] if schema_index is not None and schema_index < len(prefixes) else None

Check warning on line 183 in soda-core/src/soda_core/common/data_source_impl.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Fix this condition that always evaluates to true.

See more on https://sonarcloud.io/project/issues?id=sodadata_soda-sql&issues=AZrMNQpRb3RSIyf1NCOk&open=AZrMNQpRb3RSIyf1NCOk&pullRequest=2504
return schema_name

def extract_database_from_prefix(self, prefixes: list[str]) -> str:
database_index: int | None = self.sql_dialect.get_database_prefix_index()
if database_index is None:
return None

Check warning on line 189 in soda-core/src/soda_core/common/data_source_impl.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Return a value of type "str" instead of "NoneType" or update function "extract_database_from_prefix" type hint.

See more on https://sonarcloud.io/project/issues?id=sodadata_soda-sql&issues=AZrMNQpRb3RSIyf1NCOj&open=AZrMNQpRb3RSIyf1NCOj&pullRequest=2504
database_name: str = (
prefixes[database_index] if database_index is not None and database_index < len(prefixes) else None

Check warning on line 191 in soda-core/src/soda_core/common/data_source_impl.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Fix this condition that always evaluates to true.

See more on https://sonarcloud.io/project/issues?id=sodadata_soda-sql&issues=AZrMNWvw4nDx6yhhU1Qz&open=AZrMNWvw4nDx6yhhU1Qz&pullRequest=2504
)
return database_name

def _build_table_namespace_for_schema_query(self, prefixes: list[str]) -> tuple[DataSourceNamespace, str]:
"""
Builds the table namespace for the schema query.
Returns the table namespace and the schema name.
"""
schema_name: str = self.extract_schema_from_prefix(prefixes)
database_name: str | None = self.extract_database_from_prefix(prefixes)
database_index: int | None = self.sql_dialect.get_database_prefix_index()
schema_index: int | None = self.sql_dialect.get_schema_prefix_index()

schema_name: str = prefixes[schema_index] if schema_index is not None and schema_index < len(prefixes) else None
if schema_name is None:
raise ValueError(f"Cannot determine schema name from prefixes: {prefixes}")

database_name: str | None = (
prefixes[database_index] if database_index is not None and database_index < len(prefixes) else None
)

table_namespace: DataSourceNamespace = (
SchemaDataSourceNamespace(schema=prefixes[schema_index])
SchemaDataSourceNamespace(schema=schema_name)
if database_index is None
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable database_index is retrieved but never used in this method. After the refactoring, the method now uses database_name directly (extracted via extract_database_from_prefix) to determine whether to create a SchemaDataSourceNamespace or DbSchemaDataSourceNamespace. The check on line 208 should use database_name is None instead of database_index is None. Consider removing the unused variable and fixing the conditional check.

Copilot uses AI. Check for mistakes.
else DbSchemaDataSourceNamespace(
database=database_name,
schema=prefixes[schema_index],
schema=schema_name,
)
)
return table_namespace, schema_name
Expand Down Expand Up @@ -232,12 +242,9 @@

def _get_fully_qualified_table_names(self, prefixes: list[str], table_name: str) -> list[FullyQualifiedTableName]:
metadata_tables_query: MetadataTablesQuery = self.create_metadata_tables_query()
database_index = self.sql_dialect.get_database_prefix_index()
schema_index = self.sql_dialect.get_schema_prefix_index()
database_name = (
prefixes[database_index] if database_index is not None and database_index < len(prefixes) else None
)
schema_name = prefixes[schema_index] if schema_index is not None and schema_index < len(prefixes) else None
database_name = self.extract_database_from_prefix(prefixes)
schema_name = self.extract_schema_from_prefix(prefixes)

fully_qualified_table_names: list[FullyQualifiedTableName] = metadata_tables_query.execute(
database_name=database_name,
schema_name=schema_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def drop_test_schema_if_exists(self) -> None:
drop_table_sql = dialect.build_drop_table_sql(DROP_TABLE(table_identifier))
self.data_source_impl.execute_update(drop_table_sql)
# Drop the schema if it exists.
schema_name = self.dataset_prefix[self.data_source_impl.sql_dialect.get_schema_prefix_index()]
schema_name = self.extract_schema_from_prefix()
if self._does_schema_exist(schema_name):
self.data_source_impl.execute_update(f"DROP SCHEMA {dialect.quote_default(schema_name)};")

Expand Down
20 changes: 10 additions & 10 deletions soda-tests/src/helpers/data_source_test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,8 @@ def end_test_session_drop_schema(self) -> None:
self.drop_test_schema_if_exists()

def query_existing_test_tables(self) -> list[FullyQualifiedTableName]:
database: Optional[str] = None
if self.data_source_impl.sql_dialect.get_database_prefix_index() is not None:
database = self.dataset_prefix[self.data_source_impl.sql_dialect.get_database_prefix_index()]

schema: Optional[str] = None
if self.data_source_impl.sql_dialect.get_schema_prefix_index() is not None:
schema = self.dataset_prefix[self.data_source_impl.sql_dialect.get_schema_prefix_index()]
database: Optional[str] = self.extract_database_from_prefix()
schema: Optional[str] = self.extract_schema_from_prefix()

metadata_tables_query: MetadataTablesQuery = self.data_source_impl.create_metadata_tables_query()
fully_qualified_table_names: list[FullyQualifiedTableName] = metadata_tables_query.execute(
Expand Down Expand Up @@ -313,11 +308,16 @@ def create_test_schema_if_not_exists_sql(self) -> str:
def post_test_schema_create_sql(self) -> str:
return self.data_source_impl.sql_dialect.post_schema_create_sql(self.dataset_prefix)

def extract_database_from_prefix(self) -> str:
return self.data_source_impl.extract_database_from_prefix(self.dataset_prefix)

def extract_schema_from_prefix(self) -> str:
return self.data_source_impl.extract_schema_from_prefix(self.dataset_prefix)

def drop_test_schema_if_exists(self) -> None:
schema_index = self.data_source_impl.sql_dialect.get_schema_prefix_index()
if schema_index is None:
schema = self.extract_schema_from_prefix()
if not schema:
raise AssertionError("Data source does not support schemas")
schema = self.dataset_prefix[schema_index]

self.drop_schema_if_exists(schema)

Expand Down