diff --git a/soda-athena/src/soda_athena/test_helpers/athena_data_source_test_helper.py b/soda-athena/src/soda_athena/test_helpers/athena_data_source_test_helper.py index 6a99aba9c..8725b9bc2 100644 --- a/soda-athena/src/soda_athena/test_helpers/athena_data_source_test_helper.py +++ b/soda-athena/src/soda_athena/test_helpers/athena_data_source_test_helper.py @@ -47,7 +47,7 @@ def _create_data_source_yaml_str(self) -> str: """ def _get_3_schema_dir(self): - schema_name = self.dataset_prefix[self.data_source_impl.sql_dialect.get_schema_prefix_index()] + schema_name = self.extract_schema_from_prefix() return f"{self.s3_test_dir}/staging-dir/{ATHENA_CATALOG}/{schema_name}" def drop_test_schema_if_exists(self) -> str: diff --git a/soda-bigquery/src/soda_bigquery/common/data_sources/bigquery_data_source.py b/soda-bigquery/src/soda_bigquery/common/data_sources/bigquery_data_source.py index 1b9837783..caee0ec50 100644 --- a/soda-bigquery/src/soda_bigquery/common/data_sources/bigquery_data_source.py +++ b/soda-bigquery/src/soda_bigquery/common/data_sources/bigquery_data_source.py @@ -96,10 +96,7 @@ def _build_table_namespace_for_schema_query(self, prefixes: list[str]) -> tuple[ project_id=prefixes[0], dataset=None, # We only need the project id to query the schemas, as it's always in the `INFORMATION_SCHEMA` ) - - schema_index: int | None = self.sql_dialect.get_schema_prefix_index() - - schema_name: str = prefixes[schema_index] if schema_index is not None and schema_index < len(prefixes) else None + schema_name = self.extract_schema_from_prefix(prefixes) if schema_name is None: raise ValueError(f"Cannot determine schema name from prefixes: {prefixes}") diff --git a/soda-core/src/soda_core/common/data_source_impl.py b/soda-core/src/soda_core/common/data_source_impl.py index 249bb0ead..4dc762749 100644 --- a/soda-core/src/soda_core/common/data_source_impl.py +++ b/soda-core/src/soda_core/common/data_source_impl.py @@ -162,14 +162,13 @@ def get_columns_metadata(self, dataset_prefixes: list[str], dataset_name: str) - return self.sql_dialect.build_column_metadatas_from_query_result(query_result) def build_columns_metadata_query_str(self, dataset_prefixes: list[str], dataset_name: str) -> str: - database_index: int | None = self.sql_dialect.get_database_prefix_index() - schema_index: int | None = self.sql_dialect.get_schema_prefix_index() + schema_name: Optional[str] = self.extract_schema_from_prefix(dataset_prefixes) + database_name: Optional[str] = self.extract_database_from_prefix(dataset_prefixes) + table_namespace: DataSourceNamespace = ( - SchemaDataSourceNamespace(schema=dataset_prefixes[schema_index]) - if database_index is None - else DbSchemaDataSourceNamespace( - database=dataset_prefixes[database_index], schema=dataset_prefixes[schema_index] - ) + SchemaDataSourceNamespace(schema=schema_name) + if database_name is None + else DbSchemaDataSourceNamespace(database=database_name, schema=schema_name) ) # BigQuery must be able to override to get the location @@ -177,28 +176,36 @@ def build_columns_metadata_query_str(self, dataset_prefixes: list[str], dataset_ table_namespace=table_namespace, table_name=dataset_name ) + def extract_schema_from_prefix(self, prefixes: list[str]) -> Optional[str]: + schema_index: int | None = self.sql_dialect.get_schema_prefix_index() + if schema_index is None: + return None + schema_name: str = prefixes[schema_index] if schema_index < len(prefixes) else None + return schema_name + + def extract_database_from_prefix(self, prefixes: list[str]) -> Optional[str]: + database_index: int | None = self.sql_dialect.get_database_prefix_index() + if database_index is None: + return None + database_name: str = prefixes[database_index] if database_index < len(prefixes) else None + return database_name + def _build_table_namespace_for_schema_query(self, prefixes: list[str]) -> tuple[DataSourceNamespace, str]: """ Builds the table namespace for the schema query. Returns the table namespace and the schema name. """ - database_index: int | None = self.sql_dialect.get_database_prefix_index() - schema_index: int | None = self.sql_dialect.get_schema_prefix_index() - - schema_name: str = prefixes[schema_index] if schema_index is not None and schema_index < len(prefixes) else None + schema_name: Optional[str] = self.extract_schema_from_prefix(prefixes) + database_name: str | None = self.extract_database_from_prefix(prefixes) if schema_name is None: raise ValueError(f"Cannot determine schema name from prefixes: {prefixes}") - database_name: str | None = ( - prefixes[database_index] if database_index is not None and database_index < len(prefixes) else None - ) - table_namespace: DataSourceNamespace = ( - SchemaDataSourceNamespace(schema=prefixes[schema_index]) - if database_index is None + SchemaDataSourceNamespace(schema=schema_name) + if database_name is None else DbSchemaDataSourceNamespace( database=database_name, - schema=prefixes[schema_index], + schema=schema_name, ) ) return table_namespace, schema_name @@ -232,12 +239,9 @@ def verify_if_table_exists(self, prefixes: list[str], table_name: str) -> bool: def _get_fully_qualified_table_names(self, prefixes: list[str], table_name: str) -> list[FullyQualifiedTableName]: metadata_tables_query: MetadataTablesQuery = self.create_metadata_tables_query() - database_index = self.sql_dialect.get_database_prefix_index() - schema_index = self.sql_dialect.get_schema_prefix_index() - database_name = ( - prefixes[database_index] if database_index is not None and database_index < len(prefixes) else None - ) - schema_name = prefixes[schema_index] if schema_index is not None and schema_index < len(prefixes) else None + database_name = self.extract_database_from_prefix(prefixes) + schema_name = self.extract_schema_from_prefix(prefixes) + fully_qualified_table_names: list[FullyQualifiedTableName] = metadata_tables_query.execute( database_name=database_name, schema_name=schema_name, diff --git a/soda-sqlserver/src/soda_sqlserver/test_helpers/sqlserver_data_source_test_helper.py b/soda-sqlserver/src/soda_sqlserver/test_helpers/sqlserver_data_source_test_helper.py index 5dc4867f9..2b50db9dd 100644 --- a/soda-sqlserver/src/soda_sqlserver/test_helpers/sqlserver_data_source_test_helper.py +++ b/soda-sqlserver/src/soda_sqlserver/test_helpers/sqlserver_data_source_test_helper.py @@ -46,7 +46,7 @@ def drop_test_schema_if_exists(self) -> None: drop_table_sql = dialect.build_drop_table_sql(DROP_TABLE(table_identifier)) self.data_source_impl.execute_update(drop_table_sql) # Drop the schema if it exists. - schema_name = self.dataset_prefix[self.data_source_impl.sql_dialect.get_schema_prefix_index()] + schema_name = self.extract_schema_from_prefix() if self._does_schema_exist(schema_name): self.data_source_impl.execute_update(f"DROP SCHEMA {dialect.quote_default(schema_name)};") diff --git a/soda-tests/src/helpers/data_source_test_helper.py b/soda-tests/src/helpers/data_source_test_helper.py index 061897031..467b15bdb 100644 --- a/soda-tests/src/helpers/data_source_test_helper.py +++ b/soda-tests/src/helpers/data_source_test_helper.py @@ -277,13 +277,8 @@ def end_test_session_drop_schema(self) -> None: self.drop_test_schema_if_exists() def query_existing_test_tables(self) -> list[FullyQualifiedTableName]: - database: Optional[str] = None - if self.data_source_impl.sql_dialect.get_database_prefix_index() is not None: - database = self.dataset_prefix[self.data_source_impl.sql_dialect.get_database_prefix_index()] - - schema: Optional[str] = None - if self.data_source_impl.sql_dialect.get_schema_prefix_index() is not None: - schema = self.dataset_prefix[self.data_source_impl.sql_dialect.get_schema_prefix_index()] + database: Optional[str] = self.extract_database_from_prefix() + schema: Optional[str] = self.extract_schema_from_prefix() metadata_tables_query: MetadataTablesQuery = self.data_source_impl.create_metadata_tables_query() fully_qualified_table_names: list[FullyQualifiedTableName] = metadata_tables_query.execute( @@ -313,11 +308,16 @@ def create_test_schema_if_not_exists_sql(self) -> str: def post_test_schema_create_sql(self) -> str: return self.data_source_impl.sql_dialect.post_schema_create_sql(self.dataset_prefix) + def extract_database_from_prefix(self) -> Optional[str]: + return self.data_source_impl.extract_database_from_prefix(self.dataset_prefix) + + def extract_schema_from_prefix(self) -> Optional[str]: + return self.data_source_impl.extract_schema_from_prefix(self.dataset_prefix) + def drop_test_schema_if_exists(self) -> None: - schema_index = self.data_source_impl.sql_dialect.get_schema_prefix_index() - if schema_index is None: + schema = self.extract_schema_from_prefix() + if not schema: raise AssertionError("Data source does not support schemas") - schema = self.dataset_prefix[schema_index] self.drop_schema_if_exists(schema)