From 3bb795fd5deb0c553f09133ab47386ec16860db9 Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Fri, 6 Jun 2025 13:20:44 -0700 Subject: [PATCH 01/33] feat: initial commit of automated partition selection --- snakemake_executor_plugin_slurm/__init__.py | 24 +- snakemake_executor_plugin_slurm/partitions.py | 267 +++++++++ .../submit_string.py | 3 - tests/tests.py | 524 ++++++++++++++++++ 4 files changed, 813 insertions(+), 5 deletions(-) create mode 100644 snakemake_executor_plugin_slurm/partitions.py diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index badda89a..de9d2e36 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -29,6 +29,7 @@ from .utils import delete_slurm_environment, delete_empty_dirs, set_gres_string from .submit_string import get_submit_command +from .partitions import read_partition_file, get_best_partition @dataclass @@ -106,6 +107,14 @@ class ExecutorSettings(ExecutorSettingsBase): "required": False, }, ) + partitions: Optional[Path] = field( + default=None, + metadata={ + "help": "Path to YAML file that specifies partitions. See docs for details.", + "env_var": False, + "required": False, + }, + ) # Required: @@ -149,6 +158,11 @@ def __post_init__(self, test_mode: bool = False): if self.workflow.executor_settings.logdir else Path(".snakemake/slurm_logs").resolve() ) + self._partitions = ( + read_partition_file(self.workflow.executor_settings.partitions) + if self.workflow.executor_settings.partitions + else None + ) atexit.register(self.clean_old_logs) def clean_old_logs(self) -> None: @@ -231,12 +245,12 @@ def run_job(self, job: JobExecutorInterface): if job.resources.get("slurm_extra"): self.check_slurm_extra(job) + # NOTE removed partition from below, such that partition selection can benefit from resource checking as the call is built up. job_params = { "run_uuid": self.run_uuid, "slurm_logfile": slurm_logfile, "comment_str": comment_str, "account": self.get_account_arg(job), - "partition": self.get_partition_arg(job), "workdir": self.workflow.workdir_init, } @@ -279,6 +293,8 @@ def run_job(self, job: JobExecutorInterface): "Probably not what you want." ) + call += self.get_partition_arg(job) + exec_job = self.format_job_exec(job) # and finally the job to execute with all the snakemake parameters @@ -618,9 +634,13 @@ def get_partition_arg(self, job: JobExecutorInterface): returns a default partition, if applicable else raises an error - implicetly. """ + partition = None if job.resources.get("slurm_partition"): partition = job.resources.slurm_partition - else: + elif self._partitions: + partition = get_best_partition(self._partitions, job, self.logger) + # we didnt get a partition yet so try fallback. + if not partition: if self._fallback_partition is None: self._fallback_partition = self.get_default_partition(job) partition = self._fallback_partition diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py new file mode 100644 index 00000000..12c8d553 --- /dev/null +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -0,0 +1,267 @@ +from dataclasses import dataclass +from typing import Optional, List +import yaml +from pathlib import Path +from math import inf, isinf +from snakemake_interface_common.exceptions import WorkflowError +from snakemake_interface_executor_plugins.jobs import ( + JobExecutorInterface, +) +from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface + + +def read_partition_file(partition_file: Path) -> List["Partition"]: + with open(partition_file, "r") as f: + out = [] + partitions = yaml.safe_load(f)["partitions"] + for p in partitions: + out.append( + Partition( + name=p["name"], + limits=PartitionLimits(**p["limits"]), + description=p["description"], + ) + ) + return out + + +def get_best_partition( + candidate_partitions: List["Partition"], + job: JobExecutorInterface, + logger: LoggerExecutorInterface, +) -> Optional[str]: + scored_partitions = [ + (p, score) + for p in candidate_partitions + if (score := p.score_job_fit(job)) is not None + ] + + if scored_partitions: + best_partition, best_score = max(scored_partitions, key=lambda x: x[1]) + partition = best_partition.name + logger.warning( + f"Auto-selected partition '{partition}' for job {job.name} " + f"with score {best_score:.3f}" + ) + return partition + else: + logger.warning( + f"No suitable partition found for job {job.name} based on " + f"resource requirements. Falling back to default behavior." + ) + return None + + +def parse_gpu_requirements(job: JobExecutorInterface) -> tuple[int, Optional[str]]: + """Parse GPU requirements from job resources. Returns (count, model)""" + gpu_required = job.resources.get("gpu", 0) + gres = job.resources.get("gres", "") + + # Convert to int if it's a string representation of a number + if isinstance(gpu_required, str): + try: + gpu_required = int(gpu_required) + except ValueError: + gpu_required = 0 + + # Ensure gres is a string + if not isinstance(gres, str): + gres = str(gres) if gres else "" + + if "gpu" in gres and gpu_required: + raise WorkflowError( + "GPU resource specified in both 'gpu' and 'gres'. These are mutually exclusive." + ) + + if gpu_required: + return int(gpu_required), job.resources.get("gpu_model") + elif "gpu" in gres: + # Parse gres string format: gpu: or gpu:: + gpu_parts = [part for part in gres.split(",") if part.strip().startswith("gpu")] + if gpu_parts: + gpu_spec = gpu_parts[0].strip().split(":") + if len(gpu_spec) == 2: # gpu: + return int(gpu_spec[1]), None + elif len(gpu_spec) == 3: # gpu:: + return int(gpu_spec[2]), gpu_spec[1] + + return 0, None + + +def get_job_cpu_requirement(job: JobExecutorInterface) -> tuple[int, str]: + """ + This uses the same logic as snakemake_executor_plugin_slurm_jobstep.get_cpu_setting, but returns a tuple instead of a arg string. + """ + + gpu_required = job.resources.get("gpu", 0) + gres = job.resources.get("gres", "") + + # Convert gpu_required to int if it's a string + if isinstance(gpu_required, str): + try: + gpu_required = int(gpu_required) + except ValueError: + gpu_required = 0 + + # Ensure gres is a string for the "in" check + if not isinstance(gres, str): + gres = str(gres) if gres else "" + + has_gpu = bool(gpu_required) or "gpu" in gres + + cpus_per_task = job.resources.get("cpus_per_task") + if cpus_per_task is not None: + # Convert to int if it's a string + if isinstance(cpus_per_task, str): + try: + cpus_per_task = int(cpus_per_task) + except ValueError: + cpus_per_task = 0 + else: + cpus_per_task = int(cpus_per_task) + + if cpus_per_task < 0: + return (0, "none") + # ensure that at least 1 cpu is requested because 0 is not allowed by slurm + return (max(1, cpus_per_task), "task") + + elif has_gpu: + cpus_per_gpu = job.resources.get("cpus_per_gpu") + if cpus_per_gpu is not None: + # Convert to int if it's a string + if isinstance(cpus_per_gpu, str): + try: + cpus_per_gpu = int(cpus_per_gpu) + except ValueError: + cpus_per_gpu = 0 + else: + cpus_per_gpu = int(cpus_per_gpu) + + if cpus_per_gpu <= 0: + return (0, "none") + return (cpus_per_gpu, "gpu") + + return (job.threads, "task") + + +@dataclass +class PartitionLimits: + """Represents resource limits for a SLURM partition""" + + # Standard resources + max_runtime: float = inf # minutes + max_mem_mb: float = inf + max_mem_mb_per_cpu: float = inf + max_cpus_per_task: float = inf + + # SLURM-specific resources + max_nodes: float = inf + max_tasks: float = inf + max_tasks_per_node: float = inf + + # GPU resources + max_gpu: int = 0 + available_gpu_models: Optional[List[str]] = None + max_cpus_per_gpu: float = inf + + # MPI resources + supports_mpi: bool = True + max_mpi_tasks: float = inf + + # Node features/constraints + available_constraints: Optional[List[str]] = None + + +@dataclass +class Partition: + """Represents a SLURM partition with its properties and limits""" + + name: str + limits: PartitionLimits + description: Optional[str] = None + + def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: + """ + Check if a job can run on this partition. If not return none. + Calculate a score for how well a partition fits the job requirements + """ + + # try to score how closely a job matches a partition's limits, in order to handle case where multiple partitions can run a given job + # naive approach here is to just sum the ratio of requested resource to limit, of course this limits us to only consider numerical resources + # here a higher score indicates a better fit + # TODO decide how to handle unspecified limits, for now we assume inf for numerical limits, none for others. + + score = 0.0 + + numerical_resources = { + "mem_mb": self.limits.max_mem_mb, + "mem_mb_per_cpu": self.limits.max_mem_mb_per_cpu, + "runtime": self.limits.max_runtime, + "nodes": self.limits.max_nodes, + "tasks": self.limits.max_tasks, + "tasks_per_node": self.limits.max_tasks_per_node, + "mpi_tasks": self.limits.max_mpi_tasks, + } + + for resource_key, limit in numerical_resources.items(): + job_requirement = job.resources.get(resource_key, 0) + # Convert to numeric value if it's a string + if isinstance(job_requirement, str): + try: + job_requirement = float(job_requirement) + except ValueError: + job_requirement = 0 + elif not isinstance(job_requirement, (int, float)): + job_requirement = 0 + + if job_requirement > 0: + if not isinf(limit) and job_requirement > limit: + return None + if not isinf(limit): + score += job_requirement / limit + + cpu_count, cpu_type = get_job_cpu_requirement(job) + if cpu_type == "task" and cpu_count > 0: + if ( + not isinf(self.limits.max_cpus_per_task) + and cpu_count > self.limits.max_cpus_per_task + ): + return None + if not isinf(self.limits.max_cpus_per_task): + score += cpu_count / self.limits.max_cpus_per_task + elif cpu_type == "gpu" and cpu_count > 0: + if ( + not isinf(self.limits.max_cpus_per_gpu) + and cpu_count > self.limits.max_cpus_per_gpu + ): + return None + if not isinf(self.limits.max_cpus_per_gpu): + score += cpu_count / self.limits.max_cpus_per_gpu + + gpu_count, gpu_model = parse_gpu_requirements(job) + if gpu_count > 0: + if self.limits.max_gpu == 0 or gpu_count > self.limits.max_gpu: + return None + score += gpu_count / self.limits.max_gpu + + if gpu_model and self.limits.available_gpu_models: + if gpu_model not in self.limits.available_gpu_models: + return None + + if job.resources.get("mpi") and not self.limits.supports_mpi: + return None + + constraint = job.resources.get("constraint") + if constraint and self.limits.available_constraints: + # Ensure constraint is a string + if not isinstance(constraint, str): + constraint = str(constraint) + required_constraints = [ + c.strip() for c in constraint.split(",") if c.strip() + ] + if not all( + req in self.limits.available_constraints for req in required_constraints + ): + return None + + return score diff --git a/snakemake_executor_plugin_slurm/submit_string.py b/snakemake_executor_plugin_slurm/submit_string.py index 2da03f69..f6798314 100644 --- a/snakemake_executor_plugin_slurm/submit_string.py +++ b/snakemake_executor_plugin_slurm/submit_string.py @@ -24,9 +24,6 @@ def get_submit_command(job, params): # here, only the string is used, as it already contains # '-A {account_name}' call += f" {params.account}" - # here, only the string is used, as it already contains - # '- p {partition_name}' - call += f" {params.partition}" if job.resources.get("clusters"): call += f" --clusters {job.resources.clusters}" diff --git a/tests/tests.py b/tests/tests.py index f98328b7..ddfad201 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -3,10 +3,17 @@ from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase from unittest.mock import MagicMock, patch import pytest +import tempfile +import yaml +from pathlib import Path from snakemake_executor_plugin_slurm import ExecutorSettings from snakemake_executor_plugin_slurm.utils import set_gres_string from snakemake_executor_plugin_slurm.submit_string import get_submit_command +from snakemake_executor_plugin_slurm.partitions import ( + read_partition_file, + get_best_partition, +) from snakemake_interface_common.exceptions import WorkflowError @@ -474,3 +481,520 @@ def test_wildcard_slash_replacement(self): # Verify no slashes remain in the wildcard string assert "/" not in wildcard_str + +class TestPartitionSelection: + @pytest.fixture + def basic_partition_config(self): + """Basic partition configuration with two partitions.""" + return { + "partitions": [ + { + "name": "default", + "description": "General purpose compute nodes", + "limits": { + "max_runtime": 1440, + "max_mem_mb": 128000, + "max_cpus_per_task": 32, + "supports_mpi": True, + }, + }, + { + "name": "gpu", + "description": "GPU-enabled nodes", + "limits": { + "max_runtime": 720, + "max_mem_mb": 256000, + "max_gpu": 4, + "available_gpu_models": ["a100", "v100"], + "supports_mpi": False, + }, + }, + ] + } + + @pytest.fixture + def minimal_partition_config(self): + """Minimal partition configuration.""" + return { + "partitions": [ + { + "name": "minimal", + "description": "Minimal partition", + "limits": {}, + } + ] + } + + @pytest.fixture + def comprehensive_partition_config(self): + """Comprehensive partition configuration with all limit types.""" + return { + "partitions": [ + { + "name": "comprehensive", + "description": "Partition with all limits", + "limits": { + # Standard resources + "max_runtime": 2880, + "max_mem_mb": 500000, + "max_mem_mb_per_cpu": 8000, + "max_cpus_per_task": 64, + # SLURM-specific resources + "max_nodes": 4, + "max_tasks": 256, + "max_tasks_per_node": 64, + # GPU resources + "max_gpu": 8, + "available_gpu_models": ["a100", "v100", "rtx3090"], + "max_cpus_per_gpu": 16, + # MPI resources + "supports_mpi": True, + "max_mpi_tasks": 512, + # Node features/constraints + "available_constraints": ["intel", "avx2", "highmem"], + }, + } + ] + } + + @pytest.fixture + def empty_partitions_config(self): + """Empty partitions configuration.""" + return {"partitions": []} + + @pytest.fixture + def missing_name_config(self): + """Configuration with missing name field.""" + return { + "partitions": [ + { + # Missing 'name' field + "description": "Missing name", + "limits": {}, + } + ] + } + + @pytest.fixture + def invalid_key_config(self): + """Configuration with invalid key.""" + return {"invalid_key": []} + + @pytest.fixture + def temp_yaml_file(self): + """Helper fixture to create temporary YAML files.""" + + def _create_temp_file(config): + with tempfile.NamedTemporaryFile( + mode="w", suffix=".yaml", delete=False + ) as f: + yaml.dump(config, f) + return Path(f.name) + + return _create_temp_file + + def test_read_valid_partition_file(self, basic_partition_config, temp_yaml_file): + """Test reading a valid partition configuration file.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 2 + + # Check first partition + assert partitions[0].name == "default" + assert partitions[0].description == "General purpose compute nodes" + assert partitions[0].limits.max_runtime == 1440 + assert partitions[0].limits.max_mem_mb == 128000 + assert partitions[0].limits.max_cpus_per_task == 32 + assert partitions[0].limits.supports_mpi is True + + # Check second partition + assert partitions[1].name == "gpu" + assert partitions[1].description == "GPU-enabled nodes" + assert partitions[1].limits.max_runtime == 720 + assert partitions[1].limits.max_gpu == 4 + assert partitions[1].limits.available_gpu_models == ["a100", "v100"] + assert partitions[1].limits.supports_mpi is False + + finally: + temp_path.unlink() + + def test_read_minimal_partition_file( + self, minimal_partition_config, temp_yaml_file + ): + """Test reading a partition file with minimal configuration.""" + from math import isinf + + temp_path = temp_yaml_file(minimal_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 1 + assert partitions[0].name == "minimal" + assert partitions[0].description == "Minimal partition" + + # Check that all limits are inf + limits = partitions[0].limits + assert isinf(limits.max_runtime) + assert isinf(limits.max_mem_mb) + assert limits.max_gpu == 0 + assert limits.supports_mpi is True + + finally: + temp_path.unlink() + + def test_read_partition_file_with_all_limits( + self, comprehensive_partition_config, temp_yaml_file + ): + """Test reading a partition file with all possible limit types.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 1 + limits = partitions[0].limits + + # Check standard resources + assert limits.max_runtime == 2880 + assert limits.max_mem_mb == 500000 + assert limits.max_mem_mb_per_cpu == 8000 + assert limits.max_cpus_per_task == 64 + + # Check SLURM-specific resources + assert limits.max_nodes == 4 + assert limits.max_tasks == 256 + assert limits.max_tasks_per_node == 64 + + # Check GPU resources + assert limits.max_gpu == 8 + assert limits.available_gpu_models == ["a100", "v100", "rtx3090"] + assert limits.max_cpus_per_gpu == 16 + + # Check MPI resources + assert limits.supports_mpi is True + assert limits.max_mpi_tasks == 512 + + # Check constraints + assert limits.available_constraints == ["intel", "avx2", "highmem"] + + finally: + temp_path.unlink() + + def test_read_empty_partitions_list(self, empty_partitions_config, temp_yaml_file): + """Test reading a file with empty partitions list.""" + temp_path = temp_yaml_file(empty_partitions_config) + + try: + partitions = read_partition_file(temp_path) + assert len(partitions) == 0 + + finally: + temp_path.unlink() + + def test_read_nonexistent_file(self): + """Test reading a non-existent file raises appropriate error.""" + nonexistent_path = Path("/nonexistent/path/to/file.yaml") + + with pytest.raises(FileNotFoundError): + read_partition_file(nonexistent_path) + + def test_read_invalid_yaml_file(self): + """Test reading an invalid YAML file raises appropriate error.""" + invalid_yaml = "partitions:\n - name: test\n invalid: {\n" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(invalid_yaml) + temp_path = Path(f.name) + + try: + with pytest.raises(yaml.YAMLError): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + def test_read_file_missing_partitions_key(self, invalid_key_config, temp_yaml_file): + """Test reading a file without 'partitions' key raises KeyError.""" + temp_path = temp_yaml_file(invalid_key_config) + + try: + with pytest.raises(KeyError): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + def test_read_partition_missing_required_fields( + self, missing_name_config, temp_yaml_file + ): + """Test reading partition with missing required fields.""" + temp_path = temp_yaml_file(missing_name_config) + + try: + with pytest.raises(KeyError): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + @pytest.fixture + def mock_job(self): + """Create a mock job with configurable resources and threads.""" + + def _create_job(threads=1, **resources): + mock_resources = MagicMock() + mock_resources.get.side_effect = lambda key, default=None: resources.get( + key, default + ) + + mock_job = MagicMock() + mock_job.resources = mock_resources + mock_job.threads = threads + mock_job.name = "test_job" + return mock_job + + return _create_job + + @pytest.fixture + def mock_logger(self): + """Create a mock logger.""" + return MagicMock() + + def test_basic_partition_selection_cpu_job( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test selecting partition for a basic CPU job.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=4, mem_mb=16000, runtime=60) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'default' partition as it supports CPU jobs + assert selected_partition == "default" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'default'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_basic_partition_selection_gpu_job( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test selecting partition for a GPU job.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job( + threads=2, mem_mb=32000, runtime=300, gpu=2, gpu_model="a100" + ) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'gpu' partition as it supports GPU jobs + assert selected_partition == "gpu" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'gpu'" in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_no_suitable_partition( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test when no partition can accommodate the job requirements.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires more memory than any partition allows + job = mock_job(threads=1, mem_mb=500000, runtime=60) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None when no suitable partition found + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_comprehensive_partition_selection( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test partition selection with comprehensive limits.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job( + threads=8, + mem_mb=64000, + runtime=1200, + gpu=2, + gpu_model="a100", + constraint="intel,avx2", + ) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select the comprehensive partition + assert selected_partition == "comprehensive" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'comprehensive'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_constraint_mismatch( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with constraints not available in partition.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires constraint not available in partition + job = mock_job(threads=2, constraint="amd,gpu_direct") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None due to constraint mismatch + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_mpi_job_selection( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test MPI job partition selection.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, mpi=True, tasks=16) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'default' partition as it supports MPI, 'gpu' doesn't + assert selected_partition == "default" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'default'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_gpu_model_mismatch( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test GPU job with unsupported GPU model.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Request GPU model not available in any partition + job = mock_job(threads=2, gpu=1, gpu_model="rtx4090") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None due to GPU model mismatch + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_empty_partitions_list( + self, empty_partitions_config, temp_yaml_file, mock_job, mock_logger + ): + """Test partition selection with empty partitions list.""" + temp_path = temp_yaml_file(empty_partitions_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, mem_mb=1000) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None when no partitions available + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_gres_gpu_specification( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test GPU job specified via gres parameter.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=2, gres="gpu:v100:1", runtime=400) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'gpu' partition as it supports v100 GPUs + assert selected_partition == "gpu" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'gpu'" in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_cpus_per_task_specification( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with cpus_per_task specification.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, cpus_per_task=32, mem_mb=64000) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select comprehensive partition as it can handle 32 cpus per task + assert selected_partition == "comprehensive" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'comprehensive'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_runtime_exceeds_limit( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with runtime exceeding partition limits.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires more runtime than gpu partition allows (720 min max) + job = mock_job(threads=1, runtime=1000, gpu=1, gpu_model="a100") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None as no partition can accommodate the runtime + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() \ No newline at end of file From 6f475c3b683f80d7d6157da92b9393cb0b9acb29 Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Fri, 6 Jun 2025 14:04:58 -0700 Subject: [PATCH 02/33] fix: flatten expected yaml structure; remove partition description --- snakemake_executor_plugin_slurm/partitions.py | 13 ++- tests/tests.py | 104 +++++++----------- 2 files changed, 46 insertions(+), 71 deletions(-) diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index 12c8d553..89c3edff 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -13,13 +13,15 @@ def read_partition_file(partition_file: Path) -> List["Partition"]: with open(partition_file, "r") as f: out = [] - partitions = yaml.safe_load(f)["partitions"] - for p in partitions: + partitions_dict = yaml.safe_load(f)["partitions"] + for partition_name, partition_config in partitions_dict.items(): + if not partition_name or not partition_name.strip(): + raise KeyError("Partition name cannot be empty") + out.append( Partition( - name=p["name"], - limits=PartitionLimits(**p["limits"]), - description=p["description"], + name=partition_name, + limits=PartitionLimits(**partition_config), ) ) return out @@ -178,7 +180,6 @@ class Partition: name: str limits: PartitionLimits - description: Optional[str] = None def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: """ diff --git a/tests/tests.py b/tests/tests.py index ddfad201..b75fe7c4 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -487,92 +487,69 @@ class TestPartitionSelection: def basic_partition_config(self): """Basic partition configuration with two partitions.""" return { - "partitions": [ - { - "name": "default", - "description": "General purpose compute nodes", - "limits": { - "max_runtime": 1440, - "max_mem_mb": 128000, - "max_cpus_per_task": 32, - "supports_mpi": True, - }, + "partitions": { + "default": { + "max_runtime": 1440, + "max_mem_mb": 128000, + "max_cpus_per_task": 32, + "supports_mpi": True, }, - { - "name": "gpu", - "description": "GPU-enabled nodes", - "limits": { - "max_runtime": 720, - "max_mem_mb": 256000, - "max_gpu": 4, - "available_gpu_models": ["a100", "v100"], - "supports_mpi": False, - }, + "gpu": { + "max_runtime": 720, + "max_mem_mb": 256000, + "max_gpu": 4, + "available_gpu_models": ["a100", "v100"], + "supports_mpi": False, }, - ] + } } @pytest.fixture def minimal_partition_config(self): """Minimal partition configuration.""" - return { - "partitions": [ - { - "name": "minimal", - "description": "Minimal partition", - "limits": {}, - } - ] - } + return {"partitions": {"minimal": {}}} @pytest.fixture def comprehensive_partition_config(self): """Comprehensive partition configuration with all limit types.""" return { - "partitions": [ - { - "name": "comprehensive", - "description": "Partition with all limits", - "limits": { - # Standard resources - "max_runtime": 2880, - "max_mem_mb": 500000, - "max_mem_mb_per_cpu": 8000, - "max_cpus_per_task": 64, - # SLURM-specific resources - "max_nodes": 4, - "max_tasks": 256, - "max_tasks_per_node": 64, - # GPU resources - "max_gpu": 8, - "available_gpu_models": ["a100", "v100", "rtx3090"], - "max_cpus_per_gpu": 16, - # MPI resources - "supports_mpi": True, - "max_mpi_tasks": 512, - # Node features/constraints - "available_constraints": ["intel", "avx2", "highmem"], - }, + "partitions": { + "comprehensive": { + # Standard resources + "max_runtime": 2880, + "max_mem_mb": 500000, + "max_mem_mb_per_cpu": 8000, + "max_cpus_per_task": 64, + # SLURM-specific resources + "max_nodes": 4, + "max_tasks": 256, + "max_tasks_per_node": 64, + # GPU resources + "max_gpu": 8, + "available_gpu_models": ["a100", "v100", "rtx3090"], + "max_cpus_per_gpu": 16, + # MPI resources + "supports_mpi": True, + "max_mpi_tasks": 512, + # Node features/constraints + "available_constraints": ["intel", "avx2", "highmem"], } - ] + } } @pytest.fixture def empty_partitions_config(self): """Empty partitions configuration.""" - return {"partitions": []} + return {"partitions": {}} @pytest.fixture def missing_name_config(self): """Configuration with missing name field.""" return { - "partitions": [ - { - # Missing 'name' field - "description": "Missing name", - "limits": {}, + "partitions": { + "": { # Empty partition name } - ] + } } @pytest.fixture @@ -604,7 +581,6 @@ def test_read_valid_partition_file(self, basic_partition_config, temp_yaml_file) # Check first partition assert partitions[0].name == "default" - assert partitions[0].description == "General purpose compute nodes" assert partitions[0].limits.max_runtime == 1440 assert partitions[0].limits.max_mem_mb == 128000 assert partitions[0].limits.max_cpus_per_task == 32 @@ -612,7 +588,6 @@ def test_read_valid_partition_file(self, basic_partition_config, temp_yaml_file) # Check second partition assert partitions[1].name == "gpu" - assert partitions[1].description == "GPU-enabled nodes" assert partitions[1].limits.max_runtime == 720 assert partitions[1].limits.max_gpu == 4 assert partitions[1].limits.available_gpu_models == ["a100", "v100"] @@ -634,7 +609,6 @@ def test_read_minimal_partition_file( assert len(partitions) == 1 assert partitions[0].name == "minimal" - assert partitions[0].description == "Minimal partition" # Check that all limits are inf limits = partitions[0].limits From f8d342ea6116b322922d9453c7df54e3f17b493f Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Fri, 6 Jun 2025 14:10:41 -0700 Subject: [PATCH 03/33] fix: update cli arg name and help --- snakemake_executor_plugin_slurm/__init__.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index de9d2e36..3c026012 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -107,10 +107,21 @@ class ExecutorSettings(ExecutorSettingsBase): "required": False, }, ) - partitions: Optional[Path] = field( + partition_config: Optional[Path] = field( default=None, metadata={ - "help": "Path to YAML file that specifies partitions. See docs for details.", + "help": "Path to YAML file defining partition limits for automatic partition selection. " + "When provided, jobs will be automatically assigned to the best-fitting partition based on " + "their resource requirements. Expected format:\n" + "partitions:\n" + " short:\n" + " max_runtime: 30 # minutes\n" + " max_memory_mb: 4000\n" + " gpu:\n" + " max_runtime: 120\n" + " max_gpu: 2\n" + " available_gpu_models: ['v100', 'a100']\n" + "See documentation for complete list of available limits.", "env_var": False, "required": False, }, @@ -159,8 +170,8 @@ def __post_init__(self, test_mode: bool = False): else Path(".snakemake/slurm_logs").resolve() ) self._partitions = ( - read_partition_file(self.workflow.executor_settings.partitions) - if self.workflow.executor_settings.partitions + read_partition_file(self.workflow.executor_settings.partition_config) + if self.workflow.executor_settings.partition_config else None ) atexit.register(self.clean_old_logs) From f5511ff769b7c79813aeba4d0d9e6974bae8ceff Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Fri, 6 Jun 2025 14:52:58 -0700 Subject: [PATCH 04/33] docs: update docs and help --- docs/further.md | 73 +++++++++++++++++++++ snakemake_executor_plugin_slurm/__init__.py | 9 --- 2 files changed, 73 insertions(+), 9 deletions(-) diff --git a/docs/further.md b/docs/further.md index c0cd20af..6bdeafa0 100644 --- a/docs/further.md +++ b/docs/further.md @@ -64,6 +64,79 @@ See the [snakemake documentation on profiles](https://snakemake.readthedocs.io/e How and where you set configurations on factors like file size or increasing the runtime with every `attempt` of running a job (if [`--retries` is greater than `0`](https://snakemake.readthedocs.io/en/stable/executing/cli.html#snakemake.cli-get_argument_parser-behavior)). [There are detailed examples for these in the snakemake documentation.](https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#dynamic-resources) +#### Automatic Partition Selection + +The SLURM executor plugin supports automatic partition selection based on job resource requirements, via the command line option `--slurm-partition-config`. This feature allows the plugin to choose the most appropriate partition for each job, without the need to manually specify partitions for different job types. This also enables variable partition selection as a job's resource requirements change based on [dynamic resources](#dynamic-resource-specification), ensuring that jobs are always scheduled to an appropriate partition. + +*Jobs that explicitly specify a `slurm_partition` resource will bypass automatic selection and use the specified partition directly.* + +##### Partition Limits Specification + +To enable automatic partition selection, create a YAML configuration file that defines the available partitions and their resource limits. This file should be structured as follows: + +```yaml +partitions: + some_partition: + max_runtime: 100 + another_partition: + ... +``` +Where `some_partition` and `another_partition` are the names of the partition on your cluster, according to `sinfo`. + +The following limits can be defined for each partition: + +| Parameter | Type | Description | Default | +| ----------------------- | --------- | ---------------------------------- | --------- | +| `max_runtime` | int | Maximum walltime in minutes | unlimited | +| `max_mem_mb` | int | Maximum total memory in MB | unlimited | +| `max_mem_mb_per_cpu` | int | Maximum memory per CPU in MB | unlimited | +| `max_cpus_per_task` | int | Maximum CPUs per task | unlimited | +| `max_nodes` | int | Maximum number of nodes | unlimited | +| `max_tasks` | int | Maximum number of tasks | unlimited | +| `max_tasks_per_node` | int | Maximum tasks per node | unlimited | +| `max_gpu` | int | Maximum number of GPUs | 0 | +| `available_gpu_models` | list[str] | List of available GPU models | none | +| `max_cpus_per_gpu` | int | Maximum CPUs per GPU | unlimited | +| `supports_mpi` | bool | Whether MPI jobs are supported | true | +| `max_mpi_tasks` | int | Maximum MPI tasks | unlimited | +| `available_constraints` | list[str] | List of available node constraints | none | + +##### Example Partition Configuration + +```yaml +partitions: + standard: + max_runtime: 720 # 12 hours + max_mem_mb: 64000 # 64 GB + max_cpus_per_task: 24 + max_nodes: 1 + + highmem: + max_runtime: 1440 # 24 hours + max_mem_mb: 512000 # 512 GB + max_mem_mb_per_cpu: 16000 + max_cpus_per_task: 48 + max_nodes: 1 + + gpu: + max_runtime: 2880 # 48 hours + max_mem_mb: 128000 # 128 GB + max_cpus_per_task: 32 + max_gpu: 8 + available_gpu_models: ["a100", "v100", "rtx3090"] + max_cpus_per_gpu: 8 +``` + +##### How Partition Selection Works + +When automatic partition selection is enabled, the plugin evaluates each job's resource requirements against the defined partition limits to ensure the job is placed on a partition that can accommodate all of its requirements. When multiple partitions are compatible, the plugin uses a scoring algorithm that favors partitions with limits closer to the job's needs, preventing jobs from being assigned to partitions with excessively high resource limits. + +The scoring algorithm calculates a score by summing the ratios of requested resources to partition limits (e.g., if a job requests 8 CPUs and a partition allows 16, this contributes 0.5 to the score). Higher scores indicate better resource utilization, so a job requesting 8 CPUs would prefer a 16-CPU partition (score 0.5) over a 64-CPU partition (score 0.125). + +##### Fallback Behavior + +If no suitable partition is found based on the job's resource requirements, the plugin falls back to the default SLURM behavior, which typically uses the cluster's default partition or any partition specified explicitly in the job's resources. + #### Standard Resources diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 3c026012..1e4e04ac 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -112,15 +112,6 @@ class ExecutorSettings(ExecutorSettingsBase): metadata={ "help": "Path to YAML file defining partition limits for automatic partition selection. " "When provided, jobs will be automatically assigned to the best-fitting partition based on " - "their resource requirements. Expected format:\n" - "partitions:\n" - " short:\n" - " max_runtime: 30 # minutes\n" - " max_memory_mb: 4000\n" - " gpu:\n" - " max_runtime: 120\n" - " max_gpu: 2\n" - " available_gpu_models: ['v100', 'a100']\n" "See documentation for complete list of available limits.", "env_var": False, "required": False, From 01a94c6ac5d491b715380bad7a16027cfd3517b8 Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Sat, 7 Jun 2025 12:51:48 -0700 Subject: [PATCH 05/33] chore: linting; change automatic to dynamic partition selection --- snakemake_executor_plugin_slurm/__init__.py | 10 ++++++---- snakemake_executor_plugin_slurm/partitions.py | 10 +++++----- tests/tests.py | 3 ++- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 1e4e04ac..f5a3897a 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -110,8 +110,9 @@ class ExecutorSettings(ExecutorSettingsBase): partition_config: Optional[Path] = field( default=None, metadata={ - "help": "Path to YAML file defining partition limits for automatic partition selection. " - "When provided, jobs will be automatically assigned to the best-fitting partition based on " + "help": "Path to YAML file defining partition limits for dynamic " + "partition selection. When provided, jobs will be dynamically " + "assigned to the best-fitting partition based on " "See documentation for complete list of available limits.", "env_var": False, "required": False, @@ -247,7 +248,8 @@ def run_job(self, job: JobExecutorInterface): if job.resources.get("slurm_extra"): self.check_slurm_extra(job) - # NOTE removed partition from below, such that partition selection can benefit from resource checking as the call is built up. + # NOTE removed partition from below, such that partition + # selection can benefit from resource checking as the call is built up. job_params = { "run_uuid": self.run_uuid, "slurm_logfile": slurm_logfile, @@ -396,7 +398,7 @@ async def check_active_jobs( # We use this sacct syntax for argument 'starttime' to keep it compatible # with slurm < 20.11 - sacct_starttime = f"{datetime.now() - timedelta(days = 2):%Y-%m-%dT%H:00}" + sacct_starttime = f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" # previously we had # f"--starttime now-2days --endtime now --name {self.run_uuid}" # in line 218 - once v20.11 is definitively not in use any more, diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index 89c3edff..834d2467 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -72,7 +72,7 @@ def parse_gpu_requirements(job: JobExecutorInterface) -> tuple[int, Optional[str if "gpu" in gres and gpu_required: raise WorkflowError( - "GPU resource specified in both 'gpu' and 'gres'. These are mutually exclusive." + "GPU resource specified in both 'gpu' and 'gres'. These are mutually exclusive." # noqa: E501 ) if gpu_required: @@ -92,7 +92,7 @@ def parse_gpu_requirements(job: JobExecutorInterface) -> tuple[int, Optional[str def get_job_cpu_requirement(job: JobExecutorInterface) -> tuple[int, str]: """ - This uses the same logic as snakemake_executor_plugin_slurm_jobstep.get_cpu_setting, but returns a tuple instead of a arg string. + This uses the same logic as snakemake_executor_plugin_slurm_jobstep.get_cpu_setting, but returns a tuple instead of a arg string. # noqa: E501 """ gpu_required = job.resources.get("gpu", 0) @@ -187,10 +187,10 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: Calculate a score for how well a partition fits the job requirements """ - # try to score how closely a job matches a partition's limits, in order to handle case where multiple partitions can run a given job - # naive approach here is to just sum the ratio of requested resource to limit, of course this limits us to only consider numerical resources + # try to score how closely a job matches a partition's limits, in order to handle case where multiple partitions can run a given job # noqa: E501 + # naive approach here is to just sum the ratio of requested resource to limit, of course this limits us to only consider numerical resources # noqa: E501 # here a higher score indicates a better fit - # TODO decide how to handle unspecified limits, for now we assume inf for numerical limits, none for others. + # TODO decide how to handle unspecified limits, for now we assume inf for numerical limits, none for others. # noqa: E501 score = 0.0 diff --git a/tests/tests.py b/tests/tests.py index b75fe7c4..41a81e6b 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -482,6 +482,7 @@ def test_wildcard_slash_replacement(self): # Verify no slashes remain in the wildcard string assert "/" not in wildcard_str + class TestPartitionSelection: @pytest.fixture def basic_partition_config(self): @@ -971,4 +972,4 @@ def test_runtime_exceeds_limit( mock_logger.warning.assert_called_once() assert "No suitable partition found" in mock_logger.warning.call_args[0][0] finally: - temp_path.unlink() \ No newline at end of file + temp_path.unlink() From bb8c47192bacd9baf343038faf21d88d623b99fc Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Sat, 7 Jun 2025 16:01:27 -0700 Subject: [PATCH 06/33] chore: format tests --- tests/tests.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index 41a81e6b..18a0d6f8 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -546,12 +546,7 @@ def empty_partitions_config(self): @pytest.fixture def missing_name_config(self): """Configuration with missing name field.""" - return { - "partitions": { - "": { # Empty partition name - } - } - } + return {"partitions": {"": {}}} # Empty partition name @pytest.fixture def invalid_key_config(self): From 25c043bb8c9598043d4784ca6736f1bfa9615cbf Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 30 Oct 2025 11:49:36 +0100 Subject: [PATCH 07/33] fix: syntax error after merge conflict --- snakemake_executor_plugin_slurm/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 05e5a55a..3c965bfa 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -121,6 +121,8 @@ class ExecutorSettings(ExecutorSettingsBase): "partition selection. When provided, jobs will be dynamically " "assigned to the best-fitting partition based on " "See documentation for complete list of available limits.", + }, + ) efficiency_report: bool = field( default=False, metadata={ From 7a6e58252400257a5865245062fdcf1ecbb2c4de Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 30 Oct 2025 11:50:58 +0100 Subject: [PATCH 08/33] fix: left over line from merge conflict --- tests/tests.py | 64 +++++++++++++++++++++++--------------------------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index 86752918..fd3ed069 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -18,14 +18,11 @@ ) from snakemake_executor_plugin_slurm.utils import set_gres_string from snakemake_executor_plugin_slurm.submit_string import get_submit_command -<<<<<<< HEAD from snakemake_executor_plugin_slurm.partitions import ( read_partition_file, get_best_partition, ) -======= from snakemake_executor_plugin_slurm.validation import validate_slurm_extra ->>>>>>> c980d1f36f5e0e3e71a4c74893a8d3cea6f8df48 from snakemake_interface_common.exceptions import WorkflowError import pandas as pd @@ -798,7 +795,6 @@ def test_wildcard_slash_replacement(self): assert "/" not in wildcard_str -<<<<<<< HEAD class TestPartitionSelection: @pytest.fixture def basic_partition_config(self): @@ -1038,36 +1034,7 @@ def _create_job(threads=1, **resources): mock_job.resources = mock_resources mock_job.threads = threads mock_job.name = "test_job" -======= -class TestSlurmExtraValidation: - """Test cases for the validate_slurm_extra function.""" - - @pytest.fixture - def mock_job(self): - """Create a mock job with configurable slurm_extra resource.""" - - def _create_job(**resources): - mock_resources = MagicMock() - # Configure get method to return values from resources dict - mock_resources.get.side_effect = lambda key, default=None: resources.get( - key, default - ) - # Add direct attribute access for certain resources - for key, value in resources.items(): - setattr(mock_resources, key, value) - - mock_job = MagicMock() - mock_job.resources = mock_resources - mock_job.name = "test_job" - mock_job.wildcards = {} - mock_job.is_group.return_value = False - mock_job.jobid = 1 ->>>>>>> c980d1f36f5e0e3e71a4c74893a8d3cea6f8df48 - return mock_job - return _create_job - -<<<<<<< HEAD @pytest.fixture def mock_logger(self): """Create a mock logger.""" @@ -1310,7 +1277,35 @@ def test_runtime_exceeds_limit( assert "No suitable partition found" in mock_logger.warning.call_args[0][0] finally: temp_path.unlink() -======= + + +class TestSlurmExtraValidation: + """Test cases for the validate_slurm_extra function.""" + + @pytest.fixture + def mock_job(self): + """Create a mock job with configurable slurm_extra resource.""" + + def _create_job(**resources): + mock_resources = MagicMock() + # Configure get method to return values from resources dict + mock_resources.get.side_effect = lambda key, default=None: resources.get( + key, default + ) + # Add direct attribute access for certain resources + for key, value in resources.items(): + setattr(mock_resources, key, value) + + mock_job = MagicMock() + mock_job.resources = mock_resources + mock_job.name = "test_job" + mock_job.wildcards = {} + mock_job.is_group.return_value = False + mock_job.jobid = 1 + return mock_job + + return _create_job + def test_valid_slurm_extra(self, mock_job): """Test that validation passes with allowed SLURM options.""" job = mock_job(slurm_extra="--mail-type=END --mail-user=user@example.com") @@ -1359,4 +1354,3 @@ def test_multiple_forbidden_options(self, mock_job): # Should raise error for job-name (first one encountered) with pytest.raises(WorkflowError, match=r"job-name.*not allowed"): validate_slurm_extra(job) ->>>>>>> c980d1f36f5e0e3e71a4c74893a8d3cea6f8df48 From 4726ce6d986fc7926caa060ab0cf4c18ebfc73e0 Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 30 Oct 2025 11:52:42 +0100 Subject: [PATCH 09/33] fix: missing import --- snakemake_executor_plugin_slurm/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 3c965bfa..b8e928a5 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -3,6 +3,7 @@ __email__ = "johannes.koester@uni-due.de" __license__ = "MIT" +import atexit import csv from io import StringIO import os From 3febf2c043c6a837105dafc8aae23334f48be5b9 Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 30 Oct 2025 11:53:26 +0100 Subject: [PATCH 10/33] fix: removed doubled import statement --- tests/tests.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/tests.py b/tests/tests.py index fd3ed069..89898d26 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -9,7 +9,6 @@ import pytest import tempfile import yaml -from pathlib import Path from snakemake_executor_plugin_slurm import ExecutorSettings from snakemake_executor_plugin_slurm.efficiency_report import ( From 65396ac642365921efa7ed7e37f78efaf00d24e4 Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 30 Oct 2025 12:18:42 +0100 Subject: [PATCH 11/33] fix: made an error when solving merge conflict - mock job for partitions were not corret --- tests/tests.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/tests.py b/tests/tests.py index 89898d26..ab7406e3 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1033,6 +1033,10 @@ def _create_job(threads=1, **resources): mock_job.resources = mock_resources mock_job.threads = threads mock_job.name = "test_job" + mock_job.is_group.return_value = False + mock_job.jobid = 1 + return mock_job + return _create_job @pytest.fixture def mock_logger(self): From e652e696bd101b614f53cfb0c3b99260c0b0a603 Mon Sep 17 00:00:00 2001 From: meesters Date: Thu, 30 Oct 2025 12:21:39 +0100 Subject: [PATCH 12/33] fix: formatting --- tests/tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/tests.py b/tests/tests.py index ab7406e3..d881886b 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1036,6 +1036,7 @@ def _create_job(threads=1, **resources): mock_job.is_group.return_value = False mock_job.jobid = 1 return mock_job + return _create_job @pytest.fixture From 982dfeeae4cea54e9015f52f0177303a0ed476b6 Mon Sep 17 00:00:00 2001 From: meesters Date: Fri, 14 Nov 2025 11:57:31 +0100 Subject: [PATCH 13/33] feat: allowing env variable to define the partition profile --- snakemake_executor_plugin_slurm/__init__.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index b8e928a5..43689eb6 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -212,10 +212,21 @@ def __post_init__(self, test_mode: bool = False): if self.workflow.executor_settings.logdir else Path(".snakemake/slurm_logs").resolve() ) - self._partitions = ( - read_partition_file(self.workflow.executor_settings.partition_config) - if self.workflow.executor_settings.partition_config - else None + # Check the environment variable "SNAKEMAKE_SLURM_PARTITIONS", + # if set, read the partitions from the given file. Let the CLI + # option override this behavior. + if os.getenv("SNAKEMAKE_SLURM_PARTITIONS") and not self.workflow.executor_settings.partition_config: + partition_file = Path(os.getenv("SNAKEMAKE_SLURM_PARTITIONS")) + self.logger.info( + f"Reading SLURM partition configuration from " + f"environment variable file: {partition_file}" + ) + self._partitions = read_partition_file(partition_file) + else: + self._partitions = ( + read_partition_file(self.workflow.executor_settings.partition_config) + if self.workflow.executor_settings.partition_config + else None ) atexit.register(self.clean_old_logs) From da1b00905bd1e5afb48acc5913d004ef0c4833d1 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 12:14:14 +0100 Subject: [PATCH 14/33] feat: enabling environment variable for partition configuration file --- snakemake_executor_plugin_slurm/__init__.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 43689eb6..70fbb28e 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -120,8 +120,13 @@ class ExecutorSettings(ExecutorSettingsBase): metadata={ "help": "Path to YAML file defining partition limits for dynamic " "partition selection. When provided, jobs will be dynamically " - "assigned to the best-fitting partition based on " - "See documentation for complete list of available limits.", + "assigned to the best-fitting partition based on their resource " + "requirements. See documentation for complete list of available limits." + "Alternatively, the environment variable SNAKEMAKE_SLURM_PARTITIONS " + "can be set to point to such a file. " + "If both are set, this flag takes precedence.", + "env_var": False, + "required": False, }, ) efficiency_report: bool = field( @@ -214,8 +219,11 @@ def __post_init__(self, test_mode: bool = False): ) # Check the environment variable "SNAKEMAKE_SLURM_PARTITIONS", # if set, read the partitions from the given file. Let the CLI - # option override this behavior. - if os.getenv("SNAKEMAKE_SLURM_PARTITIONS") and not self.workflow.executor_settings.partition_config: + # option override this behavior. + if ( + os.getenv("SNAKEMAKE_SLURM_PARTITIONS") + and not self.workflow.executor_settings.partition_config + ): partition_file = Path(os.getenv("SNAKEMAKE_SLURM_PARTITIONS")) self.logger.info( f"Reading SLURM partition configuration from " @@ -227,7 +235,7 @@ def __post_init__(self, test_mode: bool = False): read_partition_file(self.workflow.executor_settings.partition_config) if self.workflow.executor_settings.partition_config else None - ) + ) atexit.register(self.clean_old_logs) def shutdown(self) -> None: From 65f10070c46982c463ffc528422ba1b335b08fe3 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 12:14:43 +0100 Subject: [PATCH 15/33] feat: added 'threads' hardening selection score --- snakemake_executor_plugin_slurm/partitions.py | 96 +++++++++++++++---- 1 file changed, 77 insertions(+), 19 deletions(-) diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index 834d2467..d58d826f 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -11,20 +11,37 @@ def read_partition_file(partition_file: Path) -> List["Partition"]: - with open(partition_file, "r") as f: - out = [] - partitions_dict = yaml.safe_load(f)["partitions"] - for partition_name, partition_config in partitions_dict.items(): - if not partition_name or not partition_name.strip(): - raise KeyError("Partition name cannot be empty") - - out.append( - Partition( - name=partition_name, - limits=PartitionLimits(**partition_config), - ) + """Read partition definitions from a YAML file""" + try: + with open(partition_file, "r") as f: + config = yaml.safe_load(f) + except FileNotFoundError: + raise WorkflowError(f"Partition file not found: {partition_file}") + except yaml.YAMLError as e: + raise WorkflowError(f"Error parsing partition file {partition_file}: {e}") + except Exception as e: + raise WorkflowError(f"Unexpected error reading partition file {partition_file}: {e}") + if not isinstance(config, dict) or "partitions" not in config: + raise WorkflowError( + f"Partition file {partition_file} is missing 'partitions' section" + ) + partitions_dict = config["partitions"] + if not isinstance(partitions_dict, dict): + raise WorkflowError( + f"'partitions' section in {partition_file} must be a mapping" + ) + out = [] + for partition_name, partition_config in partitions_dict.items(): + if not partition_name or not partition_name.strip(): + raise KeyError("Partition name cannot be empty") + + out.append( + Partition( + name=partition_name, + limits=PartitionLimits(**partition_config), ) - return out + ) + return out def get_best_partition( @@ -32,11 +49,14 @@ def get_best_partition( job: JobExecutorInterface, logger: LoggerExecutorInterface, ) -> Optional[str]: - scored_partitions = [ - (p, score) - for p in candidate_partitions - if (score := p.score_job_fit(job)) is not None - ] + scored_partitions = [] + for p in candidate_partitions: + score = p.score_job_fit(job) + logger.warning( + f"Partition '{p.name}' score for job {job.name} (threads={job.resources.get('threads')}): {score}" + ) + if score is not None: + scored_partitions.append((p, score)) if scored_partitions: best_partition, best_score = max(scored_partitions, key=lambda x: x[1]) @@ -90,6 +110,26 @@ def parse_gpu_requirements(job: JobExecutorInterface) -> tuple[int, Optional[str return 0, None +def get_effective_threads(job: JobExecutorInterface) -> int: + """ + Get the effective thread count for a job. + First checks job.threads, then falls back to job.resources["threads"]. + This handles cases where threads is specified in the resources block. + """ + threads = job.threads + # If threads is default (1) or not set, check resources + if threads == 1 or threads is None: + resource_threads = job.resources.get("threads") + if resource_threads is not None: + if isinstance(resource_threads, str): + try: + resource_threads = int(resource_threads) + except ValueError: + resource_threads = threads + threads = resource_threads if resource_threads > 1 else threads + return threads + + def get_job_cpu_requirement(job: JobExecutorInterface) -> tuple[int, str]: """ This uses the same logic as snakemake_executor_plugin_slurm_jobstep.get_cpu_setting, but returns a tuple instead of a arg string. # noqa: E501 @@ -143,7 +183,8 @@ def get_job_cpu_requirement(job: JobExecutorInterface) -> tuple[int, str]: return (0, "none") return (cpus_per_gpu, "gpu") - return (job.threads, "task") + # Fall back to effective threads (checks both job.threads and resources.threads) + return (get_effective_threads(job), "task") @dataclass @@ -155,6 +196,7 @@ class PartitionLimits: max_mem_mb: float = inf max_mem_mb_per_cpu: float = inf max_cpus_per_task: float = inf + max_threads: float = inf # SLURM-specific resources max_nodes: float = inf @@ -221,8 +263,24 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: if not isinf(limit): score += job_requirement / limit + # Check thread requirements (check both job.threads and resources.threads) + effective_threads = get_effective_threads(job) + if effective_threads > 0: + if not isinf(self.limits.max_threads) and effective_threads > self.limits.max_threads: + # Debug: partition cannot accommodate threads + return None + if not isinf(self.limits.max_threads): + score += effective_threads / self.limits.max_threads + cpu_count, cpu_type = get_job_cpu_requirement(job) if cpu_type == "task" and cpu_count > 0: + # Check cpu_count against max_threads + if not isinf(self.limits.max_threads) and cpu_count > self.limits.max_threads: + return None + if not isinf(self.limits.max_threads): + score += cpu_count / self.limits.max_threads + + # Also check against max_cpus_per_task if ( not isinf(self.limits.max_cpus_per_task) and cpu_count > self.limits.max_cpus_per_task From 9d01b96a7e6911858ae24520ca11ce5d6eb28375 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 13:00:37 +0100 Subject: [PATCH 16/33] feat: support cluster selection in multi-cluster env --- snakemake_executor_plugin_slurm/partitions.py | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index d58d826f..0e980b59 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -20,7 +20,9 @@ def read_partition_file(partition_file: Path) -> List["Partition"]: except yaml.YAMLError as e: raise WorkflowError(f"Error parsing partition file {partition_file}: {e}") except Exception as e: - raise WorkflowError(f"Unexpected error reading partition file {partition_file}: {e}") + raise WorkflowError( + f"Unexpected error reading partition file {partition_file}: {e}" + ) if not isinstance(config, dict) or "partitions" not in config: raise WorkflowError( f"Partition file {partition_file} is missing 'partitions' section" @@ -35,9 +37,13 @@ def read_partition_file(partition_file: Path) -> List["Partition"]: if not partition_name or not partition_name.strip(): raise KeyError("Partition name cannot be empty") + # Extract optional cluster name from partition config + cluster = partition_config.pop("cluster", None) + out.append( Partition( name=partition_name, + cluster=cluster, limits=PartitionLimits(**partition_config), ) ) @@ -222,6 +228,7 @@ class Partition: name: str limits: PartitionLimits + cluster: Optional[str] = None def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: """ @@ -233,7 +240,6 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: # naive approach here is to just sum the ratio of requested resource to limit, of course this limits us to only consider numerical resources # noqa: E501 # here a higher score indicates a better fit # TODO decide how to handle unspecified limits, for now we assume inf for numerical limits, none for others. # noqa: E501 - score = 0.0 numerical_resources = { @@ -246,6 +252,21 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: "mpi_tasks": self.limits.max_mpi_tasks, } + # Check cluster compatibility, first: + # Accept multiple possible resource names for cluster specification + job_cluster = ( + job.resources.get("slurm_cluster") + or job.resources.get("clusters") + or job.resources.get("cluster") + ) + print( + f"Partition '{self.name}' checking job {job.name} for cluster compatibility: {job_cluster}" + ) + if job_cluster is not None: + # Job specifies a cluster - partition must match + if self.cluster is not None and self.cluster != job_cluster: + return None # Partition is for a different cluster + for resource_key, limit in numerical_resources.items(): job_requirement = job.resources.get(resource_key, 0) # Convert to numeric value if it's a string @@ -266,7 +287,10 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: # Check thread requirements (check both job.threads and resources.threads) effective_threads = get_effective_threads(job) if effective_threads > 0: - if not isinf(self.limits.max_threads) and effective_threads > self.limits.max_threads: + if ( + not isinf(self.limits.max_threads) + and effective_threads > self.limits.max_threads + ): # Debug: partition cannot accommodate threads return None if not isinf(self.limits.max_threads): @@ -275,11 +299,14 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: cpu_count, cpu_type = get_job_cpu_requirement(job) if cpu_type == "task" and cpu_count > 0: # Check cpu_count against max_threads - if not isinf(self.limits.max_threads) and cpu_count > self.limits.max_threads: + if ( + not isinf(self.limits.max_threads) + and cpu_count > self.limits.max_threads + ): return None if not isinf(self.limits.max_threads): score += cpu_count / self.limits.max_threads - + # Also check against max_cpus_per_task if ( not isinf(self.limits.max_cpus_per_task) From 38d7daf332f0ff8ad35d2237b0ca22987621aeb8 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 13:43:30 +0100 Subject: [PATCH 17/33] fix: sound thread checking --- snakemake_executor_plugin_slurm/partitions.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index 0e980b59..a789a498 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -286,11 +286,8 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: # Check thread requirements (check both job.threads and resources.threads) effective_threads = get_effective_threads(job) - if effective_threads > 0: - if ( - not isinf(self.limits.max_threads) - and effective_threads > self.limits.max_threads - ): + if effective_threads is not None and effective_threads > 0: + if not isinf(self.limits.max_threads) and effective_threads > self.limits.max_threads: # Debug: partition cannot accommodate threads return None if not isinf(self.limits.max_threads): From 53ed396d55ec7d841642eb01c384e3a029dee228 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 13:44:31 +0100 Subject: [PATCH 18/33] fix: removed print statement (debugg leftover) --- snakemake_executor_plugin_slurm/partitions.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index a789a498..2459ffd8 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -259,9 +259,7 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: or job.resources.get("clusters") or job.resources.get("cluster") ) - print( - f"Partition '{self.name}' checking job {job.name} for cluster compatibility: {job_cluster}" - ) + if job_cluster is not None: # Job specifies a cluster - partition must match if self.cluster is not None and self.cluster != job_cluster: From 2ee9f09f8b643d39f688f3314030384c82376a78 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 13:47:37 +0100 Subject: [PATCH 19/33] docs: added docs for this PR --- docs/further.md | 29 +++++++++++++++++++ snakemake_executor_plugin_slurm/partitions.py | 6 ++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/docs/further.md b/docs/further.md index dd999d39..b8eba526 100644 --- a/docs/further.md +++ b/docs/further.md @@ -94,12 +94,14 @@ The following limits can be defined for each partition: | `max_nodes` | int | Maximum number of nodes | unlimited | | `max_tasks` | int | Maximum number of tasks | unlimited | | `max_tasks_per_node` | int | Maximum tasks per node | unlimited | +| `max_threads` | int | Maximum threads per node | unlimited | | `max_gpu` | int | Maximum number of GPUs | 0 | | `available_gpu_models` | list[str] | List of available GPU models | none | | `max_cpus_per_gpu` | int | Maximum CPUs per GPU | unlimited | | `supports_mpi` | bool | Whether MPI jobs are supported | true | | `max_mpi_tasks` | int | Maximum MPI tasks | unlimited | | `available_constraints` | list[str] | List of available node constraints | none | +| `cluster` | str | Cluster name in multi-cluster setup | none | ##### Example Partition Configuration @@ -127,6 +129,33 @@ partitions: max_cpus_per_gpu: 8 ``` +The plugin supports automatic partition selection on clusters with SLURM multi-cluster setup. You can specify which cluster each partition belongs to in your partition configuration file: + +```yaml +partitions: + d-standard: + cluster: "deviating" + max_runtime: "6d" + max_nodes: 1 + max_threads: 127 + d-parallel: + cluster: "deviating" + supports_mpi: true + max_threads: 128 + max_runtime: "6d" + standard: + cluster: "other" + max_runtime: "6d" + max_nodes: 1 + max_threads: 127 + parallel: + cluster: "other" + supports_mpi: true + max_threads: 128 + max_runtime: "6d" +``` + + ##### How Partition Selection Works When automatic partition selection is enabled, the plugin evaluates each job's resource requirements against the defined partition limits to ensure the job is placed on a partition that can accommodate all of its requirements. When multiple partitions are compatible, the plugin uses a scoring algorithm that favors partitions with limits closer to the job's needs, preventing jobs from being assigned to partitions with excessively high resource limits. diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index 2459ffd8..5acad5b7 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -58,8 +58,8 @@ def get_best_partition( scored_partitions = [] for p in candidate_partitions: score = p.score_job_fit(job) - logger.warning( - f"Partition '{p.name}' score for job {job.name} (threads={job.resources.get('threads')}): {score}" + logger.debug( + f"Partition '{p.name}' score for job {job.name}: {score}" ) if score is not None: scored_partitions.append((p, score)) @@ -67,7 +67,7 @@ def get_best_partition( if scored_partitions: best_partition, best_score = max(scored_partitions, key=lambda x: x[1]) partition = best_partition.name - logger.warning( + logger.info( f"Auto-selected partition '{partition}' for job {job.name} " f"with score {best_score:.3f}" ) From 744b8b3b33c078ee07562ff71a5909cfaef856fd Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 13:48:02 +0100 Subject: [PATCH 20/33] tests: added tests for this PR --- tests/test_partition_selection.py | 806 ++++++++++++++++++++++++++++++ tests/tests.py | 501 +------------------ 2 files changed, 811 insertions(+), 496 deletions(-) create mode 100644 tests/test_partition_selection.py diff --git a/tests/test_partition_selection.py b/tests/test_partition_selection.py new file mode 100644 index 00000000..3595959c --- /dev/null +++ b/tests/test_partition_selection.py @@ -0,0 +1,806 @@ +import tempfile +import yaml + +import pytest +from pathlib import Path +from unittest.mock import MagicMock +from snakemake_interface_common.exceptions import WorkflowError +from snakemake_executor_plugin_slurm.partitions import ( + read_partition_file, + get_best_partition, +) + + + +class TestPartitionSelection: + @pytest.fixture + def basic_partition_config(self): + """Basic partition configuration with two partitions.""" + return { + "partitions": { + "default": { + "max_runtime": 1440, + "max_mem_mb": 128000, + "max_cpus_per_task": 32, + "supports_mpi": True, + }, + "gpu": { + "max_runtime": 720, + "max_mem_mb": 256000, + "max_gpu": 4, + "available_gpu_models": ["a100", "v100"], + "supports_mpi": False, + }, + } + } + + @pytest.fixture + def minimal_partition_config(self): + """Minimal partition configuration.""" + return {"partitions": {"minimal": {}}} + + @pytest.fixture + def comprehensive_partition_config(self): + """Comprehensive partition configuration with all limit types.""" + return { + "partitions": { + "comprehensive": { + # Standard resources + "max_runtime": 2880, + "max_mem_mb": 500000, + "max_mem_mb_per_cpu": 8000, + "max_cpus_per_task": 64, + # SLURM-specific resources + "max_nodes": 4, + "max_tasks": 256, + "max_tasks_per_node": 64, + # GPU resources + "max_gpu": 8, + "available_gpu_models": ["a100", "v100", "rtx3090"], + "max_cpus_per_gpu": 16, + # MPI resources + "supports_mpi": True, + "max_mpi_tasks": 512, + # Node features/constraints + "available_constraints": ["intel", "avx2", "highmem"], + } + } + } + + @pytest.fixture + def empty_partitions_config(self): + """Empty partitions configuration.""" + return {"partitions": {}} + + @pytest.fixture + def missing_name_config(self): + """Configuration with missing name field.""" + return {"partitions": {"": {}}} # Empty partition name + + @pytest.fixture + def invalid_key_config(self): + """Configuration with invalid key.""" + return {"invalid_key": []} + + @pytest.fixture + def temp_yaml_file(self): + """Helper fixture to create temporary YAML files.""" + + def _create_temp_file(config): + with tempfile.NamedTemporaryFile( + mode="w", suffix=".yaml", delete=False + ) as f: + yaml.dump(config, f) + return Path(f.name) + + return _create_temp_file + + def test_read_valid_partition_file(self, basic_partition_config, temp_yaml_file): + """Test reading a valid partition configuration file.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 2 + + # Check first partition + assert partitions[0].name == "default" + assert partitions[0].limits.max_runtime == 1440 + assert partitions[0].limits.max_mem_mb == 128000 + assert partitions[0].limits.max_cpus_per_task == 32 + assert partitions[0].limits.supports_mpi is True + + # Check second partition + assert partitions[1].name == "gpu" + assert partitions[1].limits.max_runtime == 720 + assert partitions[1].limits.max_gpu == 4 + assert partitions[1].limits.available_gpu_models == ["a100", "v100"] + assert partitions[1].limits.supports_mpi is False + + finally: + temp_path.unlink() + + def test_read_minimal_partition_file( + self, minimal_partition_config, temp_yaml_file + ): + """Test reading a partition file with minimal configuration.""" + from math import isinf + + temp_path = temp_yaml_file(minimal_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 1 + assert partitions[0].name == "minimal" + + # Check that all limits are inf + limits = partitions[0].limits + assert isinf(limits.max_runtime) + assert isinf(limits.max_mem_mb) + assert limits.max_gpu == 0 + assert limits.supports_mpi is True + + finally: + temp_path.unlink() + + def test_read_partition_file_with_all_limits( + self, comprehensive_partition_config, temp_yaml_file + ): + """Test reading a partition file with all possible limit types.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 1 + limits = partitions[0].limits + + # Check standard resources + assert limits.max_runtime == 2880 + assert limits.max_mem_mb == 500000 + assert limits.max_mem_mb_per_cpu == 8000 + assert limits.max_cpus_per_task == 64 + + # Check SLURM-specific resources + assert limits.max_nodes == 4 + assert limits.max_tasks == 256 + assert limits.max_tasks_per_node == 64 + + # Check GPU resources + assert limits.max_gpu == 8 + assert limits.available_gpu_models == ["a100", "v100", "rtx3090"] + assert limits.max_cpus_per_gpu == 16 + + # Check MPI resources + assert limits.supports_mpi is True + assert limits.max_mpi_tasks == 512 + + # Check constraints + assert limits.available_constraints == ["intel", "avx2", "highmem"] + + finally: + temp_path.unlink() + + def test_read_empty_partitions_list(self, empty_partitions_config, temp_yaml_file): + """Test reading a file with empty partitions list.""" + temp_path = temp_yaml_file(empty_partitions_config) + + try: + partitions = read_partition_file(temp_path) + assert len(partitions) == 0 + + finally: + temp_path.unlink() + + def test_read_nonexistent_file(self): + """Test reading a non-existent file raises appropriate error.""" + + nonexistent_path = Path("/nonexistent/path/to/file.yaml") + + with pytest.raises(WorkflowError, match="Partition file not found"): + read_partition_file(nonexistent_path) + + def test_read_invalid_yaml_file(self): + """Test reading an invalid YAML file raises appropriate error.""" + + invalid_yaml = "partitions:\n - name: test\n invalid: {\n" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(invalid_yaml) + temp_path = Path(f.name) + + try: + with pytest.raises(WorkflowError, match="Error parsing partition file"): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + def test_read_file_missing_partitions_key(self, invalid_key_config, temp_yaml_file): + """Test reading a file without 'partitions' key raises KeyError.""" + + temp_path = temp_yaml_file(invalid_key_config) + + try: + with pytest.raises(WorkflowError, match="missing 'partitions' section"): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + def test_read_partition_missing_required_fields( + self, missing_name_config, temp_yaml_file + ): + """Test reading partition with missing required fields.""" + temp_path = temp_yaml_file(missing_name_config) + + try: + with pytest.raises(KeyError): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + @pytest.fixture + def mock_job(self): + """Create a mock job with configurable resources and threads.""" + + def _create_job(threads=1, **resources): + mock_resources = MagicMock() + mock_resources.get.side_effect = lambda key, default=None: resources.get( + key, default + ) + + mock_job = MagicMock() + mock_job.resources = mock_resources + mock_job.threads = threads + mock_job.name = "test_job" + mock_job.is_group.return_value = False + mock_job.jobid = 1 + return mock_job + + return _create_job + + @pytest.fixture + def mock_logger(self): + """Create a mock logger.""" + return MagicMock() + + def test_basic_partition_selection_cpu_job( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test selecting partition for a basic CPU job.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=4, mem_mb=16000, runtime=60) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'default' partition as it supports CPU jobs + assert selected_partition == "default" + # Check that the final call contains the auto-selection message + assert mock_logger.warning.call_count >= 1 + assert ( + "Auto-selected partition 'default'" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_basic_partition_selection_gpu_job( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test selecting partition for a GPU job.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job( + threads=2, mem_mb=32000, runtime=300, gpu=2, gpu_model="a100" + ) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'gpu' partition as it supports GPU jobs + assert selected_partition == "gpu" + assert mock_logger.warning.call_count >= 1 + assert ( + "Auto-selected partition 'gpu'" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_no_suitable_partition( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test when no partition can accommodate the job requirements.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires more memory than any partition allows + job = mock_job(threads=1, mem_mb=500000, runtime=60) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None when no suitable partition found + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] + finally: + temp_path.unlink() + + def test_comprehensive_partition_selection( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test partition selection with comprehensive limits.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job( + threads=8, + mem_mb=64000, + runtime=1200, + gpu=2, + gpu_model="a100", + constraint="intel,avx2", + ) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select the comprehensive partition + assert selected_partition == "comprehensive" + assert mock_logger.warning.call_count >= 1 + assert ( + "Auto-selected partition 'comprehensive'" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_constraint_mismatch( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with constraints not available in partition.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires constraint not available in partition + job = mock_job(threads=2, constraint="amd,gpu_direct") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None due to constraint mismatch + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] + finally: + temp_path.unlink() + + def test_mpi_job_selection( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test MPI job partition selection.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, mpi=True, tasks=16) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'default' partition as it supports MPI, 'gpu' doesn't + assert selected_partition == "default" + assert mock_logger.warning.call_count >= 1 + assert ( + "Auto-selected partition 'default'" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_gpu_model_mismatch( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test GPU job with unsupported GPU model.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Request GPU model not available in any partition + job = mock_job(threads=2, gpu=1, gpu_model="rtx4090") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None due to GPU model mismatch + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] + finally: + temp_path.unlink() + + def test_empty_partitions_list( + self, empty_partitions_config, temp_yaml_file, mock_job, mock_logger + ): + """Test partition selection with empty partitions list.""" + temp_path = temp_yaml_file(empty_partitions_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, mem_mb=1000) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None when no partitions available + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] + finally: + temp_path.unlink() + + def test_gres_gpu_specification( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test GPU job specified via gres parameter.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=2, gres="gpu:v100:1", runtime=400) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'gpu' partition as it supports v100 GPUs + assert selected_partition == "gpu" + assert mock_logger.warning.call_count >= 1 + assert ( + "Auto-selected partition 'gpu'" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_cpus_per_task_specification( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with cpus_per_task specification.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, cpus_per_task=32, mem_mb=64000) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select comprehensive partition as it can handle 32 cpus per task + assert selected_partition == "comprehensive" + assert mock_logger.warning.call_count >= 1 + assert ( + "Auto-selected partition 'comprehensive'" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_runtime_exceeds_limit( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with runtime exceeding partition limits.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires more runtime than gpu partition allows (720 min max) + job = mock_job(threads=1, runtime=1000, gpu=1, gpu_model="a100") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None as no partition can accommodate the runtime + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] + finally: + temp_path.unlink() + + @pytest.fixture + def multicluster_partition_config(self): + """Partition configuration with multiple clusters.""" + return { + "partitions": { + "normal-small": { + "cluster": "normal", + "max_runtime": 360, + "max_threads": 32, + "max_mem_mb": 64000, + }, + "normal-large": { + "cluster": "normal", + "max_runtime": 1440, + "max_threads": 128, + "max_mem_mb": 256000, + }, + "deviating-small": { + "cluster": "deviating", + "max_runtime": 360, + "max_threads": 64, + "max_mem_mb": 128000, + }, + "deviating-gpu": { + "cluster": "deviating", + "max_runtime": 720, + "max_threads": 32, + "max_gpu": 4, + "available_gpu_models": ["a100"], + }, + } + } + + @pytest.fixture + def max_threads_partition_config(self): + """Partition configuration specifically testing max_threads limits.""" + return { + "partitions": { + "tiny": { + "max_threads": 8, + "max_mem_mb": 32000, + }, + "small": { + "max_threads": 32, + "max_mem_mb": 64000, + }, + "medium": { + "max_threads": 127, + "max_mem_mb": 128000, + }, + "large": { + "max_threads": 256, + "max_mem_mb": 512000, + }, + } + } + + def test_cluster_specification_via_slurm_cluster( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test cluster specification using slurm_cluster resource.""" + temp_path = temp_yaml_file(multicluster_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Verify cluster field is read correctly + # Find partitions by name instead of assuming order + partition_map = {p.name: p for p in partitions} + assert partition_map["normal-small"].cluster == "normal" + assert partition_map["deviating-small"].cluster == "deviating" + + # Job targeting 'normal' cluster + job = mock_job(threads=16, mem_mb=32000, slurm_cluster="normal") + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select a partition from 'normal' cluster + assert selected_partition in ["normal-small", "normal-large"] + assert mock_logger.warning.call_count >= 1 + finally: + temp_path.unlink() + + def test_cluster_specification_via_clusters( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test cluster specification using clusters resource.""" + temp_path = temp_yaml_file(multicluster_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job targeting 'deviating' cluster via 'clusters' resource + job = mock_job(threads=48, mem_mb=64000, clusters="deviating") + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select deviating-small partition + assert selected_partition == "deviating-small" + assert mock_logger.warning.call_count >= 1 + finally: + temp_path.unlink() + + def test_cluster_specification_via_cluster( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test cluster specification using cluster resource.""" + temp_path = temp_yaml_file(multicluster_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job targeting 'normal' cluster via 'cluster' resource + job = mock_job(threads=64, mem_mb=128000, cluster="normal") + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select normal-large partition + assert selected_partition == "normal-large" + assert mock_logger.warning.call_count >= 1 + finally: + temp_path.unlink() + + def test_cluster_mismatch_excludes_partitions( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test that jobs requesting a cluster exclude partitions from other clusters.""" + temp_path = temp_yaml_file(multicluster_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job requesting GPU on 'normal' cluster (only deviating has GPU) + job = mock_job(threads=16, gpu=2, gpu_model="a100", slurm_cluster="normal") + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None as normal cluster has no GPU partitions + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + finally: + temp_path.unlink() + + def test_job_without_cluster_uses_any_partition( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test that jobs without cluster specification can use any partition.""" + temp_path = temp_yaml_file(multicluster_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job without cluster specification + job = mock_job(threads=16, mem_mb=32000) + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select a partition (any cluster is acceptable) + assert selected_partition is not None + assert mock_logger.warning.call_count >= 1 + finally: + temp_path.unlink() + + def test_max_threads_with_threads_resource( + self, max_threads_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test max_threads limit with threads resource.""" + temp_path = temp_yaml_file(max_threads_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job with 4 threads - should prefer tiny partition + job = mock_job(threads=4, mem_mb=16000) + selected_partition = get_best_partition(partitions, job, mock_logger) + assert selected_partition == "tiny" + + # Job with 16 threads - should prefer small partition + mock_logger.reset_mock() + job = mock_job(threads=16, mem_mb=32000) + selected_partition = get_best_partition(partitions, job, mock_logger) + assert selected_partition == "small" + + # Job with 64 threads - should use medium partition + mock_logger.reset_mock() + job = mock_job(threads=64, mem_mb=64000) + selected_partition = get_best_partition(partitions, job, mock_logger) + assert selected_partition == "medium" + + finally: + temp_path.unlink() + + def test_max_threads_with_cpus_per_task_resource( + self, max_threads_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test max_threads limit with cpus_per_task resource.""" + temp_path = temp_yaml_file(max_threads_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job with cpus_per_task=128 - exceeds medium (127), needs large + job = mock_job(threads=None, cpus_per_task=128, mem_mb=128000) + selected_partition = get_best_partition(partitions, job, mock_logger) + assert selected_partition == "large" + + # Job with cpus_per_task=127 - exactly matches medium + mock_logger.reset_mock() + job = mock_job(threads=None, cpus_per_task=127, mem_mb=64000) + selected_partition = get_best_partition(partitions, job, mock_logger) + assert selected_partition == "medium" + + finally: + temp_path.unlink() + + def test_max_threads_excludes_too_small_partitions( + self, max_threads_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test that partitions with insufficient max_threads are excluded.""" + temp_path = temp_yaml_file(max_threads_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job with 128 threads - tiny, small, medium cannot accommodate + job = mock_job(threads=128, mem_mb=256000) + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Only large partition should be selected + assert selected_partition == "large" + assert mock_logger.warning.call_count >= 1 + + finally: + temp_path.unlink() + + def test_max_threads_job_exceeds_all_partitions( + self, max_threads_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job requiring more threads than any partition supports.""" + temp_path = temp_yaml_file(max_threads_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job with 512 threads - exceeds all partitions + job = mock_job(threads=512, mem_mb=128000) + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] + + finally: + temp_path.unlink() + + def test_multicluster_with_max_threads( + self, temp_yaml_file, mock_job, mock_logger + ): + """Test combined cluster and max_threads constraints.""" + config = { + "partitions": { + "normal-small": { + "cluster": "normal", + "max_threads": 64, + "max_mem_mb": 128000, + }, + "normal-large": { + "cluster": "normal", + "max_threads": 256, + "max_mem_mb": 512000, + }, + "deviating-medium": { + "cluster": "deviating", + "max_threads": 128, + "max_mem_mb": 256000, + }, + } + } + temp_path = temp_yaml_file(config) + + try: + partitions = read_partition_file(temp_path) + + # Job targeting normal cluster with 128 threads + # normal-small (64) is too small, should use normal-large (256) + job = mock_job(threads=128, mem_mb=256000, slurm_cluster="normal") + selected_partition = get_best_partition(partitions, job, mock_logger) + + assert selected_partition == "normal-large" + assert mock_logger.warning.call_count >= 1 + + # Job targeting deviating cluster with 128 threads + # Should exactly match deviating-medium + mock_logger.reset_mock() + job = mock_job(threads=128, mem_mb=128000, slurm_cluster="deviating") + selected_partition = get_best_partition(partitions, job, mock_logger) + + assert selected_partition == "deviating-medium" + assert mock_logger.warning.call_count >= 1 + + finally: + temp_path.unlink() \ No newline at end of file diff --git a/tests/tests.py b/tests/tests.py index d881886b..cae305d2 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -7,8 +7,7 @@ from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase from unittest.mock import MagicMock, patch import pytest -import tempfile -import yaml + from snakemake_executor_plugin_slurm import ExecutorSettings from snakemake_executor_plugin_slurm.efficiency_report import ( @@ -17,14 +16,14 @@ ) from snakemake_executor_plugin_slurm.utils import set_gres_string from snakemake_executor_plugin_slurm.submit_string import get_submit_command -from snakemake_executor_plugin_slurm.partitions import ( - read_partition_file, - get_best_partition, -) + from snakemake_executor_plugin_slurm.validation import validate_slurm_extra from snakemake_interface_common.exceptions import WorkflowError import pandas as pd +# Import partition selection tests +from .test_partition_selection import * # noqa: F401, F403 + class TestWorkflows(snakemake.common.tests.TestWorkflowsLocalStorageBase): __test__ = True @@ -793,496 +792,6 @@ def test_wildcard_slash_replacement(self): # Verify no slashes remain in the wildcard string assert "/" not in wildcard_str - -class TestPartitionSelection: - @pytest.fixture - def basic_partition_config(self): - """Basic partition configuration with two partitions.""" - return { - "partitions": { - "default": { - "max_runtime": 1440, - "max_mem_mb": 128000, - "max_cpus_per_task": 32, - "supports_mpi": True, - }, - "gpu": { - "max_runtime": 720, - "max_mem_mb": 256000, - "max_gpu": 4, - "available_gpu_models": ["a100", "v100"], - "supports_mpi": False, - }, - } - } - - @pytest.fixture - def minimal_partition_config(self): - """Minimal partition configuration.""" - return {"partitions": {"minimal": {}}} - - @pytest.fixture - def comprehensive_partition_config(self): - """Comprehensive partition configuration with all limit types.""" - return { - "partitions": { - "comprehensive": { - # Standard resources - "max_runtime": 2880, - "max_mem_mb": 500000, - "max_mem_mb_per_cpu": 8000, - "max_cpus_per_task": 64, - # SLURM-specific resources - "max_nodes": 4, - "max_tasks": 256, - "max_tasks_per_node": 64, - # GPU resources - "max_gpu": 8, - "available_gpu_models": ["a100", "v100", "rtx3090"], - "max_cpus_per_gpu": 16, - # MPI resources - "supports_mpi": True, - "max_mpi_tasks": 512, - # Node features/constraints - "available_constraints": ["intel", "avx2", "highmem"], - } - } - } - - @pytest.fixture - def empty_partitions_config(self): - """Empty partitions configuration.""" - return {"partitions": {}} - - @pytest.fixture - def missing_name_config(self): - """Configuration with missing name field.""" - return {"partitions": {"": {}}} # Empty partition name - - @pytest.fixture - def invalid_key_config(self): - """Configuration with invalid key.""" - return {"invalid_key": []} - - @pytest.fixture - def temp_yaml_file(self): - """Helper fixture to create temporary YAML files.""" - - def _create_temp_file(config): - with tempfile.NamedTemporaryFile( - mode="w", suffix=".yaml", delete=False - ) as f: - yaml.dump(config, f) - return Path(f.name) - - return _create_temp_file - - def test_read_valid_partition_file(self, basic_partition_config, temp_yaml_file): - """Test reading a valid partition configuration file.""" - temp_path = temp_yaml_file(basic_partition_config) - - try: - partitions = read_partition_file(temp_path) - - assert len(partitions) == 2 - - # Check first partition - assert partitions[0].name == "default" - assert partitions[0].limits.max_runtime == 1440 - assert partitions[0].limits.max_mem_mb == 128000 - assert partitions[0].limits.max_cpus_per_task == 32 - assert partitions[0].limits.supports_mpi is True - - # Check second partition - assert partitions[1].name == "gpu" - assert partitions[1].limits.max_runtime == 720 - assert partitions[1].limits.max_gpu == 4 - assert partitions[1].limits.available_gpu_models == ["a100", "v100"] - assert partitions[1].limits.supports_mpi is False - - finally: - temp_path.unlink() - - def test_read_minimal_partition_file( - self, minimal_partition_config, temp_yaml_file - ): - """Test reading a partition file with minimal configuration.""" - from math import isinf - - temp_path = temp_yaml_file(minimal_partition_config) - - try: - partitions = read_partition_file(temp_path) - - assert len(partitions) == 1 - assert partitions[0].name == "minimal" - - # Check that all limits are inf - limits = partitions[0].limits - assert isinf(limits.max_runtime) - assert isinf(limits.max_mem_mb) - assert limits.max_gpu == 0 - assert limits.supports_mpi is True - - finally: - temp_path.unlink() - - def test_read_partition_file_with_all_limits( - self, comprehensive_partition_config, temp_yaml_file - ): - """Test reading a partition file with all possible limit types.""" - temp_path = temp_yaml_file(comprehensive_partition_config) - - try: - partitions = read_partition_file(temp_path) - - assert len(partitions) == 1 - limits = partitions[0].limits - - # Check standard resources - assert limits.max_runtime == 2880 - assert limits.max_mem_mb == 500000 - assert limits.max_mem_mb_per_cpu == 8000 - assert limits.max_cpus_per_task == 64 - - # Check SLURM-specific resources - assert limits.max_nodes == 4 - assert limits.max_tasks == 256 - assert limits.max_tasks_per_node == 64 - - # Check GPU resources - assert limits.max_gpu == 8 - assert limits.available_gpu_models == ["a100", "v100", "rtx3090"] - assert limits.max_cpus_per_gpu == 16 - - # Check MPI resources - assert limits.supports_mpi is True - assert limits.max_mpi_tasks == 512 - - # Check constraints - assert limits.available_constraints == ["intel", "avx2", "highmem"] - - finally: - temp_path.unlink() - - def test_read_empty_partitions_list(self, empty_partitions_config, temp_yaml_file): - """Test reading a file with empty partitions list.""" - temp_path = temp_yaml_file(empty_partitions_config) - - try: - partitions = read_partition_file(temp_path) - assert len(partitions) == 0 - - finally: - temp_path.unlink() - - def test_read_nonexistent_file(self): - """Test reading a non-existent file raises appropriate error.""" - nonexistent_path = Path("/nonexistent/path/to/file.yaml") - - with pytest.raises(FileNotFoundError): - read_partition_file(nonexistent_path) - - def test_read_invalid_yaml_file(self): - """Test reading an invalid YAML file raises appropriate error.""" - invalid_yaml = "partitions:\n - name: test\n invalid: {\n" - - with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: - f.write(invalid_yaml) - temp_path = Path(f.name) - - try: - with pytest.raises(yaml.YAMLError): - read_partition_file(temp_path) - finally: - temp_path.unlink() - - def test_read_file_missing_partitions_key(self, invalid_key_config, temp_yaml_file): - """Test reading a file without 'partitions' key raises KeyError.""" - temp_path = temp_yaml_file(invalid_key_config) - - try: - with pytest.raises(KeyError): - read_partition_file(temp_path) - finally: - temp_path.unlink() - - def test_read_partition_missing_required_fields( - self, missing_name_config, temp_yaml_file - ): - """Test reading partition with missing required fields.""" - temp_path = temp_yaml_file(missing_name_config) - - try: - with pytest.raises(KeyError): - read_partition_file(temp_path) - finally: - temp_path.unlink() - - @pytest.fixture - def mock_job(self): - """Create a mock job with configurable resources and threads.""" - - def _create_job(threads=1, **resources): - mock_resources = MagicMock() - mock_resources.get.side_effect = lambda key, default=None: resources.get( - key, default - ) - - mock_job = MagicMock() - mock_job.resources = mock_resources - mock_job.threads = threads - mock_job.name = "test_job" - mock_job.is_group.return_value = False - mock_job.jobid = 1 - return mock_job - - return _create_job - - @pytest.fixture - def mock_logger(self): - """Create a mock logger.""" - return MagicMock() - - def test_basic_partition_selection_cpu_job( - self, basic_partition_config, temp_yaml_file, mock_job, mock_logger - ): - """Test selecting partition for a basic CPU job.""" - temp_path = temp_yaml_file(basic_partition_config) - - try: - partitions = read_partition_file(temp_path) - job = mock_job(threads=4, mem_mb=16000, runtime=60) - - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should select 'default' partition as it supports CPU jobs - assert selected_partition == "default" - mock_logger.warning.assert_called_once() - assert ( - "Auto-selected partition 'default'" - in mock_logger.warning.call_args[0][0] - ) - finally: - temp_path.unlink() - - def test_basic_partition_selection_gpu_job( - self, basic_partition_config, temp_yaml_file, mock_job, mock_logger - ): - """Test selecting partition for a GPU job.""" - temp_path = temp_yaml_file(basic_partition_config) - - try: - partitions = read_partition_file(temp_path) - job = mock_job( - threads=2, mem_mb=32000, runtime=300, gpu=2, gpu_model="a100" - ) - - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should select 'gpu' partition as it supports GPU jobs - assert selected_partition == "gpu" - mock_logger.warning.assert_called_once() - assert ( - "Auto-selected partition 'gpu'" in mock_logger.warning.call_args[0][0] - ) - finally: - temp_path.unlink() - - def test_no_suitable_partition( - self, basic_partition_config, temp_yaml_file, mock_job, mock_logger - ): - """Test when no partition can accommodate the job requirements.""" - temp_path = temp_yaml_file(basic_partition_config) - - try: - partitions = read_partition_file(temp_path) - # Job requires more memory than any partition allows - job = mock_job(threads=1, mem_mb=500000, runtime=60) - - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should return None when no suitable partition found - assert selected_partition is None - mock_logger.warning.assert_called_once() - assert "No suitable partition found" in mock_logger.warning.call_args[0][0] - finally: - temp_path.unlink() - - def test_comprehensive_partition_selection( - self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger - ): - """Test partition selection with comprehensive limits.""" - temp_path = temp_yaml_file(comprehensive_partition_config) - - try: - partitions = read_partition_file(temp_path) - job = mock_job( - threads=8, - mem_mb=64000, - runtime=1200, - gpu=2, - gpu_model="a100", - constraint="intel,avx2", - ) - - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should select the comprehensive partition - assert selected_partition == "comprehensive" - mock_logger.warning.assert_called_once() - assert ( - "Auto-selected partition 'comprehensive'" - in mock_logger.warning.call_args[0][0] - ) - finally: - temp_path.unlink() - - def test_constraint_mismatch( - self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger - ): - """Test job with constraints not available in partition.""" - temp_path = temp_yaml_file(comprehensive_partition_config) - - try: - partitions = read_partition_file(temp_path) - # Job requires constraint not available in partition - job = mock_job(threads=2, constraint="amd,gpu_direct") - - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should return None due to constraint mismatch - assert selected_partition is None - mock_logger.warning.assert_called_once() - assert "No suitable partition found" in mock_logger.warning.call_args[0][0] - finally: - temp_path.unlink() - - def test_mpi_job_selection( - self, basic_partition_config, temp_yaml_file, mock_job, mock_logger - ): - """Test MPI job partition selection.""" - temp_path = temp_yaml_file(basic_partition_config) - - try: - partitions = read_partition_file(temp_path) - job = mock_job(threads=1, mpi=True, tasks=16) - - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should select 'default' partition as it supports MPI, 'gpu' doesn't - assert selected_partition == "default" - mock_logger.warning.assert_called_once() - assert ( - "Auto-selected partition 'default'" - in mock_logger.warning.call_args[0][0] - ) - finally: - temp_path.unlink() - - def test_gpu_model_mismatch( - self, basic_partition_config, temp_yaml_file, mock_job, mock_logger - ): - """Test GPU job with unsupported GPU model.""" - temp_path = temp_yaml_file(basic_partition_config) - - try: - partitions = read_partition_file(temp_path) - # Request GPU model not available in any partition - job = mock_job(threads=2, gpu=1, gpu_model="rtx4090") - - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should return None due to GPU model mismatch - assert selected_partition is None - mock_logger.warning.assert_called_once() - assert "No suitable partition found" in mock_logger.warning.call_args[0][0] - finally: - temp_path.unlink() - - def test_empty_partitions_list( - self, empty_partitions_config, temp_yaml_file, mock_job, mock_logger - ): - """Test partition selection with empty partitions list.""" - temp_path = temp_yaml_file(empty_partitions_config) - - try: - partitions = read_partition_file(temp_path) - job = mock_job(threads=1, mem_mb=1000) - - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should return None when no partitions available - assert selected_partition is None - mock_logger.warning.assert_called_once() - assert "No suitable partition found" in mock_logger.warning.call_args[0][0] - finally: - temp_path.unlink() - - def test_gres_gpu_specification( - self, basic_partition_config, temp_yaml_file, mock_job, mock_logger - ): - """Test GPU job specified via gres parameter.""" - temp_path = temp_yaml_file(basic_partition_config) - - try: - partitions = read_partition_file(temp_path) - job = mock_job(threads=2, gres="gpu:v100:1", runtime=400) - - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should select 'gpu' partition as it supports v100 GPUs - assert selected_partition == "gpu" - mock_logger.warning.assert_called_once() - assert ( - "Auto-selected partition 'gpu'" in mock_logger.warning.call_args[0][0] - ) - finally: - temp_path.unlink() - - def test_cpus_per_task_specification( - self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger - ): - """Test job with cpus_per_task specification.""" - temp_path = temp_yaml_file(comprehensive_partition_config) - - try: - partitions = read_partition_file(temp_path) - job = mock_job(threads=1, cpus_per_task=32, mem_mb=64000) - - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should select comprehensive partition as it can handle 32 cpus per task - assert selected_partition == "comprehensive" - mock_logger.warning.assert_called_once() - assert ( - "Auto-selected partition 'comprehensive'" - in mock_logger.warning.call_args[0][0] - ) - finally: - temp_path.unlink() - - def test_runtime_exceeds_limit( - self, basic_partition_config, temp_yaml_file, mock_job, mock_logger - ): - """Test job with runtime exceeding partition limits.""" - temp_path = temp_yaml_file(basic_partition_config) - - try: - partitions = read_partition_file(temp_path) - # Job requires more runtime than gpu partition allows (720 min max) - job = mock_job(threads=1, runtime=1000, gpu=1, gpu_model="a100") - - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should return None as no partition can accommodate the runtime - assert selected_partition is None - mock_logger.warning.assert_called_once() - assert "No suitable partition found" in mock_logger.warning.call_args[0][0] - finally: - temp_path.unlink() - - class TestSlurmExtraValidation: """Test cases for the validate_slurm_extra function.""" From be655d15c2329888f598efccccc61092e1ad5196 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 13:50:52 +0100 Subject: [PATCH 21/33] fix: formatting --- snakemake_executor_plugin_slurm/partitions.py | 9 +- tests/test_partition_selection.py | 145 ++++++++++-------- tests/tests.py | 1 + 3 files changed, 86 insertions(+), 69 deletions(-) diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index 5acad5b7..514503b8 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -58,9 +58,7 @@ def get_best_partition( scored_partitions = [] for p in candidate_partitions: score = p.score_job_fit(job) - logger.debug( - f"Partition '{p.name}' score for job {job.name}: {score}" - ) + logger.debug(f"Partition '{p.name}' score for job {job.name}: {score}") if score is not None: scored_partitions.append((p, score)) @@ -285,7 +283,10 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: # Check thread requirements (check both job.threads and resources.threads) effective_threads = get_effective_threads(job) if effective_threads is not None and effective_threads > 0: - if not isinf(self.limits.max_threads) and effective_threads > self.limits.max_threads: + if ( + not isinf(self.limits.max_threads) + and effective_threads > self.limits.max_threads + ): # Debug: partition cannot accommodate threads return None if not isinf(self.limits.max_threads): diff --git a/tests/test_partition_selection.py b/tests/test_partition_selection.py index 3595959c..a36da66d 100644 --- a/tests/test_partition_selection.py +++ b/tests/test_partition_selection.py @@ -11,7 +11,6 @@ ) - class TestPartitionSelection: @pytest.fixture def basic_partition_config(self): @@ -196,7 +195,7 @@ def test_read_empty_partitions_list(self, empty_partitions_config, temp_yaml_fil def test_read_nonexistent_file(self): """Test reading a non-existent file raises appropriate error.""" - + nonexistent_path = Path("/nonexistent/path/to/file.yaml") with pytest.raises(WorkflowError, match="Partition file not found"): @@ -204,7 +203,7 @@ def test_read_nonexistent_file(self): def test_read_invalid_yaml_file(self): """Test reading an invalid YAML file raises appropriate error.""" - + invalid_yaml = "partitions:\n - name: test\n invalid: {\n" with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: @@ -219,7 +218,7 @@ def test_read_invalid_yaml_file(self): def test_read_file_missing_partitions_key(self, invalid_key_config, temp_yaml_file): """Test reading a file without 'partitions' key raises KeyError.""" - + temp_path = temp_yaml_file(invalid_key_config) try: @@ -306,7 +305,7 @@ def test_basic_partition_selection_gpu_job( assert selected_partition == "gpu" assert mock_logger.warning.call_count >= 1 assert ( - "Auto-selected partition 'gpu'" + "Auto-selected partition 'gpu'" in mock_logger.warning.call_args_list[-1][0][0] ) finally: @@ -328,7 +327,10 @@ def test_no_suitable_partition( # Should return None when no suitable partition found assert selected_partition is None assert mock_logger.warning.call_count >= 1 - assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) finally: temp_path.unlink() @@ -377,7 +379,10 @@ def test_constraint_mismatch( # Should return None due to constraint mismatch assert selected_partition is None assert mock_logger.warning.call_count >= 1 - assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) finally: temp_path.unlink() @@ -419,7 +424,10 @@ def test_gpu_model_mismatch( # Should return None due to GPU model mismatch assert selected_partition is None assert mock_logger.warning.call_count >= 1 - assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) finally: temp_path.unlink() @@ -438,7 +446,10 @@ def test_empty_partitions_list( # Should return None when no partitions available assert selected_partition is None assert mock_logger.warning.call_count >= 1 - assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) finally: temp_path.unlink() @@ -458,7 +469,7 @@ def test_gres_gpu_specification( assert selected_partition == "gpu" assert mock_logger.warning.call_count >= 1 assert ( - "Auto-selected partition 'gpu'" + "Auto-selected partition 'gpu'" in mock_logger.warning.call_args_list[-1][0][0] ) finally: @@ -502,7 +513,10 @@ def test_runtime_exceeds_limit( # Should return None as no partition can accommodate the runtime assert selected_partition is None assert mock_logger.warning.call_count >= 1 - assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) finally: temp_path.unlink() @@ -564,29 +578,29 @@ def max_threads_partition_config(self): } def test_cluster_specification_via_slurm_cluster( - self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger - ): - """Test cluster specification using slurm_cluster resource.""" - temp_path = temp_yaml_file(multicluster_partition_config) - - try: - partitions = read_partition_file(temp_path) - - # Verify cluster field is read correctly - # Find partitions by name instead of assuming order - partition_map = {p.name: p for p in partitions} - assert partition_map["normal-small"].cluster == "normal" - assert partition_map["deviating-small"].cluster == "deviating" - - # Job targeting 'normal' cluster - job = mock_job(threads=16, mem_mb=32000, slurm_cluster="normal") - selected_partition = get_best_partition(partitions, job, mock_logger) - - # Should select a partition from 'normal' cluster - assert selected_partition in ["normal-small", "normal-large"] - assert mock_logger.warning.call_count >= 1 - finally: - temp_path.unlink() + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test cluster specification using slurm_cluster resource.""" + temp_path = temp_yaml_file(multicluster_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Verify cluster field is read correctly + # Find partitions by name instead of assuming order + partition_map = {p.name: p for p in partitions} + assert partition_map["normal-small"].cluster == "normal" + assert partition_map["deviating-small"].cluster == "deviating" + + # Job targeting 'normal' cluster + job = mock_job(threads=16, mem_mb=32000, slurm_cluster="normal") + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select a partition from 'normal' cluster + assert selected_partition in ["normal-small", "normal-large"] + assert mock_logger.warning.call_count >= 1 + finally: + temp_path.unlink() def test_cluster_specification_via_clusters( self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger @@ -596,11 +610,11 @@ def test_cluster_specification_via_clusters( try: partitions = read_partition_file(temp_path) - + # Job targeting 'deviating' cluster via 'clusters' resource job = mock_job(threads=48, mem_mb=64000, clusters="deviating") selected_partition = get_best_partition(partitions, job, mock_logger) - + # Should select deviating-small partition assert selected_partition == "deviating-small" assert mock_logger.warning.call_count >= 1 @@ -615,11 +629,11 @@ def test_cluster_specification_via_cluster( try: partitions = read_partition_file(temp_path) - + # Job targeting 'normal' cluster via 'cluster' resource job = mock_job(threads=64, mem_mb=128000, cluster="normal") selected_partition = get_best_partition(partitions, job, mock_logger) - + # Should select normal-large partition assert selected_partition == "normal-large" assert mock_logger.warning.call_count >= 1 @@ -634,11 +648,11 @@ def test_cluster_mismatch_excludes_partitions( try: partitions = read_partition_file(temp_path) - + # Job requesting GPU on 'normal' cluster (only deviating has GPU) job = mock_job(threads=16, gpu=2, gpu_model="a100", slurm_cluster="normal") selected_partition = get_best_partition(partitions, job, mock_logger) - + # Should return None as normal cluster has no GPU partitions assert selected_partition is None assert mock_logger.warning.call_count >= 1 @@ -653,11 +667,11 @@ def test_job_without_cluster_uses_any_partition( try: partitions = read_partition_file(temp_path) - + # Job without cluster specification job = mock_job(threads=16, mem_mb=32000) selected_partition = get_best_partition(partitions, job, mock_logger) - + # Should select a partition (any cluster is acceptable) assert selected_partition is not None assert mock_logger.warning.call_count >= 1 @@ -672,24 +686,24 @@ def test_max_threads_with_threads_resource( try: partitions = read_partition_file(temp_path) - + # Job with 4 threads - should prefer tiny partition job = mock_job(threads=4, mem_mb=16000) selected_partition = get_best_partition(partitions, job, mock_logger) assert selected_partition == "tiny" - + # Job with 16 threads - should prefer small partition mock_logger.reset_mock() job = mock_job(threads=16, mem_mb=32000) selected_partition = get_best_partition(partitions, job, mock_logger) assert selected_partition == "small" - + # Job with 64 threads - should use medium partition mock_logger.reset_mock() job = mock_job(threads=64, mem_mb=64000) selected_partition = get_best_partition(partitions, job, mock_logger) assert selected_partition == "medium" - + finally: temp_path.unlink() @@ -701,18 +715,18 @@ def test_max_threads_with_cpus_per_task_resource( try: partitions = read_partition_file(temp_path) - + # Job with cpus_per_task=128 - exceeds medium (127), needs large job = mock_job(threads=None, cpus_per_task=128, mem_mb=128000) selected_partition = get_best_partition(partitions, job, mock_logger) assert selected_partition == "large" - + # Job with cpus_per_task=127 - exactly matches medium mock_logger.reset_mock() job = mock_job(threads=None, cpus_per_task=127, mem_mb=64000) selected_partition = get_best_partition(partitions, job, mock_logger) assert selected_partition == "medium" - + finally: temp_path.unlink() @@ -724,15 +738,15 @@ def test_max_threads_excludes_too_small_partitions( try: partitions = read_partition_file(temp_path) - + # Job with 128 threads - tiny, small, medium cannot accommodate job = mock_job(threads=128, mem_mb=256000) selected_partition = get_best_partition(partitions, job, mock_logger) - + # Only large partition should be selected assert selected_partition == "large" assert mock_logger.warning.call_count >= 1 - + finally: temp_path.unlink() @@ -744,22 +758,23 @@ def test_max_threads_job_exceeds_all_partitions( try: partitions = read_partition_file(temp_path) - + # Job with 512 threads - exceeds all partitions job = mock_job(threads=512, mem_mb=128000) selected_partition = get_best_partition(partitions, job, mock_logger) - + # Should return None assert selected_partition is None assert mock_logger.warning.call_count >= 1 - assert "No suitable partition found" in mock_logger.warning.call_args_list[-1][0][0] - + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: temp_path.unlink() - def test_multicluster_with_max_threads( - self, temp_yaml_file, mock_job, mock_logger - ): + def test_multicluster_with_max_threads(self, temp_yaml_file, mock_job, mock_logger): """Test combined cluster and max_threads constraints.""" config = { "partitions": { @@ -784,23 +799,23 @@ def test_multicluster_with_max_threads( try: partitions = read_partition_file(temp_path) - + # Job targeting normal cluster with 128 threads # normal-small (64) is too small, should use normal-large (256) job = mock_job(threads=128, mem_mb=256000, slurm_cluster="normal") selected_partition = get_best_partition(partitions, job, mock_logger) - + assert selected_partition == "normal-large" assert mock_logger.warning.call_count >= 1 - + # Job targeting deviating cluster with 128 threads # Should exactly match deviating-medium mock_logger.reset_mock() job = mock_job(threads=128, mem_mb=128000, slurm_cluster="deviating") selected_partition = get_best_partition(partitions, job, mock_logger) - + assert selected_partition == "deviating-medium" assert mock_logger.warning.call_count >= 1 - + finally: - temp_path.unlink() \ No newline at end of file + temp_path.unlink() diff --git a/tests/tests.py b/tests/tests.py index cae305d2..d5809174 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -792,6 +792,7 @@ def test_wildcard_slash_replacement(self): # Verify no slashes remain in the wildcard string assert "/" not in wildcard_str + class TestSlurmExtraValidation: """Test cases for the validate_slurm_extra function.""" From dd40f24b4f50bf690841150e9ca0d7d3de83b11e Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 14:33:40 +0100 Subject: [PATCH 22/33] fix: formatting --- tests/test_partition_selection.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_partition_selection.py b/tests/test_partition_selection.py index a36da66d..23a2796f 100644 --- a/tests/test_partition_selection.py +++ b/tests/test_partition_selection.py @@ -643,7 +643,10 @@ def test_cluster_specification_via_cluster( def test_cluster_mismatch_excludes_partitions( self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger ): - """Test that jobs requesting a cluster exclude partitions from other clusters.""" + """ + Test that jobs requesting a cluster exclude partitions from other + clusters. + """ temp_path = temp_yaml_file(multicluster_partition_config) try: From fd2336e69861f666c12539e9d54c6c3d7eaaa1ad Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 14:33:57 +0100 Subject: [PATCH 23/33] fix: relative import --- tests/tests.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/tests.py b/tests/tests.py index d5809174..f194ece7 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -22,7 +22,11 @@ import pandas as pd # Import partition selection tests -from .test_partition_selection import * # noqa: F401, F403 +try: + from .test_partition_selection import * +except ImportError: + # When run directly, the relative import may fail + from test_partition_selection import * class TestWorkflows(snakemake.common.tests.TestWorkflowsLocalStorageBase): From adb276c857cc81c9379f1968cf2a97ecc33ecb23 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 15:19:29 +0100 Subject: [PATCH 24/33] feat: first step to refactoring, trying Snakemake's dpath --- tests/tests.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index f194ece7..a5c31820 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -21,13 +21,12 @@ from snakemake_interface_common.exceptions import WorkflowError import pandas as pd -# Import partition selection tests -try: - from .test_partition_selection import * -except ImportError: - # When run directly, the relative import may fail - from test_partition_selection import * +sys.path.insert(0, os.path.dirname(__file__)) +from snakemake.common import run, dpath + +def test_partition_selection(): + dpath(run("test_partition_selection")) class TestWorkflows(snakemake.common.tests.TestWorkflowsLocalStorageBase): __test__ = True From f96cdc567b5dd549f24bcdd0e86e7fca08847036 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 15:20:42 +0100 Subject: [PATCH 25/33] fix: added missing whitespace --- snakemake_executor_plugin_slurm/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 70fbb28e..1a5f2b12 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -121,7 +121,7 @@ class ExecutorSettings(ExecutorSettingsBase): "help": "Path to YAML file defining partition limits for dynamic " "partition selection. When provided, jobs will be dynamically " "assigned to the best-fitting partition based on their resource " - "requirements. See documentation for complete list of available limits." + "requirements. See documentation for complete list of available limits. " "Alternatively, the environment variable SNAKEMAKE_SLURM_PARTITIONS " "can be set to point to such a file. " "If both are set, this flag takes precedence.", From 64e99b9a1138f0314342dedc5bbabf6b0578d7f9 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 15:22:04 +0100 Subject: [PATCH 26/33] fix: reordered --- tests/tests.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index a5c31820..07e9459c 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,5 +1,8 @@ import os import re +import sys + +sys.path.insert(0, os.path.dirname(__file__)) from pathlib import Path from typing import Optional @@ -21,8 +24,6 @@ from snakemake_interface_common.exceptions import WorkflowError import pandas as pd -sys.path.insert(0, os.path.dirname(__file__)) - from snakemake.common import run, dpath def test_partition_selection(): From 5b53def2faee76d0ea6e70c159df609da5a4314b Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 15:25:04 +0100 Subject: [PATCH 27/33] fix: no negative value for cpus_per_task --- snakemake_executor_plugin_slurm/partitions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index 514503b8..d627b6ff 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -167,7 +167,7 @@ def get_job_cpu_requirement(job: JobExecutorInterface) -> tuple[int, str]: cpus_per_task = int(cpus_per_task) if cpus_per_task < 0: - return (0, "none") + raise WorkflowError("cpus_per_task cannot be negative") # ensure that at least 1 cpu is requested because 0 is not allowed by slurm return (max(1, cpus_per_task), "task") From 0781a2b44490287ae528e8be141c5c80a7901fe3 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 15:30:32 +0100 Subject: [PATCH 28/33] fix: mock logger level --- tests/test_partition_selection.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/test_partition_selection.py b/tests/test_partition_selection.py index 23a2796f..fc1161bd 100644 --- a/tests/test_partition_selection.py +++ b/tests/test_partition_selection.py @@ -279,10 +279,10 @@ def test_basic_partition_selection_cpu_job( # Should select 'default' partition as it supports CPU jobs assert selected_partition == "default" # Check that the final call contains the auto-selection message - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 assert ( "Auto-selected partition 'default'" - in mock_logger.warning.call_args_list[-1][0][0] + in mock_logger.info.call_args_list[-1][0][0] ) finally: temp_path.unlink() @@ -303,10 +303,10 @@ def test_basic_partition_selection_gpu_job( # Should select 'gpu' partition as it supports GPU jobs assert selected_partition == "gpu" - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 assert ( "Auto-selected partition 'gpu'" - in mock_logger.warning.call_args_list[-1][0][0] + in mock_logger.info.call_args_list[-1][0][0] ) finally: temp_path.unlink() @@ -326,10 +326,10 @@ def test_no_suitable_partition( # Should return None when no suitable partition found assert selected_partition is None - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 assert ( "No suitable partition found" - in mock_logger.warning.call_args_list[-1][0][0] + in mock_logger.info.call_args_list[-1][0][0] ) finally: temp_path.unlink() @@ -355,10 +355,10 @@ def test_comprehensive_partition_selection( # Should select the comprehensive partition assert selected_partition == "comprehensive" - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 assert ( "Auto-selected partition 'comprehensive'" - in mock_logger.warning.call_args_list[-1][0][0] + in mock_logger.info.call_args_list[-1][0][0] ) finally: temp_path.unlink() @@ -400,10 +400,10 @@ def test_mpi_job_selection( # Should select 'default' partition as it supports MPI, 'gpu' doesn't assert selected_partition == "default" - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 assert ( "Auto-selected partition 'default'" - in mock_logger.warning.call_args_list[-1][0][0] + in mock_logger.info.call_args_list[-1][0][0] ) finally: temp_path.unlink() @@ -467,10 +467,10 @@ def test_gres_gpu_specification( # Should select 'gpu' partition as it supports v100 GPUs assert selected_partition == "gpu" - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 assert ( "Auto-selected partition 'gpu'" - in mock_logger.warning.call_args_list[-1][0][0] + in mock_logger.info.call_args_list[-1][0][0] ) finally: temp_path.unlink() @@ -489,10 +489,10 @@ def test_cpus_per_task_specification( # Should select comprehensive partition as it can handle 32 cpus per task assert selected_partition == "comprehensive" - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 assert ( "Auto-selected partition 'comprehensive'" - in mock_logger.warning.call_args_list[-1][0][0] + in mock_logger.info.call_args_list[-1][0][0] ) finally: temp_path.unlink() From f8538ed0fb46f5b8d3416f55db3d9ff4f17925e0 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 15:34:14 +0100 Subject: [PATCH 29/33] fix: threads check abbreviated --- snakemake_executor_plugin_slurm/partitions.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index d627b6ff..c8032a21 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -125,12 +125,15 @@ def get_effective_threads(job: JobExecutorInterface) -> int: if threads == 1 or threads is None: resource_threads = job.resources.get("threads") if resource_threads is not None: - if isinstance(resource_threads, str): - try: - resource_threads = int(resource_threads) - except ValueError: - resource_threads = threads + try: + resource_threads = int(resource_threads) + except ValueError: + resource_threads = threads threads = resource_threads if resource_threads > 1 else threads + + # ensuring a valid thread count + if threads is None or threads < 1: + threads = 1 return threads From 8aeb8c58696eb6a518ae48ca8c7e399cd14cb51b Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 15:36:49 +0100 Subject: [PATCH 30/33] fix: import order --- tests/tests.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index 07e9459c..67f0c7ea 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,17 +1,12 @@ import os import re import sys - -sys.path.insert(0, os.path.dirname(__file__)) - from pathlib import Path from typing import Optional import snakemake.common.tests from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase from unittest.mock import MagicMock, patch import pytest - - from snakemake_executor_plugin_slurm import ExecutorSettings from snakemake_executor_plugin_slurm.efficiency_report import ( parse_sacct_data, @@ -26,9 +21,13 @@ from snakemake.common import run, dpath +sys.path.insert(0, os.path.dirname(__file__)) + + def test_partition_selection(): dpath(run("test_partition_selection")) + class TestWorkflows(snakemake.common.tests.TestWorkflowsLocalStorageBase): __test__ = True From db00ab1fdd0ad3a7ad8072af5f3a9a50cabf2d29 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 15:38:04 +0100 Subject: [PATCH 31/33] fix: formatting, gnarf! --- snakemake_executor_plugin_slurm/partitions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index c8032a21..43a45f03 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -130,7 +130,7 @@ def get_effective_threads(job: JobExecutorInterface) -> int: except ValueError: resource_threads = threads threads = resource_threads if resource_threads > 1 else threads - + # ensuring a valid thread count if threads is None or threads < 1: threads = 1 From eb9933dcb9eddb1bf79ee2b878c495d14bf7a70f Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 16:37:07 +0100 Subject: [PATCH 32/33] fix: attempt to run modularized tests in one go without additional imports --- .github/workflows/ci.yml | 2 +- tests/tests.py | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d60eea60..e260a74b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -98,7 +98,7 @@ jobs: poetry install - name: Run pytest - run: poetry run coverage run -m pytest tests/tests.py -sv --tb=short --disable-warnings + run: poetry run coverage run -m pytest tests/ -sv --tb=short --disable-warnings - name: Run Coverage run: poetry run coverage report -m diff --git a/tests/tests.py b/tests/tests.py index 67f0c7ea..b683fbda 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,6 +1,5 @@ import os import re -import sys from pathlib import Path from typing import Optional import snakemake.common.tests @@ -19,14 +18,6 @@ from snakemake_interface_common.exceptions import WorkflowError import pandas as pd -from snakemake.common import run, dpath - -sys.path.insert(0, os.path.dirname(__file__)) - - -def test_partition_selection(): - dpath(run("test_partition_selection")) - class TestWorkflows(snakemake.common.tests.TestWorkflowsLocalStorageBase): __test__ = True From 7c662bbff2a3564662e8e1b87563fa4863e323bd Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Thu, 20 Nov 2025 16:47:42 +0100 Subject: [PATCH 33/33] fix: mock warnings --- tests/test_partition_selection.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/test_partition_selection.py b/tests/test_partition_selection.py index fc1161bd..57ba4f20 100644 --- a/tests/test_partition_selection.py +++ b/tests/test_partition_selection.py @@ -326,10 +326,10 @@ def test_no_suitable_partition( # Should return None when no suitable partition found assert selected_partition is None - assert mock_logger.info.call_count >= 1 + assert mock_logger.warning.call_count >= 1 assert ( "No suitable partition found" - in mock_logger.info.call_args_list[-1][0][0] + in mock_logger.warning.call_args_list[-1][0][0] ) finally: temp_path.unlink() @@ -598,7 +598,7 @@ def test_cluster_specification_via_slurm_cluster( # Should select a partition from 'normal' cluster assert selected_partition in ["normal-small", "normal-large"] - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 finally: temp_path.unlink() @@ -617,7 +617,7 @@ def test_cluster_specification_via_clusters( # Should select deviating-small partition assert selected_partition == "deviating-small" - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 finally: temp_path.unlink() @@ -636,7 +636,7 @@ def test_cluster_specification_via_cluster( # Should select normal-large partition assert selected_partition == "normal-large" - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 finally: temp_path.unlink() @@ -677,7 +677,7 @@ def test_job_without_cluster_uses_any_partition( # Should select a partition (any cluster is acceptable) assert selected_partition is not None - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 finally: temp_path.unlink() @@ -748,7 +748,7 @@ def test_max_threads_excludes_too_small_partitions( # Only large partition should be selected assert selected_partition == "large" - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 finally: temp_path.unlink() @@ -809,7 +809,7 @@ def test_multicluster_with_max_threads(self, temp_yaml_file, mock_job, mock_logg selected_partition = get_best_partition(partitions, job, mock_logger) assert selected_partition == "normal-large" - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 # Job targeting deviating cluster with 128 threads # Should exactly match deviating-medium @@ -818,7 +818,7 @@ def test_multicluster_with_max_threads(self, temp_yaml_file, mock_job, mock_logg selected_partition = get_best_partition(partitions, job, mock_logger) assert selected_partition == "deviating-medium" - assert mock_logger.warning.call_count >= 1 + assert mock_logger.info.call_count >= 1 finally: temp_path.unlink()