Skip to content

Commit 0055e66

Browse files
vb-dbrksghanse
andauthored
Extended aggregation check function to support more aggregation types (#951)
## Changes Extended the is_aggr_* check functions from supporting 5 basic aggregates to 20 curated aggregate functions with a hybrid "Curated + Custom" approach. 1. Curated Aggregate Functions (20 total) Added 15 new aggregate functions. - Cardinality: count_distinct, approx_count_distinct, count_if - Statistical: stddev, stddev_pop, stddev_samp, variance, var_pop, var_samp, median, mode, skewness, kurtosis - Percentile: percentile, approx_percentile 2. Custom Aggregate Support. - Warning mechanism for non-curated aggregates (UserWarning, still executes) - Runtime validation with clear error messages for invalid return types - Human-readable violation messages (e.g., "Distinct value count 2..." instead of "Count_distinct 2...") 3. New aggr_params Parameter Added aggr_params: dict[str, Any] to all 4 is_aggr_* functions for aggregates requiring parameters (e.g., percentile, approx_percentile). 4. count_distinct with group_by Support Implemented two-stage aggregation (groupBy + join) for window-incompatible aggregates like count_distinct. 5. Bug Fixes & Improvements - Fixed flaky test test_apply_checks_and_save_in_tables_for_patterns_exclude_no_tables_matching - Added performance benchmarks for count_distinct vs approx_count_distinct - Updated documentation with usage examples and performance guidance ### Linked issues closes #933 and #929 Resolves #.. ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [x] manually tested - [x] added unit tests - [x] added integration tests - [x] added end-to-end tests - [x] added performance tests --------- Co-authored-by: vb-dbrks <vb-dbrks@users.noreply.github.com> Co-authored-by: Greg Hansen <gregory.hansen@databricks.com>
1 parent 55df7fb commit 0055e66

File tree

11 files changed

+1494
-161
lines changed

11 files changed

+1494
-161
lines changed

demos/dqx_demo_library.py

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,227 @@ def not_ends_with(column: str, suffix: str) -> Column:
690690

691691
# COMMAND ----------
692692

693+
# MAGIC %md
694+
# MAGIC ### Extended Aggregate Functions for Data Quality Checks
695+
# MAGIC
696+
# MAGIC DQX now supports 20 curated aggregate functions for advanced data quality monitoring:
697+
# MAGIC - **Statistical functions**: `stddev`, `variance`, `median`, `mode`, `skewness`, `kurtosis` for anomaly detection
698+
# MAGIC - **Percentile functions**: `percentile`, `approx_percentile` for SLA monitoring
699+
# MAGIC - **Cardinality functions**: `count_distinct`, `approx_count_distinct` (uses HyperLogLog++)
700+
# MAGIC - **Any Databricks built-in aggregate**: Supported with runtime validation
701+
702+
# COMMAND ----------
703+
704+
# MAGIC %md
705+
# MAGIC #### Example 1: Statistical Functions - Anomaly Detection with Standard Deviation
706+
# MAGIC
707+
# MAGIC Detect unusual variance in sensor readings per machine. High standard deviation indicates unstable sensors that may need calibration.
708+
709+
# COMMAND ----------
710+
711+
from databricks.labs.dqx.engine import DQEngine
712+
from databricks.labs.dqx.rule import DQDatasetRule
713+
from databricks.labs.dqx import check_funcs
714+
from databricks.sdk import WorkspaceClient
715+
716+
# Manufacturing sensor data with readings from multiple machines
717+
manufacturing_df = spark.createDataFrame([
718+
["M1", "2024-01-01", 20.1],
719+
["M1", "2024-01-02", 20.3],
720+
["M1", "2024-01-03", 20.2], # Machine 1: stable readings (low stddev)
721+
["M2", "2024-01-01", 18.5],
722+
["M2", "2024-01-02", 25.7],
723+
["M2", "2024-01-03", 15.2], # Machine 2: unstable readings (high stddev) - should FAIL
724+
["M3", "2024-01-01", 19.8],
725+
["M3", "2024-01-02", 20.1],
726+
["M3", "2024-01-03", 19.9], # Machine 3: stable readings
727+
], "machine_id: string, date: string, temperature: double")
728+
729+
# Quality check: Standard deviation should not exceed 3.0 per machine
730+
checks = [
731+
DQDatasetRule(
732+
criticality="error",
733+
check_func=check_funcs.is_aggr_not_greater_than,
734+
column="temperature",
735+
check_func_kwargs={
736+
"aggr_type": "stddev",
737+
"group_by": ["machine_id"],
738+
"limit": 3.0
739+
},
740+
),
741+
]
742+
743+
dq_engine = DQEngine(WorkspaceClient())
744+
result_df = dq_engine.apply_checks(manufacturing_df, checks)
745+
display(result_df)
746+
747+
# COMMAND ----------
748+
749+
# MAGIC %md
750+
# MAGIC #### Example 2: Approximate Aggregate Functions - Efficient Cardinality Estimation
751+
# MAGIC
752+
# MAGIC **`approx_count_distinct`** provides fast, memory-efficient cardinality estimation for large datasets.
753+
# MAGIC
754+
# MAGIC **From [Databricks Documentation](https://docs.databricks.com/aws/en/sql/language-manual/functions/approx_count_distinct.html):**
755+
# MAGIC - Uses **HyperLogLog++** (HLL++) algorithm, a state-of-the-art cardinality estimator
756+
# MAGIC - **Accurate within 5%** by default (configurable via `relativeSD` parameter)
757+
# MAGIC - **Memory efficient**: Uses fixed memory regardless of cardinality
758+
# MAGIC - **Ideal for**: High-cardinality columns, large datasets, real-time analytics
759+
# MAGIC
760+
# MAGIC **Use Case**: Monitor daily active users without expensive exact counting.
761+
762+
# COMMAND ----------
763+
764+
# User activity data with high cardinality
765+
user_activity_df = spark.createDataFrame([
766+
["2024-01-01", f"user_{i}"] for i in range(1, 95001) # 95,000 distinct users on day 1
767+
] + [
768+
["2024-01-02", f"user_{i}"] for i in range(1, 50001) # 50,000 distinct users on day 2
769+
], "activity_date: string, user_id: string")
770+
771+
# Quality check: Ensure daily active users don't drop below 60,000
772+
# Using approx_count_distinct is much faster than count_distinct for large datasets
773+
checks = [
774+
DQDatasetRule(
775+
criticality="warn",
776+
check_func=check_funcs.is_aggr_not_less_than,
777+
column="user_id",
778+
check_func_kwargs={
779+
"aggr_type": "approx_count_distinct", # Fast approximate counting
780+
"group_by": ["activity_date"],
781+
"limit": 60000
782+
},
783+
),
784+
]
785+
786+
result_df = dq_engine.apply_checks(user_activity_df, checks)
787+
display(result_df)
788+
789+
# COMMAND ----------
790+
791+
# MAGIC %md
792+
# MAGIC #### Example 3: Non-Curated Aggregate Functions with Runtime Validation
793+
# MAGIC
794+
# MAGIC DQX supports any Databricks built-in aggregate function beyond the curated list:
795+
# MAGIC - **Warning**: Non-curated functions trigger a warning
796+
# MAGIC - **Runtime validation**: Ensures the function returns numeric values compatible with comparisons
797+
# MAGIC - **Graceful errors**: Invalid aggregates (e.g., `collect_list` returning arrays) fail with clear messages
798+
# MAGIC
799+
# MAGIC **Note**: User-Defined Aggregate Functions (UDAFs) are not currently supported.
800+
801+
# COMMAND ----------
802+
803+
import warnings
804+
805+
# Sensor data with multiple readings per sensor
806+
sensor_sample_df = spark.createDataFrame([
807+
["S1", 45.2],
808+
["S1", 45.8],
809+
["S2", 78.1],
810+
["S2", 78.5],
811+
], "sensor_id: string, reading: double")
812+
813+
# Using a valid but non-curated aggregate function: any_value
814+
# This will work but produce a warning
815+
checks = [
816+
DQDatasetRule(
817+
criticality="warn",
818+
check_func=check_funcs.is_aggr_not_greater_than,
819+
column="reading",
820+
check_func_kwargs={
821+
"aggr_type": "any_value", # Not in curated list - triggers warning
822+
"group_by": ["sensor_id"],
823+
"limit": 100.0
824+
},
825+
),
826+
]
827+
828+
# Capture warnings during execution
829+
with warnings.catch_warnings(record=True) as w:
830+
warnings.simplefilter("always")
831+
result_df = dq_engine.apply_checks(sensor_sample_df, checks)
832+
833+
# Display warning message if present
834+
if w:
835+
print(f"⚠️ Warning: {w[0].message}")
836+
837+
display(result_df)
838+
839+
# COMMAND ----------
840+
841+
# MAGIC %md
842+
# MAGIC #### Example 4: Percentile Functions for SLA Monitoring
843+
# MAGIC
844+
# MAGIC Monitor P95 latency to ensure 95% of API requests meet SLA requirements.
845+
# MAGIC
846+
# MAGIC **Using `aggr_params`:** Pass aggregate function parameters as a dictionary.
847+
# MAGIC - Single parameter: `aggr_params={"percentile": 0.95}`
848+
# MAGIC - Multiple parameters: `aggr_params={"percentile": 0.99, "accuracy": 10000}`
849+
850+
# COMMAND ----------
851+
852+
# API latency data in milliseconds
853+
api_latency_df = spark.createDataFrame([
854+
["2024-01-01", i * 10.0] for i in range(1, 101) # 10ms to 1000ms latencies
855+
], "date: string, latency_ms: double")
856+
857+
# Quality check: P95 latency must be under 950ms
858+
checks = [
859+
DQDatasetRule(
860+
criticality="error",
861+
check_func=check_funcs.is_aggr_not_greater_than,
862+
column="latency_ms",
863+
check_func_kwargs={
864+
"aggr_type": "percentile",
865+
"aggr_params": {"percentile": 0.95}, # aggr_params as dict
866+
"group_by": ["date"],
867+
"limit": 950.0
868+
},
869+
),
870+
]
871+
872+
result_df = dq_engine.apply_checks(api_latency_df, checks)
873+
display(result_df)
874+
875+
# COMMAND ----------
876+
877+
# MAGIC %md
878+
# MAGIC #### Example 5: Uniqueness Validation with Count Distinct
879+
# MAGIC
880+
# MAGIC Ensure referential integrity: each country should have exactly one country code.
881+
# MAGIC
882+
# MAGIC Use `count_distinct` for exact cardinality validation across groups.
883+
884+
# COMMAND ----------
885+
886+
# Country data with potential duplicates
887+
country_df = spark.createDataFrame([
888+
["US", "USA"],
889+
["US", "USA"], # OK: same code
890+
["FR", "FRA"],
891+
["FR", "FRN"], # ERROR: different codes for same country
892+
["DE", "DEU"],
893+
], "country: string, country_code: string")
894+
895+
# Quality check: Each country must have exactly one distinct country code
896+
checks = [
897+
DQDatasetRule(
898+
criticality="error",
899+
check_func=check_funcs.is_aggr_not_greater_than,
900+
column="country_code",
901+
check_func_kwargs={
902+
"aggr_type": "count_distinct", # Exact distinct count per group
903+
"group_by": ["country"],
904+
"limit": 1
905+
},
906+
),
907+
]
908+
909+
result_df = dq_engine.apply_checks(country_df, checks)
910+
display(result_df)
911+
912+
# COMMAND ----------
913+
693914
# MAGIC %md
694915
# MAGIC ### Creating custom dataset-level checks
695916
# MAGIC Requirement: Fail all readings from a sensor if any reading for that sensor exceeds a specified threshold from the sensor specification table.

0 commit comments

Comments
 (0)