Skip to content

Commit b650eaa

Browse files
committed
feat: new improved cli validation
1 parent afc48dc commit b650eaa

File tree

2 files changed

+78
-81
lines changed

2 files changed

+78
-81
lines changed

snakemake_executor_plugin_slurm/__init__.py

Lines changed: 35 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@
4646
from .efficiency_report import create_efficiency_report
4747
from .submit_string import get_submit_command
4848
from .partitions import read_partition_file, get_best_partition
49-
from .validation import validate_slurm_extra, validate_executor_settings
49+
from .validation import (
50+
validate_slurm_extra,
51+
validate_executor_settings,
52+
validate_status_command_settings,
53+
)
5054

5155

5256
def _get_status_command_default():
@@ -266,8 +270,7 @@ class ExecutorSettings(ExecutorSettingsBase):
266270

267271
def __post_init__(self):
268272
"""Validate settings after initialization."""
269-
# Run all validation checks
270-
validate_executor_settins(self)
273+
validate_executor_settings(self)
271274

272275

273276
# Required:
@@ -329,61 +332,10 @@ def __post_init__(self, test_mode: bool = False):
329332
else None
330333
)
331334
atexit.register(self.clean_old_logs)
332-
333-
# Validate status_command configuration if the field exists
334-
self._validate_status_command_settings()
335-
336-
def _validate_status_command_settings(self):
337-
"""Validate and provide feedback about status_command configuration."""
338-
if hasattr(self.workflow.executor_settings, "status_command"):
339-
status_command = self.workflow.executor_settings.status_command
340-
if status_command:
341-
min_job_age = get_min_job_age()
342-
sacct_available = is_query_tool_available("sacct")
343-
344-
# Threshold: 3x initial status check interval (default 40s)
345-
initial_interval = getattr(
346-
self.workflow.executor_settings,
347-
"init_seconds_before_status_checks",
348-
40,
349-
)
350-
dynamic_check_threshold = 3 * initial_interval
351-
352-
if not sacct_available and status_command == "sacct":
353-
self.logger.warning(
354-
"The 'sacct' command is not available on this system. "
355-
"Using 'squeue' instead for job status queries."
356-
)
357-
elif sacct_available and min_job_age is not None:
358-
if (
359-
min_job_age < dynamic_check_threshold
360-
and status_command == "squeue"
361-
):
362-
self.logger.warning(
363-
f"MinJobAge is {min_job_age} seconds "
364-
f"(< {dynamic_check_threshold}s). "
365-
f"This may cause 'squeue' to miss recently finished jobs "
366-
"that have been purged from slurmctld, leading to job "
367-
"status queries being impossible with 'squeue'. "
368-
"Consider using 'sacct' instead or let your admini- "
369-
"strator increase MinJobAge. "
370-
"(Threshold is 3x status check interval: 3 x "
371-
f"{initial_interval}s = "
372-
f"{dynamic_check_threshold}s)"
373-
)
374-
elif (
375-
min_job_age >= dynamic_check_threshold
376-
and status_command == "sacct"
377-
):
378-
self.logger.warning(
379-
f"MinJobAge is {min_job_age} seconds (>= "
380-
f"{dynamic_check_threshold}s). "
381-
f"The 'squeue' command should work reliably for "
382-
"status queries. "
383-
"(Threshold is 3x status check interval: 3 x "
384-
f"{initial_interval}s = "
385-
f"{dynamic_check_threshold}s)"
386-
)
335+
# moved validation to validation.py
336+
validate_status_command_settings(
337+
self.workflow.executor_settings, self.logger
338+
)
387339

388340
def get_status_command(self):
389341
"""Get the status command to use, with fallback logic."""
@@ -654,13 +606,32 @@ async def check_active_jobs(
654606
missing_sacct_status = set()
655607

656608
# decide which status command to use
657-
status_command = self.get_status_command()
658-
# Getting the actual command with parameters.
659-
# Here, the command will be a list generated with
660-
# shlex.split().
661-
if status_command == "sacct":
609+
status_command_name = self.get_status_command()
610+
min_job_age = get_min_job_age()
611+
initial_interval = getattr(
612+
self.workflow.executor_settings,
613+
"init_seconds_before_status_checks",
614+
40,
615+
)
616+
dynamic_check_threshold = 3 * initial_interval
617+
if status_command_name == "squeue":
618+
if (
619+
min_job_age is None
620+
or min_job_age < dynamic_check_threshold
621+
) and is_query_tool_available("sacct"):
622+
self.logger.info(
623+
"Falling back to 'sacct' for status queries "
624+
f"(MinJobAge={min_job_age}; threshold={dynamic_check_threshold}s)."
625+
)
626+
status_command_name = "sacct"
627+
if status_command_name == "sacct" and not is_query_tool_available("sacct"):
628+
self.logger.info(
629+
"'sacct' unavailable, using 'squeue' for status queries."
630+
)
631+
status_command_name = "squeue"
632+
if status_command_name == "sacct":
662633
status_command = query_job_status_sacct(self.run_uuid)
663-
elif status_command == "squeue":
634+
else:
664635
status_command = query_job_status_squeue(self.run_uuid)
665636

666637
# this code is inspired by the snakemake profile:

snakemake_executor_plugin_slurm/validation.py

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import re
66
from pathlib import Path
77
from snakemake_interface_common.exceptions import WorkflowError
8+
from .job_status_query import get_min_job_age, is_query_tool_available
89

910

1011
def get_forbidden_slurm_options():
@@ -76,33 +77,58 @@ def validate_slurm_extra(job):
7677
)
7778

7879

79-
def validate_executor_settings(settings):
80+
def validate_status_command_settings(settings, logger):
81+
"""Emit warnings about status_command sensibility."""
82+
if not hasattr(settings, "status_command"):
83+
return
84+
status_command = settings.status_command
85+
if not status_command:
86+
return
87+
min_job_age = get_min_job_age()
88+
sacct_available = is_query_tool_available("sacct")
89+
initial_interval = getattr(
90+
settings,
91+
"init_seconds_before_status_checks",
92+
40,
93+
)
94+
dynamic_check_threshold = 3 * initial_interval
95+
if not sacct_available and status_command == "sacct":
96+
logger.warning(
97+
"The 'sacct' command is not available. Falling back to 'squeue'."
98+
)
99+
elif sacct_available and min_job_age is not None:
100+
if min_job_age < dynamic_check_threshold and status_command == "squeue":
101+
logger.warning(
102+
f"MinJobAge {min_job_age}s (< {dynamic_check_threshold}s). "
103+
"This may cause 'squeue' to miss recently finished jobs. "
104+
"Consider using 'sacct' or increasing MinJobAge."
105+
)
106+
elif min_job_age >= dynamic_check_threshold and status_command == "sacct":
107+
logger.warning(
108+
f"MinJobAge {min_job_age}s (>= {dynamic_check_threshold}s). "
109+
"'squeue' should work reliably for status queries."
110+
)
111+
112+
113+
def validate_executor_settings(settings, logger=None):
80114
"""
81115
Validate ExecutorSettings fields for correctness
82116
(better user feedback in case of wrong inputs)
83-
84-
Args:
85-
settings: ExecutorSettins instance to validate
86-
87-
Raises:
88-
WorkflowError - if any validation check fails
89117
"""
90118
# status_command: only allow known values
91119
if settings.status_command is not None:
92120
if settings.status_command not in {"sacct", "squeue"}:
93121
raise WorkflowError(
94122
"Invalid status command. Allowed values are 'sacct' or 'squeue'."
95123
)
96-
97-
# status_attempts: require positive integer
124+
# status_attempts
98125
if settings.status_attempts is not None:
99126
if (
100127
not isinstance(settings.status_attempts, int)
101128
or settings.status_attempts < 1
102129
):
103130
raise WorkflowError("status_attempts must be a positive integer")
104-
105-
# init_settings_befor_status_checks: require non-negative integer
131+
# init_seconds_before_status_checks
106132
if settings.init_seconds_before_status_checks is not None:
107133
if (
108134
not isinstance(settings.init_seconds_before_status_checks, int)
@@ -111,8 +137,7 @@ def validate_executor_settings(settings):
111137
raise WorkflowError(
112138
"init-seconds-before-status-checks must be a positive integer."
113139
)
114-
115-
# efficiency_threshold, if set must be within (0, 1]
140+
# efficiency_threshold
116141
if settings.efficiency_threshold is not None:
117142
try:
118143
thr = float(settings.efficiency_threshold)
@@ -124,18 +149,19 @@ def validate_executor_settings(settings):
124149
raise WorkflowError(
125150
"efficiency-threshold must be a number in range (0, 1]."
126151
)
127-
128-
# partition_config: if provided, it must exist
152+
# partition_config
129153
if settings.partition_config is not None:
130154
p = Path(settings.partition_config)
131155
if not p.exists():
132156
raise WorkflowError(
133157
f"Partition configuration file not found, given was {p}."
134158
)
135-
136-
# delete_logfiles_older_than: if provided, should be reasonable
159+
# delete_logfiles_older_than
137160
if settings.delete_logfiles_older_than is not None:
138161
if not isinstance(settings.delete_logfiles_older_than, int):
139162
raise WorkflowError(
140163
"delete-logfiles-older-than must be an integer (days)."
141164
)
165+
# status_command warnings (optional logger)
166+
if logger:
167+
validate_status_command_settings(settings, logger)

0 commit comments

Comments
 (0)