@@ -974,17 +974,9 @@ def build_prom_metrics(
974974
975975@dataclass
976976class UCMKVConnectorStats (KVConnectorStats ):
977- """
978- Container for UCM connector transfer performance metrics.
979- Data structure: {worker_rank: {metric_name: [values]}}
980- """
977+ """Data structure: {worker_rank: {metric_name: [values]}}"""
981978
982- def __post_init__ (self ):
983- # Initialize data as dict of dicts: {worker_rank: {metric_name: [values]}}
984- if not self .data :
985- self .data : dict [str , dict [str , list [float ]]] = {}
986-
987- def reset (self , metric_names : list [str ] | None = None ):
979+ def reset (self ):
988980 """Reset stats for all workers"""
989981 for worker_data in self .data .values ():
990982 worker_data .clear ()
@@ -1024,30 +1016,25 @@ def aggregate(self, other: KVConnectorStats) -> KVConnectorStats:
10241016 if worker_rank not in self .data :
10251017 self .data [worker_rank ] = copy .deepcopy (worker_data )
10261018 else :
1027- # Aggregate metrics for this worker
10281019 for metric_name , values in worker_data .items ():
10291020 if metric_name not in self .data [worker_rank ]:
10301021 self .data [worker_rank ][metric_name ] = []
10311022 self .data [worker_rank ][metric_name ].extend (values )
10321023 return self
10331024
10341025 def reduce (self ) -> dict [str , int | float ]:
1035- """
1036- Reduce the observations to representative values for CLI logging.
1037- """
1026+ """Reduce the observations to representative values for CLI logging"""
10381027 result = {}
10391028 is_count_metric = lambda k : any (x in k for x in ["num" , "requests" , "blocks" ])
10401029 is_hit_rate_metric = lambda k : "hit_rate" in k or "hit_rates" in k
10411030
1042- # Output each worker's stats separately
10431031 for worker_rank , worker_data in sorted (self .data .items ()):
10441032 for metric_name , values in worker_data .items ():
10451033 # For hit rate metrics, only show worker_0
10461034 if is_hit_rate_metric (metric_name ) and worker_rank != "0" :
10471035 continue
10481036
10491037 suffix = "(total)" if is_count_metric (metric_name ) else "(avg)"
1050- # For hit rate metrics from worker_0, don't include worker prefix
10511038 if is_hit_rate_metric (metric_name ) and worker_rank == "0" :
10521039 worker_key = f"{ metric_name } { suffix } "
10531040 else :
@@ -1069,7 +1056,7 @@ class UCMPromMetrics(KVConnectorPromMetrics):
10691056 Records metrics from self.monitor data based on metrics_configs.yaml configuration.
10701057 """
10711058
1072- _config_cache : Dict [str , Dict [str , Any ]] = {} # Cache for loaded configs
1059+ _config_cache : Dict [str , Dict [str , Any ]] = {}
10731060
10741061 def __init__ (
10751062 self ,
@@ -1105,11 +1092,9 @@ def _load_config(self, vllm_config: VllmConfig) -> Dict[str, Any]:
11051092 if not metrics_config_path :
11061093 return {}
11071094
1108- # Check cache first
11091095 if metrics_config_path in self ._config_cache :
11101096 return self ._config_cache [metrics_config_path ]
11111097
1112- # Load and cache config
11131098 try :
11141099 with open (metrics_config_path , "r" ) as f :
11151100 config = yaml .safe_load (f )
@@ -1131,12 +1116,10 @@ def _init_metrics_from_config(
11311116 enabled = prometheus_config .get ("enabled_metrics" , {})
11321117 metric_prefix = prometheus_config .get ("metric_prefix" , "ucm:" )
11331118
1134- # Add worker label to support per-worker metrics
11351119 extended_labelnames = (
11361120 labelnames + ["worker_id" ] if "worker_id" not in labelnames else labelnames
11371121 )
11381122
1139- # Metric type configurations
11401123 metric_types_config = {
11411124 "counter" : (
11421125 self ._counter_cls ,
@@ -1171,7 +1154,6 @@ def _init_metrics_from_config(
11711154 prometheus_name = f"{ metric_prefix } { name } "
11721155 attr_name = f"{ metric_type } _{ name } "
11731156
1174- # Create metric with extended labelnames (including worker)
11751157 metric_kwargs = {
11761158 "name" : prometheus_name ,
11771159 "documentation" : doc ,
@@ -1185,7 +1167,6 @@ def _init_metrics_from_config(
11851167 metric_kwargs ["buckets" ] = metric_cfg .get ("buckets" , [])
11861168
11871169 metric = metric_cls (** metric_kwargs )
1188- # Store base metric for creating per-worker labeled instances
11891170 setattr (self , attr_name , metric )
11901171 self .metric_mappings [name ] = {
11911172 "type" : metric_type ,
@@ -1194,9 +1175,7 @@ def _init_metrics_from_config(
11941175 }
11951176
11961177 def observe (self , transfer_stats_data : dict [str , Any ], engine_idx : int = 0 ):
1197- """
1198- Record transfer statistics to Prometheus metrics based on configuration.
1199- """
1178+ """Record transfer statistics to Prometheus metrics based on configuration."""
12001179 if transfer_stats_data and isinstance (transfer_stats_data , dict ):
12011180 first_key = next (iter (transfer_stats_data .keys ()), None )
12021181 if first_key and isinstance (transfer_stats_data [first_key ], dict ):
@@ -1217,7 +1196,6 @@ def _observe_worker_stats(
12171196 base_labelvalues = self ._per_engine_labelvalues .get (engine_idx , [])
12181197 extended_labelvalues = list (base_labelvalues ) + [str (worker_rank )]
12191198
1220- # Dynamically log metrics based on configuration
12211199 for stat_name , value in worker_stats .items ():
12221200 try :
12231201 if stat_name not in self .metric_mappings :
@@ -1232,7 +1210,6 @@ def _observe_worker_stats(
12321210 metric_type = metric_mapped ["type" ]
12331211 extended_labelnames = metric_mapped .get ("extended_labelnames" , [])
12341212
1235- # Create metric instance with worker label
12361213 if "worker_id" in extended_labelnames :
12371214 per_engine_metric = base_metric .labels (* extended_labelvalues )
12381215 else :
0 commit comments