@@ -1008,45 +1008,45 @@ def is_empty(self) -> bool:
10081008
10091009 def aggregate (self , other : KVConnectorStats ) -> KVConnectorStats :
10101010 """Aggregate stats from another worker, preserving per-worker separation"""
1011- if not other .is_empty ():
1012- assert isinstance (
1013- other , UCMKVConnectorStats
1014- ), "Expected UCMKVConnectorStats"
1015- for worker_rank , worker_data in other .data .items ():
1016- if worker_rank not in self .data :
1017- self .data [worker_rank ] = copy .deepcopy (worker_data )
1018- else :
1019- for metric_name , values in worker_data .items ():
1020- if metric_name not in self .data [worker_rank ]:
1021- self .data [worker_rank ][metric_name ] = []
1022- self .data [worker_rank ][metric_name ].extend (values )
1011+ if other .is_empty ():
1012+ return self
1013+
1014+ assert isinstance (other , UCMKVConnectorStats ), "Expected UCMKVConnectorStats"
1015+
1016+ for worker_rank , worker_data in other .data .items ():
1017+ if worker_rank not in self .data :
1018+ self .data [worker_rank ] = copy .deepcopy (worker_data )
1019+ continue
1020+
1021+ for metric_name , values in worker_data .items ():
1022+ self .data [worker_rank ].setdefault (metric_name , []).extend (values )
1023+
10231024 return self
10241025
10251026 def reduce (self ) -> dict [str , int | float ]:
1026- """Reduce the observations to representative values for CLI logging"""
1027+ """Reduce the observations to representative values for CLI logging. """
10271028 result = {}
10281029 is_count_metric = lambda k : any (x in k for x in ["num" , "requests" , "blocks" ])
10291030 is_hit_rate_metric = lambda k : "hit_rate" in k or "hit_rates" in k
1030-
1031+
10311032 for worker_rank , worker_data in sorted (self .data .items ()):
10321033 for metric_name , values in worker_data .items ():
1033- # For hit rate metrics, only show worker_0
10341034 if is_hit_rate_metric (metric_name ) and worker_rank != "0" :
10351035 continue
1036-
1036+
10371037 suffix = "(total)" if is_count_metric (metric_name ) else "(avg)"
10381038 if is_hit_rate_metric (metric_name ) and worker_rank == "0" :
10391039 worker_key = f"{ metric_name } { suffix } "
10401040 else :
10411041 worker_key = f"worker_{ worker_rank } _{ metric_name } { suffix } "
1042-
1042+
10431043 if not values :
10441044 result [worker_key ] = 0 if is_count_metric (metric_name ) else 0.0
10451045 elif is_count_metric (metric_name ):
10461046 result [worker_key ] = int (sum (values ))
10471047 else :
10481048 result [worker_key ] = round (sum (values ) / len (values ), 3 )
1049-
1049+
10501050 return result
10511051
10521052
0 commit comments