Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6edda4e
feat: extended is_aggr_*
vb-dbrks Nov 27, 2025
e8602ce
fmt: code formatting and cleanup
vb-dbrks Nov 27, 2025
ae6de97
add:
vb-dbrks Nov 27, 2025
a661b93
fix: test expectation
vb-dbrks Nov 27, 2025
91b5029
fix + fmt: test assetion for arraytype
vb-dbrks Nov 27, 2025
817cf4a
docs: fixes
vb-dbrks Nov 27, 2025
45eab64
fix: count_distinct does not support group_by because of the use of w…
vb-dbrks Nov 27, 2025
53ae329
docs + fix: count_distinct is now supported with a generalised implem…
vb-dbrks Nov 27, 2025
1611dfb
docs + fmt: improved the doc language and code comments.
vb-dbrks Nov 27, 2025
d38f671
test cov + docs: removed dead code and added a new test. Improved doc…
vb-dbrks Nov 27, 2025
b48ff6e
fix: docs build docstrings and dict syntax issue
vb-dbrks Nov 27, 2025
1f8a8df
docs: Spark/DBR version compatibility.
vb-dbrks Nov 27, 2025
3640b8d
review: addressing PR code review comments.
vb-dbrks Nov 28, 2025
e9bc7b3
perf test: added a new set of performance tests for is_aggr count dis…
vb-dbrks Nov 28, 2025
9671ca2
flaky test fix
vb-dbrks Nov 28, 2025
759edcf
docs: fix broken link
vb-dbrks Nov 28, 2025
69f31c9
Add pytest-benchmark performance baseline
vb-dbrks Nov 28, 2025
20870c8
docs: pr review addressed
vb-dbrks Dec 2, 2025
91610d0
Merge main into feature branch: resolve conflicts in check_funcs.py a…
vb-dbrks Dec 2, 2025
1d0c96a
improvement: readability of error messages + update tests
vb-dbrks Dec 2, 2025
1f68600
fix: stacklevel for warnings, trailing whitespace, add Column express…
vb-dbrks Dec 2, 2025
7cf0ccf
code hardening:
vb-dbrks Dec 2, 2025
24748ae
fix flaky test: test_apply_checks_and_save_in_tables_for_patterns_exc…
vb-dbrks Dec 3, 2025
c500cff
code improv: consistent aggregate violation messages with params support
vb-dbrks Dec 5, 2025
0b60cf8
Merge branch 'main' into 929-feature-extend-is_aggr-check_funcs
vb-dbrks Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions demos/dqx_demo_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,227 @@ def not_ends_with(column: str, suffix: str) -> Column:

# COMMAND ----------

# MAGIC %md
# MAGIC ### Extended Aggregate Functions for Data Quality Checks
# MAGIC
# MAGIC DQX now supports 20 curated aggregate functions for advanced data quality monitoring:
# MAGIC - **Statistical functions**: `stddev`, `variance`, `median`, `mode`, `skewness`, `kurtosis` for anomaly detection
# MAGIC - **Percentile functions**: `percentile`, `approx_percentile` for SLA monitoring
# MAGIC - **Cardinality functions**: `count_distinct`, `approx_count_distinct` (uses HyperLogLog++)
# MAGIC - **Any Databricks built-in aggregate**: Supported with runtime validation

# COMMAND ----------

# MAGIC %md
# MAGIC #### Example 1: Statistical Functions - Anomaly Detection with Standard Deviation
# MAGIC
# MAGIC Detect unusual variance in sensor readings per machine. High standard deviation indicates unstable sensors that may need calibration.

# COMMAND ----------

from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQDatasetRule
from databricks.labs.dqx import check_funcs
from databricks.sdk import WorkspaceClient

# Manufacturing sensor data with readings from multiple machines
manufacturing_df = spark.createDataFrame([
["M1", "2024-01-01", 20.1],
["M1", "2024-01-02", 20.3],
["M1", "2024-01-03", 20.2], # Machine 1: stable readings (low stddev)
["M2", "2024-01-01", 18.5],
["M2", "2024-01-02", 25.7],
["M2", "2024-01-03", 15.2], # Machine 2: unstable readings (high stddev) - should FAIL
["M3", "2024-01-01", 19.8],
["M3", "2024-01-02", 20.1],
["M3", "2024-01-03", 19.9], # Machine 3: stable readings
], "machine_id: string, date: string, temperature: double")

# Quality check: Standard deviation should not exceed 3.0 per machine
checks = [
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
column="temperature",
check_func_kwargs={
"aggr_type": "stddev",
"group_by": ["machine_id"],
"limit": 3.0
},
),
]

dq_engine = DQEngine(WorkspaceClient())
result_df = dq_engine.apply_checks(manufacturing_df, checks)
display(result_df)

# COMMAND ----------

# MAGIC %md
# MAGIC #### Example 2: Approximate Aggregate Functions - Efficient Cardinality Estimation
# MAGIC
# MAGIC **`approx_count_distinct`** provides fast, memory-efficient cardinality estimation for large datasets.
# MAGIC
# MAGIC **From [Databricks Documentation](https://docs.databricks.com/aws/en/sql/language-manual/functions/approx_count_distinct.html):**
# MAGIC - Uses **HyperLogLog++** (HLL++) algorithm, a state-of-the-art cardinality estimator
# MAGIC - **Accurate within 5%** by default (configurable via `relativeSD` parameter)
# MAGIC - **Memory efficient**: Uses fixed memory regardless of cardinality
# MAGIC - **Ideal for**: High-cardinality columns, large datasets, real-time analytics
# MAGIC
# MAGIC **Use Case**: Monitor daily active users without expensive exact counting.

# COMMAND ----------

# User activity data with high cardinality
user_activity_df = spark.createDataFrame([
["2024-01-01", f"user_{i}"] for i in range(1, 95001) # 95,000 distinct users on day 1
] + [
["2024-01-02", f"user_{i}"] for i in range(1, 50001) # 50,000 distinct users on day 2
], "activity_date: string, user_id: string")

# Quality check: Ensure daily active users don't drop below 60,000
# Using approx_count_distinct is much faster than count_distinct for large datasets
checks = [
DQDatasetRule(
criticality="warn",
check_func=check_funcs.is_aggr_not_less_than,
column="user_id",
check_func_kwargs={
"aggr_type": "approx_count_distinct", # Fast approximate counting
"group_by": ["activity_date"],
"limit": 60000
},
),
]

result_df = dq_engine.apply_checks(user_activity_df, checks)
display(result_df)

# COMMAND ----------

# MAGIC %md
# MAGIC #### Example 3: Non-Curated Aggregate Functions with Runtime Validation
# MAGIC
# MAGIC DQX supports any Databricks built-in aggregate function beyond the curated list:
# MAGIC - **Warning**: Non-curated functions trigger a warning
# MAGIC - **Runtime validation**: Ensures the function returns numeric values compatible with comparisons
# MAGIC - **Graceful errors**: Invalid aggregates (e.g., `collect_list` returning arrays) fail with clear messages
# MAGIC
# MAGIC **Note**: User-Defined Aggregate Functions (UDAFs) are not currently supported.

# COMMAND ----------

import warnings

# Sensor data with multiple readings per sensor
sensor_sample_df = spark.createDataFrame([
["S1", 45.2],
["S1", 45.8],
["S2", 78.1],
["S2", 78.5],
], "sensor_id: string, reading: double")

# Using a valid but non-curated aggregate function: any_value
# This will work but produce a warning
checks = [
DQDatasetRule(
criticality="warn",
check_func=check_funcs.is_aggr_not_greater_than,
column="reading",
check_func_kwargs={
"aggr_type": "any_value", # Not in curated list - triggers warning
"group_by": ["sensor_id"],
"limit": 100.0
},
),
]

# Capture warnings during execution
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result_df = dq_engine.apply_checks(sensor_sample_df, checks)

# Display warning message if present
if w:
print(f"⚠️ Warning: {w[0].message}")

display(result_df)

# COMMAND ----------

# MAGIC %md
# MAGIC #### Example 4: Percentile Functions for SLA Monitoring
# MAGIC
# MAGIC Monitor P95 latency to ensure 95% of API requests meet SLA requirements.
# MAGIC
# MAGIC **Using `aggr_params`:** Pass aggregate function parameters as a dictionary.
# MAGIC - Single parameter: `aggr_params={"percentile": 0.95}`
# MAGIC - Multiple parameters: `aggr_params={"percentile": 0.99, "accuracy": 10000}`

# COMMAND ----------

# API latency data in milliseconds
api_latency_df = spark.createDataFrame([
["2024-01-01", i * 10.0] for i in range(1, 101) # 10ms to 1000ms latencies
], "date: string, latency_ms: double")

# Quality check: P95 latency must be under 950ms
checks = [
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
column="latency_ms",
check_func_kwargs={
"aggr_type": "percentile",
"aggr_params": {"percentile": 0.95}, # aggr_params as dict
"group_by": ["date"],
"limit": 950.0
},
),
]

result_df = dq_engine.apply_checks(api_latency_df, checks)
display(result_df)

# COMMAND ----------

# MAGIC %md
# MAGIC #### Example 5: Uniqueness Validation with Count Distinct
# MAGIC
# MAGIC Ensure referential integrity: each country should have exactly one country code.
# MAGIC
# MAGIC Use `count_distinct` for exact cardinality validation across groups.

# COMMAND ----------

# Country data with potential duplicates
country_df = spark.createDataFrame([
["US", "USA"],
["US", "USA"], # OK: same code
["FR", "FRA"],
["FR", "FRN"], # ERROR: different codes for same country
["DE", "DEU"],
], "country: string, country_code: string")

# Quality check: Each country must have exactly one distinct country code
checks = [
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
column="country_code",
check_func_kwargs={
"aggr_type": "count_distinct", # Exact distinct count per group
"group_by": ["country"],
"limit": 1
},
),
]

result_df = dq_engine.apply_checks(country_df, checks)
display(result_df)

# COMMAND ----------

# MAGIC %md
# MAGIC ### Creating custom dataset-level checks
# MAGIC Requirement: Fail all readings from a sensor if any reading for that sensor exceeds a specified threshold from the sensor specification table.
Expand Down
139 changes: 139 additions & 0 deletions docs/dqx/docs/guide/quality_checks_definition.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,41 @@ This approach provides static type checking and autocompletion in IDEs, making i
column="col1",
check_func_kwargs={"aggr_type": "avg", "group_by": ["col2"], "limit": 10.5},
),

# Extended aggregate functions for advanced data quality checks

DQDatasetRule( # Uniqueness check: each country should have one country code
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
column="country_code",
check_func_kwargs={
"aggr_type": "count_distinct", # Exact distinct count (automatically uses two-stage aggregation)
"group_by": ["country"],
"limit": 1
},
),

DQDatasetRule( # Anomaly detection: detect unusual variance in sensor readings
criticality="warn",
check_func=check_funcs.is_aggr_not_greater_than,
column="temperature",
check_func_kwargs={
"aggr_type": "stddev",
"group_by": ["machine_id"],
"limit": 5.0
},
),

DQDatasetRule( # SLA monitoring: P95 latency must be under 1 second
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
column="latency_ms",
check_func_kwargs={
"aggr_type": "percentile",
"aggr_params": {"percentile": 0.95},
"limit": 1000
},
),
]
```
</TabItem>
Expand All @@ -144,6 +179,110 @@ This approach provides static type checking and autocompletion in IDEs, making i
The validation of arguments and keyword arguments for the check function is automatically performed upon creating a `DQRowRule`.
</Admonition>

## Practical Use Cases for Extended Aggregates

### Uniqueness Validation with count_distinct

Ensure referential integrity by verifying that each entity has exactly one identifier:

```yaml
# Each country should have exactly one country code
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: country_code
aggr_type: count_distinct
group_by:
- country
limit: 1
```

### Anomaly Detection with Statistical Functions

Detect unusual patterns in sensor data or business metrics:

```yaml
# Alert on unusually high temperature variance per machine
- criticality: warn
check:
function: is_aggr_not_greater_than
arguments:
column: temperature
aggr_type: stddev
group_by:
- machine_id
limit: 5.0

# Monitor revenue stability across product lines
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: daily_revenue
aggr_type: variance
group_by:
- product_line
limit: 1000000.0
```

### SLA and Performance Monitoring with Percentiles

Monitor service performance and ensure SLA compliance:

```yaml
# P95 API latency must be under 1 second
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: latency_ms
aggr_type: percentile
aggr_params:
percentile: 0.95
limit: 1000

# P99 response time check (fast approximate)
- criticality: warn
check:
function: is_aggr_not_greater_than
arguments:
column: response_time_ms
aggr_type: approx_percentile
aggr_params:
percentile: 0.99
accuracy: 10000
limit: 5000

# Median baseline for order processing time
- criticality: warn
check:
function: is_aggr_not_less_than
arguments:
column: processing_time_sec
aggr_type: median
group_by:
- order_type
limit: 30.0
```

<Admonition type="tip" title="Choosing the Right Aggregate Function">
**Uniqueness & Cardinality:**
- `count_distinct` - Exact distinct counts (uniqueness validation)
- `approx_count_distinct` - Fast approximate counts (very large datasets)

**Statistical Monitoring:**
- `stddev` / `variance` - Detect anomalies and inconsistencies
- `median` - Baseline checks, central tendency
- `mode` - Most frequent value (categorical data)

**Performance & SLAs:**
- `percentile` - Exact P95/P99 for SLA compliance
- `approx_percentile` - Fast percentile estimates for large datasets

**Learn more:** See all [Databricks aggregate functions](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-functions-builtin-alpha)
</Admonition>

### Checks defined using metadata (list of dictionaries)

Checks can be defined using declarative syntax as a list of dictionaries.
Expand Down
3 changes: 3 additions & 0 deletions docs/dqx/docs/reference/benchmarks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ sidebar_position: 13
| test_benchmark_has_valid_schema | 0.172078 | 0.172141 | 0.163793 | 0.181081 | 0.006715 | 0.009295 | 0.167010 | 0.176305 | 6 | 0 | 2 | 5.81 |
| test_benchmark_has_x_coordinate_between | 0.217192 | 0.213656 | 0.209310 | 0.236233 | 0.011150 | 0.012638 | 0.209410 | 0.222048 | 5 | 0 | 1 | 4.60 |
| test_benchmark_has_y_coordinate_between | 0.218497 | 0.219630 | 0.209352 | 0.234111 | 0.010103 | 0.013743 | 0.209584 | 0.223327 | 5 | 0 | 1 | 4.58 |
| test_benchmark_is_aggr_approx_count_distinct_with_group_by | 0.242859 | 0.239567 | 0.223938 | 0.272970 | 0.018260 | 0.017771 | 0.232410 | 0.250181 | 5 | 0 | 2 | 4.12 |
| test_benchmark_is_aggr_count_distinct_no_group_by | 0.277439 | 0.272561 | 0.250425 | 0.320653 | 0.027686 | 0.038256 | 0.256090 | 0.294346 | 5 | 0 | 1 | 3.60 |
| test_benchmark_is_aggr_count_distinct_with_group_by | 0.248015 | 0.249024 | 0.231550 | 0.270066 | 0.016044 | 0.026386 | 0.233048 | 0.259434 | 5 | 0 | 2 | 4.03 |
| test_benchmark_is_aggr_equal | 0.304401 | 0.305693 | 0.266624 | 0.330403 | 0.026888 | 0.044641 | 0.284540 | 0.329181 | 5 | 0 | 1 | 3.29 |
| test_benchmark_is_aggr_not_equal | 0.296462 | 0.296800 | 0.275119 | 0.312035 | 0.013498 | 0.013448 | 0.291054 | 0.304502 | 5 | 0 | 2 | 3.37 |
| test_benchmark_is_aggr_not_greater_than | 0.307771 | 0.315185 | 0.277924 | 0.316280 | 0.016705 | 0.010701 | 0.304974 | 0.315675 | 5 | 1 | 1 | 3.25 |
Expand Down
Loading
Loading