@@ -1010,43 +1010,43 @@ def aggregate(self, other: KVConnectorStats) -> KVConnectorStats:
10101010 """Aggregate stats from another worker, preserving per-worker separation"""
10111011 if other .is_empty ():
10121012 return self
1013-
1013+
10141014 assert isinstance (other , UCMKVConnectorStats ), "Expected UCMKVConnectorStats"
1015-
1015+
10161016 for worker_rank , worker_data in other .data .items ():
10171017 if worker_rank not in self .data :
10181018 self .data [worker_rank ] = copy .deepcopy (worker_data )
10191019 continue
1020-
1020+
10211021 for metric_name , values in worker_data .items ():
10221022 self .data [worker_rank ].setdefault (metric_name , []).extend (values )
1023-
1023+
10241024 return self
10251025
10261026 def reduce (self ) -> dict [str , int | float ]:
10271027 """Reduce the observations to representative values for CLI logging."""
10281028 result = {}
10291029 is_count_metric = lambda k : any (x in k for x in ["num" , "requests" , "blocks" ])
10301030 is_hit_rate_metric = lambda k : "hit_rate" in k or "hit_rates" in k
1031-
1031+
10321032 for worker_rank , worker_data in sorted (self .data .items ()):
10331033 for metric_name , values in worker_data .items ():
10341034 if is_hit_rate_metric (metric_name ) and worker_rank != "0" :
10351035 continue
1036-
1036+
10371037 suffix = "(total)" if is_count_metric (metric_name ) else "(avg)"
10381038 if is_hit_rate_metric (metric_name ) and worker_rank == "0" :
10391039 worker_key = f"{ metric_name } { suffix } "
10401040 else :
10411041 worker_key = f"worker_{ worker_rank } _{ metric_name } { suffix } "
1042-
1042+
10431043 if not values :
10441044 result [worker_key ] = 0 if is_count_metric (metric_name ) else 0.0
10451045 elif is_count_metric (metric_name ):
10461046 result [worker_key ] = int (sum (values ))
10471047 else :
10481048 result [worker_key ] = round (sum (values ) / len (values ), 3 )
1049-
1049+
10501050 return result
10511051
10521052
0 commit comments