diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d60eea60..e260a74b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -98,7 +98,7 @@ jobs: poetry install - name: Run pytest - run: poetry run coverage run -m pytest tests/tests.py -sv --tb=short --disable-warnings + run: poetry run coverage run -m pytest tests/ -sv --tb=short --disable-warnings - name: Run Coverage run: poetry run coverage report -m diff --git a/docs/further.md b/docs/further.md index 51ba55a9..b8eba526 100644 --- a/docs/further.md +++ b/docs/further.md @@ -64,6 +64,108 @@ See the [snakemake documentation on profiles](https://snakemake.readthedocs.io/e How and where you set configurations on factors like file size or increasing the runtime with every `attempt` of running a job (if [`--retries` is greater than `0`](https://snakemake.readthedocs.io/en/stable/executing/cli.html#snakemake.cli-get_argument_parser-behavior)). [There are detailed examples for these in the snakemake documentation.](https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#dynamic-resources) +#### Automatic Partition Selection + +The SLURM executor plugin supports automatic partition selection based on job resource requirements, via the command line option `--slurm-partition-config`. This feature allows the plugin to choose the most appropriate partition for each job, without the need to manually specify partitions for different job types. This also enables variable partition selection as a job's resource requirements change based on [dynamic resources](#dynamic-resource-specification), ensuring that jobs are always scheduled to an appropriate partition. + +*Jobs that explicitly specify a `slurm_partition` resource will bypass automatic selection and use the specified partition directly.* + +##### Partition Limits Specification + +To enable automatic partition selection, create a YAML configuration file that defines the available partitions and their resource limits. This file should be structured as follows: + +```yaml +partitions: + some_partition: + max_runtime: 100 + another_partition: + ... +``` +Where `some_partition` and `another_partition` are the names of the partition on your cluster, according to `sinfo`. + +The following limits can be defined for each partition: + +| Parameter | Type | Description | Default | +| ----------------------- | --------- | ---------------------------------- | --------- | +| `max_runtime` | int | Maximum walltime in minutes | unlimited | +| `max_mem_mb` | int | Maximum total memory in MB | unlimited | +| `max_mem_mb_per_cpu` | int | Maximum memory per CPU in MB | unlimited | +| `max_cpus_per_task` | int | Maximum CPUs per task | unlimited | +| `max_nodes` | int | Maximum number of nodes | unlimited | +| `max_tasks` | int | Maximum number of tasks | unlimited | +| `max_tasks_per_node` | int | Maximum tasks per node | unlimited | +| `max_threads` | int | Maximum threads per node | unlimited | +| `max_gpu` | int | Maximum number of GPUs | 0 | +| `available_gpu_models` | list[str] | List of available GPU models | none | +| `max_cpus_per_gpu` | int | Maximum CPUs per GPU | unlimited | +| `supports_mpi` | bool | Whether MPI jobs are supported | true | +| `max_mpi_tasks` | int | Maximum MPI tasks | unlimited | +| `available_constraints` | list[str] | List of available node constraints | none | +| `cluster` | str | Cluster name in multi-cluster setup | none | + +##### Example Partition Configuration + +```yaml +partitions: + standard: + max_runtime: 720 # 12 hours + max_mem_mb: 64000 # 64 GB + max_cpus_per_task: 24 + max_nodes: 1 + + highmem: + max_runtime: 1440 # 24 hours + max_mem_mb: 512000 # 512 GB + max_mem_mb_per_cpu: 16000 + max_cpus_per_task: 48 + max_nodes: 1 + + gpu: + max_runtime: 2880 # 48 hours + max_mem_mb: 128000 # 128 GB + max_cpus_per_task: 32 + max_gpu: 8 + available_gpu_models: ["a100", "v100", "rtx3090"] + max_cpus_per_gpu: 8 +``` + +The plugin supports automatic partition selection on clusters with SLURM multi-cluster setup. You can specify which cluster each partition belongs to in your partition configuration file: + +```yaml +partitions: + d-standard: + cluster: "deviating" + max_runtime: "6d" + max_nodes: 1 + max_threads: 127 + d-parallel: + cluster: "deviating" + supports_mpi: true + max_threads: 128 + max_runtime: "6d" + standard: + cluster: "other" + max_runtime: "6d" + max_nodes: 1 + max_threads: 127 + parallel: + cluster: "other" + supports_mpi: true + max_threads: 128 + max_runtime: "6d" +``` + + +##### How Partition Selection Works + +When automatic partition selection is enabled, the plugin evaluates each job's resource requirements against the defined partition limits to ensure the job is placed on a partition that can accommodate all of its requirements. When multiple partitions are compatible, the plugin uses a scoring algorithm that favors partitions with limits closer to the job's needs, preventing jobs from being assigned to partitions with excessively high resource limits. + +The scoring algorithm calculates a score by summing the ratios of requested resources to partition limits (e.g., if a job requests 8 CPUs and a partition allows 16, this contributes 0.5 to the score). Higher scores indicate better resource utilization, so a job requesting 8 CPUs would prefer a 16-CPU partition (score 0.5) over a 64-CPU partition (score 0.125). + +##### Fallback Behavior + +If no suitable partition is found based on the job's resource requirements, the plugin falls back to the default SLURM behavior, which typically uses the cluster's default partition or any partition specified explicitly in the job's resources. + #### Standard Resources diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 583b0773..1a5f2b12 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -3,6 +3,7 @@ __email__ = "johannes.koester@uni-due.de" __license__ = "MIT" +import atexit import csv from io import StringIO import os @@ -35,6 +36,7 @@ ) from .efficiency_report import create_efficiency_report from .submit_string import get_submit_command +from .partitions import read_partition_file, get_best_partition from .validation import validate_slurm_extra @@ -113,6 +115,20 @@ class ExecutorSettings(ExecutorSettingsBase): "required": False, }, ) + partition_config: Optional[Path] = field( + default=None, + metadata={ + "help": "Path to YAML file defining partition limits for dynamic " + "partition selection. When provided, jobs will be dynamically " + "assigned to the best-fitting partition based on their resource " + "requirements. See documentation for complete list of available limits. " + "Alternatively, the environment variable SNAKEMAKE_SLURM_PARTITIONS " + "can be set to point to such a file. " + "If both are set, this flag takes precedence.", + "env_var": False, + "required": False, + }, + ) efficiency_report: bool = field( default=False, metadata={ @@ -201,6 +217,26 @@ def __post_init__(self, test_mode: bool = False): if self.workflow.executor_settings.logdir else Path(".snakemake/slurm_logs").resolve() ) + # Check the environment variable "SNAKEMAKE_SLURM_PARTITIONS", + # if set, read the partitions from the given file. Let the CLI + # option override this behavior. + if ( + os.getenv("SNAKEMAKE_SLURM_PARTITIONS") + and not self.workflow.executor_settings.partition_config + ): + partition_file = Path(os.getenv("SNAKEMAKE_SLURM_PARTITIONS")) + self.logger.info( + f"Reading SLURM partition configuration from " + f"environment variable file: {partition_file}" + ) + self._partitions = read_partition_file(partition_file) + else: + self._partitions = ( + read_partition_file(self.workflow.executor_settings.partition_config) + if self.workflow.executor_settings.partition_config + else None + ) + atexit.register(self.clean_old_logs) def shutdown(self) -> None: """ @@ -305,6 +341,8 @@ def run_job(self, job: JobExecutorInterface): if job.resources.get("slurm_extra"): self.check_slurm_extra(job) + # NOTE removed partition from below, such that partition + # selection can benefit from resource checking as the call is built up. job_params = { "run_uuid": self.run_uuid, "slurm_logfile": slurm_logfile, @@ -698,9 +736,13 @@ def get_partition_arg(self, job: JobExecutorInterface): returns a default partition, if applicable else raises an error - implicetly. """ + partition = None if job.resources.get("slurm_partition"): partition = job.resources.slurm_partition - else: + elif self._partitions: + partition = get_best_partition(self._partitions, job, self.logger) + # we didnt get a partition yet so try fallback. + if not partition: if self._fallback_partition is None: self._fallback_partition = self.get_default_partition(job) partition = self._fallback_partition diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py new file mode 100644 index 00000000..43a45f03 --- /dev/null +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -0,0 +1,352 @@ +from dataclasses import dataclass +from typing import Optional, List +import yaml +from pathlib import Path +from math import inf, isinf +from snakemake_interface_common.exceptions import WorkflowError +from snakemake_interface_executor_plugins.jobs import ( + JobExecutorInterface, +) +from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface + + +def read_partition_file(partition_file: Path) -> List["Partition"]: + """Read partition definitions from a YAML file""" + try: + with open(partition_file, "r") as f: + config = yaml.safe_load(f) + except FileNotFoundError: + raise WorkflowError(f"Partition file not found: {partition_file}") + except yaml.YAMLError as e: + raise WorkflowError(f"Error parsing partition file {partition_file}: {e}") + except Exception as e: + raise WorkflowError( + f"Unexpected error reading partition file {partition_file}: {e}" + ) + if not isinstance(config, dict) or "partitions" not in config: + raise WorkflowError( + f"Partition file {partition_file} is missing 'partitions' section" + ) + partitions_dict = config["partitions"] + if not isinstance(partitions_dict, dict): + raise WorkflowError( + f"'partitions' section in {partition_file} must be a mapping" + ) + out = [] + for partition_name, partition_config in partitions_dict.items(): + if not partition_name or not partition_name.strip(): + raise KeyError("Partition name cannot be empty") + + # Extract optional cluster name from partition config + cluster = partition_config.pop("cluster", None) + + out.append( + Partition( + name=partition_name, + cluster=cluster, + limits=PartitionLimits(**partition_config), + ) + ) + return out + + +def get_best_partition( + candidate_partitions: List["Partition"], + job: JobExecutorInterface, + logger: LoggerExecutorInterface, +) -> Optional[str]: + scored_partitions = [] + for p in candidate_partitions: + score = p.score_job_fit(job) + logger.debug(f"Partition '{p.name}' score for job {job.name}: {score}") + if score is not None: + scored_partitions.append((p, score)) + + if scored_partitions: + best_partition, best_score = max(scored_partitions, key=lambda x: x[1]) + partition = best_partition.name + logger.info( + f"Auto-selected partition '{partition}' for job {job.name} " + f"with score {best_score:.3f}" + ) + return partition + else: + logger.warning( + f"No suitable partition found for job {job.name} based on " + f"resource requirements. Falling back to default behavior." + ) + return None + + +def parse_gpu_requirements(job: JobExecutorInterface) -> tuple[int, Optional[str]]: + """Parse GPU requirements from job resources. Returns (count, model)""" + gpu_required = job.resources.get("gpu", 0) + gres = job.resources.get("gres", "") + + # Convert to int if it's a string representation of a number + if isinstance(gpu_required, str): + try: + gpu_required = int(gpu_required) + except ValueError: + gpu_required = 0 + + # Ensure gres is a string + if not isinstance(gres, str): + gres = str(gres) if gres else "" + + if "gpu" in gres and gpu_required: + raise WorkflowError( + "GPU resource specified in both 'gpu' and 'gres'. These are mutually exclusive." # noqa: E501 + ) + + if gpu_required: + return int(gpu_required), job.resources.get("gpu_model") + elif "gpu" in gres: + # Parse gres string format: gpu: or gpu:: + gpu_parts = [part for part in gres.split(",") if part.strip().startswith("gpu")] + if gpu_parts: + gpu_spec = gpu_parts[0].strip().split(":") + if len(gpu_spec) == 2: # gpu: + return int(gpu_spec[1]), None + elif len(gpu_spec) == 3: # gpu:: + return int(gpu_spec[2]), gpu_spec[1] + + return 0, None + + +def get_effective_threads(job: JobExecutorInterface) -> int: + """ + Get the effective thread count for a job. + First checks job.threads, then falls back to job.resources["threads"]. + This handles cases where threads is specified in the resources block. + """ + threads = job.threads + # If threads is default (1) or not set, check resources + if threads == 1 or threads is None: + resource_threads = job.resources.get("threads") + if resource_threads is not None: + try: + resource_threads = int(resource_threads) + except ValueError: + resource_threads = threads + threads = resource_threads if resource_threads > 1 else threads + + # ensuring a valid thread count + if threads is None or threads < 1: + threads = 1 + return threads + + +def get_job_cpu_requirement(job: JobExecutorInterface) -> tuple[int, str]: + """ + This uses the same logic as snakemake_executor_plugin_slurm_jobstep.get_cpu_setting, but returns a tuple instead of a arg string. # noqa: E501 + """ + + gpu_required = job.resources.get("gpu", 0) + gres = job.resources.get("gres", "") + + # Convert gpu_required to int if it's a string + if isinstance(gpu_required, str): + try: + gpu_required = int(gpu_required) + except ValueError: + gpu_required = 0 + + # Ensure gres is a string for the "in" check + if not isinstance(gres, str): + gres = str(gres) if gres else "" + + has_gpu = bool(gpu_required) or "gpu" in gres + + cpus_per_task = job.resources.get("cpus_per_task") + if cpus_per_task is not None: + # Convert to int if it's a string + if isinstance(cpus_per_task, str): + try: + cpus_per_task = int(cpus_per_task) + except ValueError: + cpus_per_task = 0 + else: + cpus_per_task = int(cpus_per_task) + + if cpus_per_task < 0: + raise WorkflowError("cpus_per_task cannot be negative") + # ensure that at least 1 cpu is requested because 0 is not allowed by slurm + return (max(1, cpus_per_task), "task") + + elif has_gpu: + cpus_per_gpu = job.resources.get("cpus_per_gpu") + if cpus_per_gpu is not None: + # Convert to int if it's a string + if isinstance(cpus_per_gpu, str): + try: + cpus_per_gpu = int(cpus_per_gpu) + except ValueError: + cpus_per_gpu = 0 + else: + cpus_per_gpu = int(cpus_per_gpu) + + if cpus_per_gpu <= 0: + return (0, "none") + return (cpus_per_gpu, "gpu") + + # Fall back to effective threads (checks both job.threads and resources.threads) + return (get_effective_threads(job), "task") + + +@dataclass +class PartitionLimits: + """Represents resource limits for a SLURM partition""" + + # Standard resources + max_runtime: float = inf # minutes + max_mem_mb: float = inf + max_mem_mb_per_cpu: float = inf + max_cpus_per_task: float = inf + max_threads: float = inf + + # SLURM-specific resources + max_nodes: float = inf + max_tasks: float = inf + max_tasks_per_node: float = inf + + # GPU resources + max_gpu: int = 0 + available_gpu_models: Optional[List[str]] = None + max_cpus_per_gpu: float = inf + + # MPI resources + supports_mpi: bool = True + max_mpi_tasks: float = inf + + # Node features/constraints + available_constraints: Optional[List[str]] = None + + +@dataclass +class Partition: + """Represents a SLURM partition with its properties and limits""" + + name: str + limits: PartitionLimits + cluster: Optional[str] = None + + def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: + """ + Check if a job can run on this partition. If not return none. + Calculate a score for how well a partition fits the job requirements + """ + + # try to score how closely a job matches a partition's limits, in order to handle case where multiple partitions can run a given job # noqa: E501 + # naive approach here is to just sum the ratio of requested resource to limit, of course this limits us to only consider numerical resources # noqa: E501 + # here a higher score indicates a better fit + # TODO decide how to handle unspecified limits, for now we assume inf for numerical limits, none for others. # noqa: E501 + score = 0.0 + + numerical_resources = { + "mem_mb": self.limits.max_mem_mb, + "mem_mb_per_cpu": self.limits.max_mem_mb_per_cpu, + "runtime": self.limits.max_runtime, + "nodes": self.limits.max_nodes, + "tasks": self.limits.max_tasks, + "tasks_per_node": self.limits.max_tasks_per_node, + "mpi_tasks": self.limits.max_mpi_tasks, + } + + # Check cluster compatibility, first: + # Accept multiple possible resource names for cluster specification + job_cluster = ( + job.resources.get("slurm_cluster") + or job.resources.get("clusters") + or job.resources.get("cluster") + ) + + if job_cluster is not None: + # Job specifies a cluster - partition must match + if self.cluster is not None and self.cluster != job_cluster: + return None # Partition is for a different cluster + + for resource_key, limit in numerical_resources.items(): + job_requirement = job.resources.get(resource_key, 0) + # Convert to numeric value if it's a string + if isinstance(job_requirement, str): + try: + job_requirement = float(job_requirement) + except ValueError: + job_requirement = 0 + elif not isinstance(job_requirement, (int, float)): + job_requirement = 0 + + if job_requirement > 0: + if not isinf(limit) and job_requirement > limit: + return None + if not isinf(limit): + score += job_requirement / limit + + # Check thread requirements (check both job.threads and resources.threads) + effective_threads = get_effective_threads(job) + if effective_threads is not None and effective_threads > 0: + if ( + not isinf(self.limits.max_threads) + and effective_threads > self.limits.max_threads + ): + # Debug: partition cannot accommodate threads + return None + if not isinf(self.limits.max_threads): + score += effective_threads / self.limits.max_threads + + cpu_count, cpu_type = get_job_cpu_requirement(job) + if cpu_type == "task" and cpu_count > 0: + # Check cpu_count against max_threads + if ( + not isinf(self.limits.max_threads) + and cpu_count > self.limits.max_threads + ): + return None + if not isinf(self.limits.max_threads): + score += cpu_count / self.limits.max_threads + + # Also check against max_cpus_per_task + if ( + not isinf(self.limits.max_cpus_per_task) + and cpu_count > self.limits.max_cpus_per_task + ): + return None + if not isinf(self.limits.max_cpus_per_task): + score += cpu_count / self.limits.max_cpus_per_task + elif cpu_type == "gpu" and cpu_count > 0: + if ( + not isinf(self.limits.max_cpus_per_gpu) + and cpu_count > self.limits.max_cpus_per_gpu + ): + return None + if not isinf(self.limits.max_cpus_per_gpu): + score += cpu_count / self.limits.max_cpus_per_gpu + + gpu_count, gpu_model = parse_gpu_requirements(job) + if gpu_count > 0: + if self.limits.max_gpu == 0 or gpu_count > self.limits.max_gpu: + return None + score += gpu_count / self.limits.max_gpu + + if gpu_model and self.limits.available_gpu_models: + if gpu_model not in self.limits.available_gpu_models: + return None + + if job.resources.get("mpi") and not self.limits.supports_mpi: + return None + + constraint = job.resources.get("constraint") + if constraint and self.limits.available_constraints: + # Ensure constraint is a string + if not isinstance(constraint, str): + constraint = str(constraint) + required_constraints = [ + c.strip() for c in constraint.split(",") if c.strip() + ] + if not all( + req in self.limits.available_constraints for req in required_constraints + ): + return None + + return score diff --git a/tests/test_partition_selection.py b/tests/test_partition_selection.py new file mode 100644 index 00000000..57ba4f20 --- /dev/null +++ b/tests/test_partition_selection.py @@ -0,0 +1,824 @@ +import tempfile +import yaml + +import pytest +from pathlib import Path +from unittest.mock import MagicMock +from snakemake_interface_common.exceptions import WorkflowError +from snakemake_executor_plugin_slurm.partitions import ( + read_partition_file, + get_best_partition, +) + + +class TestPartitionSelection: + @pytest.fixture + def basic_partition_config(self): + """Basic partition configuration with two partitions.""" + return { + "partitions": { + "default": { + "max_runtime": 1440, + "max_mem_mb": 128000, + "max_cpus_per_task": 32, + "supports_mpi": True, + }, + "gpu": { + "max_runtime": 720, + "max_mem_mb": 256000, + "max_gpu": 4, + "available_gpu_models": ["a100", "v100"], + "supports_mpi": False, + }, + } + } + + @pytest.fixture + def minimal_partition_config(self): + """Minimal partition configuration.""" + return {"partitions": {"minimal": {}}} + + @pytest.fixture + def comprehensive_partition_config(self): + """Comprehensive partition configuration with all limit types.""" + return { + "partitions": { + "comprehensive": { + # Standard resources + "max_runtime": 2880, + "max_mem_mb": 500000, + "max_mem_mb_per_cpu": 8000, + "max_cpus_per_task": 64, + # SLURM-specific resources + "max_nodes": 4, + "max_tasks": 256, + "max_tasks_per_node": 64, + # GPU resources + "max_gpu": 8, + "available_gpu_models": ["a100", "v100", "rtx3090"], + "max_cpus_per_gpu": 16, + # MPI resources + "supports_mpi": True, + "max_mpi_tasks": 512, + # Node features/constraints + "available_constraints": ["intel", "avx2", "highmem"], + } + } + } + + @pytest.fixture + def empty_partitions_config(self): + """Empty partitions configuration.""" + return {"partitions": {}} + + @pytest.fixture + def missing_name_config(self): + """Configuration with missing name field.""" + return {"partitions": {"": {}}} # Empty partition name + + @pytest.fixture + def invalid_key_config(self): + """Configuration with invalid key.""" + return {"invalid_key": []} + + @pytest.fixture + def temp_yaml_file(self): + """Helper fixture to create temporary YAML files.""" + + def _create_temp_file(config): + with tempfile.NamedTemporaryFile( + mode="w", suffix=".yaml", delete=False + ) as f: + yaml.dump(config, f) + return Path(f.name) + + return _create_temp_file + + def test_read_valid_partition_file(self, basic_partition_config, temp_yaml_file): + """Test reading a valid partition configuration file.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 2 + + # Check first partition + assert partitions[0].name == "default" + assert partitions[0].limits.max_runtime == 1440 + assert partitions[0].limits.max_mem_mb == 128000 + assert partitions[0].limits.max_cpus_per_task == 32 + assert partitions[0].limits.supports_mpi is True + + # Check second partition + assert partitions[1].name == "gpu" + assert partitions[1].limits.max_runtime == 720 + assert partitions[1].limits.max_gpu == 4 + assert partitions[1].limits.available_gpu_models == ["a100", "v100"] + assert partitions[1].limits.supports_mpi is False + + finally: + temp_path.unlink() + + def test_read_minimal_partition_file( + self, minimal_partition_config, temp_yaml_file + ): + """Test reading a partition file with minimal configuration.""" + from math import isinf + + temp_path = temp_yaml_file(minimal_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 1 + assert partitions[0].name == "minimal" + + # Check that all limits are inf + limits = partitions[0].limits + assert isinf(limits.max_runtime) + assert isinf(limits.max_mem_mb) + assert limits.max_gpu == 0 + assert limits.supports_mpi is True + + finally: + temp_path.unlink() + + def test_read_partition_file_with_all_limits( + self, comprehensive_partition_config, temp_yaml_file + ): + """Test reading a partition file with all possible limit types.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 1 + limits = partitions[0].limits + + # Check standard resources + assert limits.max_runtime == 2880 + assert limits.max_mem_mb == 500000 + assert limits.max_mem_mb_per_cpu == 8000 + assert limits.max_cpus_per_task == 64 + + # Check SLURM-specific resources + assert limits.max_nodes == 4 + assert limits.max_tasks == 256 + assert limits.max_tasks_per_node == 64 + + # Check GPU resources + assert limits.max_gpu == 8 + assert limits.available_gpu_models == ["a100", "v100", "rtx3090"] + assert limits.max_cpus_per_gpu == 16 + + # Check MPI resources + assert limits.supports_mpi is True + assert limits.max_mpi_tasks == 512 + + # Check constraints + assert limits.available_constraints == ["intel", "avx2", "highmem"] + + finally: + temp_path.unlink() + + def test_read_empty_partitions_list(self, empty_partitions_config, temp_yaml_file): + """Test reading a file with empty partitions list.""" + temp_path = temp_yaml_file(empty_partitions_config) + + try: + partitions = read_partition_file(temp_path) + assert len(partitions) == 0 + + finally: + temp_path.unlink() + + def test_read_nonexistent_file(self): + """Test reading a non-existent file raises appropriate error.""" + + nonexistent_path = Path("/nonexistent/path/to/file.yaml") + + with pytest.raises(WorkflowError, match="Partition file not found"): + read_partition_file(nonexistent_path) + + def test_read_invalid_yaml_file(self): + """Test reading an invalid YAML file raises appropriate error.""" + + invalid_yaml = "partitions:\n - name: test\n invalid: {\n" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(invalid_yaml) + temp_path = Path(f.name) + + try: + with pytest.raises(WorkflowError, match="Error parsing partition file"): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + def test_read_file_missing_partitions_key(self, invalid_key_config, temp_yaml_file): + """Test reading a file without 'partitions' key raises KeyError.""" + + temp_path = temp_yaml_file(invalid_key_config) + + try: + with pytest.raises(WorkflowError, match="missing 'partitions' section"): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + def test_read_partition_missing_required_fields( + self, missing_name_config, temp_yaml_file + ): + """Test reading partition with missing required fields.""" + temp_path = temp_yaml_file(missing_name_config) + + try: + with pytest.raises(KeyError): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + @pytest.fixture + def mock_job(self): + """Create a mock job with configurable resources and threads.""" + + def _create_job(threads=1, **resources): + mock_resources = MagicMock() + mock_resources.get.side_effect = lambda key, default=None: resources.get( + key, default + ) + + mock_job = MagicMock() + mock_job.resources = mock_resources + mock_job.threads = threads + mock_job.name = "test_job" + mock_job.is_group.return_value = False + mock_job.jobid = 1 + return mock_job + + return _create_job + + @pytest.fixture + def mock_logger(self): + """Create a mock logger.""" + return MagicMock() + + def test_basic_partition_selection_cpu_job( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test selecting partition for a basic CPU job.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=4, mem_mb=16000, runtime=60) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'default' partition as it supports CPU jobs + assert selected_partition == "default" + # Check that the final call contains the auto-selection message + assert mock_logger.info.call_count >= 1 + assert ( + "Auto-selected partition 'default'" + in mock_logger.info.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_basic_partition_selection_gpu_job( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test selecting partition for a GPU job.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job( + threads=2, mem_mb=32000, runtime=300, gpu=2, gpu_model="a100" + ) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'gpu' partition as it supports GPU jobs + assert selected_partition == "gpu" + assert mock_logger.info.call_count >= 1 + assert ( + "Auto-selected partition 'gpu'" + in mock_logger.info.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_no_suitable_partition( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test when no partition can accommodate the job requirements.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires more memory than any partition allows + job = mock_job(threads=1, mem_mb=500000, runtime=60) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None when no suitable partition found + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_comprehensive_partition_selection( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test partition selection with comprehensive limits.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job( + threads=8, + mem_mb=64000, + runtime=1200, + gpu=2, + gpu_model="a100", + constraint="intel,avx2", + ) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select the comprehensive partition + assert selected_partition == "comprehensive" + assert mock_logger.info.call_count >= 1 + assert ( + "Auto-selected partition 'comprehensive'" + in mock_logger.info.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_constraint_mismatch( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with constraints not available in partition.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires constraint not available in partition + job = mock_job(threads=2, constraint="amd,gpu_direct") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None due to constraint mismatch + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_mpi_job_selection( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test MPI job partition selection.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, mpi=True, tasks=16) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'default' partition as it supports MPI, 'gpu' doesn't + assert selected_partition == "default" + assert mock_logger.info.call_count >= 1 + assert ( + "Auto-selected partition 'default'" + in mock_logger.info.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_gpu_model_mismatch( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test GPU job with unsupported GPU model.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Request GPU model not available in any partition + job = mock_job(threads=2, gpu=1, gpu_model="rtx4090") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None due to GPU model mismatch + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_empty_partitions_list( + self, empty_partitions_config, temp_yaml_file, mock_job, mock_logger + ): + """Test partition selection with empty partitions list.""" + temp_path = temp_yaml_file(empty_partitions_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, mem_mb=1000) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None when no partitions available + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_gres_gpu_specification( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test GPU job specified via gres parameter.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=2, gres="gpu:v100:1", runtime=400) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'gpu' partition as it supports v100 GPUs + assert selected_partition == "gpu" + assert mock_logger.info.call_count >= 1 + assert ( + "Auto-selected partition 'gpu'" + in mock_logger.info.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_cpus_per_task_specification( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with cpus_per_task specification.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, cpus_per_task=32, mem_mb=64000) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select comprehensive partition as it can handle 32 cpus per task + assert selected_partition == "comprehensive" + assert mock_logger.info.call_count >= 1 + assert ( + "Auto-selected partition 'comprehensive'" + in mock_logger.info.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + def test_runtime_exceeds_limit( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with runtime exceeding partition limits.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires more runtime than gpu partition allows (720 min max) + job = mock_job(threads=1, runtime=1000, gpu=1, gpu_model="a100") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None as no partition can accommodate the runtime + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) + finally: + temp_path.unlink() + + @pytest.fixture + def multicluster_partition_config(self): + """Partition configuration with multiple clusters.""" + return { + "partitions": { + "normal-small": { + "cluster": "normal", + "max_runtime": 360, + "max_threads": 32, + "max_mem_mb": 64000, + }, + "normal-large": { + "cluster": "normal", + "max_runtime": 1440, + "max_threads": 128, + "max_mem_mb": 256000, + }, + "deviating-small": { + "cluster": "deviating", + "max_runtime": 360, + "max_threads": 64, + "max_mem_mb": 128000, + }, + "deviating-gpu": { + "cluster": "deviating", + "max_runtime": 720, + "max_threads": 32, + "max_gpu": 4, + "available_gpu_models": ["a100"], + }, + } + } + + @pytest.fixture + def max_threads_partition_config(self): + """Partition configuration specifically testing max_threads limits.""" + return { + "partitions": { + "tiny": { + "max_threads": 8, + "max_mem_mb": 32000, + }, + "small": { + "max_threads": 32, + "max_mem_mb": 64000, + }, + "medium": { + "max_threads": 127, + "max_mem_mb": 128000, + }, + "large": { + "max_threads": 256, + "max_mem_mb": 512000, + }, + } + } + + def test_cluster_specification_via_slurm_cluster( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test cluster specification using slurm_cluster resource.""" + temp_path = temp_yaml_file(multicluster_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Verify cluster field is read correctly + # Find partitions by name instead of assuming order + partition_map = {p.name: p for p in partitions} + assert partition_map["normal-small"].cluster == "normal" + assert partition_map["deviating-small"].cluster == "deviating" + + # Job targeting 'normal' cluster + job = mock_job(threads=16, mem_mb=32000, slurm_cluster="normal") + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select a partition from 'normal' cluster + assert selected_partition in ["normal-small", "normal-large"] + assert mock_logger.info.call_count >= 1 + finally: + temp_path.unlink() + + def test_cluster_specification_via_clusters( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test cluster specification using clusters resource.""" + temp_path = temp_yaml_file(multicluster_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job targeting 'deviating' cluster via 'clusters' resource + job = mock_job(threads=48, mem_mb=64000, clusters="deviating") + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select deviating-small partition + assert selected_partition == "deviating-small" + assert mock_logger.info.call_count >= 1 + finally: + temp_path.unlink() + + def test_cluster_specification_via_cluster( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test cluster specification using cluster resource.""" + temp_path = temp_yaml_file(multicluster_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job targeting 'normal' cluster via 'cluster' resource + job = mock_job(threads=64, mem_mb=128000, cluster="normal") + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select normal-large partition + assert selected_partition == "normal-large" + assert mock_logger.info.call_count >= 1 + finally: + temp_path.unlink() + + def test_cluster_mismatch_excludes_partitions( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """ + Test that jobs requesting a cluster exclude partitions from other + clusters. + """ + temp_path = temp_yaml_file(multicluster_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job requesting GPU on 'normal' cluster (only deviating has GPU) + job = mock_job(threads=16, gpu=2, gpu_model="a100", slurm_cluster="normal") + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None as normal cluster has no GPU partitions + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + finally: + temp_path.unlink() + + def test_job_without_cluster_uses_any_partition( + self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test that jobs without cluster specification can use any partition.""" + temp_path = temp_yaml_file(multicluster_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job without cluster specification + job = mock_job(threads=16, mem_mb=32000) + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select a partition (any cluster is acceptable) + assert selected_partition is not None + assert mock_logger.info.call_count >= 1 + finally: + temp_path.unlink() + + def test_max_threads_with_threads_resource( + self, max_threads_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test max_threads limit with threads resource.""" + temp_path = temp_yaml_file(max_threads_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job with 4 threads - should prefer tiny partition + job = mock_job(threads=4, mem_mb=16000) + selected_partition = get_best_partition(partitions, job, mock_logger) + assert selected_partition == "tiny" + + # Job with 16 threads - should prefer small partition + mock_logger.reset_mock() + job = mock_job(threads=16, mem_mb=32000) + selected_partition = get_best_partition(partitions, job, mock_logger) + assert selected_partition == "small" + + # Job with 64 threads - should use medium partition + mock_logger.reset_mock() + job = mock_job(threads=64, mem_mb=64000) + selected_partition = get_best_partition(partitions, job, mock_logger) + assert selected_partition == "medium" + + finally: + temp_path.unlink() + + def test_max_threads_with_cpus_per_task_resource( + self, max_threads_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test max_threads limit with cpus_per_task resource.""" + temp_path = temp_yaml_file(max_threads_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job with cpus_per_task=128 - exceeds medium (127), needs large + job = mock_job(threads=None, cpus_per_task=128, mem_mb=128000) + selected_partition = get_best_partition(partitions, job, mock_logger) + assert selected_partition == "large" + + # Job with cpus_per_task=127 - exactly matches medium + mock_logger.reset_mock() + job = mock_job(threads=None, cpus_per_task=127, mem_mb=64000) + selected_partition = get_best_partition(partitions, job, mock_logger) + assert selected_partition == "medium" + + finally: + temp_path.unlink() + + def test_max_threads_excludes_too_small_partitions( + self, max_threads_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test that partitions with insufficient max_threads are excluded.""" + temp_path = temp_yaml_file(max_threads_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job with 128 threads - tiny, small, medium cannot accommodate + job = mock_job(threads=128, mem_mb=256000) + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Only large partition should be selected + assert selected_partition == "large" + assert mock_logger.info.call_count >= 1 + + finally: + temp_path.unlink() + + def test_max_threads_job_exceeds_all_partitions( + self, max_threads_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job requiring more threads than any partition supports.""" + temp_path = temp_yaml_file(max_threads_partition_config) + + try: + partitions = read_partition_file(temp_path) + + # Job with 512 threads - exceeds all partitions + job = mock_job(threads=512, mem_mb=128000) + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None + assert selected_partition is None + assert mock_logger.warning.call_count >= 1 + assert ( + "No suitable partition found" + in mock_logger.warning.call_args_list[-1][0][0] + ) + + finally: + temp_path.unlink() + + def test_multicluster_with_max_threads(self, temp_yaml_file, mock_job, mock_logger): + """Test combined cluster and max_threads constraints.""" + config = { + "partitions": { + "normal-small": { + "cluster": "normal", + "max_threads": 64, + "max_mem_mb": 128000, + }, + "normal-large": { + "cluster": "normal", + "max_threads": 256, + "max_mem_mb": 512000, + }, + "deviating-medium": { + "cluster": "deviating", + "max_threads": 128, + "max_mem_mb": 256000, + }, + } + } + temp_path = temp_yaml_file(config) + + try: + partitions = read_partition_file(temp_path) + + # Job targeting normal cluster with 128 threads + # normal-small (64) is too small, should use normal-large (256) + job = mock_job(threads=128, mem_mb=256000, slurm_cluster="normal") + selected_partition = get_best_partition(partitions, job, mock_logger) + + assert selected_partition == "normal-large" + assert mock_logger.info.call_count >= 1 + + # Job targeting deviating cluster with 128 threads + # Should exactly match deviating-medium + mock_logger.reset_mock() + job = mock_job(threads=128, mem_mb=128000, slurm_cluster="deviating") + selected_partition = get_best_partition(partitions, job, mock_logger) + + assert selected_partition == "deviating-medium" + assert mock_logger.info.call_count >= 1 + + finally: + temp_path.unlink() diff --git a/tests/tests.py b/tests/tests.py index 6dc47098..b683fbda 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1,13 +1,11 @@ import os import re - from pathlib import Path from typing import Optional import snakemake.common.tests from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase from unittest.mock import MagicMock, patch import pytest - from snakemake_executor_plugin_slurm import ExecutorSettings from snakemake_executor_plugin_slurm.efficiency_report import ( parse_sacct_data, @@ -15,6 +13,7 @@ ) from snakemake_executor_plugin_slurm.utils import set_gres_string from snakemake_executor_plugin_slurm.submit_string import get_submit_command + from snakemake_executor_plugin_slurm.validation import validate_slurm_extra from snakemake_interface_common.exceptions import WorkflowError import pandas as pd