55use aws_lambda_events:: event:: cloudwatch_events:: CloudWatchEvent ;
66use aws_sdk_cloudwatch:: {
77 primitives:: DateTime ,
8- types:: { MetricDatum , StandardUnit } ,
8+ types:: { Dimension , MetricDatum , StandardUnit } ,
99} ;
10- use deltalake:: datafusion:: common:: * ;
11- use deltalake:: datafusion:: execution:: context:: SessionContext ;
10+ use deltalake_core:: arrow:: { array:: PrimitiveArray , datatypes:: Int64Type } ;
11+ use deltalake_core:: datafusion:: common:: * ;
12+ use deltalake_core:: datafusion:: execution:: context:: SessionContext ;
1213use lambda_runtime:: { run, service_fn, Error , LambdaEvent } ;
1314use tracing:: log:: * ;
1415
16+ use std:: collections:: HashMap ;
1517use std:: sync:: Arc ;
1618use std:: time:: SystemTime ;
1719
1820mod config;
1921
2022async fn function_handler ( _event : LambdaEvent < CloudWatchEvent > ) -> Result < ( ) , Error > {
23+ deltalake_aws:: register_handlers ( None ) ;
24+
2125 let aws_config = aws_config:: load_defaults ( aws_config:: BehaviorVersion :: latest ( ) ) . await ;
2226 let cloudwatch = aws_sdk_cloudwatch:: Client :: new ( & aws_config) ;
2327
@@ -31,7 +35,7 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
3135 for gauge in gauges. iter ( ) {
3236 debug ! ( "Querying the {name} table" ) ;
3337 let ctx = SessionContext :: new ( ) ;
34- let table = deltalake :: open_table ( & gauge. url )
38+ let table = deltalake_core :: open_table ( & gauge. url )
3539 . await
3640 . expect ( "Failed to register table" ) ;
3741 ctx. register_table ( "source" , Arc :: new ( table) )
@@ -64,6 +68,68 @@ async fn function_handler(_event: LambdaEvent<CloudWatchEvent>) -> Result<(), Er
6468 . await ?;
6569 debug ! ( "Result of CloudWatch send: {res:?}" ) ;
6670 }
71+ config:: Measurement :: DimensionalCount => {
72+ let batches = df. collect ( ) . await . expect ( "Failed to collect batches" ) ;
73+ debug ! ( "I see this many batches: {}" , batches. len( ) ) ;
74+
75+ // Interestingly the collect produces a lot of zero row batches
76+ for batch in batches. iter ( ) . filter ( |b| b. num_rows ( ) > 0 ) {
77+ if let Some ( _counts) = batch. column_by_name ( "count" ) {
78+ // Fetching the count column just to ensure that it exists before doing
79+ // any more computation
80+ let schema = batch. schema ( ) ;
81+ let fields = schema. fields ( ) ;
82+
83+ for row in 0 ..batch. num_rows ( ) {
84+ let mut dimensions: HashMap < String , String > = HashMap :: new ( ) ;
85+ let mut counted = false ;
86+ let mut count = 0 ;
87+
88+ for ( idx, column) in batch. columns ( ) . iter ( ) . enumerate ( ) {
89+ let field = & fields[ idx] ;
90+ let name = field. name ( ) ;
91+ if name == "count" {
92+ let arr: & PrimitiveArray < Int64Type > =
93+ arrow:: array:: cast:: as_primitive_array ( & column) ;
94+ count = arr. value ( row) ;
95+ counted = true ;
96+ } else {
97+ let arr = arrow:: array:: cast:: as_string_array ( & column) ;
98+ dimensions. insert ( name. into ( ) , arr. value ( row) . into ( ) ) ;
99+ }
100+ }
101+
102+ if counted {
103+ debug ! ( "{count}: {dimensions:?}" ) ;
104+ let mut dims: Vec < Dimension > = vec ! [ ] ;
105+
106+ for ( key, value) in dimensions. iter ( ) {
107+ dims. push (
108+ Dimension :: builder ( ) . name ( key) . value ( value) . build ( ) ,
109+ ) ;
110+ }
111+ let datum = MetricDatum :: builder ( )
112+ . metric_name ( & gauge. name )
113+ . timestamp ( DateTime :: from ( SystemTime :: now ( ) ) )
114+ . set_dimensions ( Some ( dims) )
115+ . value ( count as f64 )
116+ . unit ( StandardUnit :: Count )
117+ . build ( ) ;
118+
119+ let res = cloudwatch
120+ . put_metric_data ( )
121+ . namespace ( format ! ( "DataLake/{name}" ) )
122+ . metric_data ( datum)
123+ . send ( )
124+ . await ?;
125+ debug ! ( "Result of CloudWatch send: {res:?}" ) ;
126+ }
127+ }
128+ } else {
129+ error ! ( "The result set must have a column named `count`" ) ;
130+ }
131+ }
132+ }
67133 }
68134 }
69135 }
0 commit comments