From bd1e649dcf51ef8a14cdf56fba24a08218543ed8 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Fri, 6 Jun 2025 08:43:10 +0200 Subject: [PATCH 01/26] failed rows wip --- .../impl/contract_verification_impl.py | 20 +++++++++++++++++++ .../model/data_source/data_source.py | 3 ++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index 59c5321fd..d97679f48 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -12,6 +12,7 @@ from soda_core.common.data_source_results import QueryResult from soda_core.common.dataset_identifier import DatasetIdentifier from soda_core.common.exceptions import InvalidRegexException, SodaCoreException +from soda_core.common.extensions import Extensions from soda_core.common.logging_constants import Emoticons, ExtraKeys, soda_logger from soda_core.common.logs import Location, Logs from soda_core.common.soda_cloud import SodaCloud @@ -47,6 +48,19 @@ logger: logging.Logger = soda_logger +class ContractVerificationResultHandler: + + @classmethod + def instance(cls, identifier: Optional[str] = None) -> ContractVerificationResultHandler: + # TODO: replace with plugin extension mechanism + return Extensions.find_class_method( + "soda.failed_rows.failed_rows", "FailedRows", "create" + )() + + def handle(self, contract_impl: ContractImpl, contract_verification_result: ContractVerificationResult): + pass + + class ContractVerificationSessionImpl: @classmethod def execute( @@ -466,6 +480,12 @@ def verify(self) -> ContractVerificationResult: else: logger.debug(f"Not sending results to Soda Cloud {Emoticons.CROSS_MARK}") + contract_verification_result_handler: Optional[ContractVerificationResultHandler] = ( + ContractVerificationResultHandler.instance() + ) + if contract_verification_result_handler: + contract_verification_result_handler.handle(self, contract_verification_result) + return contract_verification_result def build_log_summary(self, contract_verification_result: ContractVerificationResult) -> str: diff --git a/soda-core/src/soda_core/model/data_source/data_source.py b/soda-core/src/soda_core/model/data_source/data_source.py index b447910e2..626c251ff 100644 --- a/soda-core/src/soda_core/model/data_source/data_source.py +++ b/soda-core/src/soda_core/model/data_source/data_source.py @@ -1,5 +1,5 @@ import abc -from typing import Literal +from typing import Literal, Optional from pydantic import BaseModel, Field from soda_core.model.data_source.data_source_connection_properties import ( @@ -20,6 +20,7 @@ class DataSourceBase( connection_properties: DataSourceConnectionProperties = Field( ..., alias="connection", description="Data source connection details" ) + failed_rows_table_name_template: Optional[str] = Field(default=None, description="Data source name") @classmethod def get_class_type(cls) -> str: From f37e783bf574f6380430f214a72c75e858c3aed8 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Fri, 6 Jun 2025 17:07:43 +0200 Subject: [PATCH 02/26] Failed rows wip --- soda-core/src/soda_core/common/soda_cloud.py | 9 ++--- soda-core/src/soda_core/common/sql_dialect.py | 4 ++ .../impl/check_types/invalidity_check.py | 6 ++- .../impl/contract_verification_impl.py | 40 ++++++++++++++----- .../soda_core/contracts/impl/contract_yaml.py | 4 ++ .../model/data_source/data_source.py | 1 - .../data_sources/postgres_data_source.py | 4 ++ .../model/data_source/postgres_data_source.py | 3 +- .../src/helpers/data_source_test_helper.py | 3 +- .../databricks_data_source_test_helper.py | 3 -- .../snowflake_data_source_test_helper.py | 5 --- 11 files changed, 55 insertions(+), 27 deletions(-) diff --git a/soda-core/src/soda_core/common/soda_cloud.py b/soda-core/src/soda_core/common/soda_cloud.py index 027695ad9..890ee1cec 100644 --- a/soda-core/src/soda_core/common/soda_cloud.py +++ b/soda-core/src/soda_core/common/soda_cloud.py @@ -227,9 +227,9 @@ def upload_contract_file(self, contract: Contract) -> Optional[str]: soda_cloud_file_path: str = f"{contract.soda_qualified_dataset_name.lower()}.yml" return self._upload_scan_yaml_file(yaml_str=contract_yaml_source_str, soda_cloud_file_path=soda_cloud_file_path) - def send_contract_result(self, contract_verification_result: ContractVerificationResult) -> bool: + def send_contract_result(self, contract_verification_result: ContractVerificationResult) -> Optional[str]: """ - Returns True if a 200 OK was received, False otherwise + Returns A scanId string if a 200 OK was received, None otherwise """ contract_verification_result = _build_contract_result_json( contract_verification_result=contract_verification_result @@ -243,11 +243,10 @@ def send_contract_result(self, contract_verification_result: ContractVerificatio response_json = response.json() if isinstance(response_json, dict): cloud_url: Optional[str] = response_json.get("cloudUrl") + scan_id: Optional[str] = response_json.get("scanId") if isinstance(cloud_url, str): logger.info(f"To view the dataset on Soda Cloud, see {cloud_url}") - return True - else: - return False + return scan_id def send_contract_skeleton(self, contract_yaml_str: str, soda_cloud_file_path: str) -> None: file_id: Optional[str] = self._upload_scan_yaml_file( diff --git a/soda-core/src/soda_core/common/sql_dialect.py b/soda-core/src/soda_core/common/sql_dialect.py index d29266bb2..42e0ad8f9 100644 --- a/soda-core/src/soda_core/common/sql_dialect.py +++ b/soda-core/src/soda_core/common/sql_dialect.py @@ -85,6 +85,10 @@ def escape_string(self, value: str): def escape_regex(self, value: str): return value + def create_schema_if_not_exists_sql(self, schema_name: str) -> str: + quoted_schema_name: str = self.quote_default(schema_name) + return f"CREATE SCHEMA IF NOT EXISTS {quoted_schema_name};" + def build_select_sql(self, select_elements: list) -> str: statement_lines: list[str] = [] statement_lines.extend(self._build_cte_sql_lines(select_elements)) diff --git a/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py b/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py index 3551c049d..08a1dff75 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py @@ -163,15 +163,17 @@ def __init__( ) def sql_expression(self) -> SqlExpression: + return SUM(CASE_WHEN(self.sql_condition_expression(), LITERAL(1))) + + def sql_condition_expression(self) -> SqlExpression: column_name: str = self.column_impl.column_yaml.name - invalid_count_condition: SqlExpression = AND.optional( + return AND.optional( [ SqlExpressionStr.optional(self.check_filter), NOT.optional(self.missing_and_validity.is_missing_expr(column_name)), self.missing_and_validity.is_invalid_expr(column_name), ] ) - return SUM(CASE_WHEN(invalid_count_condition, LITERAL(1))) def convert_db_value(self, value) -> int: # Note: expression SUM(CASE WHEN "id" IS NULL THEN 1 ELSE 0 END) gives NULL / None as a result if diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index d97679f48..59976c2d8 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -48,16 +48,23 @@ logger: logging.Logger = soda_logger -class ContractVerificationResultHandler: +class ContractVerificationHandler: @classmethod - def instance(cls, identifier: Optional[str] = None) -> ContractVerificationResultHandler: + def instance(cls, identifier: Optional[str] = None) -> ContractVerificationHandler: # TODO: replace with plugin extension mechanism return Extensions.find_class_method( "soda.failed_rows.failed_rows", "FailedRows", "create" )() - def handle(self, contract_impl: ContractImpl, contract_verification_result: ContractVerificationResult): + def handle( + self, + contract_impl: ContractImpl, + data_source_impl: DataSourceImpl, + contract_verification_result: ContractVerificationResult, + soda_cloud: SodaCloud, + scan_id: str + ): pass @@ -468,23 +475,30 @@ def verify(self) -> ContractVerificationResult: contract_verification_result.log_records = self.logs.pop_log_records() + scan_id: Optional[str] = None if self.soda_cloud and self.publish_results: file_id: Optional[str] = self.soda_cloud.upload_contract_file(contract_verification_result.contract) if file_id: # Side effect to pass file id to console logging later on. TODO reconsider this contract.source.soda_cloud_file_id = file_id # send_contract_result will use contract.source.soda_cloud_file_id - response_ok: bool = self.soda_cloud.send_contract_result(contract_verification_result) - if not response_ok: + scan_id = self.soda_cloud.send_contract_result(contract_verification_result) + if not scan_id: contract_verification_result.sending_results_to_soda_cloud_failed = True else: logger.debug(f"Not sending results to Soda Cloud {Emoticons.CROSS_MARK}") - contract_verification_result_handler: Optional[ContractVerificationResultHandler] = ( - ContractVerificationResultHandler.instance() + contract_verification_handler: Optional[ContractVerificationHandler] = ( + ContractVerificationHandler.instance() ) - if contract_verification_result_handler: - contract_verification_result_handler.handle(self, contract_verification_result) + if contract_verification_handler: + contract_verification_handler.handle( + contract_impl=self, + data_source_impl=self.data_source_impl, + contract_verification_result=contract_verification_result, + soda_cloud=self.soda_cloud, + scan_id=scan_id, + ) return contract_verification_result @@ -1131,6 +1145,10 @@ def __eq__(self, other): return False return self.id == other.id + @abstractmethod + def sql_condition_expression(self) -> Optional[SqlExpression]: + pass + class AggregationMetricImpl(MetricImpl): def __init__( @@ -1153,6 +1171,10 @@ def __init__( def sql_expression(self) -> SqlExpression: pass + @abstractmethod + def sql_condition_expression(self) -> SqlExpression: + pass + def convert_db_value(self, value: any) -> any: return value diff --git a/soda-core/src/soda_core/contracts/impl/contract_yaml.py b/soda-core/src/soda_core/contracts/impl/contract_yaml.py index 95af4b9af..fcbaf7475 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_yaml.py +++ b/soda-core/src/soda_core/contracts/impl/contract_yaml.py @@ -550,6 +550,10 @@ def __init__(self, type_name: str, check_yaml_object: YamlObject): qualifier = check_yaml_object.read_value("qualifier") if check_yaml_object else None self.qualifier: Optional[str] = str(qualifier) if qualifier is not None else None self.filter: Optional[str] = check_yaml_object.read_string_opt("filter") if check_yaml_object else None + self.store_failed_rows: Optional[bool] = ( + check_yaml_object.read_bool_opt("store_failed_rows", default_value=False) + if check_yaml_object else None + ) if self.filter: self.filter = self.filter.strip() diff --git a/soda-core/src/soda_core/model/data_source/data_source.py b/soda-core/src/soda_core/model/data_source/data_source.py index 626c251ff..77522becb 100644 --- a/soda-core/src/soda_core/model/data_source/data_source.py +++ b/soda-core/src/soda_core/model/data_source/data_source.py @@ -20,7 +20,6 @@ class DataSourceBase( connection_properties: DataSourceConnectionProperties = Field( ..., alias="connection", description="Data source connection details" ) - failed_rows_table_name_template: Optional[str] = Field(default=None, description="Data source name") @classmethod def get_class_type(cls) -> str: diff --git a/soda-postgres/src/soda_postgres/common/data_sources/postgres_data_source.py b/soda-postgres/src/soda_postgres/common/data_sources/postgres_data_source.py index ff6f7c107..4d3897360 100644 --- a/soda-postgres/src/soda_postgres/common/data_sources/postgres_data_source.py +++ b/soda-postgres/src/soda_postgres/common/data_sources/postgres_data_source.py @@ -30,3 +30,7 @@ def __init__(self): def _build_regex_like_sql(self, matches: REGEX_LIKE) -> str: expression: str = self.build_expression_sql(matches.expression) return f"{expression} ~ '{matches.regex_pattern}'" + + def create_schema_if_not_exists_sql(self, schema_name: str) -> str: + quoted_schema_name: str = self.quote_default(schema_name) + return f"CREATE SCHEMA IF NOT EXISTS {quoted_schema_name} AUTHORIZATION CURRENT_USER;" diff --git a/soda-postgres/src/soda_postgres/model/data_source/postgres_data_source.py b/soda-postgres/src/soda_postgres/model/data_source/postgres_data_source.py index 33a0f9cfd..2e8c9efdb 100644 --- a/soda-postgres/src/soda_postgres/model/data_source/postgres_data_source.py +++ b/soda-postgres/src/soda_postgres/model/data_source/postgres_data_source.py @@ -1,5 +1,5 @@ import abc -from typing import Literal +from typing import Literal, Optional from pydantic import Field, field_validator from soda_core.model.data_source.data_source import DataSourceBase @@ -15,6 +15,7 @@ class PostgresDataSource(DataSourceBase, abc.ABC): connection_properties: PostgresConnectionProperties = Field( ..., alias="connection", description="Data source connection details" ) + dwh_schema: Optional[str] = Field(default=None, description="Diagnostic warehouse schema") @field_validator("connection_properties", mode="before") def infer_connection_type(cls, value): diff --git a/soda-tests/src/helpers/data_source_test_helper.py b/soda-tests/src/helpers/data_source_test_helper.py index bef5b2720..350f9769c 100644 --- a/soda-tests/src/helpers/data_source_test_helper.py +++ b/soda-tests/src/helpers/data_source_test_helper.py @@ -281,7 +281,8 @@ def create_test_schema_if_not_exists(self) -> None: self.data_source_impl.execute_update(sql) def create_test_schema_if_not_exists_sql(self) -> str: - return f"CREATE SCHEMA IF NOT EXISTS {self.dataset_prefix[1]} AUTHORIZATION CURRENT_USER;" + schema_name: str = self.dataset_prefix[1] + return self.data_source_impl.sql_dialect.create_schema_if_not_exists_sql(schema_name) def drop_test_schema_if_exists(self) -> None: sql: str = self.drop_test_schema_if_exists_sql() diff --git a/soda-tests/src/helpers/databricks_data_source_test_helper.py b/soda-tests/src/helpers/databricks_data_source_test_helper.py index d39736cda..991d97f53 100644 --- a/soda-tests/src/helpers/databricks_data_source_test_helper.py +++ b/soda-tests/src/helpers/databricks_data_source_test_helper.py @@ -34,9 +34,6 @@ def _create_data_source_yaml_str(self) -> str: catalog: {os.getenv("DATABRICKS_CATALOG", "unity_catalog")} """ - def create_test_schema_if_not_exists_sql(self) -> str: - return f"CREATE SCHEMA IF NOT EXISTS {self.dataset_prefix[1]};" - def _get_contract_data_type_dict(self) -> dict[str, str]: return { TestDataType.TEXT: "varchar", diff --git a/soda-tests/src/helpers/snowflake_data_source_test_helper.py b/soda-tests/src/helpers/snowflake_data_source_test_helper.py index 636a2991e..e1366e980 100644 --- a/soda-tests/src/helpers/snowflake_data_source_test_helper.py +++ b/soda-tests/src/helpers/snowflake_data_source_test_helper.py @@ -34,11 +34,6 @@ def _create_data_source_yaml_str(self) -> str: database: {self.dataset_prefix[0]} """ - def create_test_schema_if_not_exists_sql(self) -> str: - sql_dialect: "SqlDialect" = self.data_source_impl.sql_dialect - schema_name: str = self.dataset_prefix[1] - return f"CREATE SCHEMA IF NOT EXISTS {sql_dialect.quote_default(schema_name)};" - def _adjust_schema_name(self, schema_name: str) -> str: return schema_name.upper() From b097f278fc2e7f10edd850978c400e60eed43268 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Mon, 9 Jun 2025 13:20:37 +0200 Subject: [PATCH 03/26] failed rows wip --- soda-core/src/soda_core/common/soda_cloud.py | 25 +++++++++++++++ .../impl/check_types/missing_check.py | 15 ++++++--- .../impl/contract_verification_impl.py | 31 ++++++++++++------- .../soda_core/contracts/impl/contract_yaml.py | 3 +- .../model/data_source/data_source.py | 2 +- 5 files changed, 56 insertions(+), 20 deletions(-) diff --git a/soda-core/src/soda_core/common/soda_cloud.py b/soda-core/src/soda_core/common/soda_cloud.py index 890ee1cec..939c659fb 100644 --- a/soda-core/src/soda_core/common/soda_cloud.py +++ b/soda-core/src/soda_core/common/soda_cloud.py @@ -868,6 +868,9 @@ def _get_token(self) -> str: assert self.token, "No token in login response?!" return self.token + def send_failed_rows_diagnostics(self, scan_id: str, failed_rows_diagnostics: list[FailedRowsDiagnostic]): + print(f"TODO sending failed rows diagnostics for scan {scan_id} to Soda Cloud: {failed_rows_diagnostics}") + def to_jsonnable(o) -> object: if o is None or isinstance(o, str) or isinstance(o, int) or isinstance(o, float) or isinstance(o, bool): @@ -1168,3 +1171,25 @@ def _append_exception_to_cloud_log_dicts(cloud_log_dicts: list[dict], exception: exc_cloud_log_dict["index"] = len(cloud_log_dicts) cloud_log_dicts.append(exc_cloud_log_dict) return cloud_log_dicts + + +class FailedRowsDiagnostic: + def __init__(self, check_identity: str, name: str, query: str): + self.check_identity: str = check_identity + self.name: str = name + self.query: str = query + + +class QuerySourceFailedRowsDiagnostic(FailedRowsDiagnostic): + def __init__(self, check_identity: str, name: str, query: str): + super().__init__(check_identity, name, query) + + +class StoreKeysFailedRowsDiagnostic(FailedRowsDiagnostic): + def __init__(self, check_identity: str, name: str, query: str): + super().__init__(check_identity, name, query) + + +class StoreDataFailedRowsDiagnostic(FailedRowsDiagnostic): + def __init__(self, check_identity: str, name: str, query: str): + super().__init__(check_identity, name, query) diff --git a/soda-core/src/soda_core/contracts/impl/check_types/missing_check.py b/soda-core/src/soda_core/contracts/impl/check_types/missing_check.py index c7e094c17..acc901c11 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/missing_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/missing_check.py @@ -58,7 +58,7 @@ def __init__( ) self.metric_name = "missing_percent" if check_yaml.metric == "percent" else "missing_count" - self.missing_count_metric = self._resolve_metric( + self.missing_count_metric_impl = self._resolve_metric( MissingCountMetricImpl(contract_impl=contract_impl, column_impl=column_impl, check_impl=self) ) @@ -69,7 +69,7 @@ def __init__( self.missing_percent_metric_impl: MetricImpl = self.contract_impl.metrics_resolver.resolve_metric( DerivedPercentageMetricImpl( metric_type="missing_percent", - fraction_metric_impl=self.missing_count_metric, + fraction_metric_impl=self.missing_count_metric_impl, total_metric_impl=self.row_count_metric_impl, ) ) @@ -79,7 +79,7 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> diagnostic_metric_values: dict[str, float] = {} - missing_count: int = measurement_values.get_value(self.missing_count_metric) + missing_count: int = measurement_values.get_value(self.missing_count_metric_impl) diagnostic_metric_values["missing_count"] = missing_count row_count: int = measurement_values.get_value(self.row_count_metric_impl) @@ -104,6 +104,9 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> diagnostic_metric_values=diagnostic_metric_values, ) + def get_threshold_metric_impl(self) -> Optional[MetricImpl]: + return self.missing_count_metric_impl + class MissingCountMetricImpl(AggregationMetricImpl): def __init__( @@ -121,14 +124,16 @@ def __init__( ) def sql_expression(self) -> SqlExpression: + return SUM(CASE_WHEN(self.sql_condition_expression(), LITERAL(1))) + + def sql_condition_expression(self) -> SqlExpression: column_name: str = self.column_impl.column_yaml.name not_missing_and_invalid_expr = self.missing_and_validity.is_missing_expr(column_name) - missing_count_condition: SqlExpression = ( + return ( not_missing_and_invalid_expr if not self.check_filter else AND([SqlExpressionStr(self.check_filter), not_missing_and_invalid_expr]) ) - return SUM(CASE_WHEN(missing_count_condition, LITERAL(1), LITERAL(0))) def convert_db_value(self, value) -> int: # Note: expression SUM(CASE WHEN "id" IS NULL THEN 1 ELSE 0 END) gives NULL / None as a result if diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index 59976c2d8..4b964affe 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -49,21 +49,22 @@ class ContractVerificationHandler: - @classmethod def instance(cls, identifier: Optional[str] = None) -> ContractVerificationHandler: # TODO: replace with plugin extension mechanism return Extensions.find_class_method( - "soda.failed_rows.failed_rows", "FailedRows", "create" + module_name="soda.failed_rows_extractor.failed_rows_extractor", + class_name="FailedRowsExtractor", + method_name="create", )() def handle( - self, - contract_impl: ContractImpl, - data_source_impl: DataSourceImpl, - contract_verification_result: ContractVerificationResult, - soda_cloud: SodaCloud, - scan_id: str + self, + contract_impl: ContractImpl, + data_source_impl: DataSourceImpl, + contract_verification_result: ContractVerificationResult, + soda_cloud: SodaCloud, + scan_id: str, ): pass @@ -488,9 +489,7 @@ def verify(self) -> ContractVerificationResult: else: logger.debug(f"Not sending results to Soda Cloud {Emoticons.CROSS_MARK}") - contract_verification_handler: Optional[ContractVerificationHandler] = ( - ContractVerificationHandler.instance() - ) + contract_verification_handler: Optional[ContractVerificationHandler] = ContractVerificationHandler.instance() if contract_verification_handler: contract_verification_handler.handle( contract_impl=self, @@ -1077,6 +1076,12 @@ def _build_definition(self) -> str: def _build_threshold(self) -> Optional[Threshold]: return self.threshold.to_threshold_info() if self.threshold else None + def get_threshold_metric_impl(self) -> Optional[MetricImpl]: + """ + Used in extensions + """ + return None + class MissingAndValidityCheckImpl(CheckImpl): def __init__( @@ -1173,7 +1178,9 @@ def sql_expression(self) -> SqlExpression: @abstractmethod def sql_condition_expression(self) -> SqlExpression: - pass + """ + Used in extensions + """ def convert_db_value(self, value: any) -> any: return value diff --git a/soda-core/src/soda_core/contracts/impl/contract_yaml.py b/soda-core/src/soda_core/contracts/impl/contract_yaml.py index fcbaf7475..4052f7a4a 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_yaml.py +++ b/soda-core/src/soda_core/contracts/impl/contract_yaml.py @@ -551,8 +551,7 @@ def __init__(self, type_name: str, check_yaml_object: YamlObject): self.qualifier: Optional[str] = str(qualifier) if qualifier is not None else None self.filter: Optional[str] = check_yaml_object.read_string_opt("filter") if check_yaml_object else None self.store_failed_rows: Optional[bool] = ( - check_yaml_object.read_bool_opt("store_failed_rows", default_value=False) - if check_yaml_object else None + check_yaml_object.read_bool_opt("store_failed_rows", default_value=False) if check_yaml_object else None ) if self.filter: self.filter = self.filter.strip() diff --git a/soda-core/src/soda_core/model/data_source/data_source.py b/soda-core/src/soda_core/model/data_source/data_source.py index 77522becb..b447910e2 100644 --- a/soda-core/src/soda_core/model/data_source/data_source.py +++ b/soda-core/src/soda_core/model/data_source/data_source.py @@ -1,5 +1,5 @@ import abc -from typing import Literal, Optional +from typing import Literal from pydantic import BaseModel, Field from soda_core.model.data_source.data_source_connection_properties import ( From a286dbc9a264a2a936a8f9d9d57da9b9ddc4dbd9 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Wed, 11 Jun 2025 10:29:21 +0200 Subject: [PATCH 04/26] Fixed extension mechanism --- dev-requirements.txt | 11 +---------- soda-core/src/soda_core/common/extensions.py | 7 +++---- .../contracts/impl/contract_verification_impl.py | 8 +++++--- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 33de8e444..51988662f 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.10 +# This file is autogenerated by pip-compile with Python 3.11 # by the following command: # # pip-compile dev-requirements.in @@ -18,8 +18,6 @@ distlib==0.3.8 # via virtualenv docopt==0.6.2 # via tbump -exceptiongroup==1.3.0 - # via pytest filelock==3.16.1 # via virtualenv freezegun==1.5.1 @@ -62,15 +60,8 @@ tabulate==0.8.10 # via cli-ui tbump==6.11.0 # via -r dev-requirements.in -tomli==2.2.1 - # via - # build - # pip-tools - # pytest tomlkit==0.11.8 # via tbump -typing-extensions==4.14.0 - # via exceptiongroup unidecode==1.3.8 # via cli-ui virtualenv==20.26.6 diff --git a/soda-core/src/soda_core/common/extensions.py b/soda-core/src/soda_core/common/extensions.py index a8e495b83..3e2e09bac 100644 --- a/soda-core/src/soda_core/common/extensions.py +++ b/soda-core/src/soda_core/common/extensions.py @@ -11,7 +11,6 @@ def find_class_method(cls, module_name: str, class_name: str, method_name: str) module = import_module(module_name) class_ = getattr(module, class_name) return getattr(class_, method_name) - except AttributeError as e: - raise ExtensionException( - message=f"Feature '{class_name}.{method_name}' requires the Soda Extensions to be installed." - ) + except (AttributeError, ModuleNotFoundError) as e: + # Extension not installed + return None diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index 4b964affe..3a5c5f91f 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -5,6 +5,7 @@ from datetime import timezone from enum import Enum from io import StringIO +from typing import Callable from ruamel.yaml import YAML from soda_core.common.consistent_hash_builder import ConsistentHashBuilder @@ -50,13 +51,14 @@ class ContractVerificationHandler: @classmethod - def instance(cls, identifier: Optional[str] = None) -> ContractVerificationHandler: + def instance(cls, identifier: Optional[str] = None) -> Optional[ContractVerificationHandler]: # TODO: replace with plugin extension mechanism - return Extensions.find_class_method( + create_method: Callable[..., Optional[ContractVerificationHandler]] = Extensions.find_class_method( module_name="soda.failed_rows_extractor.failed_rows_extractor", class_name="FailedRowsExtractor", method_name="create", - )() + ) + return create_method() if create_method else None def handle( self, From 8b47953bccf9f37c3f5c2de2c503a76828553b3b Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Thu, 12 Jun 2025 10:22:20 +0200 Subject: [PATCH 05/26] Failed rows wip --- soda-core/src/soda_core/common/extensions.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/soda-core/src/soda_core/common/extensions.py b/soda-core/src/soda_core/common/extensions.py index 3e2e09bac..2947a60e8 100644 --- a/soda-core/src/soda_core/common/extensions.py +++ b/soda-core/src/soda_core/common/extensions.py @@ -1,8 +1,6 @@ from importlib import import_module from typing import Callable, Optional -from soda_core.common.exceptions import ExtensionException - class Extensions: @classmethod From 1f5ca20418ffd153a7486ad4396dfffd932e28a6 Mon Sep 17 00:00:00 2001 From: Niels Nuyttens Date: Fri, 6 Jun 2025 14:58:49 +0200 Subject: [PATCH 06/26] Implement proper status handling for both local and remote contract verification (#2302) --- soda-core/src/soda_core/common/soda_cloud.py | 95 ++++++++++++------- .../contracts/contract_verification.py | 17 +++- .../impl/contract_verification_impl.py | 25 ++++- 3 files changed, 95 insertions(+), 42 deletions(-) diff --git a/soda-core/src/soda_core/common/soda_cloud.py b/soda-core/src/soda_core/common/soda_cloud.py index 939c659fb..457d373ef 100644 --- a/soda-core/src/soda_core/common/soda_cloud.py +++ b/soda-core/src/soda_core/common/soda_cloud.py @@ -49,6 +49,7 @@ CheckResult, Contract, ContractVerificationResult, + ContractVerificationStatus, Threshold, YamlFileContentInfo, ) @@ -468,6 +469,7 @@ def verify_contract_on_agent( check_results=[], sending_results_to_soda_cloud_failed=False, log_records=None, + status=ContractVerificationStatus.UNKNOWN, ) can_publish_and_verify, reason = self.can_publish_and_verify_contract( @@ -516,10 +518,12 @@ def verify_contract_on_agent( logger.warning("Did not receive a Scan ID from Soda Cloud") return verification_result - scan_is_finished, contract_dataset_cloud_url = self._poll_remote_scan_finished( + scan_is_finished, contract_dataset_cloud_url, scan_status = self._poll_remote_scan_finished( scan_id=scan_id, blocking_timeout_in_minutes=blocking_timeout_in_minutes ) + verification_result.status = _map_remote_scan_status_to_contract_verification_status(scan_status) + logger.debug(f"Asking Soda Cloud the logs of scan {scan_id}") logs_response: Response = self._get_scan_logs(scan_id=scan_id) logger.debug(f"Soda Cloud responded with {json.dumps(dict(logs_response.headers))}\n{logs_response.text}") @@ -676,7 +680,9 @@ def fetch_data_source_configuration_for_dataset(self, dataset_identifier: str) - return response_dict.get("contents") - def _poll_remote_scan_finished(self, scan_id: str, blocking_timeout_in_minutes: int) -> tuple[bool, Optional[str]]: + def _poll_remote_scan_finished( + self, scan_id: str, blocking_timeout_in_minutes: int + ) -> tuple[bool, Optional[str], Optional[RemoteScanStatus]]: """ Returns a tuple of 2 values: * A boolean indicating if the scan finished (true means scan finished. false means there was a timeout or retry exceeded) @@ -693,42 +699,46 @@ def _poll_remote_scan_finished(self, scan_id: str, blocking_timeout_in_minutes: ) response = self._get_scan_status(scan_id) logger.debug(f"Soda Cloud responded with {json.dumps(dict(response.headers))}\n{response.text}") - if response: - response_body_dict: Optional[dict] = response.json() if response else None - scan_state: str = response_body_dict.get("state") if response_body_dict else None - contract_dataset_cloud_url: Optional[str] = ( - response_body_dict.get("contractDatasetCloudUrl") if response_body_dict else None - ) - - logger.info(f"Scan {scan_id} has state '{scan_state}'") - - if RemoteScanStatus.from_value(scan_state).is_final_state: - return True, contract_dataset_cloud_url - - time_to_wait_in_seconds: float = 5 - next_poll_time_str = response.headers.get("X-Soda-Next-Poll-Time") - if next_poll_time_str: - logger.debug( - f"Soda Cloud suggested to ask scan {scan_id} status again at '{next_poll_time_str}' " - f"via header X-Soda-Next-Poll-Time" - ) - next_poll_time: datetime = convert_str_to_datetime(next_poll_time_str) - if isinstance(next_poll_time, datetime): - now = datetime.now(timezone.utc) - time_to_wait = next_poll_time - now - time_to_wait_in_seconds = time_to_wait.total_seconds() - else: - time_to_wait_in_seconds = 60 - if time_to_wait_in_seconds > 0: - logger.debug( - f"Sleeping {time_to_wait_in_seconds} seconds before asking " - f"Soda Cloud scan {scan_id} status again in ." - ) - sleep(time_to_wait_in_seconds) - else: + if not response: logger.error(f"Failed to poll remote scan status. " f"Response: {response}") + continue + + response_body_dict: Optional[dict] = response.json() if response else None + contract_dataset_cloud_url: Optional[str] = ( + response_body_dict.get("contractDatasetCloudUrl") if response_body_dict else None + ) + + if "state" not in response_body_dict: + continue + scan_state = RemoteScanStatus.from_value(response_body_dict["state"]) + + logger.info(f"Scan {scan_id} has state '{scan_state.value_}'") - return False, None + if scan_state.is_final_state: + return True, contract_dataset_cloud_url, scan_state + + time_to_wait_in_seconds: float = 5 + next_poll_time_str = response.headers.get("X-Soda-Next-Poll-Time") + if next_poll_time_str: + logger.debug( + f"Soda Cloud suggested to ask scan {scan_id} status again at '{next_poll_time_str}' " + f"via header X-Soda-Next-Poll-Time" + ) + next_poll_time: datetime = convert_str_to_datetime(next_poll_time_str) + if isinstance(next_poll_time, datetime): + now = datetime.now(timezone.utc) + time_to_wait = next_poll_time - now + time_to_wait_in_seconds = time_to_wait.total_seconds() + else: + time_to_wait_in_seconds = 60 + if time_to_wait_in_seconds > 0: + logger.debug( + f"Sleeping {time_to_wait_in_seconds} seconds before asking " + f"Soda Cloud scan {scan_id} status again in ." + ) + sleep(time_to_wait_in_seconds) + + return False, None, None def _get_scan_status(self, scan_id: str) -> Response: return self._execute_rest_get( @@ -1087,6 +1097,19 @@ def _build_diagnostics_json_dict(check_result: CheckResult) -> Optional[dict]: ).model_dump() +def _map_remote_scan_status_to_contract_verification_status( + scan_status: RemoteScanStatus, +) -> ContractVerificationStatus: + if scan_status in (RemoteScanStatus.COMPLETED, RemoteScanStatus.COMPLETED_WITH_WARNINGS): + return ContractVerificationStatus.PASSED + elif scan_status in (RemoteScanStatus.COMPLETED_WITH_FAILURES, RemoteScanStatus.FAILED): + return ContractVerificationStatus.FAILED + elif scan_status in RemoteScanStatus.COMPLETED_WITH_ERRORS: + return ContractVerificationStatus.ERROR + else: + return ContractVerificationStatus.UNKNOWN + + def _build_fail_threshold(check_result: CheckResult) -> Optional[SodaCloudThresholdDiagnostic]: threshold: Threshold = check_result.check.threshold if threshold: diff --git a/soda-core/src/soda_core/contracts/contract_verification.py b/soda-core/src/soda_core/contracts/contract_verification.py index cdb3fe50e..8c150b448 100644 --- a/soda-core/src/soda_core/contracts/contract_verification.py +++ b/soda-core/src/soda_core/contracts/contract_verification.py @@ -294,6 +294,13 @@ def __init__(self, metric_id: str, value: any, metric_name: Optional[str]): self.value: any = value +class ContractVerificationStatus(Enum): + UNKNOWN = "UNKNOWN" + FAILED = "FAILED" + PASSED = "PASSED" + ERROR = "ERROR" + + class ContractVerificationResult: """ This is the immutable data structure containing all the results from a single contract verification. @@ -307,6 +314,7 @@ def __init__( data_timestamp: Optional[datetime], started_timestamp: datetime, ended_timestamp: datetime, + status: ContractVerificationStatus, measurements: list[Measurement], check_results: list[CheckResult], sending_results_to_soda_cloud_failed: bool, @@ -321,6 +329,7 @@ def __init__( self.check_results: list[CheckResult] = check_results self.sending_results_to_soda_cloud_failed: bool = sending_results_to_soda_cloud_failed self.log_records: Optional[list[LogRecord]] = log_records + self.status = status def get_logs(self) -> list[str]: return [r.msg for r in self.log_records] @@ -335,9 +344,7 @@ def get_errors_str(self) -> str: return "\n".join(self.get_errors()) def has_errors(self) -> bool: - if self.log_records is None: - return False - return any(r.levelno >= ERROR for r in self.log_records) + return self.status is ContractVerificationStatus.ERROR def is_failed(self) -> bool: """ @@ -346,14 +353,14 @@ def is_failed(self) -> bool: Only looks at check results. Ignores execution errors in the logs. """ - return any(check_result.outcome == CheckOutcome.FAILED for check_result in self.check_results) + return self.status is ContractVerificationStatus.FAILED def is_passed(self) -> bool: """ Returns true if there are no checks that have failed. Ignores execution errors in the logs. """ - return not self.is_failed() + return self.status is ContractVerificationStatus.PASSED def is_ok(self) -> bool: return not self.is_failed() and not self.has_errors() diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index 3a5c5f91f..28b1a19c5 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -25,10 +25,12 @@ ) from soda_core.contracts.contract_verification import ( Check, + CheckOutcome, CheckResult, Contract, ContractVerificationResult, ContractVerificationSessionResult, + ContractVerificationStatus, DataSource, Measurement, Threshold, @@ -428,6 +430,7 @@ def verify(self) -> ContractVerificationResult: data_source: Optional[DataSource] = None check_results: list[CheckResult] = [] measurements: list[Measurement] = [] + contract_verification_status = ContractVerificationStatus.UNKNOWN verb: str = "Validating" if self.only_validate_without_execute else "Verifying" logger.info( @@ -438,7 +441,9 @@ def verify(self) -> ContractVerificationResult: if self.data_source_impl: data_source = self.data_source_impl.build_data_source() - if not self.logs.has_errors(): + if self.logs.has_errors(): + contract_verification_status = ContractVerificationStatus.ERROR + else: # Executing the queries will set the value of the metrics linked to queries if not self.only_validate_without_execute: for query in self.queries: @@ -462,6 +467,8 @@ def verify(self) -> ContractVerificationResult: ) check_results.append(check_result) + contract_verification_status = _get_contract_verification_status(self.logs.records, check_results) + contract_verification_result: ContractVerificationResult = ContractVerificationResult( contract=contract, data_source=data_source, @@ -471,6 +478,7 @@ def verify(self) -> ContractVerificationResult: measurements=measurements, check_results=check_results, sending_results_to_soda_cloud_failed=False, + status=contract_verification_status, ) if not self.only_validate_without_execute: @@ -602,6 +610,21 @@ def _verify_duplicate_identities(cls, all_check_impls: list[CheckImpl]): checks_by_identity[check_impl.identity] = check_impl +def _get_contract_verification_status( + log_records: list[logging.LogRecord], check_results: list[CheckResult] +) -> ContractVerificationStatus: + if any(r.levelno >= logging.ERROR for r in log_records): + return ContractVerificationStatus.ERROR + + if any(check_result.outcome == CheckOutcome.FAILED for check_result in check_results): + return ContractVerificationStatus.FAILED + + if all(check_result.outcome == CheckOutcome.PASSED for check_result in check_results): + return ContractVerificationStatus.PASSED + + return ContractVerificationStatus.UNKNOWN + + class MeasurementValues: def __init__(self, measurements: list[Measurement]): self.metric_values_by_id: dict[str, any] = { From 2351523d18fd3f056cdc87408dd0c160b4645dcb Mon Sep 17 00:00:00 2001 From: Milan Lukac Date: Mon, 9 Jun 2025 22:13:39 +0200 Subject: [PATCH 07/26] Add dotenv to deps (#2305) --- soda-core/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/soda-core/setup.py b/soda-core/setup.py index 5be056d09..74b4c5eb8 100644 --- a/soda-core/setup.py +++ b/soda-core/setup.py @@ -20,6 +20,7 @@ "opentelemetry-api>=1.16.0,<2.0.0", "opentelemetry-exporter-otlp-proto-http>=1.16.0,<2.0.0", "tabulate[widechars]", + "python-dotenv~=1.0", ] setup( From 9b99876cf84f01391f94a3e8076a13023d3d7d72 Mon Sep 17 00:00:00 2001 From: Milan Lukac Date: Tue, 10 Jun 2025 14:25:43 +0200 Subject: [PATCH 08/26] Fix diagnostics, support date-like in freshness (#2307) * Fix diagnostics, support date-like in freshness * Push the fix towards cloud, not internals * Fix annotation --- soda-core/src/soda_core/common/soda_cloud.py | 8 ++++++-- .../impl/check_types/aggregate_check.py | 2 +- .../impl/check_types/duplicate_check.py | 1 + .../impl/check_types/freshness_check.py | 18 ++++++++++++++++-- .../impl/check_types/row_count_check.py | 1 + .../contracts/impl/check_types/schema_check.py | 12 ++++++++++++ .../impl/contract_verification_impl.py | 2 +- 7 files changed, 38 insertions(+), 6 deletions(-) diff --git a/soda-core/src/soda_core/common/soda_cloud.py b/soda-core/src/soda_core/common/soda_cloud.py index 457d373ef..f6bdd36bf 100644 --- a/soda-core/src/soda_core/common/soda_cloud.py +++ b/soda-core/src/soda_core/common/soda_cloud.py @@ -1084,7 +1084,11 @@ def _build_diagnostics_json_dict(check_result: CheckResult) -> Optional[dict]: else None ) - check_result_value: Optional[Number] = check_result.get_threshold_value() + # TODO: this default is here only because check.diaignostics.value is a required non-nullable field in the api. + # Re-think this when we switch to the new diagnostics format and/or api accepts Null or something else that reflects the Core model. + # A poor default, but it really needs to be a number and not None or Nan. + # It's guaranteed to be 0 for non-evaluated checks, which is not ideal, but better than stopping ingestion. + check_result_value: Number = check_result.get_threshold_value() or 0 fail_threshold: SodaCloudThresholdDiagnostic = _build_fail_threshold(check_result) @@ -1094,7 +1098,7 @@ def _build_diagnostics_json_dict(check_result: CheckResult) -> Optional[dict]: freshness=freshness_diagnostics, value=check_result_value, fail=fail_threshold, - ).model_dump() + ).model_dump(by_alias=True) def _map_remote_scan_status_to_contract_verification_status( diff --git a/soda-core/src/soda_core/contracts/impl/check_types/aggregate_check.py b/soda-core/src/soda_core/contracts/impl/check_types/aggregate_check.py index aab24a5a6..cd1b4646f 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/aggregate_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/aggregate_check.py @@ -57,7 +57,7 @@ def __init__( threshold_yaml=check_yaml.threshold, ) - self.function: Optional[str] = check_yaml.function.lower() if check_yaml.function else None + self.function: str = check_yaml.function.lower() if self.function and not contract_impl.data_source_impl.sql_dialect.supports_function(self.function): logger.error( diff --git a/soda-core/src/soda_core/contracts/impl/check_types/duplicate_check.py b/soda-core/src/soda_core/contracts/impl/check_types/duplicate_check.py index 1228804bb..af74c1705 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/duplicate_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/duplicate_check.py @@ -144,6 +144,7 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> contract=contract, check=self._build_check_info(), outcome=outcome, + threshold_metric_name=self.metric_name, diagnostic_metric_values=diagnostic_metric_values, ) diff --git a/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py b/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py index f50454249..25832a133 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from datetime import timedelta, timezone +from datetime import date, datetime, timedelta, timezone from soda_core.common.datetime_conversions import convert_str_to_datetime from soda_core.common.logging_constants import soda_logger @@ -129,7 +129,21 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> def _get_max_timestamp(self, measurement_values: MeasurementValues) -> Optional[datetime]: max_timestamp: Optional[datetime] = measurement_values.get_value(self.max_timestamp_metric) if not isinstance(max_timestamp, datetime): - logger.error(f"Freshness column '{self.column}' does not have timestamp values: {max_timestamp}") + if isinstance(max_timestamp, date): + max_timestamp = datetime.combine(max_timestamp, datetime.min.time()) + elif isinstance(max_timestamp, str): + max_timestamp = convert_str_to_datetime(max_timestamp) + else: + logger.error( + f"Freshness metric '{self.max_timestamp_metric.type}' for column '{self.column}' " + f"has an invalid data type '({type(max_timestamp).__name__})'. " + f"Is the column a timestamp or a timestamp-compatible type?" + ) + + if not isinstance(max_timestamp, datetime): + logger.error(f"Freshness column '{self.column}' does not have timestamp values: '{max_timestamp}'") + max_timestamp = None + return max_timestamp def _get_max_timestamp_utc(self, max_timestamp: Optional[datetime]) -> Optional[datetime]: diff --git a/soda-core/src/soda_core/contracts/impl/check_types/row_count_check.py b/soda-core/src/soda_core/contracts/impl/check_types/row_count_check.py index ca870d64f..8f2e7d4ec 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/row_count_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/row_count_check.py @@ -79,6 +79,7 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> contract=contract, check=self._build_check_info(), outcome=outcome, + threshold_metric_name=self.row_count_metric.type, diagnostic_metric_values=diagnostic_metric_values, ) diff --git a/soda-core/src/soda_core/contracts/impl/check_types/schema_check.py b/soda-core/src/soda_core/contracts/impl/check_types/schema_check.py index bcd88cc59..2f209f1a5 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/schema_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/schema_check.py @@ -239,11 +239,23 @@ def __init__( column_data_type_mismatches: list[ColumnDataTypeMismatch], are_columns_out_of_order: bool, ): + schema_events = sum( + ( + len(expected_column_names_not_actual), + len(actual_column_names_not_expected), + len(column_data_type_mismatches), + ) + ) + diagnostic_metric_values = {"schema_events_count": schema_events} + super().__init__( contract=contract, check=check, outcome=outcome, + threshold_metric_name="schema_events_count", + diagnostic_metric_values=diagnostic_metric_values, ) + self.expected_columns: list[ColumnMetadata] = expected_columns self.actual_columns: list[ColumnMetadata] = actual_columns self.expected_column_names_not_actual: list[str] = expected_column_names_not_actual diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index 28b1a19c5..23600a0b7 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -544,7 +544,7 @@ def build_log_summary(self, contract_verification_result: ContractVerificationRe else: table_lines.append(["Not Evaluated", not_evaluated_count, Emoticons.WHITE_CHECK_MARK]) if error_count > 0: - table_lines.append(["Runtime Errors {Emoticons.CROSS_MARK}", error_count, Emoticons.CROSS_MARK]) + table_lines.append(["Runtime Errors", error_count, Emoticons.CROSS_MARK]) else: table_lines.append(["Runtime Errors", error_count, Emoticons.WHITE_CHECK_MARK]) From 6744f3db8ec9ac6430cd96c9f34cc361aca20701 Mon Sep 17 00:00:00 2001 From: Milan Lukac Date: Tue, 10 Jun 2025 15:06:03 +0200 Subject: [PATCH 09/26] Bump contract json schema (#2308) --- .../soda_data_contract_json_schema_1_0_0.json | 1574 ++++++++++++----- 1 file changed, 1131 insertions(+), 443 deletions(-) diff --git a/soda-core/src/soda_core/contracts/soda_data_contract_json_schema_1_0_0.json b/soda-core/src/soda_core/contracts/soda_data_contract_json_schema_1_0_0.json index da8db4047..89a360c06 100644 --- a/soda-core/src/soda_core/contracts/soda_data_contract_json_schema_1_0_0.json +++ b/soda-core/src/soda_core/contracts/soda_data_contract_json_schema_1_0_0.json @@ -1,480 +1,1168 @@ { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://soda.io/soda_data_contract_json_schema_1_0_0.json", - "title": "Soda data contract style 3", - "description": "A data contract", + "title": "Soda Data Contract JSON Schema", + "description": "Soda Data Contract", "type": "object", "additionalProperties": false, "properties": { - "dataset": { - "description": "The name of the dataset", - "type": "string", - "x-schema-key": "special-key" - }, - "dataset_prefix": { - "description": "The prefix of the dataset", - "type": "array", - "items": { - "type": [ - "string" - ] + "dataset": { + "description": "The qualified name of the dataset, which is the combination dataset name, datasource name and database information. Format is [datasource]/[dataset_prefix_1]/.../[dataset_prefix_N]/[dataset]", + "type": "string" }, - "minItems": 1 - }, - "data_source": { - "description": "The name of the data_source", - "type": "string" - }, - "schema": { - "description": "The name of the schema within the data source (on bigquery, this schema property this refers to a dataset)", - "type": "string" - }, - "description": { - "description": "The description of the dataset", - "type": "string" - }, - "columns": { - "description": "The list of columns, also known as 'the schema' of the dataset.", - "type": "array", - "items": { - "type": "object", - "allOf": [ - { - "type": "object", - "required": [ - "name" - ], - "properties": { - "name": { - "description": "The name of the column as in the SQL data_source", - "type": "string" - }, - "description": { - "description": "The description to be used anywhere this column is shown to users", - "type": "string" - }, - "data_type": { - "description": "The SQL data type as in the data_source", - "anyOf": [ - { - "type": "string" - }, - { - "type": "string", - "enum": [ - "VARCHAR", - "CHAR", - "TEXT", - "STRING", - "INT", - "SMALLINT", - "TINYINT", - "BIGINT", - "INTEGER", - "DECIMAL", - "NUMERIC", - "DOUBLE", - "PRECISION", - "DOUBLE PRECISION", - "FLOAT", - "FLOAT4", - "FLOAT8", - "REAL", - "CLOB", - "BLOB", - "BINARY", - "VARBINARY", - "JSON", - "JSONB", - "XML", - "BOOLEAN", - "DATE", - "TIME", - "TIMESTAMP", - "TIMESTAMP_TZ" - ] + "description": { + "description": "The description of the dataset", + "type": "string" + }, + "filter": { + "description": "The filter where clause to be applied to the dataset", + "type": "string" + }, + "variables": { + "description": "The variables to be used within the contract file", + "type": "object", + "additionalProperties": { + "type": [ + "object", + "null" + ], + "properties": { + "default": { + "type": [ + "string", + "number" + ] } - ] }, - "character_maximum_length": { - "description": "Length is only checked by the schema check if character_maximum_length is specified in the column and supported by the data source.", - "type": "number" - }, - "optional": { - "description": "When set to true, the schema check will not fail if the column is not present. Default is required.", - "type": "boolean" - }, - "checks": { - "description": "Checks for this column", - "type": "array", - "additionalItems": false, - "minItems": 1, - "items": { - "anyOf": [ - { + "additionalProperties": false + } + }, + "soda_agent": { + "description": "The agent configuration to be used within the contract file", + "type": "object", + "properties": { + "checks_schedule": { + "type": "object", + "properties": { + "cron": { + "type": "string", + "description": "Cron expression in the format '* * * * *' (minute hour day month weekday). Requires string in quotes." + }, + "timezone": { + "type": "string", + "description": "The timezone to be used within the contract file" + }, + "variables": { + "type": "object", + "description": "The variables to be used within the contract file", + "additionalProperties": { + "type": [ + "string", + "number" + ], + "additionalProperties": false + } + } + } + } + } + }, + "columns": { + "description": "The list of columns, also known as 'the schema' of the dataset.", + "type": "array", + "items": { + "type": "object", + "allOf": [ + { "type": "object", + "required": [ + "name" + ], "properties": { - "missing": { - "oneOf": [ - { - "type": "object", - "additionalProperties": false, - "properties": { - "name": { - "description": "The display name for the check used in Soda Cloud", - "type": "string" - }, - "attributes": { - "$ref": "#/$defs/attributes" - }, - "threshold": { - "$ref": "#/$defs/threshold" - }, - "filter": { - "description": "Specifies a sql expression filter that should be applied on the metric", - "type": "string" - }, - "missing_values": { - "description": "Customized list of missing values. NULL is always considered missing so that does not have to be specified. If no customization is needed, consider specifying not_null:true instead.", - "type": "array", - "items": { - "type": [ - "integer", - "string" - ] - } - }, - "missing_format": { - "type": "object", - "additionalProperties": false, - "required": [ - "regex", "name" - ], - "properties": { - "regex": { - "description": "Customized regex to identify missing values. The flavor of regex depends on the SQL engine / data_source. NULL is always considered missing so that does not have to be specified.", - "type": "string" - }, - "name": { - "description": "Name that describes the regex pattern.", + "name": { + "description": "The name of the column as in the SQL data_source", + "type": "string" + }, + "description": { + "description": "The description to be used anywhere this column is shown to users", + "type": "string" + }, + "data_type": { + "description": "The SQL data type as in the data_source", + "anyOf": [ + { "type": "string" - } + }, + { + "type": "string", + "enum": [ + "VARCHAR", + "CHAR", + "TEXT", + "STRING", + "INT", + "SMALLINT", + "TINYINT", + "BIGINT", + "INTEGER", + "DECIMAL", + "NUMERIC", + "DOUBLE", + "PRECISION", + "DOUBLE PRECISION", + "FLOAT", + "FLOAT4", + "FLOAT8", + "REAL", + "CLOB", + "BLOB", + "BINARY", + "VARBINARY", + "JSON", + "JSONB", + "XML", + "BOOLEAN", + "DATE", + "TIME", + "TIMESTAMP", + "TIMESTAMP_TZ" + ] } - } + ] + }, + "character_maximum_length": { + "description": "Length is only checked by the schema check if character_maximum_length is specified in the column and supported by the data source.", + "type": "number" + }, + "optional": { + "description": "When set to true, the schema check will not fail if the column is not present. Default is required.", + "type": "boolean" + }, + "checks": { + "description": "Checks for this column", + "type": "array", + "additionalItems": false, + "minItems": 1, + "items": { + "anyOf": [ + { + "type": "object", + "properties": { + "missing": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "attributes": { + "$ref": "#/$defs/attributes" + }, + "threshold": { + "$ref": "#/$defs/threshold" + }, + "filter": { + "description": "Specifies a sql expression filter that should be applied on the metric", + "type": "string" + }, + "missing_values": { + "description": "Customized list of missing values. NULL is always considered missing so that does not have to be specified. If no customization is needed, consider specifying not_null:true instead.", + "type": "array", + "items": { + "type": [ + "integer", + "string" + ] + } + }, + "missing_format": { + "type": "object", + "additionalProperties": false, + "required": [ + "regex", + "name" + ], + "properties": { + "regex": { + "description": "Customized regex to identify missing values. The flavor of regex depends on the SQL engine / data_source. NULL is always considered missing so that does not have to be specified.", + "type": "string" + }, + "name": { + "description": "Name that describes the regex pattern.", + "type": "string" + } + } + } + } + }, + { + "type": "null" + } + ] + }, + "invalid": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "attributes": { + "$ref": "#/$defs/attributes" + }, + "threshold": { + "$ref": "#/$defs/threshold" + }, + "filter": { + "description": "Specifies a sql expression filter that should be applied on the metric", + "type": "string" + }, + "valid_values": { + "description": "A list of valid values. Only supports all strings or all numbers. Implies an invalid_count check in Soda.", + "type": "array", + "items": { + "type": [ + "number", + "string" + ] + } + }, + "valid_format": { + "type": "object", + "additionalProperties": false, + "required": [ + "regex", + "name" + ], + "properties": { + "regex": { + "description": "Customized regex to identify valid values. The flavor of regex depends on the SQL engine / data_source. NULL is always considered missing so that does not have to be specified.", + "type": "string" + }, + "name": { + "description": "Name that describes the regex pattern.", + "type": "string" + } + } + }, + "invalid_values": { + "description": "A list of valid values. Only supports all strings or all numbers. Implies an invalid_count check in Soda.", + "type": "array", + "items": { + "type": [ + "number", + "string" + ] + } + }, + "invalid_format": { + "type": "object", + "additionalProperties": false, + "required": [ + "regex", + "name" + ], + "properties": { + "regex": { + "description": "Customized regex to identify invalid values. The flavor of regex depends on the SQL engine / data_source. NULL is always considered missing so that does not have to be specified.", + "type": "string" + }, + "name": { + "description": "Name that describes the regex pattern.", + "type": "string" + } + } + }, + "valid_length": { + "description": "The length of the valid values", + "type": [ + "number", + "string" + ] + }, + "valid_min_length": { + "description": "The minimum length of the valid values", + "type": [ + "number", + "string" + ] + }, + "valid_max_length": { + "description": "The maximum length of the valid values", + "type": [ + "number", + "string" + ] + }, + "valid_min": { + "description": "The minimum value of the valid values", + "type": [ + "number", + "string" + ] + }, + "valid_max": { + "description": "The maximum value of the valid values", + "type": [ + "number", + "string" + ] + }, + "valid_reference_data": { + "type": "object", + "description": "Verify valid values occur in a reference dataset", + "additionalProperties": false, + "required": [ + "dataset", + "column" + ], + "properties": { + "dataset": { + "description": "The dataset to be used for the reference data check. Dataset qualified name is required.", + "type": "string" + }, + "column": { + "description": "The column to be used for the reference data check", + "type": "string" + } + } + } + } + }, + { + "type": "null" + } + ] + }, + "aggregate": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "attributes": { + "$ref": "#/$defs/attributes" + }, + "threshold": { + "$ref": "#/$defs/threshold" + }, + "filter": { + "description": "Specifies a sql expression filter that should be applied on the metric", + "type": "string" + }, + "function": { + "description": "The function to be used for the aggregate check", + "enum": [ + "avg", + "sum", + "min", + "max", + "minLength", + "maxLength", + "avgLength" + ] + } + }, + "required": [ + "threshold", + "function" + ] + } + ] + }, + "duplicate": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "attributes": { + "$ref": "#/$defs/attributes" + }, + "threshold": { + "$ref": "#/$defs/threshold" + }, + "filter": { + "description": "Specifies a sql expression filter that should be applied on the metric", + "type": "string" + } + } + }, + { + "type": "null" + } + ] + }, + "failed_rows": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "expression": { + "description": "The expression to be used for the failed rows check", + "type": "string" + }, + "query": { + "description": "The query to be used for the failed rows check", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "threshold": { + "$ref": "#/$defs/threshold" + } + }, + "oneOf": [ + { + "required": [ + "expression" + ], + "not": { + "required": [ + "query" + ] + } + }, + { + "required": [ + "query" + ], + "not": { + "required": [ + "expression" + ] + } + } + ] + } + ] + }, + "metric": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "expression": { + "description": "The expression to be used for the failed rows check", + "type": "string" + }, + "query": { + "description": "The query to be used for the failed rows check", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "threshold": { + "$ref": "#/$defs/threshold" + } + }, + "oneOf": [ + { + "required": [ + "expression" + ], + "not": { + "required": [ + "query" + ] + } + }, + { + "required": [ + "query" + ], + "not": { + "required": [ + "expression" + ] + } + } + ] + } + ] + } + }, + "additionalProperties": false + } + ] } - }, - { - "type": "null" - } - ] - }, - "invalid": { - "oneOf": [ - { - "type": "object", - "additionalProperties": false, - "properties": { - "name": { - "description": "The display name for the check used in Soda Cloud", - "type": "string" - }, - "attributes": { - "$ref": "#/$defs/attributes" - }, - "threshold": { - "$ref": "#/$defs/threshold" - }, - "filter": { - "description": "Specifies a sql expression filter that should be applied on the metric", - "type": "string" - }, - "valid_values": { - "description": "A list of valid values. Only supports all strings or all numbers. Implies an invalid_count check in Soda.", - "type": "array", - "items": { - "type": [ - "number", - "string" - ] - } - }, - "valid_format": { - "type": "object", - "additionalProperties": false, - "required": [ - "regex", "name" - ], - "properties": { - "regex": { - "description": "Customized regex to identify valid values. The flavor of regex depends on the SQL engine / data_source. NULL is always considered missing so that does not have to be specified.", - "type": "string" - }, - "name": { - "description": "Name that describes the regex pattern.", - "type": "string" - } - } - }, - "invalid_values": { - "description": "A list of valid values. Only supports all strings or all numbers. Implies an invalid_count check in Soda.", - "type": "array", - "items": { - "type": [ - "number", - "string" - ] - } - }, - "invalid_format": { - "type": "object", - "additionalProperties": false, - "required": [ - "regex", "name" - ], - "properties": { - "regex": { - "description": "Customized regex to identify invalid values. The flavor of regex depends on the SQL engine / data_source. NULL is always considered missing so that does not have to be specified.", - "type": "string" - }, - "name": { - "description": "Name that describes the regex pattern.", - "type": "string" - } + } + } + } + ] + } + }, + "checks": { + "description": "A list of checks for this dataset executed by a Soda", + "type": "array", + "minItems": 1, + "additionalItems": false, + "items": { + "anyOf": [ + { + "type": "object", + "properties": { + "row_count": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "attributes": { + "$ref": "#/$defs/attributes" + }, + "threshold": { + "$ref": "#/$defs/threshold" + }, + "filter": { + "description": "Specifies a sql expression filter that should be applied on the metric", + "type": "string" + } + } + }, + { + "type": "null" } - } - } - }, - { - "type": "null" - } - ] - } + ] + } }, "additionalProperties": false - } - ] - } - } - } - } - ] - } - }, - "checks": { - "description": "A list of checks for this dataset executed by a Soda", - "type": "array", - "minItems": 1, - "additionalItems": false, - "items": { - "anyOf": [ - { - "type": "object", - "properties": { - "row_count": { - "oneOf": [ + }, { - "type": "object", - "additionalProperties": false, - "properties": { - "name": { - "description": "The display name for the check used in Soda Cloud", - "type": "string" - }, - "attributes": { - "$ref": "#/$defs/attributes" - }, - "threshold": { - "$ref": "#/$defs/threshold" + "type": "object", + "properties": { + "schema": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "description": "Use a schema check to validate the presence, absence or position of columns in a dataset, or to validate the type of data column contains. See documentation for more details: https://docs.soda.io/soda-cl/schema.html", + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "attributes": { + "$ref": "#/$defs/attributes" + }, + "threshold": { + "$ref": "#/$defs/threshold" + }, + "filter": { + "description": "Specifies a sql expression filter that should be applied on the metric", + "type": "string" + }, + "allow_extra_columns": { + "description": "Allow columns that are not in the schema", + "type": "boolean" + }, + "allow_other_column_order": { + "description": "Allow columns to be in a different order than the schema", + "type": "boolean" + } + } + }, + { + "type": "null" + } + ] + } }, - "filter": { - "description": "Specifies a sql expression filter that should be applied on the metric", - "type": "string" - } - } + "additionalProperties": false }, { - "type": "null" - } - ] - } - }, - "additionalProperties": false - }, - { - "type": "object", - "properties": { - "schema": { - "oneOf": [ - { - "type": "object", - "additionalProperties": false, - "description": "Use a schema check to validate the presence, absence or position of columns in a dataset, or to validate the type of data column contains. See documentation for more details: https://docs.soda.io/soda-cl/schema.html", - "properties": { - "name": { - "description": "The display name for the check used in Soda Cloud", - "type": "string" - }, - "attributes": { - "$ref": "#/$defs/attributes" + "type": "object", + "properties": { + "freshness": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "column": { + "description": "The column to be used for the freshness check", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "attributes": { + "$ref": "#/$defs/attributes" + }, + "threshold": { + "$ref": "#/$defs/thresholdFreshness" + }, + "filter": { + "description": "Specifies a sql expression filter that should be applied on the metric", + "type": "string" + } + }, + "required": [ + "threshold", + "column" + ] + }, + { + "type": "null" + } + ] + } }, - "threshold": { - "$ref": "#/$defs/threshold" + "additionalProperties": false + }, + { + "type": "object", + "properties": { + "duplicate": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "columns": { + "description": "The columns to be used for the duplicate check", + "type": "array", + "items": { + "type": "string" + } + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "attributes": { + "$ref": "#/$defs/attributes" + }, + "threshold": { + "$ref": "#/$defs/threshold" + }, + "filter": { + "description": "Specifies a sql expression filter that should be applied on the metric", + "type": "string" + } + }, + "required": [ + "columns" + ] + } + ] + } }, - "filter": { - "description": "Specifies a sql expression filter that should be applied on the metric", - "type": "string" + "additionalProperties": false + }, + { + "type": "object", + "properties": { + "failed_rows": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "expression": { + "description": "The expression to be used for the failed rows check", + "type": "string" + }, + "query": { + "description": "The query to be used for the failed rows check", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "threshold": { + "$ref": "#/$defs/threshold" + } + }, + "oneOf": [ + { + "required": [ + "expression" + ], + "not": { + "required": [ + "query" + ] + } + }, + { + "required": [ + "query" + ], + "not": { + "required": [ + "expression" + ] + } + } + ] + } + ] + } }, - "extra_columns": { - "description": "Allow columns that are not in the schema", - "type": "boolean" - } - } + "additionalProperties": false }, { - "type": "null" + "type": "object", + "properties": { + "metric": { + "oneOf": [ + { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "expression": { + "description": "The expression to be used for the failed rows check", + "type": "string" + }, + "query": { + "description": "The query to be used for the failed rows check", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "threshold": { + "$ref": "#/$defs/threshold" + } + }, + "oneOf": [ + { + "required": [ + "expression" + ], + "not": { + "required": [ + "query" + ] + } + }, + { + "required": [ + "query" + ], + "not": { + "required": [ + "expression" + ] + } + } + ] + } + ] + } + }, + "additionalProperties": false } - ] - } - }, - "additionalProperties": false + ] } - ] } - } }, "required": [ - "dataset", - "columns", - "dataset_prefix", - "data_source" + "dataset", + "columns" ], "$defs": { - "generic_check_properties": { - "description": "NOT USED YET, problem with inheritance and additionalProperties", - "type": "object", - "properties": { - "name": { - "description": "The display name for the check used in Soda Cloud", - "type": "string" - }, - "attributes": { - "$ref": "#/$defs/attributes" - }, - "threshold": { - "$ref": "#/$defs/threshold" - }, - "filter": { - "description": "Specifies a sql expression filter that should be applied on the metric", - "type": "string" - } - } - }, - "numeric_range": { - "anyOf": [ - { - "type": "array", - "items": { - "type": "number" - }, - "minItems": 2, - "maxItems": 2 - } - ] - }, - "format": { - "type": "string", - "enum": [ - "integer", - "positive integer", - "negative integer", - "decimal", - "positive decimal", - "negative decimal", - "decimal point", - "positive decimal point", - "negative decimal point", - "decimal comma", - "positive decimal comma", - "negative decimal comma", - "percentage", - "positive percentage", - "negative percentage", - "percentage point", - "positive percentage point", - "negative percentage point", - "percentage comma", - "positive percentage comma", - "negative percentage comma", - "money", - "money point", - "money comma", - "date us", - "date eu", - "date inverse", - "date iso 8601", - "time 24h", - "time 24h nosec", - "time 12h", - "time 12h nosec", - "timestamp 24h", - "timestamp 12h", - "uuid", - "ip address", - "ipv4 address", - "ipv6 address", - "email", - "phone number", - "credit card number" - ] - }, - "attributes": { - "type": "object" - }, - "threshold": { - "type": "object", - "additionalProperties": false, - "description": "The threshold for the check. See documentation for more details: https://docs.soda.io/#thresholds", - "properties": { - "metric": { - "description": "The type of the metric value to check", + "generic_check_properties": { + "description": "NOT USED YET, problem with inheritance and additionalProperties", + "type": "object", + "properties": { + "name": { + "description": "The display name for the check used in Soda Cloud", + "type": "string" + }, + "qualifier": { + "description": "The qualifier for the check used in Soda Cloud", + "type": "string" + }, + "attributes": { + "$ref": "#/$defs/attributes" + }, + "threshold": { + "$ref": "#/$defs/threshold" + }, + "filter": { + "description": "Specifies a sql expression filter that should be applied on the metric", + "type": "string" + } + } + }, + "numeric_range": { + "anyOf": [ + { + "type": "array", + "items": { + "type": "number" + }, + "minItems": 2, + "maxItems": 2 + } + ] + }, + "format": { + "type": "string", "enum": [ - "count", - "percent" + "integer", + "positive integer", + "negative integer", + "decimal", + "positive decimal", + "negative decimal", + "decimal point", + "positive decimal point", + "negative decimal point", + "decimal comma", + "positive decimal comma", + "negative decimal comma", + "percentage", + "positive percentage", + "negative percentage", + "percentage point", + "positive percentage point", + "negative percentage point", + "percentage comma", + "positive percentage comma", + "negative percentage comma", + "money", + "money point", + "money comma", + "date us", + "date eu", + "date inverse", + "date iso 8601", + "time 24h", + "time 24h nosec", + "time 12h", + "time 12h nosec", + "timestamp 24h", + "timestamp 12h", + "uuid", + "ip address", + "ipv4 address", + "ipv6 address", + "email", + "phone number", + "credit card number" + ] + }, + "attributes": { + "type": "object" + }, + "threshold": { + "type": "object", + "additionalProperties": false, + "description": "The threshold for the check. See documentation for more details: https://docs.soda.io/#thresholds", + "properties": { + "metric": { + "description": "The type of the metric value to check", + "enum": [ + "count", + "percent" + ] + }, + "must_be": { + "description": "YY The value the check metric (as specified in the type) must have for the check to pass; The check passes if the metric has the specified value, and fails otherwise; https://docs.soda.io/#thresholds", + "type": [ + "number", + "string" + ] + }, + "must_not_be": { + "description": "The value that the check metric (as specified in the type) may not have. The check passes if the metric doesn't have this value and fails otherwise.", + "type": [ + "number", + "string" + ] + }, + "must_be_greater_than": { + "description": "Specifies the threshold for the check. The check fails if the metric value is greater than the specified threshold value.", + "type": [ + "number", + "string" + ] + }, + "must_be_greater_than_or_equal": { + "description": "Specifies the threshold for the check. The check fails if the metric value is greater than or equal to the specified threshold value.", + "type": [ + "number", + "string" + ] + }, + "must_be_less_than": { + "description": "Specifies the threshold for the check. The check fails if the metric value is less than the specified threshold value.", + "type": [ + "number", + "string" + ] + }, + "must_be_less_than_or_equal": { + "description": "Specifies the threshold for the check. The check fails if the metric value is less than or equal to the specified threshold value.", + "type": [ + "number", + "string" + ] + }, + "must_be_between": { + "description": "Specifies the threshold for the check. The check fails if the metric value is not between the specified threshold values.", + "$ref": "#/$defs/threshold_between" + }, + "must_be_not_between": { + "description": "Specifies the threshold for the check. The check fails if the metric value is between the specified threshold values.", + "$ref": "#/$defs/threshold_between" + } + }, + "if": { + "type": "object", + "minProperties": 2 + }, + "then": { + "type": "object", + "properties": { + "metric": true + }, + "required": [ + "metric" + ] + }, + "allOf": [ + { + "type": "object", + "maxProperties": 2 + } + ] + }, + "threshold_between": { + "type": "object", + "additionalProperties": false, + "properties": { + "greater_than": { + "type": [ + "number", + "string" + ] + }, + "greater_than_or_equal": { + "type": [ + "number", + "string" + ] + }, + "less_than": { + "type": [ + "number", + "string" + ] + }, + "less_than_or_equal": { + "type": [ + "number", + "string" + ] + } + }, + "allOf": [ + { + "oneOf": [ + { + "required": [ + "greater_than" + ] + }, + { + "required": [ + "greater_than_or_equal" + ] + }, + { + "not": { + "anyOf": [ + { + "required": [ + "greater_than" + ] + }, + { + "required": [ + "greater_than_or_equal" + ] + } + ] + } + } + ] + }, + { + "oneOf": [ + { + "required": [ + "less_than" + ] + }, + { + "required": [ + "less_than_or_equal" + ] + }, + { + "not": { + "anyOf": [ + { + "required": [ + "less_than" + ] + }, + { + "required": [ + "less_than_or_equal" + ] + } + ] + } + } + ] + } + ] + }, + "thresholdFreshness": { + "type": "object", + "additionalProperties": false, + "description": "The threshold for the check. See documentation for more details: https://docs.soda.io/#thresholds", + "properties": { + "unit": { + "description": "The type of the metric value to check", + "enum": [ + "minute", + "hour", + "day" + ] + }, + "must_be": { + "description": "YY The value the check metric (as specified in the type) must have for the check to pass; The check passes if the metric has the specified value, and fails otherwise; https://docs.soda.io/#thresholds", + "type": "number" + }, + "must_not_be": { + "description": "The value that the check metric (as specified in the type) may not have. The check passes if the metric doesn't have this value and fails otherwise.", + "type": "number" + }, + "must_be_greater_than": { + "description": "Specifies the threshold for the check. The check fails if the metric value is greater than the specified threshold value.", + "type": "number" + }, + "must_be_greater_than_or_equal": { + "description": "Specifies the threshold for the check. The check fails if the metric value is greater than or equal to the specified threshold value.", + "type": "number" + }, + "must_be_less_than": { + "description": "Specifies the threshold for the check. The check fails if the metric value is less than the specified threshold value.", + "type": "number" + }, + "must_be_less_than_or_equal": { + "description": "Specifies the threshold for the check. The check fails if the metric value is less than or equal to the specified threshold value.", + "type": "number" + }, + "must_be_between": { + "description": "Specifies the threshold for the check. The check fails if the metric value is not between the specified threshold values.", + "$ref": "#/$defs/threshold_between" + }, + "must_be_not_between": { + "description": "Specifies the threshold for the check. The check fails if the metric value is between the specified threshold values.", + "$ref": "#/$defs/threshold_between" + } + }, + "if": { + "type": "object", + "minProperties": 2 + }, + "then": { + "type": "object", + "properties": { + "unit": true + }, + "required": [ + "unit" + ] + }, + "allOf": [ + { + "type": "object", + "maxProperties": 2 + } ] - }, - "must_be": { - "description": "YY The value the check metric (as specified in the type) must have for the check to pass; The check passes if the metric has the specified value, and fails otherwise; https://docs.soda.io/#thresholds", - "type": "number" - }, - "must_not_be": { - "description": "The value that the check metric (as specified in the type) may not have. The check passes if the metric doesn't have this value and fails otherwise.", - "type": "number" - }, - "must_be_greater_than": { - "description": "Specifies the threshold for the check. The check fails if the metric value is greater than the specified threshold value.", - "type": "number" - }, - "must_be_greater_than_or_equal": { - "description": "Specifies the threshold for the check. The check fails if the metric value is greater than or equal to the specified threshold value.", - "type": "number" - }, - "must_be_less_than": { - "description": "Specifies the threshold for the check. The check fails if the metric value is less than the specified threshold value.", - "type": "number" - }, - "must_be_less_than_or_equal": { - "description": "Specifies the threshold for the check. The check fails if the metric value is less than or equal to the specified threshold value.", - "type": "number" - }, - "must_be_between": { - "description": "Specifies a threshold range for the check. The check fails if the metric value is outside the range of values indicated. See thresholds in the docs for more information", - "$ref": "#/$defs/numeric_range" - }, - "must_not_be_between": { - "description": "Specifies a threshold range for the check. The check fails if the metric value is inside the range of values indicated.", - "$ref": "#/$defs/numeric_range" - } } - } } - } +} From 2ced0a805e487e42b975bb5af1e422be4412819c Mon Sep 17 00:00:00 2001 From: Milan Lukac Date: Tue, 10 Jun 2025 16:15:57 +0200 Subject: [PATCH 10/26] Fix freshness if no data, handle corner cases better (#2309) * Fix freshness if no data, hanlde corner cases better * no data->fail check * Fix test --- .../impl/check_types/freshness_check.py | 33 +++++++++++++------ .../tests/features/test_freshness_check.py | 32 ++++++++++++++++++ 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py b/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py index 25832a133..79649e2e3 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py @@ -91,7 +91,13 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> threshold_metric_name: str = f"freshness_in_{self.unit}s" threshold_value: Optional[float] = None - if data_timestamp_utc and max_timestamp_utc: + if max_timestamp_utc is None: + outcome = CheckOutcome.FAILED + + elif data_timestamp_utc is not None: + logger.debug( + f"Calculating freshness using '{max_timestamp}' as 'max' and '{data_timestamp}' as 'now' values" + ) freshness = data_timestamp_utc - max_timestamp_utc freshness_in_seconds = freshness.total_seconds() @@ -111,6 +117,8 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> else: outcome = CheckOutcome.FAILED + freshness_str: Optional[str] = str(freshness) if freshness is not None else None + return FreshnessCheckResult( contract=contract, check=self._build_check_info(), @@ -121,27 +129,32 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> max_timestamp_utc=max_timestamp_utc, data_timestamp=data_timestamp, data_timestamp_utc=data_timestamp_utc, - freshness=str(freshness), + freshness=freshness_str, freshness_in_seconds=freshness_in_seconds, unit=self.unit, ) def _get_max_timestamp(self, measurement_values: MeasurementValues) -> Optional[datetime]: max_timestamp: Optional[datetime] = measurement_values.get_value(self.max_timestamp_metric) - if not isinstance(max_timestamp, datetime): + if max_timestamp is None: + logger.warning( + f"Freshness metric '{self.max_timestamp_metric.type}' for column '{self.column}' " + f"returned no value. Does the table or partition have rows?" + ) + return None + elif not isinstance(max_timestamp, datetime): + logger.debug( + f"Attempting to convert freshness value '{max_timestamp}' of data type '{type(max_timestamp).__name__}' to datetime" + ) if isinstance(max_timestamp, date): max_timestamp = datetime.combine(max_timestamp, datetime.min.time()) elif isinstance(max_timestamp, str): max_timestamp = convert_str_to_datetime(max_timestamp) - else: - logger.error( - f"Freshness metric '{self.max_timestamp_metric.type}' for column '{self.column}' " - f"has an invalid data type '({type(max_timestamp).__name__})'. " - f"Is the column a timestamp or a timestamp-compatible type?" - ) if not isinstance(max_timestamp, datetime): - logger.error(f"Freshness column '{self.column}' does not have timestamp values: '{max_timestamp}'") + logger.error( + f"Freshness column '{self.column}' returned value '{max_timestamp}' of data type '{type(max_timestamp).__name__}' which is not a datetime or datetime-compatible type." + ) max_timestamp = None return max_timestamp diff --git a/soda-tests/tests/features/test_freshness_check.py b/soda-tests/tests/features/test_freshness_check.py index 36e88cb4c..db90d8b5b 100644 --- a/soda-tests/tests/features/test_freshness_check.py +++ b/soda-tests/tests/features/test_freshness_check.py @@ -115,3 +115,35 @@ def test_freshness_now_variable(data_source_test_helper: DataSourceTestHelper): assert str(check_result.freshness_in_seconds) == "3600.0" assert str(check_result.unit) == "hour" assert 0.99 < get_diagnostic_value(check_result, "freshness_in_hours") < 1.01 + + +def test_freshness_no_rows(data_source_test_helper: DataSourceTestHelper): + test_table = data_source_test_helper.ensure_test_table(test_table_specification) + + id_quoted = data_source_test_helper.quote_column("id") + + contract_yaml_str: str = f""" + filter: | + {id_quoted} > 10 + checks: + - freshness: + column: created_at + threshold: + must_be_less_than: 2 + """ + + with freeze_time(datetime(year=2025, month=1, day=4, hour=10, minute=0, second=0)): + contract_verification_result_t1: ContractVerificationResult = data_source_test_helper.assert_contract_fail( + test_table=test_table, contract_yaml_str=contract_yaml_str + ) + check_result: FreshnessCheckResult = contract_verification_result_t1.check_results[0] + assert check_result.max_timestamp is None + assert check_result.max_timestamp_utc is None + assert str(check_result.data_timestamp) == "2025-01-04 10:00:00+00:00" + assert str(check_result.data_timestamp_utc) == "2025-01-04 10:00:00+00:00" + assert check_result.freshness is None + assert check_result.freshness_in_seconds is None + assert str(check_result.unit) == "hour" + assert len(check_result.diagnostic_metric_values) == 0 + + assert not contract_verification_result_t1.has_errors() From b919b454f6adaf44516740184f0e80ccfcdf542e Mon Sep 17 00:00:00 2001 From: Milan Lukac Date: Tue, 10 Jun 2025 16:31:12 +0200 Subject: [PATCH 11/26] Use question mark char for not_evaluated (#2310) --- soda-core/src/soda_core/common/logging_constants.py | 1 + soda-core/src/soda_core/contracts/contract_verification.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/soda-core/src/soda_core/common/logging_constants.py b/soda-core/src/soda_core/common/logging_constants.py index e467f2cdf..b278da682 100644 --- a/soda-core/src/soda_core/common/logging_constants.py +++ b/soda-core/src/soda_core/common/logging_constants.py @@ -15,6 +15,7 @@ class Emoticons: POLICE_CAR_LIGHT: str = "\U0001f6a8" SEE_NO_EVIL: str = "\U0001f648" PINCHED_FINGERS: str = "\U0001f90c" + QUESTION_MARK: str = "\u2753" class ExtraKeys: diff --git a/soda-core/src/soda_core/contracts/contract_verification.py b/soda-core/src/soda_core/contracts/contract_verification.py index 8c150b448..e92906dc2 100644 --- a/soda-core/src/soda_core/contracts/contract_verification.py +++ b/soda-core/src/soda_core/contracts/contract_verification.py @@ -214,7 +214,7 @@ def outcome_emoticon(self) -> str: elif self.outcome == CheckOutcome.FAILED: return Emoticons.CROSS_MARK else: - return Emoticons.SEE_NO_EVIL + return Emoticons.QUESTION_MARK @property def is_passed(self) -> bool: From 83d75acd4003b7ffa4150948bbb24c5014eaf880 Mon Sep 17 00:00:00 2001 From: Niels Nuyttens Date: Thu, 12 Jun 2025 10:00:17 +0200 Subject: [PATCH 12/26] DTL-807: Floor the current freshness in seconds so it returns an `int` (#2314) * Floor the current freshness in seconds so it returns an `int`, I think this makes the most sense given the unit. This also matches what Soda Cloud is expecting. Adjusted tests accordingly. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .../soda_core/contracts/impl/check_types/freshness_check.py | 5 +++-- soda-tests/tests/features/test_freshness_check.py | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py b/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py index 79649e2e3..40e820d54 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py @@ -2,6 +2,7 @@ import logging from datetime import date, datetime, timedelta, timezone +from math import floor from soda_core.common.datetime_conversions import convert_str_to_datetime from soda_core.common.logging_constants import soda_logger @@ -87,7 +88,7 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> data_timestamp_utc: datetime = self._get_now_timestamp_utc(data_timestamp) diagnostic_metric_values: dict[str, float] = {} freshness: Optional[timedelta] = None - freshness_in_seconds: Optional[float] = None + freshness_in_seconds: Optional[int] = None threshold_metric_name: str = f"freshness_in_{self.unit}s" threshold_value: Optional[float] = None @@ -99,7 +100,7 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> f"Calculating freshness using '{max_timestamp}' as 'max' and '{data_timestamp}' as 'now' values" ) freshness = data_timestamp_utc - max_timestamp_utc - freshness_in_seconds = freshness.total_seconds() + freshness_in_seconds = floor(freshness.total_seconds()) if self.unit == "minute": threshold_value = freshness_in_seconds / 60 diff --git a/soda-tests/tests/features/test_freshness_check.py b/soda-tests/tests/features/test_freshness_check.py index db90d8b5b..2a7ccd1ab 100644 --- a/soda-tests/tests/features/test_freshness_check.py +++ b/soda-tests/tests/features/test_freshness_check.py @@ -51,7 +51,7 @@ def test_freshness(data_source_test_helper: DataSourceTestHelper): assert str(check_result.data_timestamp) == "2025-01-04 10:00:00+00:00" assert str(check_result.data_timestamp_utc) == "2025-01-04 10:00:00+00:00" assert str(check_result.freshness) == "1:00:00" - assert str(check_result.freshness_in_seconds) == "3600.0" + assert str(check_result.freshness_in_seconds) == "3600" assert str(check_result.unit) == "hour" assert get_diagnostic_value(check_result, "freshness_in_hours") == 1 @@ -78,7 +78,7 @@ def test_freshness_in_days(data_source_test_helper: DataSourceTestHelper): assert str(check_result.data_timestamp) == "2025-01-05 11:00:00+00:00" assert str(check_result.data_timestamp_utc) == "2025-01-05 11:00:00+00:00" assert str(check_result.freshness) == "1 day, 2:00:00" - assert str(check_result.freshness_in_seconds) == "93600.0" + assert str(check_result.freshness_in_seconds) == "93600" assert str(check_result.unit) == "day" assert 1.08 < get_diagnostic_value(check_result, "freshness_in_days") < 1.09 @@ -112,7 +112,7 @@ def test_freshness_now_variable(data_source_test_helper: DataSourceTestHelper): assert str(check_result.data_timestamp) == "2025-01-04 10:00:00+00:00" assert str(check_result.data_timestamp_utc) == "2025-01-04 10:00:00+00:00" assert str(check_result.freshness) == "1:00:00" - assert str(check_result.freshness_in_seconds) == "3600.0" + assert str(check_result.freshness_in_seconds) == "3600" assert str(check_result.unit) == "hour" assert 0.99 < get_diagnostic_value(check_result, "freshness_in_hours") < 1.01 From 587ab8c9759be8be8468020897b38825a2d1ee15 Mon Sep 17 00:00:00 2001 From: Tom Baeyens Date: Mon, 16 Jun 2025 15:57:43 +0200 Subject: [PATCH 13/26] V4 datatime2 (#2311) Allow data_timestamp to be passed in the Python API --- .../src/soda_core/contracts/api/verify_api.py | 4 ++++ .../contracts/contract_verification.py | 2 ++ .../impl/check_types/freshness_check.py | 23 ++++++++++-------- .../impl/contract_verification_impl.py | 4 ++++ .../soda_core/contracts/impl/contract_yaml.py | 24 +++++++++++++++++-- 5 files changed, 45 insertions(+), 12 deletions(-) diff --git a/soda-core/src/soda_core/contracts/api/verify_api.py b/soda-core/src/soda_core/contracts/api/verify_api.py index 772d30f16..48edd10aa 100644 --- a/soda-core/src/soda_core/contracts/api/verify_api.py +++ b/soda-core/src/soda_core/contracts/api/verify_api.py @@ -23,6 +23,7 @@ def verify_contracts_locally( dataset_identifiers: Optional[list[str]] = None, soda_cloud_file_path: Optional[str] = None, variables: Optional[Dict[str, str]] = None, + data_timestamp: Optional[str] = None, publish: bool = False, ) -> ContractVerificationSessionResult: """ @@ -34,6 +35,7 @@ def verify_contracts_locally( data_source_file_path=data_source_file_path, soda_cloud_file_path=soda_cloud_file_path, variables=variables, + data_timestamp=data_timestamp, publish=publish, use_agent=False, ) @@ -73,6 +75,7 @@ def verify_contracts( publish: bool, use_agent: bool, variables: Optional[Dict[str, str]] = None, + data_timestamp: Optional[str] = None, verbose: bool = False, blocking_timeout_in_minutes: int = 60, ) -> ContractVerificationSessionResult: @@ -112,6 +115,7 @@ def verify_contracts( data_source_yaml_sources=[data_source_yaml_source], soda_cloud_impl=soda_cloud_client, variables=variables, + data_timestamp=data_timestamp, only_validate_without_execute=False, soda_cloud_publish_results=publish, soda_cloud_use_agent=use_agent, diff --git a/soda-core/src/soda_core/contracts/contract_verification.py b/soda-core/src/soda_core/contracts/contract_verification.py index e92906dc2..b18b55c8c 100644 --- a/soda-core/src/soda_core/contracts/contract_verification.py +++ b/soda-core/src/soda_core/contracts/contract_verification.py @@ -22,6 +22,7 @@ def execute( contract_yaml_sources: list[ContractYamlSource], only_validate_without_execute: bool = False, variables: Optional[dict[str, str]] = None, + data_timestamp: Optional[str] = None, data_source_impls: Optional[list["DataSourceImpl"]] = None, data_source_yaml_sources: Optional[list[DataSourceYamlSource]] = None, soda_cloud_impl: Optional["SodaCloud"] = None, @@ -38,6 +39,7 @@ def execute( contract_yaml_sources=contract_yaml_sources, only_validate_without_execute=only_validate_without_execute, variables=variables, + data_timestamp=data_timestamp, data_source_impls=data_source_impls, data_source_yaml_sources=data_source_yaml_sources, soda_cloud_impl=soda_cloud_impl, diff --git a/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py b/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py index 40e820d54..3e6d0f731 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/freshness_check.py @@ -164,20 +164,23 @@ def _get_max_timestamp_utc(self, max_timestamp: Optional[datetime]) -> Optional[ return self._datetime_to_utc(max_timestamp) if isinstance(max_timestamp, datetime) else None def _get_now_timestamp(self) -> Optional[datetime]: - if self.now_variable is None or self.now_variable == "soda.NOW": - now_timestamp_str: str = self.soda_variable_values.get("NOW") + if self.now_variable is None: + return self.contract_impl.contract_yaml.data_timestamp else: now_timestamp_str: str = self.resolved_variable_values.get(self.now_variable) if now_timestamp_str is None: logger.error(f"Freshness variable '{self.now_variable}' not available") - - if not isinstance(now_timestamp_str, str): - logger.error(f"Freshness variable '{self.now_variable}' is not available") - else: - now_timestamp: Optional[datetime] = convert_str_to_datetime(now_timestamp_str) - if not isinstance(now_timestamp, datetime): - logger.error(f"Freshness variable '{self.now_variable}' is not a timestamp: {now_timestamp_str}") - return now_timestamp + return None + if not isinstance(now_timestamp_str, str): + logger.error( + f"Freshness variable '{self.now_variable}' has wrong " f"type: {type(now_timestamp_str).__name__}" + ) + return None + else: + now_timestamp: Optional[datetime] = convert_str_to_datetime(now_timestamp_str) + if not isinstance(now_timestamp, datetime): + logger.error(f"Freshness variable '{self.now_variable}' is not a timestamp: {now_timestamp_str}") + return now_timestamp def _get_now_timestamp_utc(self, now_timestamp: Optional[datetime]) -> Optional[datetime]: return self._datetime_to_utc(now_timestamp) if now_timestamp else None diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index 23600a0b7..67b84ecb2 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -80,6 +80,7 @@ def execute( contract_yaml_sources: list[ContractYamlSource], only_validate_without_execute: bool = False, variables: Optional[dict[str, str]] = None, + data_timestamp: Optional[str] = None, data_source_impls: Optional[list[DataSourceImpl]] = None, data_source_yaml_sources: Optional[list[DataSourceYamlSource]] = None, soda_cloud_impl: Optional[SodaCloud] = None, @@ -150,6 +151,7 @@ def execute( contract_yaml_sources=contract_yaml_sources, only_validate_without_execute=only_validate_without_execute, provided_variable_values=variables, + data_timestamp=data_timestamp, data_source_impls=data_source_impls, data_source_yaml_sources=data_source_yaml_sources, soda_cloud_impl=soda_cloud_impl, @@ -164,6 +166,7 @@ def _execute_locally( contract_yaml_sources: list[ContractYamlSource], only_validate_without_execute: bool, provided_variable_values: dict[str, str], + data_timestamp: Optional[str], data_source_impls: list[DataSourceImpl], data_source_yaml_sources: list[DataSourceYamlSource], soda_cloud_impl: Optional[SodaCloud], @@ -184,6 +187,7 @@ def _execute_locally( contract_yaml: ContractYaml = ContractYaml.parse( contract_yaml_source=contract_yaml_source, provided_variable_values=provided_variable_values, + data_timestamp=data_timestamp, ) data_source_name: str = ( contract_yaml.dataset[: contract_yaml.dataset.find("/")] if contract_yaml.dataset else None diff --git a/soda-core/src/soda_core/contracts/impl/contract_yaml.py b/soda-core/src/soda_core/contracts/impl/contract_yaml.py index 4052f7a4a..ff2c924d8 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_yaml.py +++ b/soda-core/src/soda_core/contracts/impl/contract_yaml.py @@ -133,6 +133,7 @@ def parse( cls, contract_yaml_source: ContractYamlSource, provided_variable_values: Optional[dict[str, str]] = None, + data_timestamp: Optional[str] = None, ) -> Optional[ContractYaml]: check_types_have_been_registered: bool = len(CheckYaml.check_yaml_parsers) > 0 if not check_types_have_been_registered: @@ -140,20 +141,27 @@ def parse( return ContractYaml( contract_yaml_source=contract_yaml_source, provided_variable_values=provided_variable_values, + data_timestamp=data_timestamp, ) def __init__( self, contract_yaml_source: ContractYamlSource, provided_variable_values: Optional[dict[str, str]], + data_timestamp: Optional[str] = None, ): self.contract_yaml_source: ContractYamlSource = contract_yaml_source self.contract_yaml_object: YamlObject = contract_yaml_source.parse() self.variables: list[VariableYaml] = self._parse_variable_yamls(contract_yaml_source, provided_variable_values) - self.data_timestamp: datetime = datetime.now(timezone.utc) - soda_variable_values: dict[str, str] = {"NOW": convert_datetime_to_str(self.data_timestamp)} + soda_now: datetime = datetime.now(timezone.utc) + self.data_timestamp: datetime = self._get_data_timestamp(data_timestamp, soda_now) + + soda_variable_values: dict[str, str] = { + "NOW": convert_datetime_to_str(soda_now), + "DATA_TIMESTAMP": convert_datetime_to_str(self.data_timestamp), + } self.resolved_variable_values: dict[str, str] = self._resolve_variable_values( variable_yamls=self.variables, @@ -385,6 +393,18 @@ def _parse_checks( return checks + def _get_data_timestamp(self, data_timestamp: Optional[str], default_soda_now: datetime) -> datetime: + if isinstance(data_timestamp, str): + parsed_data_timestamp = convert_str_to_datetime(data_timestamp) + if isinstance(parsed_data_timestamp, datetime): + return parsed_data_timestamp + else: + logging.error( + f"Provided 'data_timestamp' value is not a correct ISO 8601 " + f"timestamp format: '{data_timestamp}'" + ) + return default_soda_now + class VariableYaml: def __init__(self, variable_name: str, variable_yaml_object: YamlObject): From 100dd756e501422272b25e1c6d9fe340040486d0 Mon Sep 17 00:00:00 2001 From: Tom Baeyens Date: Mon, 16 Jun 2025 15:58:37 +0200 Subject: [PATCH 14/26] Default check names (#2317) Added default check names. --- .../contracts/contract_verification.py | 2 +- .../impl/check_types/duplicate_check.py | 2 -- .../impl/check_types/invalidity_check.py | 1 - .../impl/contract_verification_impl.py | 22 ++++++++++++++++++- .../tests/components/test_soda_cloud.py | 4 ++++ 5 files changed, 26 insertions(+), 5 deletions(-) diff --git a/soda-core/src/soda_core/contracts/contract_verification.py b/soda-core/src/soda_core/contracts/contract_verification.py index b18b55c8c..7fe685836 100644 --- a/soda-core/src/soda_core/contracts/contract_verification.py +++ b/soda-core/src/soda_core/contracts/contract_verification.py @@ -185,7 +185,7 @@ class Check: column_name: Optional[str] type: str qualifier: Optional[str] - name: str # Short description used in UI. Required. Between 1 and 4000 chars. User defined with key 'name' or auto-generated. + name: Optional[str] identity: str definition: str column_name: Optional[str] diff --git a/soda-core/src/soda_core/contracts/impl/check_types/duplicate_check.py b/soda-core/src/soda_core/contracts/impl/check_types/duplicate_check.py index af74c1705..7687b8bef 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/duplicate_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/duplicate_check.py @@ -76,7 +76,6 @@ def __init__( ) self.metric_name = "duplicate_percent" if check_yaml.metric == "percent" else "duplicate_count" - self.name = check_yaml.name if check_yaml.name else self.type self.distinct_count_metric_impl: MetricImpl = self._resolve_metric( ColumnDistinctCountMetricImpl(contract_impl=contract_impl, column_impl=column_impl, check_impl=self) @@ -237,7 +236,6 @@ def __init__( ) self.metric_name = "duplicate_percent" if check_yaml.metric == "percent" else "duplicate_count" - self.name = check_yaml.name if check_yaml.name else self.type self.multi_column_distinct_count_metric_impl: MetricImpl = self._resolve_metric( MultiColumnDistinctCountMetricImpl(contract_impl=contract_impl, check_impl=self) diff --git a/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py b/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py index 08a1dff75..08d3fe7c1 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py @@ -79,7 +79,6 @@ def __init__( # TODO create better support in class hierarchy for common vs specific stuff. name is common. see other check type impls self.metric_name = "invalid_percent" if check_yaml.metric == "percent" else "invalid_count" - self.name = check_yaml.name if check_yaml.name else self.type self.invalid_count_metric_impl: Optional[MetricImpl] = None if self.missing_and_validity.has_reference_data(): diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index 67b84ecb2..76a89ed3a 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -1034,9 +1034,9 @@ def __init__( self.contract_impl: ContractImpl = contract_impl self.check_yaml: CheckYaml = check_yaml + self.name: str = self._get_name_with_default(check_yaml) self.column_impl: Optional[ColumnImpl] = column_impl self.type: str = check_yaml.type_name - self.name: Optional[str] = check_yaml.name if check_yaml.name else self.type self.identity: str = self._build_identity( contract_impl=contract_impl, column_impl=column_impl, @@ -1049,6 +1049,26 @@ def __init__( self.queries: list[Query] = [] self.skip: bool = False + __DEFAULT_CHECK_NAMES_BY_TYPE: dict[str, str] = { + "schema": "Schema matches expected structure", + "row_count": "Row count meets expected threshold", + "freshness": "Data is fresh", + "missing": "No missing values", + "invalid": "No invalid values", + "duplicate": "No duplicate values", + "aggregate": "Metric function meets threshold", + "metric": "Metric meets threshold", + "failed_rows": "No rows violating the condition", + } + + def _get_name_with_default(self, check_yaml: CheckYaml) -> str: + if isinstance(check_yaml.name, str): + return check_yaml.name + default_check_name: Optional[str] = self.__DEFAULT_CHECK_NAMES_BY_TYPE.get(check_yaml.type_name) + if isinstance(default_check_name, str): + return default_check_name + return check_yaml.type_name + def _resolve_metric(self, metric_impl: MetricImpl) -> MetricImpl: resolved_metric_impl: MetricImpl = self.contract_impl.metrics_resolver.resolve_metric(metric_impl) self.metrics.append(resolved_metric_impl) diff --git a/soda-tests/tests/components/test_soda_cloud.py b/soda-tests/tests/components/test_soda_cloud.py index db01eae56..8f7fe1c91 100644 --- a/soda-tests/tests/components/test_soda_cloud.py +++ b/soda-tests/tests/components/test_soda_cloud.py @@ -99,6 +99,7 @@ def test_soda_cloud_results(data_source_test_helper: DataSourceTestHelper, env_v must_be_less_than_or_equal: 2 - missing: qualifier: 2 + name: Second missing check threshold: must_be_less_than_or_equal: 5 checks: @@ -134,6 +135,9 @@ def test_soda_cloud_results(data_source_test_helper: DataSourceTestHelper, env_v assert 49.99 < missing_percent < 50.01 assert isinstance(missing_percent, float) + assert "No missing values" == request_2.json["checks"][0]["name"] + assert "Second missing check" == request_2.json["checks"][1]["name"] + def test_execute_over_agent(data_source_test_helper: DataSourceTestHelper): test_table = data_source_test_helper.ensure_test_table(test_table_specification) From 2e31e469f134c26a1c24fa2c5f69107c5922c620 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Mon, 16 Jun 2025 15:41:48 +0200 Subject: [PATCH 15/26] failed rows wip --- .../soda_core/contracts/impl/contract_verification_impl.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index 76a89ed3a..d1b1c8944 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -326,6 +326,11 @@ def __init__( dataset_identifier = DatasetIdentifier.parse(contract_yaml.dataset) self.dataset_prefix: list[str] = dataset_identifier.prefixes self.dataset_name = dataset_identifier.dataset_name + self.dataset_identifier: DatasetIdentifier = DatasetIdentifier( + data_source_name=self.data_source_impl.name, + prefixes=self.dataset_prefix, + dataset_name=self.dataset_name, + ) self.metrics_resolver: MetricsResolver = MetricsResolver() From f9e5fff60cfe756012cf7d32ce696bf087775897 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Tue, 17 Jun 2025 07:55:41 +0200 Subject: [PATCH 16/26] Failed rows extension wip --- .../src/soda_core/common/data_source_impl.py | 7 ++++ .../impl/contract_verification_impl.py | 36 +++++++++++-------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/soda-core/src/soda_core/common/data_source_impl.py b/soda-core/src/soda_core/common/data_source_impl.py index a7b203a1f..f60c5bf51 100644 --- a/soda-core/src/soda_core/common/data_source_impl.py +++ b/soda-core/src/soda_core/common/data_source_impl.py @@ -6,6 +6,7 @@ from soda_core.common.data_source_connection import DataSourceConnection from soda_core.common.data_source_results import QueryResult, UpdateResult +from soda_core.common.dataset_identifier import DatasetIdentifier from soda_core.common.exceptions import DataSourceConnectionException from soda_core.common.logging_constants import soda_logger from soda_core.common.sql_dialect import SqlDialect @@ -203,6 +204,12 @@ def test_connection_error_message(self) -> Optional[str]: def build_data_source(self) -> DataSource: return DataSource(name=self.name, type=self.type_name) + def qualify_dataset_name(self, dataset_identifier: DatasetIdentifier) -> str: + assert dataset_identifier.data_source_name == self.name + return self.sql_dialect.qualify_dataset_name( + dataset_prefix=dataset_identifier.prefixes, dataset_name=dataset_identifier.dataset_name + ) + def quote_identifier(self, identifier: str) -> str: c = self.sql_dialect._get_default_quote_char() return f"{c}{identifier}{c}" diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index d1b1c8944..d2594ce4f 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -303,7 +303,7 @@ def __init__( logs: Logs, contract_yaml: ContractYaml, only_validate_without_execute: bool, - data_source_impl: DataSourceImpl, + data_source_impl: Optional[DataSourceImpl], data_timestamp: datetime, soda_cloud: Optional[SodaCloud], publish_results: bool, @@ -326,26 +326,30 @@ def __init__( dataset_identifier = DatasetIdentifier.parse(contract_yaml.dataset) self.dataset_prefix: list[str] = dataset_identifier.prefixes self.dataset_name = dataset_identifier.dataset_name - self.dataset_identifier: DatasetIdentifier = DatasetIdentifier( - data_source_name=self.data_source_impl.name, - prefixes=self.dataset_prefix, - dataset_name=self.dataset_name, - ) self.metrics_resolver: MetricsResolver = MetricsResolver() self.column_impls: list[ColumnImpl] = [] self.check_impls: list[CheckImpl] = [] + # TODO replace usage of self.soda_qualified_dataset_name with self.dataset_identifier self.soda_qualified_dataset_name = contract_yaml.dataset - - self.sql_qualified_dataset_name: Optional[str] = ( - data_source_impl.sql_dialect.qualify_dataset_name( - dataset_prefix=self.dataset_prefix, dataset_name=self.dataset_name + # TODO replace usage of self.sql_qualified_dataset_name with self.dataset_identifier + self.sql_qualified_dataset_name: Optional[str] = None + + self.dataset_identifier: Optional[DatasetIdentifier] = None + if data_source_impl: + self.dataset_identifier = DatasetIdentifier( + data_source_name=self.data_source_impl.name, + prefixes=self.dataset_prefix, + dataset_name=self.dataset_name, + ) + # TODO replace usage of self.sql_qualified_dataset_name with self.dataset_identifier + self.sql_qualified_dataset_name = ( + data_source_impl.sql_dialect.qualify_dataset_name( + dataset_prefix=self.dataset_prefix, dataset_name=self.dataset_name + ) ) - if data_source_impl - else None - ) self.column_impls: list[ColumnImpl] = self._parse_columns(contract_yaml=contract_yaml) self.check_impls: list[CheckImpl] = self._parse_checks(contract_yaml) @@ -362,9 +366,11 @@ def __init__( ) self._verify_duplicate_identities(self.all_check_impls) - self.metrics: list[MetricImpl] = self.metrics_resolver.get_resolved_metrics() - self.queries: list[Query] = self._build_queries() if data_source_impl else [] + + self.queries: list[Query] = [] + if data_source_impl: + self.queries = self._build_queries() def _dataset_checks_came_before_columns_in_yaml(self) -> Optional[bool]: contract_keys: list[str] = self.contract_yaml.contract_yaml_object.keys() From 11b2eb87becacef4b7f36622d5896ef41a825973 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 17 Jun 2025 05:56:07 +0000 Subject: [PATCH 17/26] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../soda_core/contracts/impl/contract_verification_impl.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index d2594ce4f..34edeb9a4 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -345,10 +345,8 @@ def __init__( dataset_name=self.dataset_name, ) # TODO replace usage of self.sql_qualified_dataset_name with self.dataset_identifier - self.sql_qualified_dataset_name = ( - data_source_impl.sql_dialect.qualify_dataset_name( - dataset_prefix=self.dataset_prefix, dataset_name=self.dataset_name - ) + self.sql_qualified_dataset_name = data_source_impl.sql_dialect.qualify_dataset_name( + dataset_prefix=self.dataset_prefix, dataset_name=self.dataset_name ) self.column_impls: list[ColumnImpl] = self._parse_columns(contract_yaml=contract_yaml) From ad9134e1ea1fb3445a3db5d5044c62ae87da6680 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Tue, 17 Jun 2025 16:44:35 +0200 Subject: [PATCH 18/26] Failed rows wip --- soda-core/src/soda_core/common/sql_dialect.py | 4 ++-- .../soda_postgres/model/data_source/postgres_data_source.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/soda-core/src/soda_core/common/sql_dialect.py b/soda-core/src/soda_core/common/sql_dialect.py index 42e0ad8f9..bc90fc848 100644 --- a/soda-core/src/soda_core/common/sql_dialect.py +++ b/soda-core/src/soda_core/common/sql_dialect.py @@ -89,14 +89,14 @@ def create_schema_if_not_exists_sql(self, schema_name: str) -> str: quoted_schema_name: str = self.quote_default(schema_name) return f"CREATE SCHEMA IF NOT EXISTS {quoted_schema_name};" - def build_select_sql(self, select_elements: list) -> str: + def build_select_sql(self, select_elements: list, add_semicolon: bool = True) -> str: statement_lines: list[str] = [] statement_lines.extend(self._build_cte_sql_lines(select_elements)) statement_lines.extend(self._build_select_sql_lines(select_elements)) statement_lines.extend(self._build_from_sql_lines(select_elements)) statement_lines.extend(self._build_where_sql_lines(select_elements)) statement_lines.extend(self._build_order_by_lines(select_elements)) - return "\n".join(statement_lines) + ";" + return "\n".join(statement_lines) + (";" if add_semicolon else "") def _build_select_sql_lines(self, select_elements: list) -> list[str]: select_field_sqls: list[str] = [] diff --git a/soda-postgres/src/soda_postgres/model/data_source/postgres_data_source.py b/soda-postgres/src/soda_postgres/model/data_source/postgres_data_source.py index 2e8c9efdb..33a0f9cfd 100644 --- a/soda-postgres/src/soda_postgres/model/data_source/postgres_data_source.py +++ b/soda-postgres/src/soda_postgres/model/data_source/postgres_data_source.py @@ -1,5 +1,5 @@ import abc -from typing import Literal, Optional +from typing import Literal from pydantic import Field, field_validator from soda_core.model.data_source.data_source import DataSourceBase @@ -15,7 +15,6 @@ class PostgresDataSource(DataSourceBase, abc.ABC): connection_properties: PostgresConnectionProperties = Field( ..., alias="connection", description="Data source connection details" ) - dwh_schema: Optional[str] = Field(default=None, description="Diagnostic warehouse schema") @field_validator("connection_properties", mode="before") def infer_connection_type(cls, value): From eeb99e6116124088bbfb0051c5a97b0fea7c8435 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Wed, 18 Jun 2025 11:32:59 +0200 Subject: [PATCH 19/26] Failed rows keys wip --- soda-core/src/soda_core/common/sql_dialect.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/soda-core/src/soda_core/common/sql_dialect.py b/soda-core/src/soda_core/common/sql_dialect.py index bc90fc848..670bb5b90 100644 --- a/soda-core/src/soda_core/common/sql_dialect.py +++ b/soda-core/src/soda_core/common/sql_dialect.py @@ -5,6 +5,7 @@ from numbers import Number from textwrap import dedent, indent +from soda_core.common.dataset_identifier import DatasetIdentifier from soda_core.common.sql_ast import * @@ -28,6 +29,11 @@ def quote_default(self, identifier: Optional[str]) -> Optional[str]: else None ) + def build_fully_qualified_sql_name(self, dataset_identifier: DatasetIdentifier) -> str: + return self.qualify_dataset_name( + dataset_prefix=dataset_identifier.prefixes, dataset_name=dataset_identifier.dataset_name + ) + def qualify_dataset_name(self, dataset_prefix: list[str], dataset_name: str) -> str: """ Creates a fully qualified table name, optionally quoting the table name From c5e4877e332540c228945528fcf4796b391311e2 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Wed, 18 Jun 2025 15:20:56 +0200 Subject: [PATCH 20/26] Failed rows keys wip --- .../contracts/impl/check_types/invalidity_check.py | 7 +++++-- .../soda_core/contracts/impl/contract_verification_impl.py | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py b/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py index 08d3fe7c1..5533495b8 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/invalidity_check.py @@ -98,7 +98,7 @@ def __init__( ) else: self.invalid_count_metric_impl = self._resolve_metric( - InvalidCountMetric(contract_impl=contract_impl, column_impl=column_impl, check_impl=self) + InvalidCountMetricImpl(contract_impl=contract_impl, column_impl=column_impl, check_impl=self) ) self.row_count_metric = self._resolve_metric(RowCountMetricImpl(contract_impl=contract_impl, check_impl=self)) @@ -145,8 +145,11 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> diagnostic_metric_values=diagnostic_metric_values, ) + def get_threshold_metric_impl(self) -> Optional[MetricImpl]: + return self.invalid_count_metric_impl -class InvalidCountMetric(AggregationMetricImpl): + +class InvalidCountMetricImpl(AggregationMetricImpl): def __init__( self, contract_impl: ContractImpl, diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index 34edeb9a4..86eb6fc2c 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -33,6 +33,7 @@ ContractVerificationStatus, DataSource, Measurement, + SodaException, Threshold, YamlFileContentInfo, ) @@ -1138,7 +1139,7 @@ def get_threshold_metric_impl(self) -> Optional[MetricImpl]: """ Used in extensions """ - return None + raise SodaException(f"Check type '{self.type}' does not support get_threshold_metric_impl'") class MissingAndValidityCheckImpl(CheckImpl): From 460a483c21ca0d700b669e62c1cc431af7fd1180 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Thu, 19 Jun 2025 11:26:28 +0200 Subject: [PATCH 21/26] Failed rows wip --- soda-core/src/soda_core/common/soda_cloud.py | 8 ++++---- .../contracts/impl/contract_verification_impl.py | 9 +++++---- soda-tests/tests/components/manual_test_agent_flow.py | 8 ++++++-- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/soda-core/src/soda_core/common/soda_cloud.py b/soda-core/src/soda_core/common/soda_cloud.py index f6bdd36bf..f7a61d473 100644 --- a/soda-core/src/soda_core/common/soda_cloud.py +++ b/soda-core/src/soda_core/common/soda_cloud.py @@ -228,7 +228,7 @@ def upload_contract_file(self, contract: Contract) -> Optional[str]: soda_cloud_file_path: str = f"{contract.soda_qualified_dataset_name.lower()}.yml" return self._upload_scan_yaml_file(yaml_str=contract_yaml_source_str, soda_cloud_file_path=soda_cloud_file_path) - def send_contract_result(self, contract_verification_result: ContractVerificationResult) -> Optional[str]: + def send_contract_result(self, contract_verification_result: ContractVerificationResult) -> Optional[dict]: """ Returns A scanId string if a 200 OK was received, None otherwise """ @@ -241,13 +241,13 @@ def send_contract_result(self, contract_verification_result: ContractVerificatio ) if response.status_code == 200: logger.info(f"{Emoticons.OK_HAND} Results sent to Soda Cloud") - response_json = response.json() + response_json: dict = response.json() if isinstance(response_json, dict): cloud_url: Optional[str] = response_json.get("cloudUrl") - scan_id: Optional[str] = response_json.get("scanId") if isinstance(cloud_url, str): logger.info(f"To view the dataset on Soda Cloud, see {cloud_url}") - return scan_id + return response_json + return None def send_contract_skeleton(self, contract_yaml_str: str, soda_cloud_file_path: str) -> None: file_id: Optional[str] = self._upload_scan_yaml_file( diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index 86eb6fc2c..f0ad8a4c0 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -69,7 +69,7 @@ def handle( data_source_impl: DataSourceImpl, contract_verification_result: ContractVerificationResult, soda_cloud: SodaCloud, - scan_id: str, + soda_cloud_send_results_response_json: dict, ): pass @@ -500,14 +500,15 @@ def verify(self) -> ContractVerificationResult: contract_verification_result.log_records = self.logs.pop_log_records() - scan_id: Optional[str] = None + soda_cloud_response_json: Optional[dict] = None if self.soda_cloud and self.publish_results: file_id: Optional[str] = self.soda_cloud.upload_contract_file(contract_verification_result.contract) if file_id: # Side effect to pass file id to console logging later on. TODO reconsider this contract.source.soda_cloud_file_id = file_id # send_contract_result will use contract.source.soda_cloud_file_id - scan_id = self.soda_cloud.send_contract_result(contract_verification_result) + soda_cloud_response_json = self.soda_cloud.send_contract_result(contract_verification_result) + scan_id: Optional[str] = soda_cloud_response_json.get("scanId") if not scan_id: contract_verification_result.sending_results_to_soda_cloud_failed = True else: @@ -520,7 +521,7 @@ def verify(self) -> ContractVerificationResult: data_source_impl=self.data_source_impl, contract_verification_result=contract_verification_result, soda_cloud=self.soda_cloud, - scan_id=scan_id, + soda_cloud_send_results_response_json=soda_cloud_response_json, ) return contract_verification_result diff --git a/soda-tests/tests/components/manual_test_agent_flow.py b/soda-tests/tests/components/manual_test_agent_flow.py index 7fb93131d..e9276d3d5 100644 --- a/soda-tests/tests/components/manual_test_agent_flow.py +++ b/soda-tests/tests/components/manual_test_agent_flow.py @@ -2,6 +2,7 @@ from dotenv import load_dotenv from soda_core.common.logging_configuration import configure_logging +from soda_core.common.soda_cloud import SodaCloud from soda_core.common.yaml import ContractYamlSource, SodaCloudYamlSource from soda_core.contracts.contract_verification import ContractVerificationSession @@ -31,13 +32,16 @@ def main(): soda_cloud_yaml_str = dedent( """ soda_cloud: - bla: bla + api_key_id: ${SODA_CLOUD_API_KEY_ID} + api_key_secret: ${SODA_CLOUD_API_KEY_SECRET} """ ).strip() + soda_cloud: SodaCloud = SodaCloud.from_yaml_source(SodaCloudYamlSource.from_str(soda_cloud_yaml_str), provided_variable_values={}) + ContractVerificationSession.execute( contract_yaml_sources=[ContractYamlSource.from_str(contract_yaml_str)], - soda_cloud_yaml_source=SodaCloudYamlSource.from_str(soda_cloud_yaml_str), + soda_cloud_impl=soda_cloud, soda_cloud_use_agent=True, soda_cloud_use_agent_blocking_timeout_in_minutes=55, ) From d1d78f6a9b78563d639eba8b1ce4a789e88362f3 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Thu, 19 Jun 2025 13:08:19 +0200 Subject: [PATCH 22/26] failed rows wip --- .../soda_core/contracts/impl/contract_verification_impl.py | 3 +++ soda-core/src/soda_core/contracts/impl/contract_yaml.py | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py index f0ad8a4c0..e59619261 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py +++ b/soda-core/src/soda_core/contracts/impl/contract_verification_impl.py @@ -202,6 +202,7 @@ def _execute_locally( contract_yaml=contract_yaml, only_validate_without_execute=only_validate_without_execute, data_timestamp=contract_yaml.data_timestamp, + execution_timestamp=contract_yaml.execution_timestamp, data_source_impl=data_source_impl, soda_cloud=soda_cloud_impl, publish_results=soda_cloud_publish_results, @@ -306,6 +307,7 @@ def __init__( only_validate_without_execute: bool, data_source_impl: Optional[DataSourceImpl], data_timestamp: datetime, + execution_timestamp: datetime, soda_cloud: Optional[SodaCloud], publish_results: bool, ): @@ -320,6 +322,7 @@ def __init__( self.started_timestamp: datetime = datetime.now(tz=timezone.utc) + self.execution_timestamp: datetime = execution_timestamp self.data_timestamp: datetime = data_timestamp self.dataset_name: Optional[str] = None diff --git a/soda-core/src/soda_core/contracts/impl/contract_yaml.py b/soda-core/src/soda_core/contracts/impl/contract_yaml.py index ff2c924d8..93e8f1c62 100644 --- a/soda-core/src/soda_core/contracts/impl/contract_yaml.py +++ b/soda-core/src/soda_core/contracts/impl/contract_yaml.py @@ -155,11 +155,11 @@ def __init__( self.variables: list[VariableYaml] = self._parse_variable_yamls(contract_yaml_source, provided_variable_values) - soda_now: datetime = datetime.now(timezone.utc) - self.data_timestamp: datetime = self._get_data_timestamp(data_timestamp, soda_now) + self.execution_timestamp: datetime = datetime.now(timezone.utc) + self.data_timestamp: datetime = self._get_data_timestamp(data_timestamp, self.execution_timestamp) soda_variable_values: dict[str, str] = { - "NOW": convert_datetime_to_str(soda_now), + "NOW": convert_datetime_to_str(self.execution_timestamp), "DATA_TIMESTAMP": convert_datetime_to_str(self.data_timestamp), } From 920778ac6d95f821b63dbde558c42b1aec17bac5 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Fri, 20 Jun 2025 14:58:30 +0200 Subject: [PATCH 23/26] Failed rows wip --- soda-core/src/soda_core/common/data_source_impl.py | 6 +++--- .../soda_core/contracts/impl/check_types/schema_check.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/soda-core/src/soda_core/common/data_source_impl.py b/soda-core/src/soda_core/common/data_source_impl.py index f60c5bf51..c25e27d27 100644 --- a/soda-core/src/soda_core/common/data_source_impl.py +++ b/soda-core/src/soda_core/common/data_source_impl.py @@ -146,16 +146,16 @@ def get_max_aggregation_query_length(self) -> int: # BigQuery: No documented limit on query size, but practical limits on complexity and performance. return 63 * 1024 * 1024 - def is_different_data_type(self, expected_column: ColumnMetadata, actual_column_metadata: ColumnMetadata) -> bool: + def is_different_data_type(self, expected_column: ColumnMetadata, actual_column: ColumnMetadata) -> bool: canonical_expected_data_type: str = self.get_canonical_data_type(expected_column.data_type) - canonical_actual_data_type: str = self.get_canonical_data_type(actual_column_metadata.data_type) + canonical_actual_data_type: str = self.get_canonical_data_type(actual_column.data_type) if canonical_expected_data_type != canonical_actual_data_type: return True if ( isinstance(expected_column.character_maximum_length, int) - and expected_column.character_maximum_length != actual_column_metadata.character_maximum_length + and expected_column.character_maximum_length != actual_column.character_maximum_length ): return True diff --git a/soda-core/src/soda_core/contracts/impl/check_types/schema_check.py b/soda-core/src/soda_core/contracts/impl/check_types/schema_check.py index 2f209f1a5..36693f49e 100644 --- a/soda-core/src/soda_core/contracts/impl/check_types/schema_check.py +++ b/soda-core/src/soda_core/contracts/impl/check_types/schema_check.py @@ -141,7 +141,7 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) -> actual_column_metadata and expected_column.data_type and self.contract_impl.data_source_impl.is_different_data_type( - expected_column=expected_column, actual_column_metadata=actual_column_metadata + expected_column=expected_column, actual_column=actual_column_metadata ) ): column_data_type_mismatches.append( From 8560ca6323c74cab41d6368e9366c6e4975fc271 Mon Sep 17 00:00:00 2001 From: tombaeyens Date: Sat, 21 Jun 2025 10:58:09 +0200 Subject: [PATCH 24/26] Added diagnostics check results table --- .../soda_core/common/statements/metadata_columns_query.py | 6 ++++++ soda-tests/tests/features/test_duplicate_column_check.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/soda-core/src/soda_core/common/statements/metadata_columns_query.py b/soda-core/src/soda_core/common/statements/metadata_columns_query.py index e849586ff..31409bcb2 100644 --- a/soda-core/src/soda_core/common/statements/metadata_columns_query.py +++ b/soda-core/src/soda_core/common/statements/metadata_columns_query.py @@ -13,6 +13,12 @@ class ColumnMetadata: data_type: str character_maximum_length: Optional[int] + def get_data_type_ddl(self) -> str: + if self.character_maximum_length is None: + return self.data_type + else: + return f"{self.data_type}({self.character_maximum_length})" + class MetadataColumnsQuery: def __init__(self, sql_dialect: SqlDialect, data_source_connection: DataSourceConnection): diff --git a/soda-tests/tests/features/test_duplicate_column_check.py b/soda-tests/tests/features/test_duplicate_column_check.py index f38a25c51..fd80137b8 100644 --- a/soda-tests/tests/features/test_duplicate_column_check.py +++ b/soda-tests/tests/features/test_duplicate_column_check.py @@ -116,7 +116,7 @@ def test_duplicate_metric_typo_error(data_source_test_helper: DataSourceTestHelp metric: percentttt """, ) - assert "'metric' must be in ['count', 'percent']" == contract_verification_result.get_errors_str() + assert "'metric' must be in ['count', 'percent']" in contract_verification_result.get_errors_str() def test_duplicate_with_check_filter(data_source_test_helper: DataSourceTestHelper): From a23444bc90fefc6da04a05dd5048f2add71dba96 Mon Sep 17 00:00:00 2001 From: Milan Lukac Date: Mon, 23 Jun 2025 16:36:10 +0200 Subject: [PATCH 25/26] Failed rows config WIP --- .../model/data_source/data_source.py | 4 +- soda-core/src/soda_core/model/failed_rows.py | 79 +++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 soda-core/src/soda_core/model/failed_rows.py diff --git a/soda-core/src/soda_core/model/data_source/data_source.py b/soda-core/src/soda_core/model/data_source/data_source.py index b447910e2..3fdbb6480 100644 --- a/soda-core/src/soda_core/model/data_source/data_source.py +++ b/soda-core/src/soda_core/model/data_source/data_source.py @@ -1,10 +1,11 @@ import abc -from typing import Literal +from typing import Literal, Optional from pydantic import BaseModel, Field from soda_core.model.data_source.data_source_connection_properties import ( DataSourceConnectionProperties, ) +from soda_core.model.failed_rows import FailedRowsConfigDatasource class DataSourceBase( @@ -20,6 +21,7 @@ class DataSourceBase( connection_properties: DataSourceConnectionProperties = Field( ..., alias="connection", description="Data source connection details" ) + failed_rows: Optional[FailedRowsConfigDatasource] = Field(..., description="Configuration for failed rows storage") @classmethod def get_class_type(cls) -> str: diff --git a/soda-core/src/soda_core/model/failed_rows.py b/soda-core/src/soda_core/model/failed_rows.py new file mode 100644 index 000000000..0fb90cf61 --- /dev/null +++ b/soda-core/src/soda_core/model/failed_rows.py @@ -0,0 +1,79 @@ +import abc +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + +DEFAULT_SCHEMA = "failed_rows" + + +class FailedRowsStrategy(str, Enum): + """Enum for failed rows strategy""" + + NONE = "none" + STORE_DIAGNOSTICS = "store_diagnostics" + STORE_KEYS = "store_keys" + STORE_DATA = "store_data" + + +class FailedRowsConfigBase( + BaseModel, + abc.ABC, + frozen=True, + extra="forbid", + validate_by_name=True, # Allow to use both field names and aliases when populating from dict +): + enabled: Optional[bool] = Field( + None, + description="Enable or disable the storage of failed rows. " "If set to false, failed rows will not be stored.", + ) + + +class FailedRowsConfigOrganisation(FailedRowsConfigBase): + """Top-level configuration for failed rows, used on Soda Cloud.""" + + # Overrides the `enabled` field on other levels as Cloud config will return a bool always. + enabled: bool = Field( + ..., + description="Enable or disable the storage of failed rows. " "If set to false, failed rows will not be stored.", + ) + + path_default: Optional[str] = Field( + "{{ data_source.database }}/{DEFAULT_SCHEMA}", # TODO: revisit + description="Path to the warehouse location where failed rows will be stored. ", + ) + enabled_by_default: bool = Field( + True, + description="Enable or disable the storage of failed rows by default. Does not override the `enabled` setting if `enabled` is set to false." + "If set to false, failed rows will not be stored unless explicitly enabled in the contract or check.", + ) + strategy_default: FailedRowsStrategy = Field( + FailedRowsStrategy.STORE_DIAGNOSTICS, description="Default strategy for storing failed rows." + ) + + +class FailedRowsConfigDatasource(FailedRowsConfigBase): + """Top-level configuration for failed rows, on data source level.""" + + path: Optional[str] = Field(..., description="Path to the warehouse location where failed rows will be stored.") + enabled_by_default: Optional[bool] = Field( + True, + description="Enable or disable the storage of failed rows by default. Does not override the `enabled` setting if `enabled` is set to false." + "If set to false, failed rows will not be stored unless explicitly enabled in the contract or check.", + ) + strategy_default: Optional[FailedRowsStrategy] = Field( + FailedRowsStrategy.STORE_DIAGNOSTICS, description="Default strategy for storing failed rows." + ) + + +class FailedRowsConfigContract(FailedRowsConfigBase): + """Configuration for failed rows at the contract level.""" + + path: Optional[str] = Field(..., description="Path to the warehouse location where failed rows will be stored.") + strategy: Optional[FailedRowsStrategy] = Field(..., description="Strategy for storing failed rows.") + + +class FailedRowsConfigCheck(FailedRowsConfigBase): + """Configuration for failed rows at the check level.""" + + strategy: Optional[FailedRowsStrategy] = Field(..., description="Strategy for storing failed rows.") From 2e94aac94efc18da8ace1b737b7481e2db346581 Mon Sep 17 00:00:00 2001 From: Milan Lukac Date: Mon, 23 Jun 2025 16:45:33 +0200 Subject: [PATCH 26/26] Failed rows config WIP --- soda-core/src/soda_core/model/data_source/data_source.py | 2 +- soda-core/src/soda_core/model/failed_rows.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/soda-core/src/soda_core/model/data_source/data_source.py b/soda-core/src/soda_core/model/data_source/data_source.py index 3fdbb6480..3f0562bb3 100644 --- a/soda-core/src/soda_core/model/data_source/data_source.py +++ b/soda-core/src/soda_core/model/data_source/data_source.py @@ -21,7 +21,7 @@ class DataSourceBase( connection_properties: DataSourceConnectionProperties = Field( ..., alias="connection", description="Data source connection details" ) - failed_rows: Optional[FailedRowsConfigDatasource] = Field(..., description="Configuration for failed rows storage") + failed_rows: Optional[FailedRowsConfigDatasource] = Field(None, description="Configuration for failed rows storage") @classmethod def get_class_type(cls) -> str: diff --git a/soda-core/src/soda_core/model/failed_rows.py b/soda-core/src/soda_core/model/failed_rows.py index 0fb90cf61..cf3cf0085 100644 --- a/soda-core/src/soda_core/model/failed_rows.py +++ b/soda-core/src/soda_core/model/failed_rows.py @@ -55,7 +55,7 @@ class FailedRowsConfigOrganisation(FailedRowsConfigBase): class FailedRowsConfigDatasource(FailedRowsConfigBase): """Top-level configuration for failed rows, on data source level.""" - path: Optional[str] = Field(..., description="Path to the warehouse location where failed rows will be stored.") + path: Optional[str] = Field(None, description="Path to the warehouse location where failed rows will be stored.") enabled_by_default: Optional[bool] = Field( True, description="Enable or disable the storage of failed rows by default. Does not override the `enabled` setting if `enabled` is set to false." @@ -69,11 +69,11 @@ class FailedRowsConfigDatasource(FailedRowsConfigBase): class FailedRowsConfigContract(FailedRowsConfigBase): """Configuration for failed rows at the contract level.""" - path: Optional[str] = Field(..., description="Path to the warehouse location where failed rows will be stored.") - strategy: Optional[FailedRowsStrategy] = Field(..., description="Strategy for storing failed rows.") + path: Optional[str] = Field(None, description="Path to the warehouse location where failed rows will be stored.") + strategy: Optional[FailedRowsStrategy] = Field(None, description="Strategy for storing failed rows.") class FailedRowsConfigCheck(FailedRowsConfigBase): """Configuration for failed rows at the check level.""" - strategy: Optional[FailedRowsStrategy] = Field(..., description="Strategy for storing failed rows.") + strategy: Optional[FailedRowsStrategy] = Field(None, description="Strategy for storing failed rows.")