Skip to content

Commit 9948bd0

Browse files
author
fer
committed
feat(telemetry): Add unified telemetry emitter for Phase 3
Introduces TelemetryEmitter + TelemetryEvent providing JSONL + optional human mirror logging of canonical structural metrics. Includes canonical fields tetrad (Φ_s, |∇φ|, K_φ, ξ_C) via extended suite when available plus coherence_total & sense_index. Preserves all 10 invariants (read-only, no EPI mutation). Lint-compliant line wrapping. Next: operator introspection metadata & grammar-aware error factory integration.
1 parent 43091df commit 9948bd0

File tree

2 files changed

+359
-0
lines changed

2 files changed

+359
-0
lines changed

src/tnfr/metrics/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@
4545
glyphogram_series,
4646
latency_series,
4747
)
48+
from .telemetry import (
49+
TelemetryEmitter,
50+
TelemetryEvent,
51+
stream_telemetry,
52+
)
4853

4954
__all__ = (
5055
"register_metrics_callbacks",
@@ -76,4 +81,7 @@
7681
"compute_phase_coupling_strength",
7782
"is_phase_compatible",
7883
"compute_network_phase_alignment",
84+
"TelemetryEmitter",
85+
"TelemetryEvent",
86+
"stream_telemetry",
7987
)

src/tnfr/metrics/telemetry.py

Lines changed: 351 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,351 @@
1+
"""Unified telemetry emitter for TNFR Phase 3.
2+
3+
This module provides a lightweight, unified interface for exporting
4+
structural metrics and canonical field measurements during simulations.
5+
6+
Design Goals (Phase 3):
7+
-----------------------
8+
1. Physics fidelity: All metrics trace directly to TNFR invariants or
9+
canonical structural fields (Φ_s, |∇φ|, K_φ, ξ_C plus extended suite).
10+
2. Zero mutation: Telemetry collection MUST NOT mutate EPI or ΔNFR.
11+
3. Low overhead: Target <5% added wall time per sampling interval.
12+
4. Fractality aware: Works for nested EPIs (operational fractality).
13+
5. Reproducibility: Includes seed + run id for trajectory replay.
14+
6. Grammar alignment: Does not interfere with operator sequencing
15+
(U1-U4); U6 confinement data is read-only.
16+
17+
Core Concepts:
18+
--------------
19+
TelemetryEvent: Immutable snapshot of structural metrics.
20+
TelemetryEmitter: Context-managed collector writing JSON Lines and/or
21+
human-readable summaries. Batching is optional; immediate flush by
22+
default for reliability on long runs.
23+
24+
Minimal Public API:
25+
-------------------
26+
TelemetryEmitter(path).record(G, step=..., operator=..., extra=...)
27+
TelemetryEmitter(path).flush()
28+
29+
Extension Points:
30+
-----------------
31+
- Add selective sampling policies
32+
- Integrate performance guardrails (duration stats)
33+
- Attach operator introspection metadata (to be added in Phase 3 task)
34+
35+
Invariants Preserved:
36+
---------------------
37+
1. EPI changes only via operators (no mutation here)
38+
2. νf units preserved (Hz_str not altered)
39+
3. ΔNFR semantics retained (never reframed as loss)
40+
4. Operator closure untouched
41+
5. Phase verification external (we only read phase values)
42+
6. Lifecycle unaffected
43+
7. Fractality supported through recursive traversal utilities (future)
44+
8. Determinism: seed included if provided
45+
9. Structural metrics exported (C(t), Si, phase, νf + fields)
46+
10. Domain neutrality: No domain-specific assumptions
47+
48+
NOTE: This initial implementation focuses on correctness & clarity.
49+
Performance guardrails and operator introspection will hook into this
50+
emitter in subsequent Phase 3 steps.
51+
"""
52+
53+
from __future__ import annotations
54+
55+
from dataclasses import dataclass, asdict
56+
from datetime import datetime
57+
from pathlib import Path
58+
from typing import Any, Iterable, Mapping, MutableMapping
59+
import json
60+
import time
61+
62+
try: # Physics field computations (canonical tetrad + extended suite)
63+
from ..physics.fields import (
64+
compute_extended_canonical_suite, # returns dict
65+
compute_structural_potential,
66+
compute_phase_gradient,
67+
compute_phase_curvature,
68+
estimate_coherence_length,
69+
)
70+
except Exception: # pragma: no cover - graceful degradation
71+
compute_extended_canonical_suite = None # type: ignore
72+
compute_structural_potential = None # type: ignore
73+
compute_phase_gradient = None # type: ignore
74+
compute_phase_curvature = None # type: ignore
75+
estimate_coherence_length = None # type: ignore
76+
77+
try: # Existing metrics
78+
from .sense_index import sense_index # type: ignore
79+
except Exception: # pragma: no cover
80+
sense_index = None # type: ignore
81+
82+
try:
83+
from .coherence import compute_coherence # type: ignore
84+
except Exception: # pragma: no cover
85+
compute_coherence = None # type: ignore
86+
87+
__all__ = ["TelemetryEmitter", "TelemetryEvent"]
88+
89+
90+
@dataclass(frozen=True, slots=True)
91+
class TelemetryEvent:
92+
"""Immutable telemetry snapshot.
93+
94+
Fields
95+
------
96+
t_iso : str
97+
ISO8601 timestamp for wall-clock time.
98+
t_epoch : float
99+
Seconds since UNIX epoch.
100+
step : int | None
101+
Simulation step / operator index (if provided).
102+
operator : str | None
103+
Last applied operator mnemonic (AL, IL, OZ, etc.).
104+
metrics : Mapping[str, Any]
105+
Structural metrics dictionary.
106+
extra : Mapping[str, Any] | None
107+
User-supplied contextual additions (seed, run_id, notes, ...).
108+
"""
109+
110+
t_iso: str
111+
t_epoch: float
112+
step: int | None
113+
operator: str | None
114+
metrics: Mapping[str, Any]
115+
extra: Mapping[str, Any] | None = None
116+
117+
118+
class TelemetryEmitter:
119+
"""Unified telemetry collector for TNFR simulations.
120+
121+
Parameters
122+
----------
123+
path : str | Path
124+
Output file path (JSON Lines). Parent directories are created.
125+
flush_interval : int, default=1
126+
Number of events to batch before auto-flush. 1 = flush each event.
127+
include_extended : bool, default=True
128+
If True, compute extended canonical suite when available for
129+
efficiency; otherwise compute tetrad fields individually.
130+
safe : bool, default=True
131+
If True, wraps metric computations in try/except returning partial
132+
results on failure (never raises during record).
133+
human_mirror : bool, default=False
134+
If True, writes a sibling *.log file with concise summaries.
135+
136+
Notes
137+
-----
138+
The emitter never mutates graph state; it only reads node attributes.
139+
"""
140+
141+
def __init__(
142+
self,
143+
path: str | Path,
144+
*,
145+
flush_interval: int = 1,
146+
include_extended: bool = True,
147+
safe: bool = True,
148+
human_mirror: bool = False,
149+
) -> None:
150+
self.path = Path(path)
151+
self.path.parent.mkdir(parents=True, exist_ok=True)
152+
self.flush_interval = max(1, int(flush_interval))
153+
self.include_extended = bool(include_extended)
154+
self.safe = bool(safe)
155+
self.human_mirror = bool(human_mirror)
156+
self._buffer: list[TelemetryEvent] = []
157+
self._start_time = time.perf_counter()
158+
self._human_path = (
159+
self.path.with_suffix(".log") if self.human_mirror else None
160+
)
161+
162+
# ------------------------------------------------------------------
163+
# Context manager
164+
# ------------------------------------------------------------------
165+
def __enter__(self) -> "TelemetryEmitter": # noqa: D401
166+
return self
167+
168+
def __exit__(self, exc_type, exc, tb) -> None: # noqa: D401
169+
try:
170+
self.flush()
171+
finally:
172+
# No open handles to close (using append mode on demand)
173+
pass
174+
175+
# ------------------------------------------------------------------
176+
# Public API
177+
# ------------------------------------------------------------------
178+
def record(
179+
self,
180+
G: Any,
181+
*,
182+
step: int | None = None,
183+
operator: str | None = None,
184+
extra: Mapping[str, Any] | None = None,
185+
) -> TelemetryEvent:
186+
"""Capture a telemetry snapshot.
187+
188+
Parameters
189+
----------
190+
G : Any
191+
TNFR graph-like object with node attributes.
192+
step : int | None
193+
Simulation step index.
194+
operator : str | None
195+
Last operator mnemonic for sequencing context.
196+
extra : Mapping[str, Any] | None
197+
Additional context (seed, run_id, grammar_state, etc.).
198+
"""
199+
200+
metrics: MutableMapping[str, Any] = {}
201+
202+
def _compute() -> None:
203+
# Core structural metrics
204+
if compute_coherence is not None:
205+
try:
206+
metrics["coherence_total"] = float(compute_coherence(G))
207+
except Exception:
208+
if not self.safe:
209+
raise
210+
if sense_index is not None:
211+
try:
212+
metrics["sense_index"] = float(sense_index(G))
213+
except Exception:
214+
if not self.safe:
215+
raise
216+
217+
# Canonical field tetrad (plus extended suite if available)
218+
if (
219+
self.include_extended
220+
and compute_extended_canonical_suite is not None
221+
):
222+
try:
223+
suite = compute_extended_canonical_suite(G)
224+
if isinstance(suite, Mapping):
225+
for k, v in suite.items():
226+
metrics[k] = v
227+
except Exception:
228+
if not self.safe:
229+
raise
230+
else:
231+
# Tetrad individually
232+
if compute_structural_potential is not None:
233+
try:
234+
metrics["phi_s"] = compute_structural_potential(G)
235+
except Exception:
236+
if not self.safe:
237+
raise
238+
if compute_phase_gradient is not None:
239+
try:
240+
metrics["phase_grad"] = compute_phase_gradient(G)
241+
except Exception:
242+
if not self.safe:
243+
raise
244+
if compute_phase_curvature is not None:
245+
try:
246+
metrics["phase_curv"] = compute_phase_curvature(G)
247+
except Exception:
248+
if not self.safe:
249+
raise
250+
if estimate_coherence_length is not None:
251+
try:
252+
metrics["xi_c"] = estimate_coherence_length(G)
253+
except Exception:
254+
if not self.safe:
255+
raise
256+
257+
if self.safe:
258+
try:
259+
_compute()
260+
except Exception:
261+
# Swallow and proceed with partial metrics
262+
pass
263+
else:
264+
_compute()
265+
266+
event = TelemetryEvent(
267+
t_iso=datetime.utcnow().isoformat(timespec="seconds") + "Z",
268+
t_epoch=time.time(),
269+
step=step,
270+
operator=operator,
271+
metrics=dict(metrics),
272+
extra=dict(extra) if extra else None,
273+
)
274+
self._buffer.append(event)
275+
if len(self._buffer) >= self.flush_interval:
276+
self.flush()
277+
return event
278+
279+
def flush(self) -> None:
280+
"""Flush buffered telemetry events to disk."""
281+
if not self._buffer:
282+
return
283+
# JSON Lines write
284+
with self.path.open("a", encoding="utf-8") as fh:
285+
for ev in self._buffer:
286+
fh.write(json.dumps(asdict(ev), ensure_ascii=False) + "\n")
287+
if self._human_path is not None:
288+
with self._human_path.open("a", encoding="utf-8") as hf:
289+
for ev in self._buffer:
290+
coh = ev.metrics.get("coherence_total")
291+
si = ev.metrics.get("sense_index")
292+
phi = (
293+
ev.metrics.get("phi_s")
294+
or ev.metrics.get("structural_potential")
295+
)
296+
hf.write(
297+
(
298+
f"[{ev.step}] op={ev.operator} C={coh:.3f} "
299+
f"Si={si:.3f} Φ_s={phi} t={ev.t_iso}\n"
300+
)
301+
)
302+
self._buffer.clear()
303+
304+
# ------------------------------------------------------------------
305+
# Introspection / diagnostics
306+
# ------------------------------------------------------------------
307+
def stats(self) -> dict[str, Any]:
308+
"""Return emitter internal statistics (buffer + runtime)."""
309+
return {
310+
"buffer_len": len(self._buffer),
311+
"flush_interval": self.flush_interval,
312+
"include_extended": self.include_extended,
313+
"uptime_sec": time.perf_counter() - self._start_time,
314+
"path": str(self.path),
315+
}
316+
317+
318+
# Convenience helper -------------------------------------------------------
319+
def stream_telemetry(
320+
G: Any,
321+
*,
322+
emitter: TelemetryEmitter,
323+
steps: Iterable[int],
324+
operator_sequence: Iterable[str] | None = None,
325+
extra: Mapping[str, Any] | None = None,
326+
) -> list[TelemetryEvent]:
327+
"""Record telemetry across a sequence of steps.
328+
329+
Parameters
330+
----------
331+
G : Any
332+
TNFR graph instance.
333+
emitter : TelemetryEmitter
334+
Active telemetry emitter.
335+
steps : Iterable[int]
336+
Step indices to record.
337+
operator_sequence : Iterable[str] | None
338+
Optional operator mnemonics aligned with steps.
339+
extra : Mapping[str, Any] | None
340+
Additional context (seed/run id).
341+
"""
342+
343+
events: list[TelemetryEvent] = []
344+
ops_iter = (
345+
iter(operator_sequence) if operator_sequence is not None else None
346+
)
347+
for s in steps:
348+
op_name = next(ops_iter) if ops_iter is not None else None
349+
events.append(emitter.record(G, step=s, operator=op_name, extra=extra))
350+
emitter.flush()
351+
return events

0 commit comments

Comments
 (0)