Skip to content
20 changes: 17 additions & 3 deletions docs/dqx/docs/reference/quality_checks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ You can also define your own custom checks in Python (see [Creating custom check
| `is_geometrycollection` | Checks whether the values in the input column are geometrycollection geometries/geographies. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) |
| `is_ogc_valid` | Checks whether the values in the input column are valid geometries in the OGC sense. I.e a bowtie polygon is invalid because it has a self intersection. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) |
| `is_non_empty_geometry` | Checks whether the values in the input column are non-empty geometries. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) |
| `is_not_null_island` | Checks whether the values in the input column are NULL island geometries (e.g. POINT(0 0), POINTZ(0 0 0), or POINTZM(0 0 0 0)). This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) |
| `has_dimension` | Checks whether the values in the input column are geometries of the specified dimension (2D projected dimension). This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression); `dimension`: dimension to check |
| `has_x_coordinate_between` | Checks whether the values in the input column are geometries with x coordinate between the provided boundaries. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression); `min_value`: minimum value; `max_value`: maximum value |
| `has_y_coordinate_between` | Checks whether the values in the input column are geometries with y coordinate between the provided boundaries. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression); `min_value`: minimum value; `max_value`: maximum value | | `column`: column to check (can be a string column name or a column expression); `min_value`: minimum value; `max_value`: maximum value |

| `has_y_coordinate_between` | Checks whether the values in the input column are geometries with y coordinate between the provided boundaries. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression); `min_value`: minimum value; `max_value`: maximum value |
</details>

<Admonition type="warning" title="Applicability">
Expand Down Expand Up @@ -584,7 +584,14 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen
function: is_non_empty_geometry
arguments:
column: point_geom


# is_not_null_island check
- criticality: error
check:
function: is_not_null_island
arguments:
column: point_geom

# has_dimension check
- criticality: error
check:
Expand Down Expand Up @@ -1048,6 +1055,13 @@ checks = [
column="point_geom"
),

# is_not_null_island check
DQRowRule(
criticality="error",
check_func=geo_check_funcs.is_not_null_island,
column="point_geom"
),

# has_dimension check
DQRowRule(
criticality="error",
Expand Down
36 changes: 36 additions & 0 deletions src/databricks/labs/dqx/geo/check_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,42 @@ def is_non_empty_geometry(column: str | Column) -> Column:
)


@register_rule("row")
def is_not_null_island(column: str | Column) -> Column:
"""Checks whether the values in the input column are NULL island geometries (e.g. POINT(0 0), POINTZ(0 0 0), or
POINTZM(0 0 0 0)).

Args:
column: column to check; can be a string column name or a column expression

Returns:
Column object indicating whether the values in the input column are NULL island geometries

Note:
This function requires Databricks serverless compute or runtime 17.1 or above.
"""
col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column)
# NOTE: This function is currently only available in Databricks runtime 17.1 or above or in
# Databricks SQL, due to the use of the `try_to_geometry`, `st_geometrytype`, `st_x`, and `st_y` functions.
try_geom_expr = f"try_to_geometry({col_str_norm})"
geom_cond = F.expr(f"{try_geom_expr} IS NULL")

is_point_cond = F.expr(f"st_geometrytype({try_geom_expr}) = '{POINT_TYPE}'")
null_xy_cond = F.expr(f"st_x({try_geom_expr}) = 0.0 AND st_y({try_geom_expr}) = 0.0")
null_z_cond = F.expr(f"coalesce(st_z({try_geom_expr}), -1) = 0.0")
null_m_cond = F.expr(f"coalesce(st_m({try_geom_expr}), -1) = 0.0")

is_point_null_island = is_point_cond & null_xy_cond & null_z_cond & null_m_cond
condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(~geom_cond & is_point_cond & is_point_null_island)
condition_str = f"column `{col_expr_str}` contains a null island"

return make_condition(
condition,
F.lit(condition_str),
f"{col_str_norm}_contains_null_island",
)


@register_rule("row")
def has_dimension(column: str | Column, dimension: int) -> Column:
"""Checks whether the geometries/geographies in the input column have a given dimension.
Expand Down
11 changes: 11 additions & 0 deletions tests/integration/test_apply_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6584,6 +6584,17 @@ def test_apply_checks_all_geo_checks_using_classes(skip_if_runtime_not_geo_compa
check_func=geo_check_funcs.is_non_empty_geometry,
column=F.col("point_geom"),
),
# is_not_null_island check
DQRowRule(
criticality="error",
check_func=geo_check_funcs.is_not_null_island,
column="point_geom",
),
DQRowRule(
criticality="error",
check_func=geo_check_funcs.is_not_null_island,
column=F.col("point_geom"),
),
# has_dimension check
DQRowRule(
criticality="error",
Expand Down
36 changes: 36 additions & 0 deletions tests/integration/test_row_checks_geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
is_multilinestring,
is_multipoint,
is_multipolygon,
is_not_null_island,
is_point,
is_polygon,
is_ogc_valid,
Expand Down Expand Up @@ -333,6 +334,41 @@ def test_is_non_empty_geometry(skip_if_runtime_not_geo_compatible, spark):
assert_df_equality(actual, expected, ignore_nullable=True)


def test_is_not_null_island(skip_if_runtime_not_geo_compatible, spark):
input_schema = "geom: string, geomz: string, geomzm: string"
test_df = spark.createDataFrame(
[
["POINT(1 1)", "POINTZ(1 1 1)", "POINTZM(1 1 1 1)"],
["POINT(0 0)", "POINTZ(0 0 0)", "POINTZM(0 0 0 0)"],
["LINESTRING(0 0, 1 1)", "LINESTRING(0 0, 1 1)", "LINESTRING(0 0, 1 1)"],
["nonsense", "nonsense", "nonsense"],
[None, None, None],
],
input_schema,
)

actual = test_df.select(is_not_null_island("geom"), is_not_null_island("geomz"), is_not_null_island("geomzm"))

checked_schema = (
"geom_contains_null_island: string, geomz_contains_null_island: string, geomzm_contains_null_island: string"
)
expected = spark.createDataFrame(
[
[None, None, None],
[
"column `geom` contains a null island",
"column `geomz` contains a null island",
"column `geomzm` contains a null island",
],
[None, None, None],
[None, None, None],
[None, None, None],
],
checked_schema,
)
assert_df_equality(actual, expected, ignore_nullable=True)


def test_has_dimension(skip_if_runtime_not_geo_compatible, spark):
input_schema = "geom: string"
test_df = spark.createDataFrame(
Expand Down
14 changes: 14 additions & 0 deletions tests/perf/test_apply_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,20 @@ def test_benchmark_is_non_empty_geometry(skip_if_runtime_not_geo_compatible, ben
assert actual_count == EXPECTED_ROWS


def test_benchmark_is_not_null_island(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df):
dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS)
checks = [
DQRowRule(
criticality="error",
check_func=geo_check_funcs.is_not_null_island,
column="point_geom",
)
]
checked = dq_engine.apply_checks(generated_geo_df, checks)
actual_count = benchmark(lambda: checked.count())
assert actual_count == EXPECTED_ROWS


def test_benchmark_has_dimension(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df):
dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS)
checks = [
Expand Down
7 changes: 7 additions & 0 deletions tests/resources/all_row_geo_checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@
arguments:
column: point_geom

# is_not_null_island check
- criticality: error
check:
function: is_not_null_island
arguments:
column: point_geom

# has_dimension check
- criticality: error
check:
Expand Down