@@ -1017,27 +1017,18 @@ def is_empty(self) -> bool:
10171017 def aggregate (self , other : KVConnectorStats ) -> KVConnectorStats :
10181018 """Aggregate stats from another worker, preserving per-worker separation"""
10191019 if not other .is_empty ():
1020- if isinstance (other , UCMKVConnectorStats ):
1021- # Other is also UCMKVConnectorStats with per-worker data
1022- for worker_rank , worker_data in other .data .items ():
1023- if worker_rank not in self .data :
1024- self .data [worker_rank ] = copy .deepcopy (worker_data )
1025- else :
1026- # Aggregate metrics for this worker
1027- for metric_name , values in worker_data .items ():
1028- if metric_name not in self .data [worker_rank ]:
1029- self .data [worker_rank ][metric_name ] = []
1030- self .data [worker_rank ][metric_name ].extend (values )
1031- else :
1032- # Other is a different type, treat as single worker "0"
1033- worker_key = "0"
1034- if worker_key not in self .data :
1035- self .data [worker_key ] = {}
1036-
1037- for metric_name , values in other .data .items ():
1038- if metric_name not in self .data [worker_key ]:
1039- self .data [worker_key ][metric_name ] = []
1040- self .data [worker_key ][metric_name ].extend (values )
1020+ assert isinstance (
1021+ other , UCMKVConnectorStats
1022+ ), "Expected UCMKVConnectorStats"
1023+ for worker_rank , worker_data in other .data .items ():
1024+ if worker_rank not in self .data :
1025+ self .data [worker_rank ] = copy .deepcopy (worker_data )
1026+ else :
1027+ # Aggregate metrics for this worker
1028+ for metric_name , values in worker_data .items ():
1029+ if metric_name not in self .data [worker_rank ]:
1030+ self .data [worker_rank ][metric_name ] = []
1031+ self .data [worker_rank ][metric_name ].extend (values )
10411032 return self
10421033
10431034 def reduce (self ) -> dict [str , int | float ]:
@@ -1202,17 +1193,10 @@ def _init_metrics_from_config(
12021193 "extended_labelnames" : extended_labelnames ,
12031194 }
12041195
1205- def observe (
1206- self ,
1207- transfer_stats_data : dict [str , Any ] | KVConnectorStats ,
1208- engine_idx : int = 0 ,
1209- ):
1196+ def observe (self , transfer_stats_data : dict [str , Any ], engine_idx : int = 0 ):
12101197 """
12111198 Record transfer statistics to Prometheus metrics based on configuration.
12121199 """
1213- if isinstance (transfer_stats_data , KVConnectorStats ):
1214- transfer_stats_data = transfer_stats_data .data
1215-
12161200 if transfer_stats_data and isinstance (transfer_stats_data , dict ):
12171201 first_key = next (iter (transfer_stats_data .keys ()), None )
12181202 if first_key and isinstance (transfer_stats_data [first_key ], dict ):
0 commit comments