diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index 8131ebbce..c3024e6cc 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -41,6 +41,9 @@ You can also define your own custom checks in Python (see [Creating custom check | `is_not_greater_than` | Checks whether the values in the input column are not greater than the provided limit. | `column`: column to check (can be a string column name or a column expression); `limit`: limit as number, date, timestamp, column name or sql expression | | `is_valid_date` | Checks whether the values in the input column have valid date formats. | `column`: column to check (can be a string column name or a column expression); `date_format`: optional date format (e.g. 'yyyy-mm-dd') | | `is_valid_timestamp` | Checks whether the values in the input column have valid timestamp formats. | `column`: column to check (can be a string column name or a column expression); `timestamp_format`: optional timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') | +| `is_valid_json` | Checks whether the values in the input column are valid JSON strings. | `column`: column to check (can be a string column name or a column expression) | +| `has_json_keys` | Checks whether the values in the input column contain specific keys in the outermost JSON object. | `column`: column to check (can be a string column name or a column expression); `keys`: A list of JSON keys to verify within the outermost JSON object; `require_all`: optional boolean flag to require all keys to be present | +| `has_valid_json_schema` | Checks whether the values in the specified column, which contain JSON strings, conform to the expected schema. This check is **not strict**. Extra fields in the JSON that are not defined in the schema are ignored. | `column`: column to check (can be a string column name or a column expression); `schema`: the schema as a DDL string (e.g., "id INT, name STRING") or StructType object; | | `is_not_in_future` | Checks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_not_in_near_future` | Checks whether the values in the input column contain a timestamp that is not in the near future, where 'near future' is defined as greater than the current timestamp but less than the current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_older_than_n_days` | Checks whether the values in one input column are at least N days older than the values in another column. | `column`: column to check (can be a string column name or a column expression); `days`: number of days; `curr_date`: current date, if not provided current_date() function is used; `negate`: if the condition should be negated | @@ -326,6 +329,41 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen column: col5 date_format: yyyy-MM-dd +# is_valid_json check +- criticality: error + check: + function: is_valid_json + arguments: + column: col_json_str + +# has_json_keys check +- criticality: error + check: + function: has_json_keys + arguments: + column: col_json_str + keys: + - key1 + +- criticality: error + name: col_json_str_does_not_have_json_keys2 + check: + function: has_json_keys + arguments: + column: col_json_str + keys: + - key1 + - key2 + require_all: False + +- criticality: error + name: col_json_str2_has_invalid_json_schema + check: + function: has_valid_json_schema + arguments: + column: col_json_str2 + schema: "STRUCT" + # is_valid_timestamp check - criticality: error check: @@ -535,42 +573,42 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen function: is_linestring arguments: column: linestring_geom - + # is_polygon check - criticality: error check: function: is_polygon arguments: column: polygon_geom - + # is_multipoint check - criticality: error check: function: is_multipoint arguments: column: multipoint_geom - + # is_multilinestring check - criticality: error check: function: is_multilinestring arguments: column: multilinestring_geom - + # is_multipolygon check - criticality: error check: function: is_multipolygon arguments: column: multipolygon_geom - + # is_geometrycollection check - criticality: error check: function: is_geometrycollection arguments: column: geometrycollection_geom - + # is_ogc_valid check - criticality: error check: @@ -584,7 +622,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen function: is_non_empty_geometry arguments: column: point_geom - + # has_dimension check - criticality: error check: @@ -592,7 +630,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen arguments: column: polygon_geom dimension: 2 - + # has_x_coordinate_between check - criticality: error check: @@ -601,7 +639,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen column: polygon_geom min_value: 0.0 max_value: 10.0 - + # has_y_coordinate_between check - criticality: error check: @@ -610,6 +648,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen column: polygon_geom min_value: 0.0 max_value: 10.0 + ``` @@ -882,6 +921,38 @@ checks = [ name="col6_is_not_valid_timestamp2" ), + # is_valid_json check + DQRowRule( + criticality="error", + check_func=check_funcs.is_valid_json, + column="col_json_str" + ), + + # has_json_keys check + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", # or as expr: F.col("col_json_str") + check_func_kwargs={"keys": ["key1"]}, + name="col_json_str_has_json_keys" + ), + + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", # or as expr: F.col("col_json_str") + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + name="col_json_str_has_json_keys" + ), + + DQRowRule( + criticality="error", + check_func=check_funcs.has_valid_json_schema, + column="col_json_str2", # or as expr: F.col("col_json_str") + check_func_kwargs={"schema": "STRUCT"}, + name="col_json_str2_has_valid_json_schema" + ), + # is_not_in_future check DQRowRule( criticality="error", @@ -1019,7 +1090,7 @@ checks = [ check_func=geo_check_funcs.is_multilinestring, column="multilinestring_geom" ), - + # is_multipolygon check DQRowRule( criticality="error", @@ -3258,7 +3329,7 @@ The PII detection extras include a built-in `does_not_contain_pii` check that us function: does_not_contain_pii arguments: column: description - + # PII detection check with custom threshold and named entities - criticality: error check: @@ -3275,7 +3346,7 @@ The PII detection extras include a built-in `does_not_contain_pii` check that us ```python from databricks.labs.dqx.rule import DQRowRule from databricks.labs.dqx.pii.pii_detection_funcs import does_not_contain_pii - + checks = [ # Basic PII detection check DQRowRule( @@ -3293,7 +3364,7 @@ The PII detection extras include a built-in `does_not_contain_pii` check that us check_func_kwargs={"threshold": 0.8, "entities": ["PERSON", "EMAIL_ADDRESS"]} ), ] - ``` + ``` @@ -3330,7 +3401,7 @@ These can be loaded using `NLPEngineConfig`: from databricks.labs.dqx.rule import DQRowRule from databricks.labs.dqx.pii.pii_detection_funcs import does_not_contain_pii from databricks.labs.dqx.pii.nlp_engine_config import NLPEngineConfig - + checks = [ # PII detection check using spacy as a named entity recognizer DQRowRule( @@ -3339,7 +3410,7 @@ These can be loaded using `NLPEngineConfig`: column="description", check_func=does_not_contain_pii, check_func_kwargs={"nlp_engine_config": NLPEngineConfig.SPACY_MEDIUM} - ), + ), ] ``` @@ -3359,7 +3430,7 @@ Using custom models for named-entity recognition may require you to install thes from databricks.labs.dqx.rule import DQRowRule from databricks.labs.dqx.engine import DQEngine from databricks.sdk import WorkspaceClient - + nlp_engine_config = { 'nlp_engine_name': 'transformers_stanford_deidentifier_base', 'models': [ @@ -3402,9 +3473,9 @@ Using custom models for named-entity recognition may require you to install thes column="description", check_func=does_not_contain_pii, check_func_kwargs={"nlp_engine_config": nlp_engine_config}, - ), + ), ] - + dq_engine = DQEngine(WorkspaceClient()) df = spark.read.table("main.default.table") valid_df, quarantine_df = dq_engine.apply_checks_and_split(df, checks) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index 906147f95..c3cf61507 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1966,6 +1966,152 @@ def apply(df: DataFrame, spark: SparkSession, ref_dfs: dict[str, DataFrame]) -> return condition, apply +@register_rule("row") +def is_valid_json(column: str | Column) -> Column: + """ + Checks whether the values in the input column are valid JSON strings. + + Args: + column: Column name (str) or Column expression to check for valid JSON. + + Returns: + A Spark Column representing the condition for invalid JSON strings. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + return make_condition( + ~F.when(col_expr.isNotNull(), F.try_parse_json(col_expr_str).isNotNull()), + F.concat_ws( + "", + F.lit("Value '"), + col_expr.cast("string"), + F.lit(f"' in Column '{col_expr_str}' is not a valid JSON string"), + ), + f"{col_str_norm}_is_not_valid_json", + ) + + +@register_rule("row") +def has_json_keys(column: str | Column, keys: list[str], require_all: bool = True) -> Column: + """ + Checks whether the values in the input column contain specific keys in the outermost JSON object. + + Args: + column: The name of the column or the column expression to check for JSON keys. + keys: A list of JSON keys to verify within the outermost JSON object. + require_all: If True, all specified keys must be present. If False, at least one key must be present. + + Returns: + Column: A Spark Column representing the condition for missing JSON keys. + """ + if not keys: + raise InvalidParameterError("The 'keys' parameter must be a non-empty list of strings.") + if any(not isinstance(k, str) for k in keys): + raise InvalidParameterError("All keys must be of type string.") + + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + json_keys_array = F.json_object_keys(col_expr) + required_keys = F.array_distinct(F.array(*[F.lit(k) for k in keys])) + + json_validation_error = is_valid_json(col_expr_str) + is_invalid_json = json_validation_error.isNotNull() + + has_json_keys_msg = F.concat_ws( + "", + F.lit("Value '"), + F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), + F.lit(f"' in Column '{col_expr_str}' is missing keys in the list: ["), + F.concat_ws(", ", F.lit(keys)), + F.lit("]"), + ) + message = F.when(is_invalid_json, json_validation_error).otherwise(has_json_keys_msg) + + if require_all: + missing = F.array_except(required_keys, json_keys_array) + condition_when_valid = F.size(missing) == 0 + else: + condition_when_valid = F.arrays_overlap(json_keys_array, required_keys) + + condition = F.when(~is_invalid_json, condition_when_valid).otherwise(F.lit(False)) + + return make_condition( + ~condition, + message, + f"{col_str_norm}_does_not_have_json_keys", + ) + + +@register_rule("row") +def has_valid_json_schema(column: str | Column, schema: str | types.StructType) -> Column: + """ + Validates that JSON strings in the specified column conform to an expected schema. + + This check is not strict. Extra fields in the JSON that are not defined in the schema are ignored. + Please note that the depth of nested structures is limited to 15. + + Args: + column: Column name or Column expression containing JSON strings. + schema: Expected schema as a DDL string (e.g., "id INT, name STRING") or StructType. + + Returns: + Column: A Spark Column representing whether the column conforms to the expected JSON schema. + + Raises: + InvalidParameterError: If the schema string is invalid or cannot be parsed, or if + the input schema is neither a string nor a StructType. + """ + + _expected_schema = _get_schema(schema) + schema_str = _expected_schema.simpleString() + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + + json_validation_error = is_valid_json(col_str_norm) + is_invalid_json = json_validation_error.isNotNull() + + # Add unique corrupt-record field to isolate parse errors + corrupt_record_name = f"{uuid.uuid4().hex[:8]}_dqx_corrupt_record" + + extended_schema = types.StructType( + _expected_schema.fields + [types.StructField(corrupt_record_name, types.StringType(), True)] + ) + + # Attempt to parse JSON using the extended schema + parsed_struct = F.from_json( + col_expr, + extended_schema, + options={"columnNameOfCorruptRecord": corrupt_record_name}, + ) + + # Core conformity: must be valid JSON and not corrupt + is_not_corrupt = parsed_struct[corrupt_record_name].isNull() + base_conformity = ~is_invalid_json & is_not_corrupt + + # Field presence checks (non-null + exists) + field_presence_checks = _generate_field_presence_checks(_expected_schema, parsed_struct) + has_missing_or_null_fields = F.array_contains( + F.array(*[F.coalesce(expr, F.lit(False)) for expr in field_presence_checks]), + False, + ) + + is_conforming = base_conformity & ~has_missing_or_null_fields + condition = is_conforming | col_expr.isNull() + + error_msg = F.concat_ws( + "", + F.lit("Value '"), + F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), + F.lit(f"' in Column '{col_expr_str}' does not conform to expected JSON schema: "), + F.lit(schema_str), + ) + + final_error_msg = F.when(is_invalid_json, json_validation_error).otherwise(error_msg) + + return make_condition( + ~condition, + final_error_msg, + f"{col_str_norm}_has_invalid_json_schema", + ) + + def _get_schema(input_schema: str | types.StructType, columns: list[str] | None = None) -> types.StructType: """ Normalize the input schema into a Spark StructType schema. @@ -2191,6 +2337,38 @@ def _is_compatible_atomic_type(actual_type: types.AtomicType, expected_type: typ return False +def _generate_field_presence_checks( + expected_schema: types.StructType, parsed_struct_col: Column, max_depth: int = 15, current_depth: int = 0 +) -> list[Column]: + """ + Recursively generate Spark Column expressions that verify each field defined in the expected + schema is present and non-null within a parsed struct column. + + Args: + expected_schema: The StructType defining the expected JSON schema. + parsed_struct_col: The parsed struct column (e.g., from from_json) to validate. + max_depth: Maximum recursion depth to prevent excessive nesting. Default is 10. + current_depth: Current recursion depth. + + Returns: + A list of Column expressions, one per field in the expected schema, that evaluate to True + if the corresponding field is non-null. + """ + if current_depth > max_depth: + return [] + + validations = [] + for field in expected_schema.fields: + field_ref = parsed_struct_col[field.name] + if isinstance(field.dataType, types.StructType): + validations += _generate_field_presence_checks( + field.dataType, field_ref, max_depth=max_depth, current_depth=current_depth + 1 + ) + else: + validations.append(field_ref.isNotNull()) + return validations + + def _match_rows( df: DataFrame, ref_df: DataFrame, diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py old mode 100644 new mode 100755 index 764e7120d..0fcf99f86 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -5235,7 +5235,7 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string, col_json_str2: string" ) test_df = spark.createDataFrame( [ @@ -5252,6 +5252,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.1", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', ], [ "val2", @@ -5266,6 +5268,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.2", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', ], [ "val3", @@ -5280,6 +5284,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.3", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', ], ], schema, @@ -5318,6 +5324,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.1", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', None, None, ], @@ -5334,6 +5342,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.2", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', None, None, ], @@ -5350,6 +5360,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.3", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', None, None, ], @@ -5503,7 +5515,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string, col_json_str2: string" ) test_df = spark.createDataFrame( [ @@ -5520,6 +5532,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.0", "2001:0db8:85a3:08d3:0000:0000:0000:0001", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', ], [ "val2", @@ -5534,6 +5548,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.1", "2001:0db8:85a3:08d3:0000:0000:0000:1", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', ], [ "val3", @@ -5548,6 +5564,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.2", "2001:0db8:85a3:08d3:0000::2", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', ], ], schema, @@ -5574,6 +5592,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.0", "2001:0db8:85a3:08d3:0000:0000:0000:0001", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', None, None, ], @@ -5590,6 +5610,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.1", "2001:0db8:85a3:08d3:0000:0000:0000:1", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', None, None, ], @@ -5606,6 +5628,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.2", "2001:0db8:85a3:08d3:0000::2", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', None, None, ], @@ -6318,6 +6342,31 @@ def test_apply_checks_all_checks_using_classes(ws, spark): column="col6", check_func_kwargs={"window_minutes": 1, "min_records_per_window": 1, "lookback_windows": 3}, ), + # is_valid_json check + DQRowRule( + criticality="error", + check_func=check_funcs.is_valid_json, + column="col_json_str", + ), + # has_json_keys check + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + ), + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1"]}, + ), + DQRowRule( + criticality="error", + check_func=check_funcs.has_valid_json_schema, + column="col_json_str2", + check_func_kwargs={"schema": "STRUCT"}, + ), ] dq_engine = DQEngine(ws) @@ -6325,7 +6374,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string, col_json_str2: string" ) test_df = spark.createDataFrame( [ @@ -6342,6 +6391,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.255", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', ], [ "val2", @@ -6356,6 +6407,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.1", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', ], [ "val3", @@ -6370,6 +6423,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.2", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', ], ], schema, @@ -6393,6 +6448,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.255", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', None, None, ], @@ -6409,6 +6466,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.1", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', None, None, ], @@ -6425,6 +6484,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.2", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', None, None, ], diff --git a/tests/integration/test_row_checks.py b/tests/integration/test_row_checks.py index da5f0bd81..8e55eb67d 100644 --- a/tests/integration/test_row_checks.py +++ b/tests/integration/test_row_checks.py @@ -25,6 +25,9 @@ is_not_null_and_is_in_list, is_not_null_and_not_empty_array, is_valid_date, + is_valid_json, + has_json_keys, + has_valid_json_schema, is_valid_timestamp, is_valid_ipv4_address, is_ipv4_address_in_cidr, @@ -2855,3 +2858,224 @@ def test_col_is_equal_to(spark, set_utc_timezone): ) assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_valid_json(spark): + schema = "a: string, b: string" + test_df = spark.createDataFrame( + [ + ['{"key": "value"}', '{"key": value}'], + ['{"number": 123}', '{"number": 123}'], + ['{"array": [1, 2, 3]}', '{"array": [1, 2, 3}'], + ['Not a JSON string', 'Also not JSON'], + [None, None], + ['123', '"a string"'], + ['true', 'null'], + ['[]', '{}'], + ['{"a": 1,}', '{key: "value"}'], + ['[1, 2,', '{"a": "b"'], + ["{'a': 'b'}", ''], + [' {"a": 1} ', '{"b": 2}\n'], + ], + schema, + ) + + actual = test_df.select(is_valid_json("a"), is_valid_json("b")) + + expected_schema = "a_is_not_valid_json: string, b_is_not_valid_json: string" + + expected = spark.createDataFrame( + [ + [None, "Value '{\"key\": value}' in Column 'b' is not a valid JSON string"], + [None, None], + [None, "Value '{\"array\": [1, 2, 3}' in Column 'b' is not a valid JSON string"], + [ + "Value 'Not a JSON string' in Column 'a' is not a valid JSON string", + "Value 'Also not JSON' in Column 'b' is not a valid JSON string", + ], + [None, None], + [None, None], + [None, None], + [None, None], + [ + "Value '{\"a\": 1,}' in Column 'a' is not a valid JSON string", + "Value '{key: \"value\"}' in Column 'b' is not a valid JSON string", + ], + [ + "Value '[1, 2,' in Column 'a' is not a valid JSON string", + "Value '{\"a\": \"b\"' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{'a': 'b'}' in Column 'a' is not a valid JSON string", + "Value '' in Column 'b' is not a valid JSON string", + ], + [None, None], + ], + expected_schema, + ) + + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_json_keys_require_all_true(spark): + schema = "a: string, b: string" + test_df = spark.createDataFrame( + [ + ['{"key": "value", "another_key": 123}', '{"key": "value"}'], + ['{"number": 123}', '{"number": 123, "extra": true}'], + ['{"array": [1, 2, 3]}', '{"array": {1, 2, 3}]'], + ['{"key": "value"}', '{"missing_key": "value"}'], + [None, None], + ['Not a JSON string', '{"key": "value"}'], + ['{"key": "value"}', 'Not a JSON string'], + ['{"key": "value"}', None], + [None, '{"key": "value"}'], + ['{"nested": {"inner_key": "inner_value"}}', '{"nested": {"inner_key": "inner_value"}}'], + ], + schema, + ) + + actual = test_df.select( + has_json_keys("a", ["key", "another_key"]), + has_json_keys("b", ["key"]), + ) + + expected_schema = "a_does_not_have_json_keys: string, b_does_not_have_json_keys: string" + + expected = spark.createDataFrame( + [ + [None, None], + [ + "Value '{\"number\": 123}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"number\": 123, \"extra\": true}' in Column 'b' is missing keys in the list: [key]", + ], + [ + "Value '{\"array\": [1, 2, 3]}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"array\": {1, 2, 3}]' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{\"key\": \"value\"}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"missing_key\": \"value\"}' in Column 'b' is missing keys in the list: [key]", + ], + [None, None], + ["Value 'Not a JSON string' in Column 'a' is not a valid JSON string", None], + [ + "Value '{\"key\": \"value\"}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value 'Not a JSON string' in Column 'b' is not a valid JSON string", + ], + ["Value '{\"key\": \"value\"}' in Column 'a' is missing keys in the list: [key, another_key]", None], + [None, None], + [ + "Value '{\"nested\": {\"inner_key\": \"inner_value\"}}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"nested\": {\"inner_key\": \"inner_value\"}}' in Column 'b' is missing keys in the list: [key]", + ], + ], + expected_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_json_keys_require_at_least_one(spark): + schema = "a: string, b: string" + required_keys = ["key", "another_key", "extra_key"] + + test_data = [ + ['{"key": 1, "another_key": 2, "extra_key": 3}', '{"key": 1, "another_key": 2, "extra_key": 3}'], + ['{"key": 1}', '{"key": 1}'], + ['{"number": 123}', '{"random_sample": 1523}'], + ['{}', '{}'], + ['{"key": "value"', '{"key": "value"'], + [None, 'Not a JSON string'], + ] + + test_df = spark.createDataFrame(test_data, schema) + + actual = test_df.select( + has_json_keys("a", required_keys, require_all=False), + has_json_keys("b", required_keys, require_all=False), + ) + + expected_schema = "a_does_not_have_json_keys: string, b_does_not_have_json_keys: string" + + expected = spark.createDataFrame( + [ + [None, None], + [None, None], + [ + "Value '{\"number\": 123}' in Column 'a' is missing keys in the list: [key, another_key, extra_key]", + "Value '{\"random_sample\": 1523}' in Column 'b' is missing keys in the list: [key, another_key, extra_key]", + ], + [ + "Value '{}' in Column 'a' is missing keys in the list: [key, another_key, extra_key]", + "Value '{}' in Column 'b' is missing keys in the list: [key, another_key, extra_key]", + ], + [ + "Value '{\"key\": \"value\"' in Column 'a' is not a valid JSON string", + "Value '{\"key\": \"value\"' in Column 'b' is not a valid JSON string", + ], + [None, "Value 'Not a JSON string' in Column 'b' is not a valid JSON string"], + ], + expected_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_valid_json_schema(spark): + schema = "a: string, b: string" + test_df = spark.createDataFrame( + [ + ['{"a": 1, "b": 2}', '{"a": 3, "b": 4}'], + ['{"key": "value", "another_key": 123}', '{"key": "value"}'], + ['{"number": 123}', '{"number": 123, "extra": true}'], + ['{"array": [1, 2, 3]}', '{"array": {1, 2, 3}]'], + ['{"key": "value"}', '{"missing_key": "value"}'], + [None, None], + ['Not a JSON string', '{"key": "value"}'], + ['{"key": "value"}', 'Not a JSON string'], + ['{"key": "value"}', None], + ], + schema, + ) + + json_schema = "STRUCT" + expected_schema = "a_has_invalid_json_schema: string, b_has_invalid_json_schema: string" + expected = spark.createDataFrame( + [ + [None, None], + [ + "Value '{\"key\": \"value\", \"another_key\": 123}' in Column 'a' does not conform to expected JSON schema: struct", + "Value '{\"key\": \"value\"}' in Column 'b' does not conform to expected JSON schema: struct", + ], + [ + "Value '{\"number\": 123}' in Column 'a' does not conform to expected JSON schema: struct", + "Value '{\"number\": 123, \"extra\": true}' in Column 'b' does not conform to expected JSON schema: struct", + ], + [ + "Value '{\"array\": [1, 2, 3]}' in Column 'a' does not conform to expected JSON schema: struct", + "Value '{\"array\": {1, 2, 3}]' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{\"key\": \"value\"}' in Column 'a' does not conform to expected JSON schema: struct", + "Value '{\"missing_key\": \"value\"}' in Column 'b' does not conform to expected JSON schema: struct", + ], + [None, None], + [ + "Value 'Not a JSON string' in Column 'a' is not a valid JSON string", + "Value '{\"key\": \"value\"}' in Column 'b' does not conform to expected JSON schema: struct", + ], + [ + "Value '{\"key\": \"value\"}' in Column 'a' does not conform to expected JSON schema: struct", + "Value 'Not a JSON string' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{\"key\": \"value\"}' in Column 'a' does not conform to expected JSON schema: struct", + None, + ], + ], + expected_schema, + ) + actual = test_df.select( + has_valid_json_schema("a", json_schema), + has_valid_json_schema("b", json_schema), + ) + assert_df_equality(actual, expected, ignore_nullable=True) diff --git a/tests/perf/conftest.py b/tests/perf/conftest.py index 16b4437a7..eb29ff95e 100644 --- a/tests/perf/conftest.py +++ b/tests/perf/conftest.py @@ -31,7 +31,8 @@ SCHEMA_STR = ( "col1: int, col2: int, col3: int, col4: array, " "col5: date, col6: timestamp, col7: map, " - "col8: struct, col10: int, col_ipv4: string, col_ipv6: string" + "col8: struct, col10: int, col_ipv4: string, col_ipv6: string, " + "col_json_str: string" ) RUN_TIME = datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc) @@ -101,6 +102,7 @@ def generated_df(spark, rows=DEFAULT_ROWS): .withColumnSpec("col10") .withColumnSpec("col_ipv4", template=r"\n.\n.\n.\n") .withColumnSpec("col_ipv6", template="XXXX:XXXX:XXXX:XXXX:XXXX:XXXX:XXXX:XXXX") + .withColumnSpec("col_json_str", template=r"{'key1': '\w', 'key2': 'd\w'}") ) return spec.build() diff --git a/tests/perf/test_apply_checks.py b/tests/perf/test_apply_checks.py index 4785d8f3f..ad7305aa1 100644 --- a/tests/perf/test_apply_checks.py +++ b/tests/perf/test_apply_checks.py @@ -1630,6 +1630,69 @@ def test_benchmark_foreach_has_valid_schema(benchmark, ws, generated_string_df): assert actual_count == EXPECTED_ROWS +@pytest.mark.benchmark(group="test_benchmark_is_valid_json") +def test_benchmark_is_valid_json(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.is_valid_json, + column="col_json_str", + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +@pytest.mark.benchmark(group="test_benchmark_has_json_keys") +def test_benchmark_has_json_keys_require_at_least_one(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +@pytest.mark.benchmark(group="test_benchmark_has_json_keys") +def test_benchmark_has_json_keys_require_all_true(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"]}, + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +@pytest.mark.benchmark(group="test_benchmark_has_valid_json_schema") +def test_benchmark_has_valid_json_schema(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.has_valid_json_schema, + column="col_json_str", + check_func_kwargs={"schema": "STRUCT"}, + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + def test_benchmark_is_aggr_count_distinct_with_group_by(benchmark, ws, generated_df): """Benchmark count_distinct with group_by (uses two-stage aggregation: groupBy + join).""" dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) @@ -1669,7 +1732,7 @@ def test_benchmark_is_aggr_approx_count_distinct_with_group_by(benchmark, ws, ge actual_count = benchmark(lambda: checked.count()) assert actual_count == EXPECTED_ROWS - + def test_benchmark_is_aggr_count_distinct_no_group_by(benchmark, ws, generated_df): """Benchmark count_distinct without group_by (baseline - uses standard aggregation).""" dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) diff --git a/tests/resources/all_row_checks.yaml b/tests/resources/all_row_checks.yaml index a7e8f47d8..eb81d444b 100644 --- a/tests/resources/all_row_checks.yaml +++ b/tests/resources/all_row_checks.yaml @@ -240,6 +240,39 @@ arguments: column: col6 +# is_valid_json check +- criticality: error + check: + function: is_valid_json + arguments: + column: col_json_str + +# has_json_keys check +- criticality: error + check: + function: has_json_keys + arguments: + column: col_json_str + keys: + - key1 + +- criticality: error + check: + function: has_json_keys + arguments: + column: col_json_str + keys: + - key1 + - key2 + require_all: false + +- criticality: error + check: + function: has_valid_json_schema + arguments: + column: col_json_str2 + schema: 'STRUCT' + # is_not_in_future check - criticality: error check: diff --git a/tests/unit/test_build_rules.py b/tests/unit/test_build_rules.py index a34e26dfd..d21532872 100644 --- a/tests/unit/test_build_rules.py +++ b/tests/unit/test_build_rules.py @@ -26,6 +26,8 @@ is_not_less_than, is_not_greater_than, is_valid_date, + is_valid_json, + has_json_keys, regex_match, compare_datasets, ) @@ -1143,6 +1145,20 @@ def test_convert_dq_rules_to_metadata(): DQRowRule( criticality="error", check_func=is_valid_date, column="b", check_func_kwargs={"date_format": "yyyy-MM-dd"} ), + DQRowRule(criticality="error", check_func=is_valid_json, column="col_json_str"), + DQRowRule( + criticality="error", + check_func=has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1"]}, + ), + DQRowRule( + name="col_json_str_has_no_json_key1_key2", + criticality="error", + check_func=has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + ), DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1", "col2"]), DQDatasetRule( criticality="error", @@ -1302,6 +1318,27 @@ def test_convert_dq_rules_to_metadata(): "arguments": {"column": "b", "date_format": "yyyy-MM-dd"}, }, }, + { + "name": "col_json_str_is_not_valid_json", + "criticality": "error", + "check": { + "function": "is_valid_json", + "arguments": {"column": "col_json_str"}, + }, + }, + { + "name": "col_json_str_does_not_have_json_keys", + "criticality": "error", + "check": {"function": "has_json_keys", "arguments": {"column": "col_json_str", "keys": ["key1"]}}, + }, + { + "name": "col_json_str_has_no_json_key1_key2", + "criticality": "error", + "check": { + "function": "has_json_keys", + "arguments": {"column": "col_json_str", "keys": ["key1", "key2"], "require_all": False}, + }, + }, { "name": "struct_col1_col2_is_not_unique", "criticality": "error", @@ -1479,6 +1516,7 @@ def test_metadata_round_trip_conversion_preserves_rules() -> None: DQRowRule( criticality="error", check_func=is_valid_date, column="b", check_func_kwargs={"date_format": "yyyy-MM-dd"} ), + DQRowRule(criticality="error", check_func=is_valid_json, column="b"), DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1", "col2"]), DQDatasetRule( criticality="error",