1+ """MicroMetricLogger implementation for sparse attention hub."""
2+
3+ import json
4+ import os
5+ import time
6+ from collections import deque
7+ from dataclasses import dataclass , asdict
8+ from datetime import datetime
9+ from typing import Any , Dict , List , Optional , Union
10+ import inspect
11+
12+
13+ @dataclass
14+ class LogEvent :
15+ """Log event data structure."""
16+ timestamp : datetime
17+ metric : str # Metric identifier string
18+ value : Union [None , Any ]
19+ metadata : Dict [str , Any ] # Additional context (layer, head, etc.)
20+ location : str # Auto-inferred: "module.function" or "class.method"
21+
22+
23+ class MicroMetricLogger :
24+ """Singleton logger for micro metrics with queue-based architecture."""
25+
26+ _instance : Optional ["MicroMetricLogger" ] = None
27+ _initialized : bool = False
28+
29+ # Class-level storage for registered metrics (works without initialization)
30+ _registered_metrics : Dict [str , type ] = {} # identifier -> dtype mapping
31+
32+ def __new__ (cls , * args , ** kwargs ) -> "MicroMetricLogger" :
33+ if cls ._instance is None :
34+ cls ._instance = super ().__new__ (cls )
35+ return cls ._instance
36+
37+ def __init__ (self ,
38+ log_path : Optional [str ] = None ,
39+ flush_every : int = 100 , # Flush every N events
40+ flush_interval : float = 60.0 , # Flush every N seconds
41+ enabled_metrics : Union [List [str ], str ] = None ): # List of string identifiers to enable, or "all"
42+ if not self ._initialized :
43+ self .log_path = log_path
44+ self .flush_every = flush_every
45+ self .flush_interval = flush_interval
46+
47+ # Internal state
48+ self .log_queue : deque = deque (maxlen = 10000 ) # Circular buffer
49+ self .enabled_metrics : set = set ()
50+ self .last_flush_time = time .time ()
51+
52+ # Enable metrics if log_path is provided
53+ if self .log_path is not None :
54+ self ._ensure_log_directory ()
55+ self .enable_metrics (enabled_metrics )
56+
57+ MicroMetricLogger ._initialized = True
58+
59+ # main registration function
60+
61+ @classmethod
62+ def register_metric (cls , identifier : str , dtype : type ) -> None :
63+ """Register a metric with its string identifier and expected data type.
64+
65+ This works at class level and doesn't require initialization.
66+ """
67+ if identifier in cls ._registered_metrics :
68+ print (f"Warning: Metric '{ identifier } ' is being re-registered" )
69+ cls ._registered_metrics [identifier ] = dtype
70+
71+ @classmethod
72+ def get_registered_metrics (cls ) -> Dict [str , type ]:
73+ """Get all registered metrics at class level."""
74+ return cls ._registered_metrics .copy ()
75+
76+
77+ # helper methods
78+
79+ def _ensure_log_directory (self ) -> None :
80+ """Ensure the log directory exists."""
81+ if self .log_path is not None :
82+ os .makedirs (self .log_path , exist_ok = True )
83+
84+ def _get_calling_location (self ) -> str :
85+ """Get the calling location using inspect module."""
86+ try :
87+ # Get the calling frame (skip this method and the log method)
88+ caller_frame = inspect .currentframe ().f_back .f_back
89+ if caller_frame is None :
90+ return "unknown"
91+
92+ # Get module name
93+ module = inspect .getmodule (caller_frame )
94+ module_name = module .__name__ if module else "unknown"
95+
96+ # Get function/class name
97+ function_name = caller_frame .f_code .co_name
98+
99+ # Try to get class name if it's a method
100+ class_name = None
101+ if 'self' in caller_frame .f_locals :
102+ class_name = caller_frame .f_locals ['self' ].__class__ .__name__
103+
104+ if class_name :
105+ return f"{ module_name } .{ class_name } .{ function_name } "
106+ else :
107+ return f"{ module_name } .{ function_name } "
108+ except Exception :
109+ return "unknown"
110+
111+ def __del__ (self ):
112+ """Cleanup when logger is destroyed."""
113+ self .flush () # Final flush
114+
115+
116+ # api
117+
118+ def enable_metrics (self , metrics : Union [List [str ], str ] = None ) -> None :
119+ """Enable logging for specific metrics.
120+
121+ Args:
122+ metrics: List of metric identifiers to enable, or "all" for all registered metrics.
123+ If None, enables no metrics (empty list).
124+ """
125+ if metrics == "all" :
126+ self .enabled_metrics = set (self ._registered_metrics .keys ())
127+ elif isinstance (metrics , (list , set )):
128+ # Only enable metrics that are registered
129+ valid_metrics = set (metrics ) & set (self ._registered_metrics .keys ())
130+ invalid_metrics = set (metrics ) - set (self ._registered_metrics .keys ())
131+ if invalid_metrics :
132+ print (f"Warning: Attempting to enable unregistered metrics: { invalid_metrics } " )
133+ self .enabled_metrics = valid_metrics
134+ else :
135+ # Default to empty set
136+ self .enabled_metrics = set ()
137+
138+ def log (self , identifier : str , value : Any , metadata : Dict [str , Any ] = None ) -> None :
139+ """Log a metric value with optional metadata. Location is auto-inferred.
140+
141+ This only works if log_path is defined.
142+ """
143+ # Check if logging is configured
144+ if self .log_path is None :
145+ print (f"Warning: Cannot log metric '{ identifier } ' - log_path not defined. Use configure_logging() first." )
146+ return
147+
148+ # Check if metric is enabled
149+ if identifier not in self .enabled_metrics :
150+ print (f"Warning: Attempting to log metric '{ identifier } ' which is not enabled" )
151+ return
152+
153+ # Create log event
154+ event = LogEvent (
155+ timestamp = datetime .now (),
156+ metric = identifier ,
157+ value = value ,
158+ metadata = metadata or {},
159+ location = self ._get_calling_location ()
160+ )
161+
162+ # Add to queue
163+ self .log_queue .append (event )
164+
165+ # Check if we should flush
166+ if len (self .log_queue ) >= self .flush_every :
167+ self .flush ()
168+
169+ def configure_logging (self , log_path : str , enabled_metrics : Union [List [str ], str ] = None ) -> None :
170+ """Configure logging with a log path and optionally enable metrics.
171+
172+ This must be called before logging can work.
173+ """
174+ self .log_path = log_path
175+ self ._ensure_log_directory ()
176+ self .enable_metrics (enabled_metrics )
177+
178+ def flush (self ) -> None :
179+ """Force flush the current queue to disk."""
180+ if not self .log_queue or self .log_path is None :
181+ return
182+
183+ # Get current timestamp for filename
184+ timestamp = datetime .now ().strftime ("%Y%m%d" )
185+ filename = f"metrics_{ timestamp } .jsonl"
186+ filepath = os .path .join (self .log_path , filename )
187+
188+ # Write events to file
189+ with open (filepath , "a" , encoding = "utf-8" ) as f :
190+ while self .log_queue :
191+ event = self .log_queue .popleft ()
192+ # Convert dataclass to dict and serialize
193+ event_dict = asdict (event )
194+ # Convert datetime to ISO format string
195+ event_dict ["timestamp" ] = event_dict ["timestamp" ].isoformat ()
196+ f .write (json .dumps (event_dict ) + "\n " )
197+
198+ self .last_flush_time = time .time ()
199+
200+ def is_metric_enabled (self , identifier : str ) -> bool :
201+ """Check if a specific metric is requested for logging."""
202+ return identifier in self .enabled_metrics
203+
204+ def get_enabled_metrics (self ) -> set :
205+ """Get currently enabled metrics."""
206+ return self .enabled_metrics .copy ()
207+
208+ def is_logging_configured (self ) -> bool :
209+ """Check if logging is configured (log_path is set)."""
210+ return self .log_path is not None
0 commit comments