From c2e9be73ae54b6a4abf8984cd507ab6acb6396fe Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 12 Nov 2025 17:38:43 +0000 Subject: [PATCH 1/4] Add session-leader wait stats --- slurm_usage.py | 251 +++++++++++++++++++++++++++++++++++- tests/test_visualization.py | 187 +++++++++++++++++++++++++++ 2 files changed, 433 insertions(+), 5 deletions(-) diff --git a/slurm_usage.py b/slurm_usage.py index bf1b6de..b7ced7e 100755 --- a/slurm_usage.py +++ b/slurm_usage.py @@ -1188,6 +1188,8 @@ def _load_recent_data( for f in parquet_files: try: df = pl.read_parquet(f) + if data_type == "processed": + df = _ensure_processed_df_schema(df) dfs.append(df) except (OSError, pl.exceptions.ComputeError) as e: # noqa: PERF203 console.print(f"[yellow]Warning: Could not read {f}: {e}[/yellow]") @@ -1383,6 +1385,27 @@ def _processed_jobs_to_dataframe( return pl.DataFrame(data_dicts, schema=schema) +def _ensure_processed_df_schema(df: pl.DataFrame) -> pl.DataFrame: + """Cast processed DataFrame columns to the expected schema.""" + schema = ProcessedJob.get_polars_schema() + if df.is_empty(): + return pl.DataFrame(schema=schema) + + casts: list[pl.Expr] = [] + for column, dtype in schema.items(): + if column in df.columns: + if df.schema[column] != dtype: + casts.append(pl.col(column).cast(dtype, strict=False).alias(column)) + else: + casts.append(pl.lit(None).cast(dtype).alias(column)) + + if casts: + df = df.with_columns(casts) + + # Ensure consistent column ordering and drop unexpected columns + return df.select(list(schema.keys())) + + def _save_processed_jobs_to_parquet( processed_jobs: list[ProcessedJob], file_path: Path, @@ -1434,7 +1457,7 @@ def _fetch_jobs_for_date( # noqa: PLR0912 # Check if we need to re-collect this date if skip_if_complete and processed_file.exists(): try: - df = pl.read_parquet(processed_file) + df = _ensure_processed_df_schema(pl.read_parquet(processed_file)) incomplete = df.filter(pl.col("state").is_in(INCOMPLETE_JOB_STATES)).height if incomplete == 0: if completion_tracker: @@ -1448,7 +1471,7 @@ def _fetch_jobs_for_date( # noqa: PLR0912 existing_job_states: dict[str, str] = {} if processed_file.exists(): try: - existing_df = pl.read_parquet(processed_file) + existing_df = _ensure_processed_df_schema(pl.read_parquet(processed_file)) for row in existing_df.iter_rows(named=True): existing_job_states[row["job_id"]] = row["state"] except (OSError, pl.exceptions.ComputeError): @@ -1479,7 +1502,7 @@ def _fetch_jobs_for_date( # noqa: PLR0912 is_complete = False if processed_file.exists(): try: - df = pl.read_parquet(processed_file) + df = _ensure_processed_df_schema(pl.read_parquet(processed_file)) incomplete = df.filter(pl.col("state").is_in(INCOMPLETE_JOB_STATES)).height is_complete = incomplete == 0 if is_complete and completion_tracker: @@ -1878,6 +1901,22 @@ def _create_node_usage_stats(df: pl.DataFrame) -> None: # ============================================================================ +class SessionLeaderWaitStats(NamedTuple): + """Aggregated wait metrics for session-leader jobs.""" + + leader_jobs: int + total_jobs: int + leader_share: float + users: int + sample_jobs: int + mean_wait_hours: float | None + median_wait_hours: float | None + p90_wait_hours: float | None + p95_wait_hours: float | None + over_two_hours: int + over_six_hours: int + + def _prepare_dataframe_for_analysis(df: pl.DataFrame, config: Config) -> pl.DataFrame: """Prepare DataFrame with additional columns needed for analysis. @@ -1926,6 +1965,207 @@ def _prepare_dataframe_for_analysis(df: pl.DataFrame, config: Config) -> pl.Data ) +def _format_wait_hours(value: float | None) -> str: + """Format a floating hour value as HH:MM.""" + if value is None: + return "N/A" + + total_minutes = int(round(value * 60)) + hours, minutes = divmod(total_minutes, 60) + return f"{hours:02d}:{minutes:02d}" + + +def _identify_session_leader_jobs(df: pl.DataFrame, idle_hours: int = 6) -> pl.DataFrame: + """Return jobs that start a new session after user inactivity.""" + if df.is_empty() or "submit_time" not in df.columns or "user" not in df.columns: + return pl.DataFrame(schema=df.schema) + + relevant_df = df.filter(pl.col("submit_time").is_not_null()) + if relevant_df.is_empty(): + return pl.DataFrame(schema=df.schema) + + idle_seconds = idle_hours * 3600 + + previous_user = pl.col("user").shift(1) + previous_submit = pl.col("submit_time").shift(1) + + leaders_df = ( + relevant_df.sort(["user", "submit_time"]) + .with_columns( + pl.when(previous_user.is_null() | (pl.col("user") != previous_user)) + .then(True) + .otherwise( + ( + (pl.col("submit_time") - previous_submit).dt.total_seconds() + >= idle_seconds + ), + ) + .alias("is_session_leader"), + ) + .filter(pl.col("is_session_leader")) + .drop("is_session_leader") + ) + + return leaders_df + + +def _calculate_session_leader_wait_stats( + leaders_df: pl.DataFrame, + leaders_with_wait: pl.DataFrame, + total_jobs: int, +) -> SessionLeaderWaitStats | None: + """Calculate aggregate wait metrics for session leaders.""" + if leaders_df.is_empty() or total_jobs == 0: + return None + + leader_jobs = len(leaders_df) + leader_share = (leader_jobs / total_jobs) * 100 if total_jobs else 0.0 + user_count = leaders_df["user"].n_unique() + sample_jobs = len(leaders_with_wait) + + if sample_jobs == 0: + return SessionLeaderWaitStats( + leader_jobs=leader_jobs, + total_jobs=total_jobs, + leader_share=leader_share, + users=user_count, + sample_jobs=0, + mean_wait_hours=None, + median_wait_hours=None, + p90_wait_hours=None, + p95_wait_hours=None, + over_two_hours=0, + over_six_hours=0, + ) + + wait_metrics = leaders_with_wait.select( + [ + pl.col("wait_seconds").mean().alias("mean_wait_seconds"), + pl.col("wait_seconds").median().alias("median_wait_seconds"), + pl.col("wait_seconds").quantile(0.9, interpolation="nearest").alias("p90_wait_seconds"), + pl.col("wait_seconds").quantile(0.95, interpolation="nearest").alias("p95_wait_seconds"), + pl.col("wait_seconds").gt(7200).sum().alias("over_two_hours"), + pl.col("wait_seconds").gt(21600).sum().alias("over_six_hours"), + ], + ).to_dicts()[0] + + def _seconds_to_hours(value: float | None) -> float | None: + if value is None: + return None + return float(value) / 3600 + + return SessionLeaderWaitStats( + leader_jobs=leader_jobs, + total_jobs=total_jobs, + leader_share=leader_share, + users=user_count, + sample_jobs=sample_jobs, + mean_wait_hours=_seconds_to_hours(wait_metrics["mean_wait_seconds"]), + median_wait_hours=_seconds_to_hours(wait_metrics["median_wait_seconds"]), + p90_wait_hours=_seconds_to_hours(wait_metrics["p90_wait_seconds"]), + p95_wait_hours=_seconds_to_hours(wait_metrics["p95_wait_seconds"]), + over_two_hours=int(wait_metrics["over_two_hours"]), + over_six_hours=int(wait_metrics["over_six_hours"]), + ) + + +def _create_session_leader_wait_section(df: pl.DataFrame, idle_hours: int = 6) -> None: + """Render wait metrics for first jobs submitted after idle periods.""" + if df.is_empty(): + return + + leaders_df = _identify_session_leader_jobs(df, idle_hours) + if leaders_df.is_empty(): + console.print("[yellow]Not enough submit time data to evaluate first-job waits.[/yellow]") + return + + leaders_with_wait = leaders_df.filter(pl.col("wait_seconds").is_not_null()) + stats = _calculate_session_leader_wait_stats(leaders_df, leaders_with_wait, len(df)) + if stats is None: + console.print("[yellow]Not enough submit time data to evaluate first-job waits.[/yellow]") + return + + console.print( + Panel.fit( + f"First Job After ≥{idle_hours}h Idle — Wait Summary", + style="bold cyan", + box=box.DOUBLE_EDGE, + ), + ) + + summary_table = Table(box=box.ROUNDED) + summary_table.add_column("Metric", style="cyan") + summary_table.add_column("Value", justify="right", style="yellow") + summary_table.add_column("Notes") + + summary_table.add_row( + "Session-Leader Jobs", + f"{stats.leader_jobs:,}", + f"{stats.leader_share:.1f}% of all jobs", + ) + summary_table.add_row("Users Represented", f"{stats.users:,}", "") + summary_table.add_row("Sampled Jobs (with waits)", f"{stats.sample_jobs:,}", "") + summary_table.add_row("Mean Wait (hh:mm)", _format_wait_hours(stats.mean_wait_hours), "") + summary_table.add_row("Median Wait (hh:mm)", _format_wait_hours(stats.median_wait_hours), "") + summary_table.add_row("90th Percentile", _format_wait_hours(stats.p90_wait_hours), "") + summary_table.add_row("95th Percentile", _format_wait_hours(stats.p95_wait_hours), "") + + if stats.sample_jobs > 0: + over_two_pct = (stats.over_two_hours / stats.sample_jobs) * 100 if stats.sample_jobs else 0.0 + over_six_pct = (stats.over_six_hours / stats.sample_jobs) * 100 if stats.sample_jobs else 0.0 + summary_table.add_row( + "Leaders >2h Wait", + f"{stats.over_two_hours:,}", + f"{over_two_pct:.0f}% of sampled", + ) + summary_table.add_row( + "Leaders >6h Wait", + f"{stats.over_six_hours:,}", + f"{over_six_pct:.0f}% of sampled", + ) + else: + summary_table.add_row("Leaders >2h Wait", "0", "No wait data") + summary_table.add_row("Leaders >6h Wait", "0", "No wait data") + + console.print(summary_table) + + if stats.sample_jobs == 0: + return + + wait_hours = (leaders_with_wait["wait_seconds"] / 3600).to_list() + bin_definitions = [ + ("<15m", 0.0, 0.25), + ("15m-2h", 0.25, 2.0), + ("2-6h", 2.0, 6.0), + ("6-12h", 6.0, 12.0), + ("12-18h", 12.0, 18.0), + (">18h", 18.0, float("inf")), + ] + + labels: list[str] = [] + counts: list[int] = [] + for label, start, end in bin_definitions: + if end == float("inf"): + count = sum(1 for hours in wait_hours if hours >= start) + else: + count = sum(1 for hours in wait_hours if start <= hours < end) + labels.append(label) + counts.append(count) + + if any(counts): + console.print("\n") + _create_bar_chart( + labels, + counts, + "First-Job Wait Distribution", + width=40, + top_n=len(labels), + unit="jobs", + show_percentage=True, + item_type="bins", + ) + + def _create_user_statistics_section(df: pl.DataFrame) -> None: """Create and display user statistics table and charts. @@ -2266,6 +2506,7 @@ def _create_summary_stats(df: pl.DataFrame, config: Config) -> None: _create_node_usage_stats(prepared_df) _create_efficiency_analysis_section(prepared_df) _create_cluster_summary_section(prepared_df) + _create_session_leader_wait_section(prepared_df) def _create_daily_usage_chart(df: pl.DataFrame) -> None: @@ -2459,7 +2700,7 @@ def collect( # noqa: PLR0912, PLR0915 # We have new processed jobs to save/merge if processed_file.exists(): # Load existing data - existing_df = pl.read_parquet(processed_file) + existing_df = _ensure_processed_df_schema(pl.read_parquet(processed_file)) new_df = _processed_jobs_to_dataframe(result.processed_jobs) # Merge: keep the most recent version of each job @@ -2517,7 +2758,7 @@ def collect( # noqa: PLR0912, PLR0915 file_path = config.processed_data_dir / f"{date_str}.parquet" if file_path.exists(): with contextlib.suppress(Exception): - dfs.append(pl.read_parquet(file_path)) + dfs.append(_ensure_processed_df_schema(pl.read_parquet(file_path))) if dfs: df = pl.concat(dfs).unique(subset=["job_id"], keep="last") diff --git a/tests/test_visualization.py b/tests/test_visualization.py index 6694472..96bc1f7 100644 --- a/tests/test_visualization.py +++ b/tests/test_visualization.py @@ -2,6 +2,7 @@ from __future__ import annotations +import math import os import sys from datetime import datetime, timedelta, timezone @@ -273,6 +274,158 @@ def test_create_node_usage_stats_empty(self, mock_print: MagicMock) -> None: # # Should handle empty data gracefully +class TestSessionLeaderWaitMetrics: + """Test session leader wait metric helpers.""" + + def test_identify_session_leader_jobs(self, test_dates: dict[str, str]) -> None: + """Ensure session leaders are detected after idle periods.""" + base_day = test_dates["today"] + data = [ + { + "job_id": "job1", + "user": "alice", + "submit_time": datetime.fromisoformat(f"{base_day}T08:00:00"), + "wait_seconds": 600.0, + }, + { + "job_id": "job2", + "user": "alice", + "submit_time": datetime.fromisoformat(f"{base_day}T09:00:00"), + "wait_seconds": 800.0, + }, + { + "job_id": "job3", + "user": "alice", + "submit_time": datetime.fromisoformat(f"{base_day}T16:00:00"), + "wait_seconds": 900.0, + }, + { + "job_id": "job4", + "user": "bob", + "submit_time": datetime.fromisoformat(f"{base_day}T12:00:00"), + "wait_seconds": 1_200.0, + }, + { + "job_id": "job5", + "user": "bob", + "submit_time": datetime.fromisoformat(f"{base_day}T15:00:00"), + "wait_seconds": 1_500.0, + }, + { + "job_id": "job6", + "user": "bob", + "submit_time": datetime.fromisoformat(f"{base_day}T23:30:00"), + "wait_seconds": 1_800.0, + }, + ] + + df = pl.DataFrame(data) + leaders = slurm_usage._identify_session_leader_jobs(df, idle_hours=6) + + assert set(leaders["job_id"].to_list()) == {"job1", "job3", "job4", "job6"} + + def test_calculate_session_leader_wait_stats(self) -> None: + """Verify wait statistics aggregation.""" + leader_data = [ + {"user": "alice", "wait_seconds": 7_200.0}, + {"user": "bob", "wait_seconds": 18_000.0}, + {"user": "bob", "wait_seconds": 3_600.0}, + ] + leaders_df = pl.DataFrame(leader_data) + leaders_with_wait = leaders_df.filter(pl.col("wait_seconds").is_not_null()) + + stats = slurm_usage._calculate_session_leader_wait_stats(leaders_df, leaders_with_wait, total_jobs=10) + + assert stats is not None + assert stats.leader_jobs == 3 + assert math.isclose(stats.leader_share, 30.0) + assert stats.users == 2 + assert stats.sample_jobs == 3 + assert stats.over_two_hours == 1 + assert stats.over_six_hours == 0 + assert stats.mean_wait_hours is not None and math.isclose(stats.mean_wait_hours, 8 / 3, rel_tol=1e-5) + assert stats.median_wait_hours is not None and math.isclose(stats.median_wait_hours, 2.0, rel_tol=1e-5) + assert stats.p90_wait_hours is not None and math.isclose(stats.p90_wait_hours, 5.0, rel_tol=1e-5) + assert stats.p95_wait_hours is not None and math.isclose(stats.p95_wait_hours, 5.0, rel_tol=1e-5) + + def test_session_leader_stats_without_waits(self) -> None: + """Ensure stats handle missing wait data.""" + data = [{"user": "carol", "wait_seconds": None}] + leaders_df = pl.DataFrame(data) + leaders_with_wait = leaders_df.filter(pl.col("wait_seconds").is_not_null()) + + stats = slurm_usage._calculate_session_leader_wait_stats(leaders_df, leaders_with_wait, total_jobs=5) + + assert stats is not None + assert stats.sample_jobs == 0 + assert stats.mean_wait_hours is None + assert stats.over_two_hours == 0 + + @patch("slurm_usage.console.print") + def test_session_leader_section_handles_missing_submit(self, mock_print: MagicMock) -> None: + """Session leader section should warn when submit data missing.""" + df = pl.DataFrame({"user": ["alice"], "submit_time": [None], "wait_seconds": [None]}) + + slurm_usage._create_session_leader_wait_section(df) + + assert any("Not enough" in str(call.args[0]) for call in mock_print.call_args_list) + + @patch("slurm_usage._create_bar_chart") + @patch("slurm_usage.console.print") + def test_session_leader_section_without_waits( + self, + mock_print: MagicMock, + mock_chart: MagicMock, + ) -> None: + """Ensure section renders summary even when no wait values.""" + base_time = datetime(2025, 1, 1, tzinfo=UTC) + df = pl.DataFrame( + { + "user": ["alice", "alice"], + "submit_time": [base_time, base_time + timedelta(hours=8)], + "wait_seconds": [None, None], + }, + ) + + slurm_usage._create_session_leader_wait_section(df) + + assert mock_print.call_count >= 2 + mock_chart.assert_not_called() + + @patch("slurm_usage._create_bar_chart") + @patch("slurm_usage.console.print") + def test_session_leader_section_with_waits( + self, + mock_print: MagicMock, + mock_chart: MagicMock, + ) -> None: + """Ensure wait section shows histogram when data exists.""" + base_time = datetime(2025, 1, 1, tzinfo=UTC) + df = pl.DataFrame( + { + "user": ["alice", "alice", "bob"], + "submit_time": [ + base_time, + base_time + timedelta(hours=7), + base_time, + ], + "wait_seconds": [ + 600.0, # <15m bin + 18_000.0, # 5h -> 2-6h bin and >2h + 30_000.0, # 8h -> 6-12h bin and >6h + ], + }, + ) + + slurm_usage._create_session_leader_wait_section(df) + + mock_chart.assert_called_once() + labels, counts = mock_chart.call_args.args[:2] + assert "2-6h" in labels + assert sum(counts) == 3 + assert mock_print.call_count >= 2 + + class TestSummaryStatistics: """Test summary statistics generation.""" @@ -379,6 +532,40 @@ def test_create_summary_stats_with_groups(self, mock_print: MagicMock, tmp_path: assert mock_print.called +class TestProcessedSchemaHelpers: + """Tests for processed schema normalization.""" + + def test_ensure_schema_adds_missing_columns(self) -> None: + """Missing columns should be added with proper dtypes.""" + df = pl.DataFrame({"job_id": ["job-1"]}) + result = slurm_usage._ensure_processed_df_schema(df) + schema = slurm_usage.ProcessedJob.get_polars_schema() + + assert result.columns == list(schema.keys()) + assert result.schema == schema + + def test_ensure_schema_casts_datetime_fields(self) -> None: + """String timestamps should be converted to UTC datetime columns.""" + df = pl.DataFrame( + { + "job_id": ["job-2"], + "user": ["alice"], + "submit_time": ["2025-01-02T00:00:00+00:00"], + "start_time": ["2025-01-02T08:00:00+00:00"], + "processed_date": ["2025-01-03T10:00:00+00:00"], + }, + ) + + result = slurm_usage._ensure_processed_df_schema(df) + schema = slurm_usage.ProcessedJob.get_polars_schema() + + assert result.schema["submit_time"] == schema["submit_time"] + assert result["submit_time"][0].year == 2025 + assert result["start_time"][0].hour == 8 + processed_dt = result["processed_date"][0] + assert processed_dt.tzinfo is not None + assert processed_dt.utcoffset() == timedelta(0) + class TestErrorHandling: """Test error handling in various functions.""" From fedd93a699bd633272bcd70d3fed7fc695da1d0f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 12 Nov 2025 17:48:48 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- slurm_usage.py | 5 +---- tests/test_visualization.py | 7 ++++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/slurm_usage.py b/slurm_usage.py index b7ced7e..fd69167 100755 --- a/slurm_usage.py +++ b/slurm_usage.py @@ -1995,10 +1995,7 @@ def _identify_session_leader_jobs(df: pl.DataFrame, idle_hours: int = 6) -> pl.D pl.when(previous_user.is_null() | (pl.col("user") != previous_user)) .then(True) .otherwise( - ( - (pl.col("submit_time") - previous_submit).dt.total_seconds() - >= idle_seconds - ), + ((pl.col("submit_time") - previous_submit).dt.total_seconds() >= idle_seconds), ) .alias("is_session_leader"), ) diff --git a/tests/test_visualization.py b/tests/test_visualization.py index 96bc1f7..2d7a480 100644 --- a/tests/test_visualization.py +++ b/tests/test_visualization.py @@ -410,9 +410,9 @@ def test_session_leader_section_with_waits( base_time, ], "wait_seconds": [ - 600.0, # <15m bin - 18_000.0, # 5h -> 2-6h bin and >2h - 30_000.0, # 8h -> 6-12h bin and >6h + 600.0, # <15m bin + 18_000.0, # 5h -> 2-6h bin and >2h + 30_000.0, # 8h -> 6-12h bin and >6h ], }, ) @@ -566,6 +566,7 @@ def test_ensure_schema_casts_datetime_fields(self) -> None: assert processed_dt.tzinfo is not None assert processed_dt.utcoffset() == timedelta(0) + class TestErrorHandling: """Test error handling in various functions.""" From c14a19ee654facecd302e9691e421038e30b7d31 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 12 Nov 2025 18:10:18 +0000 Subject: [PATCH 3/4] Add per-user session leader stats --- slurm_usage.py | 59 +++++++++++++++++++++++++++++++++++++ tests/test_visualization.py | 23 +++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/slurm_usage.py b/slurm_usage.py index fd69167..a4a9572 100755 --- a/slurm_usage.py +++ b/slurm_usage.py @@ -2006,6 +2006,39 @@ def _identify_session_leader_jobs(df: pl.DataFrame, idle_hours: int = 6) -> pl.D return leaders_df +def _compute_user_session_leader_stats(leaders_with_wait: pl.DataFrame) -> pl.DataFrame: + """Aggregate per-user wait metrics for session leaders.""" + if leaders_with_wait.is_empty(): + return pl.DataFrame( + schema={ + "user": pl.Utf8, + "leader_jobs": pl.Int64, + "mean_wait_seconds": pl.Float64, + "median_wait_seconds": pl.Float64, + "p90_wait_seconds": pl.Float64, + "max_wait_seconds": pl.Float64, + "over_two_hours": pl.Int64, + "over_six_hours": pl.Int64, + }, + ) + + return ( + leaders_with_wait.group_by("user") + .agg( + [ + pl.len().alias("leader_jobs"), + pl.col("wait_seconds").mean().alias("mean_wait_seconds"), + pl.col("wait_seconds").median().alias("median_wait_seconds"), + pl.col("wait_seconds").quantile(0.9, interpolation="nearest").alias("p90_wait_seconds"), + pl.col("wait_seconds").max().alias("max_wait_seconds"), + pl.col("wait_seconds").gt(7200).sum().alias("over_two_hours"), + pl.col("wait_seconds").gt(21600).sum().alias("over_six_hours"), + ], + ) + .sort(["mean_wait_seconds", "leader_jobs"], descending=[True, True]) + ) + + def _calculate_session_leader_wait_stats( leaders_df: pl.DataFrame, leaders_with_wait: pl.DataFrame, @@ -2162,6 +2195,32 @@ def _create_session_leader_wait_section(df: pl.DataFrame, idle_hours: int = 6) - item_type="bins", ) + user_stats = _compute_user_session_leader_stats(leaders_with_wait) + if not user_stats.is_empty(): + user_table = Table(title="Session Leader Wait by User", box=box.SIMPLE) + user_table.add_column("User", style="cyan") + user_table.add_column("Leaders", justify="right") + user_table.add_column("Mean", justify="right", style="yellow") + user_table.add_column("Median", justify="right") + user_table.add_column("P90", justify="right") + user_table.add_column("Max", justify="right") + user_table.add_column(">2h", justify="right") + user_table.add_column(">6h", justify="right") + + for row in user_stats.head(15).iter_rows(named=True): + user_table.add_row( + row["user"][:18], + f"{row['leader_jobs']:,}", + _format_wait_hours(row["mean_wait_seconds"] / 3600 if row["mean_wait_seconds"] is not None else None), + _format_wait_hours(row["median_wait_seconds"] / 3600 if row["median_wait_seconds"] is not None else None), + _format_wait_hours(row["p90_wait_seconds"] / 3600 if row["p90_wait_seconds"] is not None else None), + _format_wait_hours(row["max_wait_seconds"] / 3600 if row["max_wait_seconds"] is not None else None), + f"{row['over_two_hours']:,}", + f"{row['over_six_hours']:,}", + ) + + console.print(user_table) + def _create_user_statistics_section(df: pl.DataFrame) -> None: """Create and display user statistics table and charts. diff --git a/tests/test_visualization.py b/tests/test_visualization.py index 2d7a480..1e7f22a 100644 --- a/tests/test_visualization.py +++ b/tests/test_visualization.py @@ -361,6 +361,29 @@ def test_session_leader_stats_without_waits(self) -> None: assert stats.mean_wait_hours is None assert stats.over_two_hours == 0 + def test_compute_user_session_leader_stats_empty(self) -> None: + """Empty inputs should return empty schema-consistent DataFrame.""" + empty_df = pl.DataFrame(schema={"user": pl.Utf8, "wait_seconds": pl.Float64}) + result = slurm_usage._compute_user_session_leader_stats(empty_df) + assert result.is_empty() + + def test_compute_user_session_leader_stats_values(self) -> None: + """Per-user aggregation should capture mean/percentile info.""" + df = pl.DataFrame( + { + "user": ["alice", "alice", "bob", "bob", "bob"], + "wait_seconds": [3600.0, 7200.0, 60.0, 90.0, 10800.0], + }, + ) + + stats_df = slurm_usage._compute_user_session_leader_stats(df) + stats = {row["user"]: row for row in stats_df.iter_rows(named=True)} + + assert stats["alice"]["leader_jobs"] == 2 + assert math.isclose(stats["alice"]["mean_wait_seconds"], 5400.0) + assert stats["bob"]["leader_jobs"] == 3 + assert stats["bob"]["over_two_hours"] == 1 + @patch("slurm_usage.console.print") def test_session_leader_section_handles_missing_submit(self, mock_print: MagicMock) -> None: """Session leader section should warn when submit data missing.""" From 5112dea0e2bd0f11c6cf5eec8c4f989f0417783d Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 12 Nov 2025 20:44:52 +0000 Subject: [PATCH 4/4] Add configurable session waits and wait hotlist --- config.example.yaml | 3 + slurm_usage.py | 157 +++++++++++++++++++++++++++++++----- tests/test_visualization.py | 50 +++++++++++- 3 files changed, 188 insertions(+), 22 deletions(-) diff --git a/config.example.yaml b/config.example.yaml index 1947ba9..4bddf80 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -19,6 +19,9 @@ groups: - henry - irene +# Idle hours before a user's job counts as a new session leader (default 1) +session_leader_idle_hours: 1 + # Data directory configuration (optional) # - If not specified or set to null, defaults to ./data (current working directory) # - Set to an explicit path to use a custom location diff --git a/slurm_usage.py b/slurm_usage.py index a4a9572..1afd38b 100755 --- a/slurm_usage.py +++ b/slurm_usage.py @@ -177,9 +177,15 @@ class Config(BaseModel): data_dir: Path groups: dict[str, list[str]] = Field(default_factory=dict) user_to_group: dict[str, str] = Field(default_factory=dict, exclude=True) + session_leader_idle_hours: int = 1 @classmethod - def create(cls, data_dir: Path | None = None, groups: dict[str, list[str]] | None = None) -> Config: + def create( + cls, + data_dir: Path | None = None, + groups: dict[str, list[str]] | None = None, + session_leader_idle_hours: int | None = None, + ) -> Config: """Create a Config instance with proper data directory resolution. Args: @@ -211,10 +217,18 @@ def create(cls, data_dir: Path | None = None, groups: dict[str, list[str]] | Non # Check config file, otherwise use default ./data data_dir = Path(file_config["data_dir"]) if "data_dir" in file_config and file_config["data_dir"] is not None else Path("data") + if session_leader_idle_hours is None: + session_leader_idle_hours = file_config.get("session_leader_idle_hours") + if session_leader_idle_hours is None: + session_leader_idle_hours = 1 + else: + session_leader_idle_hours = int(session_leader_idle_hours) + return cls( data_dir=data_dir, groups=groups, user_to_group=user_to_group, + session_leader_idle_hours=session_leader_idle_hours, ) def get_user_group(self, user: str) -> str: @@ -1957,13 +1971,23 @@ def _prepare_dataframe_for_analysis(df: pl.DataFrame, config: Config) -> pl.Data ) # Calculate wait time (in seconds) for jobs that have both submit and start times - return df.with_columns( + wait_seconds_expr = ( pl.when((pl.col("submit_time").is_not_null()) & (pl.col("start_time").is_not_null())) .then((pl.col("start_time") - pl.col("submit_time")).dt.total_seconds()) + .when( + # Treat cancelled-before-start jobs as waiting until cancellation time + (pl.col("start_time").is_null()) + & (pl.col("submit_time").is_not_null()) + & (pl.col("end_time").is_not_null()) + & (pl.col("state") == "CANCELLED"), + ) + .then((pl.col("end_time") - pl.col("submit_time")).dt.total_seconds()) .otherwise(None) - .alias("wait_seconds"), + .alias("wait_seconds") ) + return df.with_columns(wait_seconds_expr) + def _format_wait_hours(value: float | None) -> str: """Format a floating hour value as HH:MM.""" @@ -1975,6 +1999,18 @@ def _format_wait_hours(value: float | None) -> str: return f"{hours:02d}:{minutes:02d}" +def _seconds_to_hours(value: float | None) -> float | None: + """Convert seconds to hours as float.""" + if value is None: + return None + return float(value) / 3600 + + +def _format_wait_from_seconds(value: float | None) -> str: + """Format seconds-based wait into HH:MM.""" + return _format_wait_hours(_seconds_to_hours(value)) + + def _identify_session_leader_jobs(df: pl.DataFrame, idle_hours: int = 6) -> pl.DataFrame: """Return jobs that start a new session after user inactivity.""" if df.is_empty() or "submit_time" not in df.columns or "user" not in df.columns: @@ -2079,11 +2115,6 @@ def _calculate_session_leader_wait_stats( ], ).to_dicts()[0] - def _seconds_to_hours(value: float | None) -> float | None: - if value is None: - return None - return float(value) / 3600 - return SessionLeaderWaitStats( leader_jobs=leader_jobs, total_jobs=total_jobs, @@ -2099,7 +2130,7 @@ def _seconds_to_hours(value: float | None) -> float | None: ) -def _create_session_leader_wait_section(df: pl.DataFrame, idle_hours: int = 6) -> None: +def _create_session_leader_wait_section(df: pl.DataFrame, idle_hours: int = 1) -> None: """Render wait metrics for first jobs submitted after idle periods.""" if df.is_empty(): return @@ -2211,10 +2242,10 @@ def _create_session_leader_wait_section(df: pl.DataFrame, idle_hours: int = 6) - user_table.add_row( row["user"][:18], f"{row['leader_jobs']:,}", - _format_wait_hours(row["mean_wait_seconds"] / 3600 if row["mean_wait_seconds"] is not None else None), - _format_wait_hours(row["median_wait_seconds"] / 3600 if row["median_wait_seconds"] is not None else None), - _format_wait_hours(row["p90_wait_seconds"] / 3600 if row["p90_wait_seconds"] is not None else None), - _format_wait_hours(row["max_wait_seconds"] / 3600 if row["max_wait_seconds"] is not None else None), + _format_wait_from_seconds(row["mean_wait_seconds"]), + _format_wait_from_seconds(row["median_wait_seconds"]), + _format_wait_from_seconds(row["p90_wait_seconds"]), + _format_wait_from_seconds(row["max_wait_seconds"]), f"{row['over_two_hours']:,}", f"{row['over_six_hours']:,}", ) @@ -2222,6 +2253,84 @@ def _create_session_leader_wait_section(df: pl.DataFrame, idle_hours: int = 6) - console.print(user_table) +def _compute_user_wait_hotlist(waited_df: pl.DataFrame) -> pl.DataFrame: + """Aggregate wait statistics for all jobs with wait data.""" + if waited_df.is_empty(): + return pl.DataFrame( + schema={ + "user": pl.Utf8, + "jobs_with_wait": pl.Int64, + "mean_wait_seconds": pl.Float64, + "median_wait_seconds": pl.Float64, + "p90_wait_seconds": pl.Float64, + "p95_wait_seconds": pl.Float64, + "max_wait_seconds": pl.Float64, + "over_two_hours": pl.Int64, + "over_six_hours": pl.Int64, + "over_twenty_four_hours": pl.Int64, + }, + ) + + return ( + waited_df.group_by("user") + .agg( + [ + pl.len().alias("jobs_with_wait"), + pl.col("wait_seconds").mean().alias("mean_wait_seconds"), + pl.col("wait_seconds").median().alias("median_wait_seconds"), + pl.col("wait_seconds").quantile(0.9, interpolation="nearest").alias("p90_wait_seconds"), + pl.col("wait_seconds").quantile(0.95, interpolation="nearest").alias("p95_wait_seconds"), + pl.col("wait_seconds").max().alias("max_wait_seconds"), + pl.col("wait_seconds").gt(7200).sum().alias("over_two_hours"), + pl.col("wait_seconds").gt(21600).sum().alias("over_six_hours"), + pl.col("wait_seconds").gt(86400).sum().alias("over_twenty_four_hours"), + ], + ) + .sort(["max_wait_seconds", "jobs_with_wait"], descending=[True, True]) + ) + + +def _create_all_job_wait_hotlist(df: pl.DataFrame) -> None: + """Display per-user wait metrics across all jobs.""" + waited_df = df.filter(pl.col("wait_seconds").is_not_null()) + if waited_df.is_empty(): + return + + user_stats = _compute_user_wait_hotlist(waited_df) + if user_stats.is_empty(): + return + + console.print(Panel.fit("All Jobs Wait Hotlist", style="bold cyan", box=box.DOUBLE_EDGE)) + + table = Table(title="Users with Longest Waits", box=box.ROUNDED) + table.add_column("User", style="cyan") + table.add_column("Jobs", justify="right") + table.add_column("Mean", justify="right", style="yellow") + table.add_column("Median", justify="right") + table.add_column("P90", justify="right") + table.add_column("P95", justify="right") + table.add_column("Max", justify="right", style="red") + table.add_column(">2h", justify="right") + table.add_column(">6h", justify="right") + table.add_column(">24h", justify="right") + + for row in user_stats.head(15).iter_rows(named=True): + table.add_row( + row["user"][:18], + f"{row['jobs_with_wait']:,}", + _format_wait_from_seconds(row["mean_wait_seconds"]), + _format_wait_from_seconds(row["median_wait_seconds"]), + _format_wait_from_seconds(row["p90_wait_seconds"]), + _format_wait_from_seconds(row["p95_wait_seconds"]), + _format_wait_from_seconds(row["max_wait_seconds"]), + f"{row['over_two_hours']:,}", + f"{row['over_six_hours']:,}", + f"{row['over_twenty_four_hours']:,}", + ) + + console.print(table) + + def _create_user_statistics_section(df: pl.DataFrame) -> None: """Create and display user statistics table and charts. @@ -2545,7 +2654,7 @@ def _create_cluster_summary_section(df: pl.DataFrame) -> None: console.print(cluster_summary) -def _create_summary_stats(df: pl.DataFrame, config: Config) -> None: +def _create_summary_stats(df: pl.DataFrame, config: Config, leader_idle_hours: int | None = None) -> None: """Create and display comprehensive resource usage statistics. Args: @@ -2557,12 +2666,14 @@ def _create_summary_stats(df: pl.DataFrame, config: Config) -> None: return prepared_df = _prepare_dataframe_for_analysis(df, config) + idle_hours = leader_idle_hours if leader_idle_hours is not None else config.session_leader_idle_hours _create_user_statistics_section(prepared_df) _create_group_statistics_section(prepared_df) _create_node_usage_stats(prepared_df) _create_efficiency_analysis_section(prepared_df) _create_cluster_summary_section(prepared_df) - _create_session_leader_wait_section(prepared_df) + _create_session_leader_wait_section(prepared_df, idle_hours=idle_hours) + _create_all_job_wait_hotlist(prepared_df) def _create_daily_usage_chart(df: pl.DataFrame) -> None: @@ -2669,6 +2780,7 @@ def collect( # noqa: PLR0912, PLR0915 data_dir: Annotated[Path | None, typer.Option("--data-dir", help="Data directory (default: ./data)")] = None, show_summary: Annotated[bool, typer.Option("--summary/--no-summary", help="Show summary after collection")] = True, # noqa: FBT002 n_parallel: Annotated[int, typer.Option("--n-parallel", "-n", help="Number of parallel workers for date-based collection")] = 4, + leader_idle_hours: Annotated[int | None, typer.Option("--leader-idle-hours", help="Idle hours before a new session leader is counted")] = None, ) -> None: """Collect job data from SLURM using parallel date-based queries.""" mode_text = "[yellow]MOCK DATA MODE[/yellow]\n" if USE_MOCK_DATA else "" @@ -2679,8 +2791,11 @@ def collect( # noqa: PLR0912, PLR0915 ), ) + if leader_idle_hours is not None and leader_idle_hours < 1: + raise typer.BadParameter("leader-idle-hours must be at least 1") + # Create config and ensure directories exist - config = Config.create(data_dir=data_dir) + config = Config.create(data_dir=data_dir, session_leader_idle_hours=leader_idle_hours) config.ensure_directories_exist() if USE_MOCK_DATA: @@ -2823,7 +2938,7 @@ def collect( # noqa: PLR0912, PLR0915 ) # Show daily usage trends first _create_daily_usage_chart(df) - _create_summary_stats(df, config) + _create_summary_stats(df, config, leader_idle_hours=leader_idle_hours) console.print("\n[bold green]✓ Collection complete[/bold green]") console.print(f" Total records processed: {total_processed:,}") @@ -2833,6 +2948,7 @@ def collect( # noqa: PLR0912, PLR0915 def analyze( data_dir: Annotated[Path | None, typer.Option("--data-dir", help="Data directory (default: ./data)")] = None, days: Annotated[int, typer.Option("--days", "-d", help="Days to analyze")] = 7, + leader_idle_hours: Annotated[int | None, typer.Option("--leader-idle-hours", help="Idle hours before a new session leader is counted")] = None, ) -> None: """Analyze collected job data.""" console.print( @@ -2842,7 +2958,10 @@ def analyze( ), ) - config = Config.create(data_dir=data_dir) + if leader_idle_hours is not None and leader_idle_hours < 1: + raise typer.BadParameter("leader-idle-hours must be at least 1") + + config = Config.create(data_dir=data_dir, session_leader_idle_hours=leader_idle_hours) df = _load_recent_data(config, days) if df is None or df.is_empty(): @@ -2854,7 +2973,7 @@ def analyze( # Show daily usage trends first _create_daily_usage_chart(df) - _create_summary_stats(df, config) + _create_summary_stats(df, config, leader_idle_hours=leader_idle_hours) # State distribution - optimized console.print("\n[bold]State Distribution:[/bold]") diff --git a/tests/test_visualization.py b/tests/test_visualization.py index 1e7f22a..4212225 100644 --- a/tests/test_visualization.py +++ b/tests/test_visualization.py @@ -361,6 +361,28 @@ def test_session_leader_stats_without_waits(self) -> None: assert stats.mean_wait_hours is None assert stats.over_two_hours == 0 + def test_compute_user_wait_hotlist_empty(self) -> None: + """All-job wait hotlist handles empty frames.""" + empty = pl.DataFrame(schema={"user": pl.Utf8, "wait_seconds": pl.Float64}) + result = slurm_usage._compute_user_wait_hotlist(empty) + assert result.is_empty() + + def test_compute_user_wait_hotlist_values(self) -> None: + """All-job wait hotlist returns per-user metrics.""" + df = pl.DataFrame( + { + "user": ["alice", "alice", "bob", "bob"], + "wait_seconds": [7200.0, 90_000.0, 60.0, 100_000.0], + }, + ) + + stats_df = slurm_usage._compute_user_wait_hotlist(df) + stats = {row["user"]: row for row in stats_df.iter_rows(named=True)} + + assert stats["alice"]["jobs_with_wait"] == 2 + assert stats["alice"]["over_twenty_four_hours"] == 1 + assert stats["bob"]["over_two_hours"] == 1 + def test_compute_user_session_leader_stats_empty(self) -> None: """Empty inputs should return empty schema-consistent DataFrame.""" empty_df = pl.DataFrame(schema={"user": pl.Utf8, "wait_seconds": pl.Float64}) @@ -389,7 +411,7 @@ def test_session_leader_section_handles_missing_submit(self, mock_print: MagicMo """Session leader section should warn when submit data missing.""" df = pl.DataFrame({"user": ["alice"], "submit_time": [None], "wait_seconds": [None]}) - slurm_usage._create_session_leader_wait_section(df) + slurm_usage._create_session_leader_wait_section(df, idle_hours=6) assert any("Not enough" in str(call.args[0]) for call in mock_print.call_args_list) @@ -410,7 +432,7 @@ def test_session_leader_section_without_waits( }, ) - slurm_usage._create_session_leader_wait_section(df) + slurm_usage._create_session_leader_wait_section(df, idle_hours=6) assert mock_print.call_count >= 2 mock_chart.assert_not_called() @@ -440,7 +462,7 @@ def test_session_leader_section_with_waits( }, ) - slurm_usage._create_session_leader_wait_section(df) + slurm_usage._create_session_leader_wait_section(df, idle_hours=6) mock_chart.assert_called_once() labels, counts = mock_chart.call_args.args[:2] @@ -554,6 +576,28 @@ def test_create_summary_stats_with_groups(self, mock_print: MagicMock, tmp_path: # Should handle groups properly assert mock_print.called + def test_prepare_dataframe_adds_cancelled_wait(self, tmp_path: Path, test_dates: dict[str, str]) -> None: + """Cancelled-before-start jobs should get wait_seconds based on cancellation time.""" + config = slurm_usage.Config.create(data_dir=tmp_path) + submit_time = datetime.fromisoformat(f"{test_dates['today']}T08:00:00").replace(tzinfo=UTC) + end_time = submit_time + timedelta(hours=5) + + df = pl.DataFrame( + { + "user": ["user1"], + "state": ["CANCELLED"], + "submit_time": [submit_time], + "start_time": pl.Series("start_time", [None], dtype=pl.Datetime("us", "UTC")), + "end_time": [end_time], + "elapsed_seconds": [0], + "alloc_cpus": [1], + "req_mem_mb": [1024.0], + }, + ) + + prepared = slurm_usage._prepare_dataframe_for_analysis(df, config) + assert prepared["wait_seconds"][0] == 5 * 3600 + class TestProcessedSchemaHelpers: """Tests for processed schema normalization."""