From 6cd9f02c78aa99ebbf3b0715f21a396fda211d6e Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 17 Jul 2025 18:41:32 +0100 Subject: [PATCH 01/43] move criticiality of rule inro _validate_attributes --- src/databricks/labs/dqx/rule.py | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/src/databricks/labs/dqx/rule.py b/src/databricks/labs/dqx/rule.py index 7d1e23a91..cb2f6c327 100644 --- a/src/databricks/labs/dqx/rule.py +++ b/src/databricks/labs/dqx/rule.py @@ -195,6 +195,15 @@ def _initialize_name_if_missing(self, check_condition: Column): object.__setattr__(self, "name", normalized_name) def _validate_attributes(self) -> None: + """Verify Criticality of rule""" + criticality = self.criticality + if criticality not in {Criticality.WARN.value, Criticality.ERROR.value}: + raise ValueError( + f"Invalid 'criticality' value: '{criticality}'. " + f"Expected '{Criticality.WARN.value}' or '{Criticality.ERROR.value}'. " + f"Check details: {self.name}" + ) + """Validate input attributes.""" if self.column is not None and self.columns is not None: raise ValueError("Both 'column' and 'columns' cannot be provided at the same time.") @@ -206,23 +215,6 @@ def get_check_condition(self) -> Column: :return: The Spark Column representing the check condition. """ - - @ft.cached_property - def check_criticality(self) -> str: - """Criticality of the check. - - :return: string describing criticality - `warn` or `error`. - :raises ValueError: if criticality is invalid. - """ - criticality = self.criticality - if criticality not in {Criticality.WARN.value, Criticality.ERROR.value}: - raise ValueError( - f"Invalid 'criticality' value: '{criticality}'. " - f"Expected '{Criticality.WARN.value}' or '{Criticality.ERROR.value}'. " - f"Check details: {self.name}" - ) - return criticality - @ft.cached_property def columns_as_string_expr(self) -> Column: """Spark Column expression representing the column(s) as a string (not normalized). From 98371b705b18d263252252a469932d8344bc1a5d Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 17 Jul 2025 18:42:07 +0100 Subject: [PATCH 02/43] since criticality is validated after creation, filter by criticality attribute --- src/databricks/labs/dqx/engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/dqx/engine.py b/src/databricks/labs/dqx/engine.py index 28d9f8a4f..72d3074d6 100644 --- a/src/databricks/labs/dqx/engine.py +++ b/src/databricks/labs/dqx/engine.py @@ -401,7 +401,7 @@ def _get_check_columns(checks: list[DQRule], criticality: str) -> list[DQRule]: :param criticality: criticality :return: list of check columns """ - return [check for check in checks if check.check_criticality == criticality] + return [check for check in checks if check.criticality == criticality] def _append_empty_checks(self, df: DataFrame) -> DataFrame: """Append empty checks at the end of dataframe. From 82c7a2200aa40ed035304913757c8405f0106104 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 17:00:22 +0100 Subject: [PATCH 03/43] feat: add check for valid json --- src/databricks/labs/dqx/check_funcs.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 962d550a8..a5bf9bbb7 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1747,6 +1747,27 @@ def apply(df: DataFrame) -> DataFrame: return condition, apply +@register_rule("row") +def is_valid_json(column: str | Column) -> Column: + """ + Build a condition to check if a column contains valid JSON strings. + + Args: + column: Column name (str) or Column expression to check for valid JSON. + + Returns: + A Spark Column representing the condition for invalid JSON strings. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + return make_condition( + F.when(F.try_parse_json(col_expr).isNull(), F.lit(None)).otherwise(F.lit(True)), + F.concat_ws( + "", F.lit("Value '"), col_expr.cast("string"), F.lit(f"' in Column '{col_expr_str}' is not a valid JSON") + ), + f"{col_str_norm}_is_not_valid_json", + ) + + def _get_schema(input_schema: str | types.StructType, columns: list[str] | None = None) -> types.StructType: """ Normalize the input schema into a Spark StructType schema. From f1ec4af5867190f2b32e7714cc4cf6ac2fe46071 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 19:16:46 +0100 Subject: [PATCH 04/43] feat: add checks for is_valid_json --- tests/unit/test_build_rules.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/unit/test_build_rules.py b/tests/unit/test_build_rules.py index ca9607554..ed01f936d 100644 --- a/tests/unit/test_build_rules.py +++ b/tests/unit/test_build_rules.py @@ -25,6 +25,7 @@ is_not_less_than, is_not_greater_than, is_valid_date, + is_valid_json, regex_match, compare_datasets, ) @@ -1124,6 +1125,9 @@ def test_convert_dq_rules_to_metadata(): DQRowRule( criticality="error", check_func=is_valid_date, column="b", check_func_kwargs={"date_format": "yyyy-MM-dd"} ), + DQRowRule( + criticality="error", check_func=is_valid_json, column="col_json_str" + ), DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1", "col2"]), DQDatasetRule( criticality="error", @@ -1282,6 +1286,14 @@ def test_convert_dq_rules_to_metadata(): "arguments": {"column": "b", "date_format": "yyyy-MM-dd"}, }, }, + { + "name": "col_json_str_is_not_valid_json", + "criticality": "error", + "check": { + "function": "is_valid_json", + "arguments": {"column": "col_json_str"}, + }, + }, { "name": "struct_col1_col2_is_not_unique", "criticality": "error", @@ -1454,6 +1466,9 @@ def test_metadata_round_trip_conversion_preserves_rules() -> None: DQRowRule( criticality="error", check_func=is_valid_date, column="b", check_func_kwargs={"date_format": "yyyy-MM-dd"} ), + DQRowRule( + criticality="error", check_func=is_valid_json, column="b" + ), DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1", "col2"]), DQDatasetRule( criticality="error", From dfa9649b3589b4ab7fbfff79ca2c493488844ace Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 21:06:56 +0100 Subject: [PATCH 05/43] feat: add is_valid_json --- docs/dqx/docs/reference/quality_checks.mdx | 52 ++++++++++++------- src/databricks/labs/dqx/check_funcs.py | 4 +- .../llm/resources/yaml_checks_examples.yml | 5 ++ tests/integration/test_apply_checks.py | 30 +++++++++-- tests/resources/all_row_checks.yaml | 7 +++ tests/unit/test_build_rules.py | 8 +-- 6 files changed, 77 insertions(+), 29 deletions(-) mode change 100644 => 100755 tests/integration/test_apply_checks.py diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index 3b8a6c750..743361202 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -38,6 +38,7 @@ You can also define your own custom checks (see [Creating custom checks](#creati | `is_not_greater_than` | Checks whether the values in the input column are not greater than the provided limit. | `column`: column to check (can be a string column name or a column expression); `limit`: limit as number, date, timestamp, column name or sql expression | | `is_valid_date` | Checks whether the values in the input column have valid date formats. | `column`: column to check (can be a string column name or a column expression); `date_format`: optional date format (e.g. 'yyyy-mm-dd') | | `is_valid_timestamp` | Checks whether the values in the input column have valid timestamp formats. | `column`: column to check (can be a string column name or a column expression); `timestamp_format`: optional timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') | +| `is_valid_json` | Checks whether the values in the input column are valid JSON objects. | `column`: column to check (can be a string column name or a column expression) | | `is_not_in_future` | Checks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_not_in_near_future` | Checks whether the values in the input column contain a timestamp that is not in the near future, where 'near future' is defined as greater than the current timestamp but less than the current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_older_than_n_days` | Checks whether the values in one input column are at least N days older than the values in another column. | `column`: column to check (can be a string column name or a column expression); `days`: number of days; `curr_date`: current date, if not provided current_date() function is used; `negate`: if the condition should be negated | @@ -330,6 +331,13 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen column: col6 timestamp_format: yyyy-MM-dd HH:mm:ss +# is_valid_json check +- criticality: error + check: + function: is_valid_json + arguments: + column: col_json_str + - criticality: error name: col6_is_not_valid_timestamp2 check: @@ -531,42 +539,42 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen function: is_linestring arguments: column: linestring_geom - + # is_polygon check - criticality: error check: function: is_polygon arguments: column: polygon_geom - + # is_multipoint check - criticality: error check: function: is_multipoint arguments: column: multipoint_geom - + # is_multilinestring check - criticality: error check: function: is_multilinestring arguments: column: multilinestring_geom - + # is_multipolygon check - criticality: error check: function: is_multipolygon arguments: column: multipolygon_geom - + # is_geometrycollection check - criticality: error check: function: is_geometrycollection arguments: column: geometrycollection_geom - + # is_ogc_valid check - criticality: error check: @@ -580,7 +588,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen function: is_non_empty_geometry arguments: column: point_geom - + # has_dimension check - criticality: error check: @@ -588,7 +596,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen arguments: column: polygon_geom dimension: 2 - + # has_x_coordinate_between check - criticality: error check: @@ -597,7 +605,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen column: polygon_geom min_value: 0.0 max_value: 10.0 - + # has_y_coordinate_between check - criticality: error check: @@ -606,6 +614,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen column: polygon_geom min_value: 0.0 max_value: 10.0 + ``` @@ -878,6 +887,13 @@ checks = [ name="col6_is_not_valid_timestamp2" ), + # is_valid_json check + DQRowRule( + criticality="error", + check_func=check_funcs.is_valid_json, + column="col_json_str" + ), + # is_not_in_future check DQRowRule( criticality="error", @@ -1015,7 +1031,7 @@ checks = [ check_func=geo_check_funcs.is_multilinestring, column="multilinestring_geom" ), - + # is_multipolygon check DQRowRule( criticality="error", @@ -3021,7 +3037,7 @@ The PII detection extras include a built-in `does_not_contain_pii` check that us function: does_not_contain_pii arguments: column: description - + # PII detection check with custom threshold and named entities - criticality: error check: @@ -3038,7 +3054,7 @@ The PII detection extras include a built-in `does_not_contain_pii` check that us ```python from databricks.labs.dqx.rule import DQRowRule from databricks.labs.dqx.pii.pii_detection_funcs import does_not_contain_pii - + checks = [ # Basic PII detection check DQRowRule( @@ -3056,7 +3072,7 @@ The PII detection extras include a built-in `does_not_contain_pii` check that us check_func_kwargs={"threshold": 0.8, "entities": ["PERSON", "EMAIL_ADDRESS"]} ), ] - ``` + ``` @@ -3093,7 +3109,7 @@ These can be loaded using `NLPEngineConfig`: from databricks.labs.dqx.rule import DQRowRule from databricks.labs.dqx.pii.pii_detection_funcs import does_not_contain_pii from databricks.labs.dqx.pii.nlp_engine_config import NLPEngineConfig - + checks = [ # PII detection check using spacy as a named entity recognizer DQRowRule( @@ -3102,7 +3118,7 @@ These can be loaded using `NLPEngineConfig`: column="description", check_func=does_not_contain_pii, check_func_kwargs={"nlp_engine_config": NLPEngineConfig.SPACY_MEDIUM} - ), + ), ] ``` @@ -3122,7 +3138,7 @@ Using custom models for named-entity recognition may require you to install thes from databricks.labs.dqx.rule import DQRowRule from databricks.labs.dqx.engine import DQEngine from databricks.sdk import WorkspaceClient - + nlp_engine_config = { 'nlp_engine_name': 'transformers_stanford_deidentifier_base', 'models': [ @@ -3165,9 +3181,9 @@ Using custom models for named-entity recognition may require you to install thes column="description", check_func=does_not_contain_pii, check_func_kwargs={"nlp_engine_config": nlp_engine_config}, - ), + ), ] - + dq_engine = DQEngine(WorkspaceClient()) df = spark.read.table("main.default.table") valid_df, quarantine_df = dq_engine.apply_checks_and_split(df, checks) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index a5bf9bbb7..8e1ac12be 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1750,7 +1750,7 @@ def apply(df: DataFrame) -> DataFrame: @register_rule("row") def is_valid_json(column: str | Column) -> Column: """ - Build a condition to check if a column contains valid JSON strings. + Checks whether the values in the input column is a valid JSON string. Args: column: Column name (str) or Column expression to check for valid JSON. @@ -1760,7 +1760,7 @@ def is_valid_json(column: str | Column) -> Column: """ col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) return make_condition( - F.when(F.try_parse_json(col_expr).isNull(), F.lit(None)).otherwise(F.lit(True)), + F.when(F.col(col_expr_str).isNotNull() & F.try_parse_json(col_expr_str).isNull(), F.col(col_expr_str).isNull()), F.concat_ws( "", F.lit("Value '"), col_expr.cast("string"), F.lit(f"' in Column '{col_expr_str}' is not a valid JSON") ), diff --git a/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml b/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml index f127b75f2..60d92c168 100644 --- a/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml +++ b/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml @@ -195,6 +195,11 @@ arguments: column: col6 timestamp_format: yyyy-MM-dd HH:mm:ss +- criticality: error + check: + function: is_valid_json + arguments: + column: col_json_str - criticality: error name: col6_is_not_valid_timestamp2 check: diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py old mode 100644 new mode 100755 index 5c572800c..519b5fb11 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -4470,7 +4470,7 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string" ) test_df = spark.createDataFrame( [ @@ -4487,6 +4487,7 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.1", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + "{}", ], [ "val2", @@ -4501,6 +4502,7 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.2", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + "{'key1': 1}", ], [ "val3", @@ -4515,6 +4517,7 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.3", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + "{'key1': [1, 2, 3]}", ], ], schema, @@ -4553,6 +4556,7 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.1", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + "{}", None, None, ], @@ -4569,6 +4573,7 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.2", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + "{'key1': 1}", None, None, ], @@ -4585,6 +4590,7 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.3", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + "{'key1': [1, 2, 3]}", None, None, ], @@ -4738,7 +4744,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string" ) test_df = spark.createDataFrame( [ @@ -4755,6 +4761,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.0", "2001:0db8:85a3:08d3:0000:0000:0000:0001", + "{}", ], [ "val2", @@ -4769,6 +4776,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.1", "2001:0db8:85a3:08d3:0000:0000:0000:1", + "{'key1': 1}", ], [ "val3", @@ -4783,6 +4791,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.2", "2001:0db8:85a3:08d3:0000::2", + "{'key1': [1, 2, 3]}", ], ], schema, @@ -4809,6 +4818,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.0", "2001:0db8:85a3:08d3:0000:0000:0000:0001", + "{}", None, None, ], @@ -4825,6 +4835,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.1", "2001:0db8:85a3:08d3:0000:0000:0000:1", + "{'key1': 1}", None, None, ], @@ -4841,6 +4852,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.2", "2001:0db8:85a3:08d3:0000::2", + "{'key1': [1, 2, 3]}", None, None, ], @@ -5553,6 +5565,12 @@ def test_apply_checks_all_checks_using_classes(ws, spark): column="col6", check_func_kwargs={"window_minutes": 1, "min_records_per_window": 1, "lookback_windows": 3}, ), + # is_valid_json check + DQRowRule( + criticality="error", + check_func=check_funcs.is_valid_json, + column="col_json_str", + ), ] dq_engine = DQEngine(ws) @@ -5560,7 +5578,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string" ) test_df = spark.createDataFrame( [ @@ -5577,6 +5595,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.255", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + "{}", ], [ "val2", @@ -5591,6 +5610,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.1", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + "{'key1': '1'}", ], [ "val3", @@ -5605,6 +5625,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.2", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + "{'key1': '[1, 2, 3]'}", ], ], schema, @@ -5628,6 +5649,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.255", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + "{}", None, None, ], @@ -5644,6 +5666,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.1", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + "{'key1': '1'}", None, None, ], @@ -5660,6 +5683,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.2", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + "{'key1': '[1, 2, 3]'}", None, None, ], diff --git a/tests/resources/all_row_checks.yaml b/tests/resources/all_row_checks.yaml index a7e8f47d8..f63a43cc3 100644 --- a/tests/resources/all_row_checks.yaml +++ b/tests/resources/all_row_checks.yaml @@ -240,6 +240,13 @@ arguments: column: col6 +# is_valid_json check +- criticality: error + check: + function: is_valid_json + arguments: + column: col_json_str + # is_not_in_future check - criticality: error check: diff --git a/tests/unit/test_build_rules.py b/tests/unit/test_build_rules.py index ed01f936d..5e9e3de46 100644 --- a/tests/unit/test_build_rules.py +++ b/tests/unit/test_build_rules.py @@ -1125,9 +1125,7 @@ def test_convert_dq_rules_to_metadata(): DQRowRule( criticality="error", check_func=is_valid_date, column="b", check_func_kwargs={"date_format": "yyyy-MM-dd"} ), - DQRowRule( - criticality="error", check_func=is_valid_json, column="col_json_str" - ), + DQRowRule(criticality="error", check_func=is_valid_json, column="col_json_str"), DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1", "col2"]), DQDatasetRule( criticality="error", @@ -1466,9 +1464,7 @@ def test_metadata_round_trip_conversion_preserves_rules() -> None: DQRowRule( criticality="error", check_func=is_valid_date, column="b", check_func_kwargs={"date_format": "yyyy-MM-dd"} ), - DQRowRule( - criticality="error", check_func=is_valid_json, column="b" - ), + DQRowRule(criticality="error", check_func=is_valid_json, column="b"), DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1", "col2"]), DQDatasetRule( criticality="error", From 89f281177911196bd5c358a6b1a7719c51a6591e Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 22:17:39 +0100 Subject: [PATCH 06/43] feat: add has_json_keys --- src/databricks/labs/dqx/check_funcs.py | 37 ++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 8e1ac12be..39b18c1da 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1768,6 +1768,43 @@ def is_valid_json(column: str | Column) -> Column: ) +@register_rule("row") +def has_json_keys(column: str | Column, keys: list[str | int], require_all: bool = True) -> Column: + """ + Checks whether the values in the input column contain specific JSON keys. + + Args: + column (str | Column): The name of the column or the column itself to check for JSON keys. + keys (list[str | int]): The list of JSON keys to check for. + require_all (bool): If True, all specified keys must be present. If False, at least one key must be present. + + Returns: + Column: A Spark Column representing the condition for missing JSON keys. + """ + if not keys: + raise InvalidParameterError("The 'keys' parameter must be a non-empty list of strings.") + for key in keys: + if not isinstance(key, (str, int)): + raise InvalidParameterError("All keys must be of type str or int.") + + unique_keys_lit = F.lit(list(set(keys))) + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + json_keys_array = F.json_object_keys(col_expr) + + if require_all: + condition = F.size(F.array_except(unique_keys_lit, json_keys_array)) == 0 + else: + condition = F.when(is_valid_json(col_str_norm).isNull(), F.arrays_overlap(json_keys_array, unique_keys_lit)) + + return make_condition( + condition, + F.concat_ws( + "", F.lit("Value '"), col_expr.cast("string"), F.lit(f"' in Column '{col_expr_str}' is not a valid JSON") + ), + f"{col_str_norm}_has_json_keys", + ) + + def _get_schema(input_schema: str | types.StructType, columns: list[str] | None = None) -> types.StructType: """ Normalize the input schema into a Spark StructType schema. From 02466c18f354eb56c5176f81e498f6a37e973c26 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 22:40:40 +0100 Subject: [PATCH 07/43] refactor: change logic --- src/databricks/labs/dqx/check_funcs.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 39b18c1da..a71c1140b 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1760,7 +1760,10 @@ def is_valid_json(column: str | Column) -> Column: """ col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) return make_condition( - F.when(F.col(col_expr_str).isNotNull() & F.try_parse_json(col_expr_str).isNull(), F.col(col_expr_str).isNull()), + F.when( + F.col(col_expr_str).isNotNull(), # preserve nulls + F.try_parse_json(col_expr_str).isNotNull() # True if parse ok, False if parse fails + ), F.concat_ws( "", F.lit("Value '"), col_expr.cast("string"), F.lit(f"' in Column '{col_expr_str}' is not a valid JSON") ), @@ -1769,13 +1772,13 @@ def is_valid_json(column: str | Column) -> Column: @register_rule("row") -def has_json_keys(column: str | Column, keys: list[str | int], require_all: bool = True) -> Column: +def has_json_keys(column: str | Column, keys: list[str], require_all: bool = True) -> Column: """ Checks whether the values in the input column contain specific JSON keys. Args: column (str | Column): The name of the column or the column itself to check for JSON keys. - keys (list[str | int]): The list of JSON keys to check for. + keys (list[str]): The list of JSON keys to check for. require_all (bool): If True, all specified keys must be present. If False, at least one key must be present. Returns: @@ -1784,8 +1787,8 @@ def has_json_keys(column: str | Column, keys: list[str | int], require_all: bool if not keys: raise InvalidParameterError("The 'keys' parameter must be a non-empty list of strings.") for key in keys: - if not isinstance(key, (str, int)): - raise InvalidParameterError("All keys must be of type str or int.") + if not isinstance(key, (str)): + raise InvalidParameterError("All keys must be of type string.") unique_keys_lit = F.lit(list(set(keys))) col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) From ccb6e05aed42e3f38fc2dc763155ef54e04d7a37 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 22:42:31 +0100 Subject: [PATCH 08/43] refactor: invert --- src/databricks/labs/dqx/check_funcs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index a71c1140b..7f2f22e0b 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1760,9 +1760,9 @@ def is_valid_json(column: str | Column) -> Column: """ col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) return make_condition( - F.when( - F.col(col_expr_str).isNotNull(), # preserve nulls - F.try_parse_json(col_expr_str).isNotNull() # True if parse ok, False if parse fails + ~F.when( + F.col(col_expr_str).isNotNull(), + F.try_parse_json(col_expr_str).isNotNull() ), F.concat_ws( "", F.lit("Value '"), col_expr.cast("string"), F.lit(f"' in Column '{col_expr_str}' is not a valid JSON") From 156a9c29b8a278140ae25efa817e6d5701ded5e8 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 22:45:52 +0100 Subject: [PATCH 09/43] refactor: negate --- src/databricks/labs/dqx/check_funcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 7f2f22e0b..3a1822de0 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1800,7 +1800,7 @@ def has_json_keys(column: str | Column, keys: list[str], require_all: bool = Tru condition = F.when(is_valid_json(col_str_norm).isNull(), F.arrays_overlap(json_keys_array, unique_keys_lit)) return make_condition( - condition, + ~condition, F.concat_ws( "", F.lit("Value '"), col_expr.cast("string"), F.lit(f"' in Column '{col_expr_str}' is not a valid JSON") ), From 8d30ff6e1951af1a63d7cf16b87fce6933fbe223 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 23:01:32 +0100 Subject: [PATCH 10/43] refactor: update --- src/databricks/labs/dqx/check_funcs.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 3a1822de0..f0054ad8f 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1760,10 +1760,7 @@ def is_valid_json(column: str | Column) -> Column: """ col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) return make_condition( - ~F.when( - F.col(col_expr_str).isNotNull(), - F.try_parse_json(col_expr_str).isNotNull() - ), + ~F.when(F.col(col_expr_str).isNotNull(), F.try_parse_json(col_expr_str).isNotNull()), F.concat_ws( "", F.lit("Value '"), col_expr.cast("string"), F.lit(f"' in Column '{col_expr_str}' is not a valid JSON") ), @@ -1802,7 +1799,12 @@ def has_json_keys(column: str | Column, keys: list[str], require_all: bool = Tru return make_condition( ~condition, F.concat_ws( - "", F.lit("Value '"), col_expr.cast("string"), F.lit(f"' in Column '{col_expr_str}' is not a valid JSON") + "", + F.lit("Value '"), + F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), + F.lit(f"' in Column '{col_expr_str}' keys are not in the key list: ["), + F.concat_ws(", ", *keys), + F.lit("]"), ), f"{col_str_norm}_has_json_keys", ) From 1873d724ab82788950d9c7441c1d736d584909c7 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 23:17:15 +0100 Subject: [PATCH 11/43] refactor: update --- docs/dqx/docs/reference/quality_checks.mdx | 46 +++++++++++++++++++--- src/databricks/labs/dqx/check_funcs.py | 2 +- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index 743361202..a68ccbc54 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -39,6 +39,7 @@ You can also define your own custom checks (see [Creating custom checks](#creati | `is_valid_date` | Checks whether the values in the input column have valid date formats. | `column`: column to check (can be a string column name or a column expression); `date_format`: optional date format (e.g. 'yyyy-mm-dd') | | `is_valid_timestamp` | Checks whether the values in the input column have valid timestamp formats. | `column`: column to check (can be a string column name or a column expression); `timestamp_format`: optional timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') | | `is_valid_json` | Checks whether the values in the input column are valid JSON objects. | `column`: column to check (can be a string column name or a column expression) | +| `has_json_keys` | Checks whether the values in the input column contain specific JSON keys. | `column`: column to check (can be a string column name or a column expression); `keys`: list of JSON keys to check for | | `is_not_in_future` | Checks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_not_in_near_future` | Checks whether the values in the input column contain a timestamp that is not in the near future, where 'near future' is defined as greater than the current timestamp but less than the current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_older_than_n_days` | Checks whether the values in one input column are at least N days older than the values in another column. | `column`: column to check (can be a string column name or a column expression); `days`: number of days; `curr_date`: current date, if not provided current_date() function is used; `negate`: if the condition should be negated | @@ -323,20 +324,37 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen column: col5 date_format: yyyy-MM-dd -# is_valid_timestamp check +# is_valid_json check - criticality: error check: - function: is_valid_timestamp + function: is_valid_json arguments: - column: col6 - timestamp_format: yyyy-MM-dd HH:mm:ss + column: col_json_str -# is_valid_json check +# has_json_keys check - criticality: error check: - function: is_valid_json + function: has_json_keys arguments: column: col_json_str + keys: [key1, key2] + +- criticality: error + name: col_json_str_has_no_json_keys2 + check: + function: has_json_keys + arguments: + column: col_json_str + keys: [key1, key2] + require_all: False + +# is_valid_timestamp check +- criticality: error + check: + function: is_valid_timestamp + arguments: + column: col6 + timestamp_format: yyyy-MM-dd HH:mm:ss - criticality: error name: col6_is_not_valid_timestamp2 @@ -872,6 +890,22 @@ checks = [ name="col5_is_not_valid_date2" ), + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", # or as expr: F.col("col_json_str") + check_func_kwargs={"keys": ["key1", "key2"]}, + name="col_json_str_has_json_keys" + ), + + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", # or as expr: F.col("col_json_str") + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + name="col_json_str_has_json_keys" + ), + # is_valid_timestamp check DQRowRule( criticality="error", diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index f0054ad8f..553a6ccee 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1806,7 +1806,7 @@ def has_json_keys(column: str | Column, keys: list[str], require_all: bool = Tru F.concat_ws(", ", *keys), F.lit("]"), ), - f"{col_str_norm}_has_json_keys", + f"{col_str_norm}_has_no_json_keys", ) From 5109c273710831f504a1543e25fddaf708b20c30 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 23:23:18 +0100 Subject: [PATCH 12/43] refactor: update --- src/databricks/labs/dqx/check_funcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 553a6ccee..55c20733f 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1803,7 +1803,7 @@ def has_json_keys(column: str | Column, keys: list[str], require_all: bool = Tru F.lit("Value '"), F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), F.lit(f"' in Column '{col_expr_str}' keys are not in the key list: ["), - F.concat_ws(", ", *keys), + F.concat_ws(", ", F.lit(keys)), F.lit("]"), ), f"{col_str_norm}_has_no_json_keys", From ceecf7d916375cd85a7d2a0c8da4d77122ae8921 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 23:33:40 +0100 Subject: [PATCH 13/43] refactor: updates --- docs/dqx/docs/reference/quality_checks.mdx | 9 ++++++--- tests/integration/test_apply_checks.py | 13 +++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index a68ccbc54..371500924 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -337,7 +337,8 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen function: has_json_keys arguments: column: col_json_str - keys: [key1, key2] + keys: + - key1 - criticality: error name: col_json_str_has_no_json_keys2 @@ -345,7 +346,9 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen function: has_json_keys arguments: column: col_json_str - keys: [key1, key2] + keys: + - key1 + - key2 require_all: False # is_valid_timestamp check @@ -894,7 +897,7 @@ checks = [ criticality="error", check_func=check_funcs.has_json_keys, column="col_json_str", # or as expr: F.col("col_json_str") - check_func_kwargs={"keys": ["key1", "key2"]}, + check_func_kwargs={"keys": ["key1"]}, name="col_json_str_has_json_keys" ), diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 519b5fb11..2ea063bf5 100755 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -5571,6 +5571,19 @@ def test_apply_checks_all_checks_using_classes(ws, spark): check_func=check_funcs.is_valid_json, column="col_json_str", ), + # has_json_keys check + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + ), + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1"]}, + ), ] dq_engine = DQEngine(ws) From 0c9408968b0be22b474ff075c082dad640c251d4 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 15 Oct 2025 23:39:56 +0100 Subject: [PATCH 14/43] refactor: change and update --- .../llm/resources/yaml_checks_examples.yml | 25 ++++++++++++++--- tests/unit/test_build_rules.py | 27 +++++++++++++++++++ 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml b/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml index 60d92c168..54418ff10 100644 --- a/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml +++ b/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml @@ -191,15 +191,32 @@ date_format: yyyy-MM-dd - criticality: error check: - function: is_valid_timestamp + function: is_valid_json arguments: - column: col6 - timestamp_format: yyyy-MM-dd HH:mm:ss + column: col_json_str - criticality: error check: - function: is_valid_json + function: has_json_keys + arguments: + column: col_json_str + keys: + - key1 +- criticality: error + name: col_json_str_has_no_json_keys2 + check: + function: has_json_keys arguments: column: col_json_str + keys: + - key1 + - key2 + require_all: false +- criticality: error + check: + function: is_valid_timestamp + arguments: + column: col6 + timestamp_format: yyyy-MM-dd HH:mm:ss - criticality: error name: col6_is_not_valid_timestamp2 check: diff --git a/tests/unit/test_build_rules.py b/tests/unit/test_build_rules.py index 5e9e3de46..0e0dad3ce 100644 --- a/tests/unit/test_build_rules.py +++ b/tests/unit/test_build_rules.py @@ -26,6 +26,7 @@ is_not_greater_than, is_valid_date, is_valid_json, + has_json_keys, regex_match, compare_datasets, ) @@ -1126,6 +1127,19 @@ def test_convert_dq_rules_to_metadata(): criticality="error", check_func=is_valid_date, column="b", check_func_kwargs={"date_format": "yyyy-MM-dd"} ), DQRowRule(criticality="error", check_func=is_valid_json, column="col_json_str"), + DQRowRule( + criticality="error", + check_func=has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1"]}, + ), + DQRowRule( + name="col_json_str_has_no_json_key1_key2", + criticality="error", + check_func=has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + ), DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1", "col2"]), DQDatasetRule( criticality="error", @@ -1292,6 +1306,19 @@ def test_convert_dq_rules_to_metadata(): "arguments": {"column": "col_json_str"}, }, }, + { + "name": "col_json_str_has_no_json_keys", + "criticality": "error", + "check": {"function": "has_json_keys", "arguments": {"column": "col_json_str", "keys": ["key1"]}}, + }, + { + "name": "col_json_str_has_no_json_key1_key2", + "criticality": "error", + "check": { + "function": "has_json_keys", + "arguments": {"column": "col_json_str", "keys": ["key1", "key2"], "require_all": False}, + }, + }, { "name": "struct_col1_col2_is_not_unique", "criticality": "error", From 05365e0702bb1339cad4519b1552fe1bb5c90137 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 16 Oct 2025 11:48:57 +0100 Subject: [PATCH 15/43] refactor: fix docs --- docs/dqx/docs/reference/quality_checks.mdx | 35 +++++++++++----------- src/databricks/labs/dqx/check_funcs.py | 6 ++-- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index 371500924..c8df2908a 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -39,7 +39,7 @@ You can also define your own custom checks (see [Creating custom checks](#creati | `is_valid_date` | Checks whether the values in the input column have valid date formats. | `column`: column to check (can be a string column name or a column expression); `date_format`: optional date format (e.g. 'yyyy-mm-dd') | | `is_valid_timestamp` | Checks whether the values in the input column have valid timestamp formats. | `column`: column to check (can be a string column name or a column expression); `timestamp_format`: optional timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') | | `is_valid_json` | Checks whether the values in the input column are valid JSON objects. | `column`: column to check (can be a string column name or a column expression) | -| `has_json_keys` | Checks whether the values in the input column contain specific JSON keys. | `column`: column to check (can be a string column name or a column expression); `keys`: list of JSON keys to check for | +| `has_json_keys` | Checks whether the values in the input column contain specific JSON keys. | `column`: column to check (can be a string column name or a column expression); `keys`: list of JSON keys to check for; `require_all`: optional boolean flag to require all keys to be present | | `is_not_in_future` | Checks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_not_in_near_future` | Checks whether the values in the input column contain a timestamp that is not in the near future, where 'near future' is defined as greater than the current timestamp but less than the current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_older_than_n_days` | Checks whether the values in one input column are at least N days older than the values in another column. | `column`: column to check (can be a string column name or a column expression); `days`: number of days; `curr_date`: current date, if not provided current_date() function is used; `negate`: if the condition should be negated | @@ -893,22 +893,6 @@ checks = [ name="col5_is_not_valid_date2" ), - DQRowRule( - criticality="error", - check_func=check_funcs.has_json_keys, - column="col_json_str", # or as expr: F.col("col_json_str") - check_func_kwargs={"keys": ["key1"]}, - name="col_json_str_has_json_keys" - ), - - DQRowRule( - criticality="error", - check_func=check_funcs.has_json_keys, - column="col_json_str", # or as expr: F.col("col_json_str") - check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, - name="col_json_str_has_json_keys" - ), - # is_valid_timestamp check DQRowRule( criticality="error", @@ -931,6 +915,23 @@ checks = [ column="col_json_str" ), + # has_json_keys check + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", # or as expr: F.col("col_json_str") + check_func_kwargs={"keys": ["key1"]}, + name="col_json_str_has_json_keys" + ), + + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", # or as expr: F.col("col_json_str") + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + name="col_json_str_has_json_keys" + ), + # is_not_in_future check DQRowRule( criticality="error", diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 55c20733f..de6258407 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1774,9 +1774,9 @@ def has_json_keys(column: str | Column, keys: list[str], require_all: bool = Tru Checks whether the values in the input column contain specific JSON keys. Args: - column (str | Column): The name of the column or the column itself to check for JSON keys. - keys (list[str]): The list of JSON keys to check for. - require_all (bool): If True, all specified keys must be present. If False, at least one key must be present. + column: The name of the column or the column itself to check for JSON keys. + keys: The list of JSON keys to check for. + require_all: If True, all specified keys must be present. If False, at least one key must be present. Returns: Column: A Spark Column representing the condition for missing JSON keys. From 7be64e6c28a8ad97c0d109c5cdf00985ef0d48da Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 16 Oct 2025 23:11:28 +0100 Subject: [PATCH 16/43] refactor: updates --- docs/dqx/docs/reference/quality_checks.mdx | 2 +- src/databricks/labs/dqx/check_funcs.py | 79 +++++++-- .../llm/resources/yaml_checks_examples.yml | 2 +- tests/integration/test_row_checks.py | 162 ++++++++++++++++++ tests/resources/all_row_checks.yaml | 19 ++ tests/unit/test_build_rules.py | 17 +- 6 files changed, 261 insertions(+), 20 deletions(-) diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index c8df2908a..aa83dfecb 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -341,7 +341,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen - key1 - criticality: error - name: col_json_str_has_no_json_keys2 + name: col_json_str_does_not_have_json_keys2 check: function: has_json_keys arguments: diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index de6258407..ae265567a 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1762,7 +1762,10 @@ def is_valid_json(column: str | Column) -> Column: return make_condition( ~F.when(F.col(col_expr_str).isNotNull(), F.try_parse_json(col_expr_str).isNotNull()), F.concat_ws( - "", F.lit("Value '"), col_expr.cast("string"), F.lit(f"' in Column '{col_expr_str}' is not a valid JSON") + "", + F.lit("Value '"), + col_expr.cast("string"), + F.lit(f"' in Column '{col_expr_str}' is not a valid JSON string"), ), f"{col_str_norm}_is_not_valid_json", ) @@ -1774,7 +1777,7 @@ def has_json_keys(column: str | Column, keys: list[str], require_all: bool = Tru Checks whether the values in the input column contain specific JSON keys. Args: - column: The name of the column or the column itself to check for JSON keys. + column: The name of the column or the column expression to check for JSON keys. keys: The list of JSON keys to check for. require_all: If True, all specified keys must be present. If False, at least one key must be present. @@ -1783,30 +1786,72 @@ def has_json_keys(column: str | Column, keys: list[str], require_all: bool = Tru """ if not keys: raise InvalidParameterError("The 'keys' parameter must be a non-empty list of strings.") - for key in keys: - if not isinstance(key, (str)): - raise InvalidParameterError("All keys must be of type string.") + if any(not isinstance(k, str) for k in keys): + raise InvalidParameterError("All keys must be of type string.") - unique_keys_lit = F.lit(list(set(keys))) col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) json_keys_array = F.json_object_keys(col_expr) + required_keys = F.array_distinct(F.array(*[F.lit(k) for k in keys])) + + json_validation_error = is_valid_json(col_str_norm) + is_invalid_json = json_validation_error.isNotNull() + + has_json_keys_msg = F.concat_ws( + "", + F.lit("Value '"), + F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), + F.lit(f"' in Column '{col_expr_str}' is missing keys in the list: ["), + F.concat_ws(", ", F.lit(keys)), + F.lit("]"), + ) + message = F.when(is_invalid_json, json_validation_error).otherwise(has_json_keys_msg) if require_all: - condition = F.size(F.array_except(unique_keys_lit, json_keys_array)) == 0 + missing = F.array_except(required_keys, json_keys_array) + condition_when_valid = F.size(missing) == 0 else: - condition = F.when(is_valid_json(col_str_norm).isNull(), F.arrays_overlap(json_keys_array, unique_keys_lit)) + condition_when_valid = F.arrays_overlap(json_keys_array, required_keys) + + condition = F.when(~is_invalid_json, condition_when_valid).otherwise(F.lit(False)) return make_condition( ~condition, - F.concat_ws( - "", - F.lit("Value '"), - F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), - F.lit(f"' in Column '{col_expr_str}' keys are not in the key list: ["), - F.concat_ws(", ", F.lit(keys)), - F.lit("]"), - ), - f"{col_str_norm}_has_no_json_keys", + message, + f"{col_str_norm}_does_not_have_json_keys", + ) + + +@register_rule("row") +def has_valid_json_schema(column: str | Column, schema: str | types.StructType) -> Column: + """ + Checks whether the values in the input column conform to a specified JSON schema. + + Args: + column: The name of the column or the column expression to check for JSON schema conformity. + schema: The expected JSON schema as a DDL string (e.g., "id INT, name STRING") or StructType object. + + Returns: + Column: A Spark Column representing the condition for JSON schema violations. + """ + _expected_schema = types.StructType.fromDDL(schema) + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + + json_validation_error = is_valid_json(col_str_norm) + is_invalid_json = json_validation_error.isNotNull() + parsed_column = F.from_json(col_expr, _expected_schema) + condition = parsed_column.isNotNull() | col_expr.isNull() + has_json_schema_msg = F.concat_ws( + "", + F.lit("Value '"), + F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), + F.lit(f"' in Column '{col_expr_str}' does not conform to the expected JSON schema: "), + F.lit(_expected_schema.simpleString()), + ) + + return make_condition( + ~condition, + F.when(is_invalid_json, json_validation_error).otherwise(has_json_schema_msg), + f"{col_str_norm}_has_invalid_json_schema", ) diff --git a/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml b/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml index 54418ff10..cfdda6745 100644 --- a/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml +++ b/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml @@ -202,7 +202,7 @@ keys: - key1 - criticality: error - name: col_json_str_has_no_json_keys2 + name: col_json_str_does_not_have_json_keys2 check: function: has_json_keys arguments: diff --git a/tests/integration/test_row_checks.py b/tests/integration/test_row_checks.py index 2fc713a03..af2d27520 100644 --- a/tests/integration/test_row_checks.py +++ b/tests/integration/test_row_checks.py @@ -23,6 +23,8 @@ is_not_null_and_is_in_list, is_not_null_and_not_empty_array, is_valid_date, + is_valid_json, + has_json_keys, is_valid_timestamp, is_valid_ipv4_address, is_ipv4_address_in_cidr, @@ -2726,3 +2728,163 @@ def test_col_is_equal_to(spark, set_utc_timezone): ) assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_valid_json(spark): + schema = "a: string, b: string" + test_df = spark.createDataFrame( + [ + ['{"key": "value"}', '{"key": value}'], + ['{"number": 123}', '{"number": 123}'], + ['{"array": [1, 2, 3]}', '{"array": [1, 2, 3}'], + ['Not a JSON string', 'Also not JSON'], + [None, None], + ['123', '"a string"'], + ['true', 'null'], + ['[]', '{}'], + ['{"a": 1,}', '{key: "value"}'], + ['[1, 2,', '{"a": "b"'], + ["{'a': 'b'}", ''], + [' {"a": 1} ', '{"b": 2}\n'], + ], + schema, + ) + + actual = test_df.select(is_valid_json("a"), is_valid_json("b")) + + expected_schema = "a_is_not_valid_json: string, b_is_not_valid_json: string" + + expected = spark.createDataFrame( + [ + [None, "Value '{\"key\": value}' in Column 'b' is not a valid JSON string"], + [None, None], + [None, "Value '{\"array\": [1, 2, 3}' in Column 'b' is not a valid JSON string"], + [ + "Value 'Not a JSON string' in Column 'a' is not a valid JSON string", + "Value 'Also not JSON' in Column 'b' is not a valid JSON string", + ], + [None, None], + [None, None], + [None, None], + [None, None], + [ + "Value '{\"a\": 1,}' in Column 'a' is not a valid JSON string", + "Value '{key: \"value\"}' in Column 'b' is not a valid JSON string", + ], + [ + "Value '[1, 2,' in Column 'a' is not a valid JSON string", + "Value '{\"a\": \"b\"' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{'a': 'b'}' in Column 'a' is not a valid JSON string", + "Value '' in Column 'b' is not a valid JSON string", + ], + [None, None], + ], + expected_schema, + ) + + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_json_keys_require_all_true(spark): + schema = "a: string, b: string" + test_df = spark.createDataFrame( + [ + ['{"key": "value", "another_key": 123}', '{"key": "value"}'], + ['{"number": 123}', '{"number": 123, "extra": true}'], + ['{"array": [1, 2, 3]}', '{"array": {1, 2, 3}]'], + ['{"key": "value"}', '{"missing_key": "value"}'], + [None, None], + ['Not a JSON string', '{"key": "value"}'], + ['{"key": "value"}', 'Not a JSON string'], + ['{"key": "value"}', None], + [None, '{"key": "value"}'], + ['{"nested": {"inner_key": "inner_value"}}', '{"nested": {"inner_key": "inner_value"}}'], + ], + schema, + ) + + actual = test_df.select( + has_json_keys("a", ["key", "another_key"]), + has_json_keys("b", ["key"]), + ) + + expected_schema = "a_does_not_have_json_keys: string, b_does_not_have_json_keys: string" + + expected = spark.createDataFrame( + [ + [None, None], + [ + "Value '{\"number\": 123}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"number\": 123, \"extra\": true}' in Column 'b' is missing keys in the list: [key]", + ], + [ + "Value '{\"array\": [1, 2, 3]}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"array\": {1, 2, 3}]' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{\"key\": \"value\"}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"missing_key\": \"value\"}' in Column 'b' is missing keys in the list: [key]", + ], + [None, None], + ["Value 'Not a JSON string' in Column 'a' is not a valid JSON string", None], + [ + "Value '{\"key\": \"value\"}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value 'Not a JSON string' in Column 'b' is not a valid JSON string", + ], + ["Value '{\"key\": \"value\"}' in Column 'a' is missing keys in the list: [key, another_key]", None], + [None, None], + [ + "Value '{\"nested\": {\"inner_key\": \"inner_value\"}}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"nested\": {\"inner_key\": \"inner_value\"}}' in Column 'b' is missing keys in the list: [key]", + ], + ], + expected_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_json_keys_require_at_least_one(spark): + schema = "a: string, b: string" + required_keys = ["key", "another_key", "extra_key"] + + test_data = [ + ['{"key": 1, "another_key": 2, "extra_key": 3}', '{"key": 1, "another_key": 2, "extra_key": 3}'], + ['{"key": 1}', '{"key": 1}'], + ['{"number": 123}', '{"random_sample": 1523}'], + ['{}', '{}'], + ['{"key": "value"', '{"key": "value"'], + [None, 'Not a JSON string'], + ] + + test_df = spark.createDataFrame(test_data, schema) + + actual = test_df.select( + has_json_keys("a", required_keys, require_all=False), + has_json_keys("b", required_keys, require_all=False), + ) + + expected_schema = "a_does_not_have_json_keys: string, b_does_not_have_json_keys: string" + + expected = spark.createDataFrame( + [ + [None, None], + [None, None], + [ + "Value '{\"number\": 123}' in Column 'a' is missing keys in the list: [key, another_key, extra_key]", + "Value '{\"random_sample\": 1523}' in Column 'b' is missing keys in the list: [key, another_key, extra_key]", + ], + [ + "Value '{}' in Column 'a' is missing keys in the list: [key, another_key, extra_key]", + "Value '{}' in Column 'b' is missing keys in the list: [key, another_key, extra_key]", + ], + [ + "Value '{\"key\": \"value\"' in Column 'a' is not a valid JSON string", + "Value '{\"key\": \"value\"' in Column 'b' is not a valid JSON string", + ], + [None, "Value 'Not a JSON string' in Column 'b' is not a valid JSON string"], + ], + expected_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) diff --git a/tests/resources/all_row_checks.yaml b/tests/resources/all_row_checks.yaml index f63a43cc3..e4f04074d 100644 --- a/tests/resources/all_row_checks.yaml +++ b/tests/resources/all_row_checks.yaml @@ -247,6 +247,25 @@ arguments: column: col_json_str +# has_json_keys check +- criticality: error + check: + function: has_json_keys + arguments: + column: col_json_str + keys: + - key1 + +- criticality: error + check: + function: has_json_keys + arguments: + column: col_json_str + keys: + - key1 + - key2 + require_all: false + # is_not_in_future check - criticality: error check: diff --git a/tests/unit/test_build_rules.py b/tests/unit/test_build_rules.py index 0e0dad3ce..41012eee4 100644 --- a/tests/unit/test_build_rules.py +++ b/tests/unit/test_build_rules.py @@ -27,6 +27,7 @@ is_valid_date, is_valid_json, has_json_keys, + has_valid_json_schema, regex_match, compare_datasets, ) @@ -1140,6 +1141,12 @@ def test_convert_dq_rules_to_metadata(): column="col_json_str", check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, ), + DQRowRule( + criticality="error", + check_func=has_valid_json_schema, + column="col_json_str", + check_func_kwargs={"schema": "STRUCT"}, + ), DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1", "col2"]), DQDatasetRule( criticality="error", @@ -1307,7 +1314,7 @@ def test_convert_dq_rules_to_metadata(): }, }, { - "name": "col_json_str_has_no_json_keys", + "name": "col_json_str_does_not_have_json_keys", "criticality": "error", "check": {"function": "has_json_keys", "arguments": {"column": "col_json_str", "keys": ["key1"]}}, }, @@ -1319,6 +1326,14 @@ def test_convert_dq_rules_to_metadata(): "arguments": {"column": "col_json_str", "keys": ["key1", "key2"], "require_all": False}, }, }, + { + "name": "col_json_str_has_invalid_json_schema", + "criticality": "error", + "check": { + "function": "has_valid_json_schema", + "arguments": {"column": "col_json_str", "schema": "STRUCT"}, + }, + }, { "name": "struct_col1_col2_is_not_unique", "criticality": "error", From c7d8406e95cf3801972efbfef0ca50f35b5a9ab1 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 16 Oct 2025 23:41:12 +0100 Subject: [PATCH 17/43] refactor: update logic --- src/databricks/labs/dqx/check_funcs.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index ae265567a..a569d55d9 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1833,13 +1833,22 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType) Returns: Column: A Spark Column representing the condition for JSON schema violations. """ - _expected_schema = types.StructType.fromDDL(schema) + try: + if isinstance(schema, str): + _expected_schema = types.StructType.fromDDL(schema) + elif isinstance(schema, types.StructType): + _expected_schema = schema + except Exception as e: + raise InvalidParameterError(f"Invalid schema: {schema}. Error: {e}") + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) json_validation_error = is_valid_json(col_str_norm) is_invalid_json = json_validation_error.isNotNull() parsed_column = F.from_json(col_expr, _expected_schema) - condition = parsed_column.isNotNull() | col_expr.isNull() + + condition = F.when(~is_invalid_json, parsed_column.isNotNull()).otherwise(F.lit(False)) + has_json_schema_msg = F.concat_ws( "", F.lit("Value '"), From 66cbb13bba37ed42f3c9433e662eab9f6ac7a0d8 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 16 Oct 2025 23:47:15 +0100 Subject: [PATCH 18/43] refactor: explcit True --- src/databricks/labs/dqx/check_funcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index a569d55d9..b0bfc3572 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1847,7 +1847,7 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType) is_invalid_json = json_validation_error.isNotNull() parsed_column = F.from_json(col_expr, _expected_schema) - condition = F.when(~is_invalid_json, parsed_column.isNotNull()).otherwise(F.lit(False)) + condition = F.when(~is_invalid_json & parsed_column.isNotNull(), F.lit(True)).otherwise(F.lit(False)) has_json_schema_msg = F.concat_ws( "", From 70e19bd6ccba37c215b51092dad95bd84332ed07 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Fri, 17 Oct 2025 00:38:16 +0100 Subject: [PATCH 19/43] refactor: remove repetition --- src/databricks/labs/dqx/check_funcs.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index b0bfc3572..9d9db511b 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1833,13 +1833,7 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType) Returns: Column: A Spark Column representing the condition for JSON schema violations. """ - try: - if isinstance(schema, str): - _expected_schema = types.StructType.fromDDL(schema) - elif isinstance(schema, types.StructType): - _expected_schema = schema - except Exception as e: - raise InvalidParameterError(f"Invalid schema: {schema}. Error: {e}") + _expected_schema = _get_schema(schema) col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) From a168d64b3f9b5218935ceb95b860b968676293b0 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Fri, 17 Oct 2025 00:51:35 +0100 Subject: [PATCH 20/43] refactor: remove as it depends on spark --- tests/unit/test_build_rules.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/tests/unit/test_build_rules.py b/tests/unit/test_build_rules.py index 41012eee4..f18b80949 100644 --- a/tests/unit/test_build_rules.py +++ b/tests/unit/test_build_rules.py @@ -27,7 +27,6 @@ is_valid_date, is_valid_json, has_json_keys, - has_valid_json_schema, regex_match, compare_datasets, ) @@ -1141,12 +1140,6 @@ def test_convert_dq_rules_to_metadata(): column="col_json_str", check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, ), - DQRowRule( - criticality="error", - check_func=has_valid_json_schema, - column="col_json_str", - check_func_kwargs={"schema": "STRUCT"}, - ), DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1", "col2"]), DQDatasetRule( criticality="error", @@ -1326,14 +1319,6 @@ def test_convert_dq_rules_to_metadata(): "arguments": {"column": "col_json_str", "keys": ["key1", "key2"], "require_all": False}, }, }, - { - "name": "col_json_str_has_invalid_json_schema", - "criticality": "error", - "check": { - "function": "has_valid_json_schema", - "arguments": {"column": "col_json_str", "schema": "STRUCT"}, - }, - }, { "name": "struct_col1_col2_is_not_unique", "criticality": "error", From c3c23e72225be4e5fc796fa128d54f0afff4e582 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Fri, 17 Oct 2025 01:18:28 +0100 Subject: [PATCH 21/43] feat: add perf test for 2 tests (remaining 1) --- tests/perf/conftest.py | 4 ++- tests/perf/test_apply_checks.py | 47 +++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/tests/perf/conftest.py b/tests/perf/conftest.py index e0897fc46..4fb3934b9 100644 --- a/tests/perf/conftest.py +++ b/tests/perf/conftest.py @@ -28,7 +28,8 @@ SCHEMA_STR = ( "col1: int, col2: int, col3: int, col4: array, " "col5: date, col6: timestamp, col7: map, " - "col8: struct, col10: int, col_ipv4: string, col_ipv6: string" + "col8: struct, col10: int, col_ipv4: string, col_ipv6: string, " + "col_json_str: string" ) RUN_TIME = datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc) @@ -98,6 +99,7 @@ def generated_df(spark, rows=DEFAULT_ROWS): .withColumnSpec("col10") .withColumnSpec("col_ipv4", template=r"\n.\n.\n.\n") .withColumnSpec("col_ipv6", template="XXXX:XXXX:XXXX:XXXX:XXXX:XXXX:XXXX:XXXX") + .withColumnSpec("col_json_str", template=r"{'\key1': '\w', '\key2': 'd\w'}") ) return spec.build() diff --git a/tests/perf/test_apply_checks.py b/tests/perf/test_apply_checks.py index 1f223d476..b5015075d 100644 --- a/tests/perf/test_apply_checks.py +++ b/tests/perf/test_apply_checks.py @@ -1591,3 +1591,50 @@ def test_benchmark_foreach_has_valid_schema(benchmark, ws, generated_string_df): benchmark.group += f"_{n_rows}_rows_{len(columns)}_columns" actual_count = benchmark(lambda: dq_engine.apply_checks(df, checks).count()) assert actual_count == EXPECTED_ROWS + + +@pytest.mark.benchmark(group="test_benchmark_is_valid_json") +def test_benchmark_is_valid_json(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.is_valid_json, + column="col_json_str", + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +@pytest.mark.benchmark(group="test_benchmark_has_json_keys") +def test_benchmark_has_json_keys_require_at_least_one(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +@pytest.mark.benchmark(group="test_benchmark_has_json_keys") +def test_benchmark_has_json_keys_require_all_true(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"]}, + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS From 3e633123fe3a57022981c8c0be9844fb714ebd4f Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Tue, 21 Oct 2025 07:30:35 +0100 Subject: [PATCH 22/43] refactor: switch back to has_json_schema --- src/databricks/labs/dqx/check_funcs.py | 43 +++++++++++++++---- tests/integration/test_row_checks.py | 57 ++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 8 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 9d9db511b..46bed0559 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1822,42 +1822,69 @@ def has_json_keys(column: str | Column, keys: list[str], require_all: bool = Tru @register_rule("row") -def has_valid_json_schema(column: str | Column, schema: str | types.StructType) -> Column: +def has_json_schema(column: str | Column, schema: str | types.StructType, strict: bool = False) -> Column: """ Checks whether the values in the input column conform to a specified JSON schema. Args: column: The name of the column or the column expression to check for JSON schema conformity. schema: The expected JSON schema as a DDL string (e.g., "id INT, name STRING") or StructType object. + strict: Whether to perform strict schema validation (default: False). For JSON schema, + strict mode means that the JSON must match the schema exactly (same fields, same order). Returns: Column: A Spark Column representing the condition for JSON schema violations. """ _expected_schema = _get_schema(schema) - col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) - json_validation_error = is_valid_json(col_str_norm) is_invalid_json = json_validation_error.isNotNull() - parsed_column = F.from_json(col_expr, _expected_schema) - condition = F.when(~is_invalid_json & parsed_column.isNotNull(), F.lit(True)).otherwise(F.lit(False)) + # Add this because of to avoid name clashes if a user already specified _corrupt_record field + unique_prefix = uuid.uuid4().hex[:8] + corrupt_record_name = f"{unique_prefix}_dqx_corrupt_record" + new_json_schema = types.StructType( + _expected_schema.fields + [types.StructField(corrupt_record_name, types.StringType(), True)] + ) + + parsed_struct = F.from_json(col_expr, new_json_schema, options={"columnNameOfCorruptRecord": corrupt_record_name}) + + is_not_corrupt = parsed_struct[corrupt_record_name].isNull() + base_conformity = ~is_invalid_json & is_not_corrupt + + _get_strict_schema_comparison() + _get_permissive_schema_comparison + + if strict: + expected_field_names = [f.name for f in _expected_schema.fields] + concatenated_fields = F.concat_ws("_", *[parsed_struct[name] for name in expected_field_names]) + has_content = concatenated_fields.isNotNull() + is_conforming = base_conformity & has_content + else: + is_conforming = base_conformity + + condition = is_conforming | col_expr.isNull() has_json_schema_msg = F.concat_ws( "", F.lit("Value '"), F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), - F.lit(f"' in Column '{col_expr_str}' does not conform to the expected JSON schema: "), + F.lit(f"' in Column '{col_expr_str}' does not conform to the expected JSON schema (strict={strict}): "), F.lit(_expected_schema.simpleString()), ) + final_error_message = F.when( + is_invalid_json, + json_validation_error + ).otherwise( + has_json_schema_msg + ) return make_condition( ~condition, - F.when(is_invalid_json, json_validation_error).otherwise(has_json_schema_msg), + final_error_message, f"{col_str_norm}_has_invalid_json_schema", ) - def _get_schema(input_schema: str | types.StructType, columns: list[str] | None = None) -> types.StructType: """ Normalize the input schema into a Spark StructType schema. diff --git a/tests/integration/test_row_checks.py b/tests/integration/test_row_checks.py index af2d27520..b7067b760 100644 --- a/tests/integration/test_row_checks.py +++ b/tests/integration/test_row_checks.py @@ -25,6 +25,7 @@ is_valid_date, is_valid_json, has_json_keys, + has_json_schema, is_valid_timestamp, is_valid_ipv4_address, is_ipv4_address_in_cidr, @@ -2888,3 +2889,59 @@ def test_has_json_keys_require_at_least_one(spark): expected_schema, ) assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_json_schema(spark): + schema = "a: string, b: string" + test_data = spark.createDataFrame( + [ + ['{"key": "value", "another_key": 123}', '{"key": "value"}'], + ['{"number": 123}', '{"number": 123, "extra": true}'], + ['{"array": [1, 2, 3]}', '{"array": {1, 2, 3}]'], + ['{"key": "value"}', '{"missing_key": "value"}'], + [None, None], + ['Not a JSON string', '{"key": "value"}'], + ['{"key": "value"}', 'Not a JSON string'], + ['{"key": "value"}', None], + ] + ) + test_df = spark.createDataFrame(test_data, schema) + + json_schema = "STRUCT" + expected_schema = "a_has_invalid_json_schema: string, b_has_invalid_json_schema: string" + expected = spark.createDataFrame( + [ + [ + "Value '{\"key\": \"value\", \"another_key\": 123}' in Column 'a' does not conform to the expected JSON schema: STRUCT", + "Value '{\"key\": \"value\"}' in Column 'b' does not conform to the expected JSON schema: STRUCT", + ], + [ + "Value '{\"number\": 123}' in Column 'a' does not conform to the expected JSON schema: STRUCT", + "Value '{\"number\": 123, \"extra\": true}' in Column 'b' does not conform to the expected JSON schema: STRUCT", + ], + [ + "Value '{\"array\": [1, 2, 3]}' in Column 'a' does not conform to the expected JSON schema: STRUCT", + "Value '{\"array\": {1, 2, 3}]}' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{\"key\": \"value\"}' in Column 'a' does not conform to the expected JSON schema: STRUCT", + "Value '{\"missing_key\": \"value\"}' in Column 'b' does not conform to the expected JSON schema: STRUCT", + ], + [None, None], + ["Value 'Not a JSON string' in Column 'a' is not a valid JSON string", None], + [ + "Value '{\"key\": \"value\"}' in Column 'a' does not conform to the expected JSON schema: STRUCT", + "Value 'Not a JSON string' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{\"key\": \"value\"}' in Column 'a' does not conform to the expected JSON schema: STRUCT", + None, + ], + ], + expected_schema, + ) + actual = test_df.select( + has_json_schema("a", json_schema), + has_json_schema("b", json_schema), + ) + assert_df_equality(actual, expected, ignore_nullable=True) From a72bdb15ae50d00756cdfbceb59d21e5b45415cc Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Tue, 21 Oct 2025 15:15:38 +0100 Subject: [PATCH 23/43] docs: document properly that function only checks outside keys --- docs/dqx/docs/reference/quality_checks.mdx | 2 +- src/databricks/labs/dqx/check_funcs.py | 20 ++++++++------------ tests/integration/test_row_checks.py | 8 ++++---- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index d17c80352..ab8c32de1 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -40,7 +40,7 @@ You can also define your own custom checks (see [Creating custom checks](#creati | `is_valid_date` | Checks whether the values in the input column have valid date formats. | `column`: column to check (can be a string column name or a column expression); `date_format`: optional date format (e.g. 'yyyy-mm-dd') | | `is_valid_timestamp` | Checks whether the values in the input column have valid timestamp formats. | `column`: column to check (can be a string column name or a column expression); `timestamp_format`: optional timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') | | `is_valid_json` | Checks whether the values in the input column are valid JSON objects. | `column`: column to check (can be a string column name or a column expression) | -| `has_json_keys` | Checks whether the values in the input column contain specific JSON keys. | `column`: column to check (can be a string column name or a column expression); `keys`: list of JSON keys to check for; `require_all`: optional boolean flag to require all keys to be present | +| `has_json_keys` | Checks whether the values in the input column contain specific keys in the outermost JSON object. | `column`: column to check (can be a string column name or a column expression); `keys`: A list of JSON keys to verify within the outermost JSON object; `require_all`: optional boolean flag to require all keys to be present | | `is_not_in_future` | Checks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_not_in_near_future` | Checks whether the values in the input column contain a timestamp that is not in the near future, where 'near future' is defined as greater than the current timestamp but less than the current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_older_than_n_days` | Checks whether the values in one input column are at least N days older than the values in another column. | `column`: column to check (can be a string column name or a column expression); `days`: number of days; `curr_date`: current date, if not provided current_date() function is used; `negate`: if the condition should be negated | diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 46bed0559..e865edbe6 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1774,11 +1774,11 @@ def is_valid_json(column: str | Column) -> Column: @register_rule("row") def has_json_keys(column: str | Column, keys: list[str], require_all: bool = True) -> Column: """ - Checks whether the values in the input column contain specific JSON keys. + Checks whether the values in the input column contain specific keys in the outermost JSON object. Args: column: The name of the column or the column expression to check for JSON keys. - keys: The list of JSON keys to check for. + keys: A list of JSON keys to verify within the outermost JSON object. require_all: If True, all specified keys must be present. If False, at least one key must be present. Returns: @@ -1822,7 +1822,7 @@ def has_json_keys(column: str | Column, keys: list[str], require_all: bool = Tru @register_rule("row") -def has_json_schema(column: str | Column, schema: str | types.StructType, strict: bool = False) -> Column: +def has_valid_json_schema(column: str | Column, schema: str | types.StructType, strict: bool = False) -> Column: """ Checks whether the values in the input column conform to a specified JSON schema. @@ -1844,8 +1844,8 @@ def has_json_schema(column: str | Column, schema: str | types.StructType, strict unique_prefix = uuid.uuid4().hex[:8] corrupt_record_name = f"{unique_prefix}_dqx_corrupt_record" new_json_schema = types.StructType( - _expected_schema.fields + [types.StructField(corrupt_record_name, types.StringType(), True)] - ) + _expected_schema.fields + [types.StructField(corrupt_record_name, types.StringType(), True)] + ) parsed_struct = F.from_json(col_expr, new_json_schema, options={"columnNameOfCorruptRecord": corrupt_record_name}) @@ -1865,19 +1865,14 @@ def has_json_schema(column: str | Column, schema: str | types.StructType, strict condition = is_conforming | col_expr.isNull() - has_json_schema_msg = F.concat_ws( + has_valid_json_schema_msg = F.concat_ws( "", F.lit("Value '"), F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), F.lit(f"' in Column '{col_expr_str}' does not conform to the expected JSON schema (strict={strict}): "), F.lit(_expected_schema.simpleString()), ) - final_error_message = F.when( - is_invalid_json, - json_validation_error - ).otherwise( - has_json_schema_msg - ) + final_error_message = F.when(is_invalid_json, json_validation_error).otherwise(has_valid_json_schema_msg) return make_condition( ~condition, @@ -1885,6 +1880,7 @@ def has_json_schema(column: str | Column, schema: str | types.StructType, strict f"{col_str_norm}_has_invalid_json_schema", ) + def _get_schema(input_schema: str | types.StructType, columns: list[str] | None = None) -> types.StructType: """ Normalize the input schema into a Spark StructType schema. diff --git a/tests/integration/test_row_checks.py b/tests/integration/test_row_checks.py index b7067b760..6c9da4b44 100644 --- a/tests/integration/test_row_checks.py +++ b/tests/integration/test_row_checks.py @@ -25,7 +25,7 @@ is_valid_date, is_valid_json, has_json_keys, - has_json_schema, + has_valid_json_schema, is_valid_timestamp, is_valid_ipv4_address, is_ipv4_address_in_cidr, @@ -2891,7 +2891,7 @@ def test_has_json_keys_require_at_least_one(spark): assert_df_equality(actual, expected, ignore_nullable=True) -def test_has_json_schema(spark): +def test_has_valid_json_schema(spark): schema = "a: string, b: string" test_data = spark.createDataFrame( [ @@ -2941,7 +2941,7 @@ def test_has_json_schema(spark): expected_schema, ) actual = test_df.select( - has_json_schema("a", json_schema), - has_json_schema("b", json_schema), + has_valid_json_schema("a", json_schema), + has_valid_json_schema("b", json_schema), ) assert_df_equality(actual, expected, ignore_nullable=True) From b8505e450a21a1019b8c445d1d896fe3671f1870 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Tue, 21 Oct 2025 15:41:39 +0100 Subject: [PATCH 24/43] refactor: comment out to test --- src/databricks/labs/dqx/check_funcs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index e865edbe6..111f3e6db 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1852,8 +1852,8 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType, is_not_corrupt = parsed_struct[corrupt_record_name].isNull() base_conformity = ~is_invalid_json & is_not_corrupt - _get_strict_schema_comparison() - _get_permissive_schema_comparison + # _get_strict_schema_comparison() + # _get_permissive_schema_comparison if strict: expected_field_names = [f.name for f in _expected_schema.fields] From a177c01406934054df2da06c6f9c9dd00c2f0bb3 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Tue, 21 Oct 2025 15:59:57 +0100 Subject: [PATCH 25/43] refactor: try using transform for strict comparison --- src/databricks/labs/dqx/check_funcs.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 111f3e6db..6adea0daf 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1830,7 +1830,8 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType, column: The name of the column or the column expression to check for JSON schema conformity. schema: The expected JSON schema as a DDL string (e.g., "id INT, name STRING") or StructType object. strict: Whether to perform strict schema validation (default: False). For JSON schema, - strict mode means that the JSON must match the schema exactly (same fields, same order). + strict mode (True) means that the JSON must match the schema exactly (same fields, same order). + else, non-strict mode (False) allows extra fields in the JSON. Returns: Column: A Spark Column representing the condition for JSON schema violations. @@ -1857,8 +1858,15 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType, if strict: expected_field_names = [f.name for f in _expected_schema.fields] - concatenated_fields = F.concat_ws("_", *[parsed_struct[name] for name in expected_field_names]) - has_content = concatenated_fields.isNotNull() + # concatenated_fields = F.concat_ws("_", *[parsed_struct[name] for name in expected_field_names]) + # has_content = concatenated_fields.isNotNull() + has_content = F.when( + ~is_invalid_json, + F.transform( + F.schema_of_json(F.to_json(col_expr)), + lambda inferred_schema: _get_strict_schema_comparison(inferred_schema, _expected_schema), + ).isNotNull(), + ).otherwise(F.lit(False)) is_conforming = base_conformity & has_content else: is_conforming = base_conformity From e0c34381c56c8a49493e9e826ed7dad56c4d270f Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Tue, 21 Oct 2025 18:55:33 +0100 Subject: [PATCH 26/43] feat: implement changes --- src/databricks/labs/dqx/check_funcs.py | 78 +++++++++++++++++--------- 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 6adea0daf..e2ae3a928 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1829,62 +1829,63 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType, Args: column: The name of the column or the column expression to check for JSON schema conformity. schema: The expected JSON schema as a DDL string (e.g., "id INT, name STRING") or StructType object. - strict: Whether to perform strict schema validation (default: False). For JSON schema, - strict mode (True) means that the JSON must match the schema exactly (same fields, same order). - else, non-strict mode (False) allows extra fields in the JSON. + strict: Whether to perform strict schema validation. In strict mode, the JSON must match the schema + exactly (no extra fields, all required fields present). Non-strict mode allows extra fields. Returns: - Column: A Spark Column representing the condition for JSON schema violations. + Column: A Spark Column indicating schema validity; False if violations are found. """ _expected_schema = _get_schema(schema) col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + + # Check JSON string validity first json_validation_error = is_valid_json(col_str_norm) is_invalid_json = json_validation_error.isNotNull() - # Add this because of to avoid name clashes if a user already specified _corrupt_record field + # Use a unique corrupt record field to detect parse issues unique_prefix = uuid.uuid4().hex[:8] corrupt_record_name = f"{unique_prefix}_dqx_corrupt_record" - new_json_schema = types.StructType( + + extended_schema = types.StructType( _expected_schema.fields + [types.StructField(corrupt_record_name, types.StringType(), True)] ) - parsed_struct = F.from_json(col_expr, new_json_schema, options={"columnNameOfCorruptRecord": corrupt_record_name}) + parsed_struct = F.from_json( + col_expr, + extended_schema, + options={"columnNameOfCorruptRecord": corrupt_record_name}, + ) is_not_corrupt = parsed_struct[corrupt_record_name].isNull() base_conformity = ~is_invalid_json & is_not_corrupt - # _get_strict_schema_comparison() - # _get_permissive_schema_comparison - if strict: - expected_field_names = [f.name for f in _expected_schema.fields] - # concatenated_fields = F.concat_ws("_", *[parsed_struct[name] for name in expected_field_names]) - # has_content = concatenated_fields.isNotNull() - has_content = F.when( - ~is_invalid_json, - F.transform( - F.schema_of_json(F.to_json(col_expr)), - lambda inferred_schema: _get_strict_schema_comparison(inferred_schema, _expected_schema), - ).isNotNull(), - ).otherwise(F.lit(False)) - is_conforming = base_conformity & has_content + map_json = F.from_json(col_expr, _expected_schema) + json_keys = F.map_keys(map_json) + expected_keys = [F.lit(f.name) for f in _expected_schema.fields] + + has_extra_fields = F.size(F.array_except(json_keys, F.array(*expected_keys))) > 0 + not_null_checks = _generate_not_null_expr(_expected_schema, parsed_struct) + has_null_fields = F.array_contains(F.array(*[F.coalesce(e, F.lit(False)) for e in not_null_checks]), False) + + is_conforming = base_conformity & ~has_extra_fields & ~has_null_fields else: - is_conforming = base_conformity + is_conforming = base_conformity & (F.size(F.json_object_keys(col_expr)) > 0) condition = is_conforming | col_expr.isNull() - - has_valid_json_schema_msg = F.concat_ws( + schema_str = _expected_schema.simpleString() + error_msg = F.concat_ws( "", F.lit("Value '"), F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), - F.lit(f"' in Column '{col_expr_str}' does not conform to the expected JSON schema (strict={strict}): "), - F.lit(_expected_schema.simpleString()), + F.lit(f"' in Column '{col_expr_str}' does not conform to expected JSON schema (strict={strict}): "), + F.lit(schema_str), ) - final_error_message = F.when(is_invalid_json, json_validation_error).otherwise(has_valid_json_schema_msg) + final_error_msg = F.when(is_invalid_json, json_validation_error).otherwise(error_msg) return make_condition( ~condition, - final_error_message, + final_error_msg, f"{col_str_norm}_has_invalid_json_schema", ) @@ -2114,6 +2115,27 @@ def _is_compatible_atomic_type(actual_type: types.AtomicType, expected_type: typ return False +def _generate_not_null_expr(schema: types.StructType, col_name: Column) -> list[Column]: + """ + Generate a list of expressions that check for non-null values in the given schema. + + Args: + schema: The schema of the DataFrame. + col_name: The name of the column to check. + + Returns: + A list of Column expressions that check for non-null values. + """ + exprs = [] + for field in schema.fields: + field_col = col_name[field.name] + if isinstance(field.dataType, types.StructType): + exprs += _generate_not_null_expr(field.dataType, field_col) + else: + exprs.append(field_col.isNotNull()) + return exprs + + def _match_rows( df: DataFrame, ref_df: DataFrame, From 3b0fd52fdc5814a85d488b2fa5ff915ca4ebc30f Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 22 Oct 2025 19:29:09 +0100 Subject: [PATCH 27/43] format and add tests --- src/databricks/labs/dqx/check_funcs.py | 70 +++++++++++++------------- tests/integration/test_apply_checks.py | 14 +++++- 2 files changed, 47 insertions(+), 37 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index e2ae3a928..58fb20a85 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1822,65 +1822,61 @@ def has_json_keys(column: str | Column, keys: list[str], require_all: bool = Tru @register_rule("row") -def has_valid_json_schema(column: str | Column, schema: str | types.StructType, strict: bool = False) -> Column: +def has_valid_json_schema(column: str | Column, schema: str | types.StructType) -> Column: """ - Checks whether the values in the input column conform to a specified JSON schema. + Validates that JSON strings in the specified column conform to an expected schema. Args: - column: The name of the column or the column expression to check for JSON schema conformity. - schema: The expected JSON schema as a DDL string (e.g., "id INT, name STRING") or StructType object. - strict: Whether to perform strict schema validation. In strict mode, the JSON must match the schema - exactly (no extra fields, all required fields present). Non-strict mode allows extra fields. + column: Column name or Column expression containing JSON strings. + schema: Expected schema as a DDL string (e.g., "id INT, name STRING") or StructType. Returns: - Column: A Spark Column indicating schema validity; False if violations are found. + Column: Boolean condition representing JSON schema validity. """ + _expected_schema = _get_schema(schema) + schema_str = _expected_schema.simpleString() col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) - # Check JSON string validity first json_validation_error = is_valid_json(col_str_norm) is_invalid_json = json_validation_error.isNotNull() - # Use a unique corrupt record field to detect parse issues - unique_prefix = uuid.uuid4().hex[:8] - corrupt_record_name = f"{unique_prefix}_dqx_corrupt_record" + # Add unique corrupt-record field to isolate parse errors + corrupt_record_name = f"{uuid.uuid4().hex[:8]}_dqx_corrupt_record" extended_schema = types.StructType( _expected_schema.fields + [types.StructField(corrupt_record_name, types.StringType(), True)] ) + # Attempt to parse JSON using the extended schema parsed_struct = F.from_json( col_expr, extended_schema, options={"columnNameOfCorruptRecord": corrupt_record_name}, ) + # Core conformity: must be valid JSON and not corrupt is_not_corrupt = parsed_struct[corrupt_record_name].isNull() base_conformity = ~is_invalid_json & is_not_corrupt - if strict: - map_json = F.from_json(col_expr, _expected_schema) - json_keys = F.map_keys(map_json) - expected_keys = [F.lit(f.name) for f in _expected_schema.fields] - - has_extra_fields = F.size(F.array_except(json_keys, F.array(*expected_keys))) > 0 - not_null_checks = _generate_not_null_expr(_expected_schema, parsed_struct) - has_null_fields = F.array_contains(F.array(*[F.coalesce(e, F.lit(False)) for e in not_null_checks]), False) - - is_conforming = base_conformity & ~has_extra_fields & ~has_null_fields - else: - is_conforming = base_conformity & (F.size(F.json_object_keys(col_expr)) > 0) + # Field presence checks (non-null + exists) + field_presence_checks = _generate_field_presence_checks(_expected_schema, parsed_struct) + has_missing_or_null_fields = F.array_contains( + F.array(*[F.coalesce(expr, F.lit(False)) for expr in field_presence_checks]), + False, + ) + is_conforming = base_conformity & ~has_missing_or_null_fields condition = is_conforming | col_expr.isNull() - schema_str = _expected_schema.simpleString() + error_msg = F.concat_ws( "", F.lit("Value '"), F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), - F.lit(f"' in Column '{col_expr_str}' does not conform to expected JSON schema (strict={strict}): "), + F.lit(f"' in column '{col_expr_str}' does not conform to expected JSON schema: "), F.lit(schema_str), ) + final_error_msg = F.when(is_invalid_json, json_validation_error).otherwise(error_msg) return make_condition( @@ -2115,25 +2111,27 @@ def _is_compatible_atomic_type(actual_type: types.AtomicType, expected_type: typ return False -def _generate_not_null_expr(schema: types.StructType, col_name: Column) -> list[Column]: +def _generate_field_presence_checks(expected_schema: types.StructType, parsed_struct_col: Column) -> list[Column]: """ - Generate a list of expressions that check for non-null values in the given schema. + Recursively generate Spark Column expressions that verify each field defined in the expected + schema is present and non-null within a parsed struct column. Args: - schema: The schema of the DataFrame. - col_name: The name of the column to check. + expected_schema: The StructType defining the expected JSON schema. + parsed_struct_col: The parsed struct column (e.g., from from_json) to validate. Returns: - A list of Column expressions that check for non-null values. + A list of Column expressions, one per field in the expected schema, that evaluate to True + if the corresponding field is non-null. """ - exprs = [] - for field in schema.fields: - field_col = col_name[field.name] + validations = [] + for field in expected_schema.fields: + field_ref = parsed_struct_col[field.name] if isinstance(field.dataType, types.StructType): - exprs += _generate_not_null_expr(field.dataType, field_col) + validations += _generate_field_presence_checks(field.dataType, field_ref) else: - exprs.append(field_col.isNotNull()) - return exprs + validations.append(field_ref.isNotNull()) + return validations def _match_rows( diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 2ea063bf5..b09f0a454 100755 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -5584,6 +5584,12 @@ def test_apply_checks_all_checks_using_classes(ws, spark): column="col_json_str", check_func_kwargs={"keys": ["key1"]}, ), + DQRowRule( + criticality="error", + check_func=check_funcs.has_valid_json_schema, + column="col_json_str2", + check_func_kwargs={"schema": "STRUCT"}, + ), ] dq_engine = DQEngine(ws) @@ -5591,7 +5597,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string, col_json_str: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string, col_json_str2" ) test_df = spark.createDataFrame( [ @@ -5609,6 +5615,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "255.255.255.255", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", "{}", + "{'a' : 1, 'b': 2}", ], [ "val2", @@ -5624,6 +5631,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "255.255.255.1", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", "{'key1': '1'}", + "{ 'a' : 1, 'b': 1000, 'c': {'1': 8}}", ], [ "val3", @@ -5639,6 +5647,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "255.255.255.2", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", "{'key1': '[1, 2, 3]'}", + "{ 'a' : 1, 'b': 1023455, 'c': null }", ], ], schema, @@ -5663,6 +5672,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "255.255.255.255", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", "{}", + "{'a' : 1, 'b': 2}", None, None, ], @@ -5680,6 +5690,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "255.255.255.1", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", "{'key1': '1'}", + "{ 'a' : 1, 'b': 1000, 'c': {'1': 8}}", None, None, ], @@ -5697,6 +5708,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "255.255.255.2", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", "{'key1': '[1, 2, 3]'}", + "{ 'a' : 1, 'b': 1023455, 'c': null }", None, None, ], From 7b19d00c2c9389a91d6721b9ee5e4406c2a4b424 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 22 Oct 2025 20:01:48 +0100 Subject: [PATCH 28/43] refactor: add to markdown --- docs/dqx/docs/reference/quality_checks.mdx | 17 +++++++++++++++++ src/databricks/labs/dqx/check_funcs.py | 2 +- .../dqx/llm/resources/yaml_checks_examples.yml | 7 +++++++ tests/integration/test_apply_checks.py | 2 +- 4 files changed, 26 insertions(+), 2 deletions(-) diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index e1511d871..c5c11c8c6 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -41,6 +41,7 @@ You can also define your own custom checks (see [Creating custom checks](#creati | `is_valid_timestamp` | Checks whether the values in the input column have valid timestamp formats. | `column`: column to check (can be a string column name or a column expression); `timestamp_format`: optional timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') | | `is_valid_json` | Checks whether the values in the input column are valid JSON objects. | `column`: column to check (can be a string column name or a column expression) | | `has_json_keys` | Checks whether the values in the input column contain specific keys in the outermost JSON object. | `column`: column to check (can be a string column name or a column expression); `keys`: A list of JSON keys to verify within the outermost JSON object; `require_all`: optional boolean flag to require all keys to be present | +| `has_valid_json_schema` | Checks whether the values in the specified column, which contain JSON strings, conform to the expected schema. | `column`: column to check (can be a string column name or a column expression); `schema`: the schema as a DDL string (e.g., "id INT, name STRING") or StructType object; | | `is_not_in_future` | Checks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_not_in_near_future` | Checks whether the values in the input column contain a timestamp that is not in the near future, where 'near future' is defined as greater than the current timestamp but less than the current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_older_than_n_days` | Checks whether the values in one input column are at least N days older than the values in another column. | `column`: column to check (can be a string column name or a column expression); `days`: number of days; `curr_date`: current date, if not provided current_date() function is used; `negate`: if the condition should be negated | @@ -352,6 +353,14 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen - key2 require_all: False +- criticality: error + name: col_json_str2_has_invalid_json_schema + check: + function: has_valid_json_schema + arguments: + column: col_json_str2 + schema: "STRUCT" + # is_valid_timestamp check - criticality: error check: @@ -933,6 +942,14 @@ checks = [ name="col_json_str_has_json_keys" ), + DQRowRule( + criticality="error", + check_func=check_funcs.has_valid_json_schema, + column="col_json_str2", # or as expr: F.col("col_json_str") + check_func_kwargs={"schema": "STRUCT"}, + name="col_json_str2_has_valid_json_schema" + ), + # is_not_in_future check DQRowRule( criticality="error", diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 58fb20a85..05ecdab75 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1831,7 +1831,7 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType) schema: Expected schema as a DDL string (e.g., "id INT, name STRING") or StructType. Returns: - Column: Boolean condition representing JSON schema validity. + Column: A Spark Column representing the column conforms to the expected JSON schema. """ _expected_schema = _get_schema(schema) diff --git a/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml b/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml index cfdda6745..bc5fcb01d 100644 --- a/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml +++ b/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml @@ -211,6 +211,13 @@ - key1 - key2 require_all: false +- criticality: error + name: col_json_str2_has_invalid_json_schema + check: + function: has_valid_json_schema + arguments: + column: col_json_str2 + schema: 'STRUCT' - criticality: error check: function: is_valid_timestamp diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 5b40efa67..7a8a03afa 100755 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -5588,7 +5588,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): criticality="error", check_func=check_funcs.has_valid_json_schema, column="col_json_str2", - check_func_kwargs={"schema": "STRUCT"}, + check_func_kwargs={"schema": "STRUCT"}, ), ] From 96cbc8e20ae99348acb7d82e09643f0721e793c4 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 23 Oct 2025 00:15:17 +0100 Subject: [PATCH 29/43] updates --- tests/integration/test_apply_checks.py | 8 +++++++- tests/resources/all_row_checks.yaml | 7 +++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 7a8a03afa..12d3d0948 100755 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -4744,7 +4744,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string, col_json_str: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string, col_json_str2: string" ) test_df = spark.createDataFrame( [ @@ -4762,6 +4762,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "192.168.1.0", "2001:0db8:85a3:08d3:0000:0000:0000:0001", "{}", + "{'a': 1, 'b': 2}", ], [ "val2", @@ -4777,6 +4778,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "192.168.1.1", "2001:0db8:85a3:08d3:0000:0000:0000:1", "{'key1': 1}", + "{'a': 1, 'b': 2, 'c': 3}", ], [ "val3", @@ -4792,6 +4794,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "192.168.1.2", "2001:0db8:85a3:08d3:0000::2", "{'key1': [1, 2, 3]}", + "{'a': 1, 'b': 2, 'c': 1.222}", ], ], schema, @@ -4819,6 +4822,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "192.168.1.0", "2001:0db8:85a3:08d3:0000:0000:0000:0001", "{}", + "{'a': 1, 'b': 2}", None, None, ], @@ -4836,6 +4840,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "192.168.1.1", "2001:0db8:85a3:08d3:0000:0000:0000:1", "{'key1': 1}", + "{'a': 1, 'b': 2, 'c': 3}", None, None, ], @@ -4853,6 +4858,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "192.168.1.2", "2001:0db8:85a3:08d3:0000::2", "{'key1': [1, 2, 3]}", + "{'a': 1, 'b': 2, 'c': 1.222}", None, None, ], diff --git a/tests/resources/all_row_checks.yaml b/tests/resources/all_row_checks.yaml index e4f04074d..eb81d444b 100644 --- a/tests/resources/all_row_checks.yaml +++ b/tests/resources/all_row_checks.yaml @@ -266,6 +266,13 @@ - key2 require_all: false +- criticality: error + check: + function: has_valid_json_schema + arguments: + column: col_json_str2 + schema: 'STRUCT' + # is_not_in_future check - criticality: error check: From fc7bd7ab71e84f9cdab9665fdb8180c745524379 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Wed, 12 Nov 2025 11:13:50 +0000 Subject: [PATCH 30/43] refactor: add missing type in schema --- tests/integration/test_apply_checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 8dbbcd4ec..45b74cce7 100755 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -5877,7 +5877,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string, col_json_str: string, col_json_str2" + "col_ipv4: string, col_ipv6: string, col_json_str: string, col_json_str2: string" ) test_df = spark.createDataFrame( [ From a10ab226df9899808b3f48526c427dfd992409b7 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 13 Nov 2025 10:06:30 +0000 Subject: [PATCH 31/43] refactor: update tests --- tests/integration/test_apply_checks.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 45b74cce7..deb203a12 100755 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -5894,8 +5894,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.255", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", - "{}", - "{'a' : 1, 'b': 2}", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', ], [ "val2", @@ -5910,8 +5910,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.1", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", - "{'key1': '1'}", - "{ 'a' : 1, 'b': 1000, 'c': {'1': 8}}", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', ], [ "val3", @@ -5926,8 +5926,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.2", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", - "{'key1': '[1, 2, 3]'}", - "{ 'a' : 1, 'b': 1023455, 'c': null }", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', ], ], schema, @@ -5951,8 +5951,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.255", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", - "{}", - "{'a' : 1, 'b': 2}", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', None, None, ], @@ -5969,8 +5969,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.1", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", - "{'key1': '1'}", - "{ 'a' : 1, 'b': 1000, 'c': {'1': 8}}", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', None, None, ], @@ -5987,8 +5987,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.2", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", - "{'key1': '[1, 2, 3]'}", - "{ 'a' : 1, 'b': 1023455, 'c': null }", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', None, None, ], From b753071cf5c13bd34034f4eb66a22fbf9f3a3a3d Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 13 Nov 2025 10:09:59 +0000 Subject: [PATCH 32/43] feat: add has_valid_json_schema to perf --- tests/perf/test_apply_checks.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/perf/test_apply_checks.py b/tests/perf/test_apply_checks.py index 5afd445d6..c2ee80dea 100644 --- a/tests/perf/test_apply_checks.py +++ b/tests/perf/test_apply_checks.py @@ -1639,3 +1639,19 @@ def test_benchmark_has_json_keys_require_all_true(benchmark, ws, generated_df): checked = dq_engine.apply_checks(generated_df, checks) actual_count = benchmark(lambda: checked.count()) assert actual_count == EXPECTED_ROWS + + +@pytest.mark.benchmark(group="test_benchmark_has_valid_json_schema") +def test_benchmark_has_valid_json_schema(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.has_valid_json_schema, + column="col_json_str", + check_func_kwargs={"schema": "STRUCT"}, + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS From 53a8f51434c13197a80d2006008379cfb29ab476 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 13 Nov 2025 10:33:05 +0000 Subject: [PATCH 33/43] refactor: modify schema and dataframe --- tests/integration/test_apply_checks.py | 44 +++++++++++++++----------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index deb203a12..f3eb1d750 100755 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -4744,7 +4744,7 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string, col_json_str: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string, col_json_str2: string" ) test_df = spark.createDataFrame( [ @@ -4761,7 +4761,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.1", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", - "{}", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', ], [ "val2", @@ -4776,7 +4777,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.2", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", - "{'key1': 1}", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', ], [ "val3", @@ -4791,7 +4793,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.3", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", - "{'key1': [1, 2, 3]}", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', ], ], schema, @@ -4830,7 +4833,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.1", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", - "{}", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', None, None, ], @@ -4847,7 +4851,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.2", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", - "{'key1': 1}", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', None, None, ], @@ -4864,7 +4869,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.3", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", - "{'key1': [1, 2, 3]}", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', None, None, ], @@ -5035,8 +5041,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.0", "2001:0db8:85a3:08d3:0000:0000:0000:0001", - "{}", - "{'a': 1, 'b': 2}", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', ], [ "val2", @@ -5051,8 +5057,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.1", "2001:0db8:85a3:08d3:0000:0000:0000:1", - "{'key1': 1}", - "{'a': 1, 'b': 2, 'c': 3}", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', ], [ "val3", @@ -5067,8 +5073,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.2", "2001:0db8:85a3:08d3:0000::2", - "{'key1': [1, 2, 3]}", - "{'a': 1, 'b': 2, 'c': 1.222}", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', ], ], schema, @@ -5095,8 +5101,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.0", "2001:0db8:85a3:08d3:0000:0000:0000:0001", - "{}", - "{'a': 1, 'b': 2}", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', None, None, ], @@ -5113,8 +5119,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.1", "2001:0db8:85a3:08d3:0000:0000:0000:1", - "{'key1': 1}", - "{'a': 1, 'b': 2, 'c': 3}", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', None, None, ], @@ -5131,8 +5137,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.2", "2001:0db8:85a3:08d3:0000::2", - "{'key1': [1, 2, 3]}", - "{'a': 1, 'b': 2, 'c': 1.222}", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', None, None, ], From 560ffd0ad95a7b05c303d96c8ef14ab3f992de3a Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 13 Nov 2025 11:39:36 +0000 Subject: [PATCH 34/43] refactor: add note that this is not strict validation --- src/databricks/labs/dqx/check_funcs.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index cf9572bd1..bf70eae22 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1853,6 +1853,10 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType) """ Validates that JSON strings in the specified column conform to an expected schema. + Note: + This check is **not strict**. Extra fields in the JSON that are not defined + in the schema are ignored due to Spark's permissive parsing behavior. + Args: column: Column name or Column expression containing JSON strings. schema: Expected schema as a DDL string (e.g., "id INT, name STRING") or StructType. From ca62317a33838252821557c158d192fd7b890385 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Thu, 13 Nov 2025 12:08:58 +0000 Subject: [PATCH 35/43] docs: update docs --- docs/dqx/docs/reference/quality_checks.mdx | 2 +- src/databricks/labs/dqx/check_funcs.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index bc3f00f81..88765ff4b 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -43,7 +43,7 @@ You can also define your own custom checks in Python (see [Creating custom check | `is_valid_timestamp` | Checks whether the values in the input column have valid timestamp formats. | `column`: column to check (can be a string column name or a column expression); `timestamp_format`: optional timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') | | `is_valid_json` | Checks whether the values in the input column are valid JSON objects. | `column`: column to check (can be a string column name or a column expression) | | `has_json_keys` | Checks whether the values in the input column contain specific keys in the outermost JSON object. | `column`: column to check (can be a string column name or a column expression); `keys`: A list of JSON keys to verify within the outermost JSON object; `require_all`: optional boolean flag to require all keys to be present | -| `has_valid_json_schema` | Checks whether the values in the specified column, which contain JSON strings, conform to the expected schema. | `column`: column to check (can be a string column name or a column expression); `schema`: the schema as a DDL string (e.g., "id INT, name STRING") or StructType object; | +| `has_valid_json_schema` | Checks whether the values in the specified column, which contain JSON strings, conform to the expected schema. This check is **not strict**. Extra fields in the JSON that are not defined in the schema are ignored. | `column`: column to check (can be a string column name or a column expression); `schema`: the schema as a DDL string (e.g., "id INT, name STRING") or StructType object; | | `is_not_in_future` | Checks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_not_in_near_future` | Checks whether the values in the input column contain a timestamp that is not in the near future, where 'near future' is defined as greater than the current timestamp but less than the current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_older_than_n_days` | Checks whether the values in one input column are at least N days older than the values in another column. | `column`: column to check (can be a string column name or a column expression); `days`: number of days; `curr_date`: current date, if not provided current_date() function is used; `negate`: if the condition should be negated | diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index bf70eae22..b7598699a 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1853,9 +1853,7 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType) """ Validates that JSON strings in the specified column conform to an expected schema. - Note: - This check is **not strict**. Extra fields in the JSON that are not defined - in the schema are ignored due to Spark's permissive parsing behavior. + This check is not strict. Extra fields in the JSON that are not defined in the schema are ignored. Args: column: Column name or Column expression containing JSON strings. From 37d78034cbf7743faea32ed58da44b21fb4caf96 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Tue, 25 Nov 2025 17:28:34 +0000 Subject: [PATCH 36/43] chore: change to uppercase --- src/databricks/labs/dqx/check_funcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index b7598699a..9d7fc323c 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1902,7 +1902,7 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType) "", F.lit("Value '"), F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), - F.lit(f"' in column '{col_expr_str}' does not conform to expected JSON schema: "), + F.lit(f"' in Column '{col_expr_str}' does not conform to expected JSON schema: "), F.lit(schema_str), ) From 7225e6ca8e79dac43557f6cc07d42b4e4d4716ef Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Sun, 7 Dec 2025 21:14:12 +0100 Subject: [PATCH 37/43] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/databricks/labs/dqx/check_funcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 0c38e6fcf..6d4b43938 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1836,7 +1836,7 @@ def apply(df: DataFrame) -> DataFrame: @register_rule("row") def is_valid_json(column: str | Column) -> Column: """ - Checks whether the values in the input column is a valid JSON string. + Checks whether the values in the input column are valid JSON strings. Args: column: Column name (str) or Column expression to check for valid JSON. From 4f56bb3aa4dc53592c86382fbb1f3eb80377226f Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Sun, 7 Dec 2025 21:15:26 +0100 Subject: [PATCH 38/43] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- docs/dqx/docs/reference/quality_checks.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index 8bd981e8f..f23b37abc 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -41,7 +41,7 @@ You can also define your own custom checks in Python (see [Creating custom check | `is_not_greater_than` | Checks whether the values in the input column are not greater than the provided limit. | `column`: column to check (can be a string column name or a column expression); `limit`: limit as number, date, timestamp, column name or sql expression | | `is_valid_date` | Checks whether the values in the input column have valid date formats. | `column`: column to check (can be a string column name or a column expression); `date_format`: optional date format (e.g. 'yyyy-mm-dd') | | `is_valid_timestamp` | Checks whether the values in the input column have valid timestamp formats. | `column`: column to check (can be a string column name or a column expression); `timestamp_format`: optional timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') | -| `is_valid_json` | Checks whether the values in the input column are valid JSON objects. | `column`: column to check (can be a string column name or a column expression) | +| `is_valid_json` | Checks whether the values in the input column are valid JSON strings. | `column`: column to check (can be a string column name or a column expression) | | `has_json_keys` | Checks whether the values in the input column contain specific keys in the outermost JSON object. | `column`: column to check (can be a string column name or a column expression); `keys`: A list of JSON keys to verify within the outermost JSON object; `require_all`: optional boolean flag to require all keys to be present | | `has_valid_json_schema` | Checks whether the values in the specified column, which contain JSON strings, conform to the expected schema. This check is **not strict**. Extra fields in the JSON that are not defined in the schema are ignored. | `column`: column to check (can be a string column name or a column expression); `schema`: the schema as a DDL string (e.g., "id INT, name STRING") or StructType object; | | `is_not_in_future` | Checks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | From 26257978dc896d0d532861d59bd5638d057b746a Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Sun, 7 Dec 2025 21:25:54 +0100 Subject: [PATCH 39/43] Apply suggestion from @mwojtyczka --- src/databricks/labs/dqx/check_funcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 3a1ddf27c..1a1735e7e 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1925,7 +1925,7 @@ def is_valid_json(column: str | Column) -> Column: """ col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) return make_condition( - ~F.when(F.col(col_expr_str).isNotNull(), F.try_parse_json(col_expr_str).isNotNull()), + ~F.when(col_expr.isNotNull(), F.try_parse_json(col_expr_str).isNotNull()), F.concat_ws( "", F.lit("Value '"), From 640e88396966769ff41510d92c10ccb29af4f1d4 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Sun, 7 Dec 2025 21:35:41 +0100 Subject: [PATCH 40/43] Apply suggestion from @mwojtyczka fix --- src/databricks/labs/dqx/check_funcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 1a1735e7e..ad8aaa0de 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1958,7 +1958,7 @@ def has_json_keys(column: str | Column, keys: list[str], require_all: bool = Tru json_keys_array = F.json_object_keys(col_expr) required_keys = F.array_distinct(F.array(*[F.lit(k) for k in keys])) - json_validation_error = is_valid_json(col_str_norm) + json_validation_error = is_valid_json(col_expr_str) is_invalid_json = json_validation_error.isNotNull() has_json_keys_msg = F.concat_ws( From afe2773f4fcce81590d4074ac08fd4c729c1b52c Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Sun, 7 Dec 2025 21:40:11 +0100 Subject: [PATCH 41/43] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/databricks/labs/dqx/check_funcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index ad8aaa0de..c5960d866 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1998,7 +1998,7 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType) schema: Expected schema as a DDL string (e.g., "id INT, name STRING") or StructType. Returns: - Column: A Spark Column representing the column conforms to the expected JSON schema. + Column: A Spark Column representing whether the column conforms to the expected JSON schema. """ _expected_schema = _get_schema(schema) From 946f271d24e4aaf7711b0a8df73c48f1f1660392 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Sun, 7 Dec 2025 21:58:46 +0100 Subject: [PATCH 42/43] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/perf/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/perf/conftest.py b/tests/perf/conftest.py index 616b52cf2..eb29ff95e 100644 --- a/tests/perf/conftest.py +++ b/tests/perf/conftest.py @@ -102,7 +102,7 @@ def generated_df(spark, rows=DEFAULT_ROWS): .withColumnSpec("col10") .withColumnSpec("col_ipv4", template=r"\n.\n.\n.\n") .withColumnSpec("col_ipv6", template="XXXX:XXXX:XXXX:XXXX:XXXX:XXXX:XXXX:XXXX") - .withColumnSpec("col_json_str", template=r"{'\key1': '\w', '\key2': 'd\w'}") + .withColumnSpec("col_json_str", template=r"{'key1': '\w', 'key2': 'd\w'}") ) return spec.build() From 75925e003ff004cb6cff20e3de9355e96e86fdb3 Mon Sep 17 00:00:00 2001 From: Eniwoke Cornelius Date: Mon, 8 Dec 2025 21:06:21 +0000 Subject: [PATCH 43/43] feat: add recursion limit of nested json to 15 --- src/databricks/labs/dqx/check_funcs.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 73dfc095d..5a5ddcd2d 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -2019,6 +2019,7 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType) Validates that JSON strings in the specified column conform to an expected schema. This check is not strict. Extra fields in the JSON that are not defined in the schema are ignored. + Please note that the depth of nested structures is limited to 15. Args: column: Column name or Column expression containing JSON strings. @@ -2026,6 +2027,10 @@ def has_valid_json_schema(column: str | Column, schema: str | types.StructType) Returns: Column: A Spark Column representing whether the column conforms to the expected JSON schema. + + Raises: + InvalidParameterError: If the schema string is invalid or cannot be parsed, or if + the input schema is neither a string nor a StructType. """ _expected_schema = _get_schema(schema) @@ -2305,7 +2310,9 @@ def _is_compatible_atomic_type(actual_type: types.AtomicType, expected_type: typ return False -def _generate_field_presence_checks(expected_schema: types.StructType, parsed_struct_col: Column) -> list[Column]: +def _generate_field_presence_checks( + expected_schema: types.StructType, parsed_struct_col: Column, max_depth: int = 15, current_depth: int = 0 +) -> list[Column]: """ Recursively generate Spark Column expressions that verify each field defined in the expected schema is present and non-null within a parsed struct column. @@ -2313,16 +2320,23 @@ def _generate_field_presence_checks(expected_schema: types.StructType, parsed_st Args: expected_schema: The StructType defining the expected JSON schema. parsed_struct_col: The parsed struct column (e.g., from from_json) to validate. + max_depth: Maximum recursion depth to prevent excessive nesting. Default is 10. + current_depth: Current recursion depth. Returns: A list of Column expressions, one per field in the expected schema, that evaluate to True if the corresponding field is non-null. """ + if current_depth > max_depth: + return [] + validations = [] for field in expected_schema.fields: field_ref = parsed_struct_col[field.name] if isinstance(field.dataType, types.StructType): - validations += _generate_field_presence_checks(field.dataType, field_ref) + validations += _generate_field_presence_checks( + field.dataType, field_ref, max_depth=max_depth, current_depth=current_depth + 1 + ) else: validations.append(field_ref.isNotNull()) return validations