Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/further.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ The SLURM executor plugin supports automatic partition selection based on job re

*Jobs that explicitly specify a `slurm_partition` resource will bypass automatic selection and use the specified partition directly.*

> **Note:** Partition selection supports specifying the target cluster using any of the resource keys `cluster`, `clusters`, or `slurm_cluster` in your workflow profile or the partition configuration file. All three are treated equivalently by the plugin.

##### Partition Limits Specification

To enable automatic partition selection, create a YAML configuration file that defines the available partitions and their resource limits. This file should be structured as follows:
Expand Down
37 changes: 35 additions & 2 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,10 +868,43 @@ def get_partition_arg(self, job: JobExecutorInterface):
else raises an error - implicetly.
"""
partition = None

# Check if a specific partition is requested
if job.resources.get("slurm_partition"):
partition = job.resources.slurm_partition
elif self._partitions:
# But also check if there's a cluster requirement that might override it
job_cluster = (
job.resources.get("slurm_cluster")
or job.resources.get("cluster")
or job.resources.get("clusters")
)
Comment on lines +875 to +879
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent cluster key priority order with submit_string.py.

The resolution order here (slurm_clusterclusterclusters) differs from submit_string.py (lines 48-51) which uses (clusterclustersslurm_cluster). If a job specifies multiple cluster keys with different values, partition validation may use a different cluster than what's passed to sbatch.

             job_cluster = (
-                job.resources.get("slurm_cluster")
-                or job.resources.get("cluster")
+                job.resources.get("cluster")
                 or job.resources.get("clusters")
+                or job.resources.get("slurm_cluster")
             )
🤖 Prompt for AI Agents
In snakemake_executor_plugin_slurm/__init__.py around lines 875 to 879, the
cluster key lookup currently prefers slurm_cluster before cluster and clusters,
which is inconsistent with submit_string.py; change the resolution order to
match submit_string.py by checking job.resources for "cluster" then "clusters"
then "slurm_cluster" so partition validation uses the same cluster value passed
to sbatch, and update any related comments or docstrings to reflect the
canonical priority.


if job_cluster and self._partitions:
# If a cluster is specified, verify the partition exists and matches
# Otherwise, use auto-selection to find a partition for that cluster
partition_obj = next(
(
p
for p in self._partitions
if p.name == job.resources.slurm_partition
),
None,
)
if (
partition_obj
and partition_obj.partition_cluster
and partition_obj.partition_cluster != job_cluster
):
# Partition exists but is for a different cluster - use auto-selection
partition = get_best_partition(self._partitions, job, self.logger)
else:
partition = job.resources.slurm_partition
else:
partition = job.resources.slurm_partition

# If no partition was selected yet, try auto-selection
if not partition and self._partitions:
partition = get_best_partition(self._partitions, job, self.logger)

# we didnt get a partition yet so try fallback.
if not partition:
if self._fallback_partition is None:
Expand Down
25 changes: 17 additions & 8 deletions snakemake_executor_plugin_slurm/partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ def read_partition_file(partition_file: Path) -> List["Partition"]:
raise KeyError("Partition name cannot be empty")

# Extract optional cluster name from partition config
cluster = partition_config.pop("cluster", None)
cluster = None
for key in ("slurm_cluster", "cluster", "clusters"):
if key in partition_config:
cluster = partition_config.pop(key)
break

out.append(
Partition(
name=partition_name,
cluster=cluster,
partition_cluster=cluster,
limits=PartitionLimits(**partition_config),
)
)
Expand All @@ -60,7 +64,7 @@ def get_best_partition(
for p in candidate_partitions:
score = p.score_job_fit(job)
logger.debug(f"Partition '{p.name}' score for job {job.name}: {score}")
if score is not None:
if score is not None and score > 0:
scored_partitions.append((p, score))

if scored_partitions:
Expand Down Expand Up @@ -241,7 +245,7 @@ class Partition:

name: str
limits: PartitionLimits
cluster: Optional[str] = None
partition_cluster: Optional[str] = None

def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]:
"""
Expand Down Expand Up @@ -269,14 +273,19 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]:
# Accept multiple possible resource names for cluster specification
job_cluster = (
job.resources.get("slurm_cluster")
or job.resources.get("clusters")
or job.resources.get("cluster")
or job.resources.get("clusters")
)

# Enforce strict cluster eligibility:
# - If the job specifies a cluster, only partitions with a matching cluster are eligible
# - If the job does not specify a cluster, only partitions without a cluster are eligible
if job_cluster is not None:
# Job specifies a cluster - partition must match
if self.cluster is not None and self.cluster != job_cluster:
return None # Partition is for a different cluster
if self.partition_cluster != job_cluster:
return None # Not eligible
else:
if self.partition_cluster is not None:
return None # Not eligible

for resource_key, limit in numerical_resources.items():
job_requirement = job.resources.get(resource_key, 0)
Expand Down
11 changes: 9 additions & 2 deletions snakemake_executor_plugin_slurm/submit_string.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,15 @@ def get_submit_command(job, params):
# "- p '{partition_name}'"
call += f" {params.partition}"

if job.resources.get("clusters"):
call += f" --clusters {safe_quote(job.resources.clusters)}"
# Add cluster specification if provided
# Check for cluster first (singular), then fall back to clusters (plural) for backwards compatibility
cluster_val = (
job.resources.get("cluster")
or job.resources.get("clusters")
or job.resources.get("slurm_cluster")
)
if cluster_val:
call += f" --cluster {safe_quote(cluster_val)}"

if job.resources.get("runtime"):
call += f" -t {safe_quote(job.resources.runtime)}"
Expand Down
2 changes: 1 addition & 1 deletion snakemake_executor_plugin_slurm/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def validate_executor_settings(settings, logger=None):
if settings.init_seconds_before_status_checks is not None:
if (
not isinstance(settings.init_seconds_before_status_checks, int)
or settings.init_seconds_before_status_checks > 0
or settings.init_seconds_before_status_checks < 1
):
raise WorkflowError(
"init-seconds-before-status-checks must be a positive integer."
Expand Down
55 changes: 49 additions & 6 deletions tests/test_partition_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,8 @@ def test_cluster_specification_via_slurm_cluster(
# Verify cluster field is read correctly
# Find partitions by name instead of assuming order
partition_map = {p.name: p for p in partitions}
assert partition_map["normal-small"].cluster == "normal"
assert partition_map["deviating-small"].cluster == "deviating"
assert partition_map["normal-small"].partition_cluster == "normal"
assert partition_map["deviating-small"].partition_cluster == "deviating"

# Job targeting 'normal' cluster
job = mock_job(threads=16, mem_mb=32000, slurm_cluster="normal")
Expand Down Expand Up @@ -664,10 +664,54 @@ def test_cluster_mismatch_excludes_partitions(
finally:
temp_path.unlink()

def test_job_with_cluster_does_not_select_partition_without_cluster(
self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger
):
"""
Test that jobs with a cluster requirement do not select partitions without a cluster.
"""
config = dict(multicluster_partition_config)
config["partitions"]["no-cluster"] = {
"max_runtime": 360,
"max_threads": 32,
"max_mem_mb": 64000,
}
temp_path = temp_yaml_file(config)
try:
partitions = read_partition_file(temp_path)
job = mock_job(threads=16, mem_mb=32000, slurm_cluster="normal")
selected_partition = get_best_partition(partitions, job, mock_logger)
# Should select a partition with cluster 'normal', not 'no-cluster'
assert selected_partition in ["normal-small", "normal-large"]
finally:
temp_path.unlink()

def test_job_without_cluster_can_select_partition_without_cluster(
self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger
):
"""
Test that jobs without cluster requirement can select partitions without a cluster.
"""
config = dict(multicluster_partition_config)
config["partitions"]["no-cluster"] = {
"max_runtime": 360,
"max_threads": 32,
"max_mem_mb": 64000,
}
temp_path = temp_yaml_file(config)
try:
partitions = read_partition_file(temp_path)
job = mock_job(threads=16, mem_mb=32000)
selected_partition = get_best_partition(partitions, job, mock_logger)
# Should be able to select 'no-cluster' partition
assert selected_partition in ["normal-small", "normal-large", "no-cluster"]
finally:
temp_path.unlink()

def test_job_without_cluster_uses_any_partition(
self, multicluster_partition_config, temp_yaml_file, mock_job, mock_logger
):
"""Test that jobs without cluster specification can use any partition."""
"""Test that jobs without cluster specification can use any partition without a cluster assignment."""
temp_path = temp_yaml_file(multicluster_partition_config)

try:
Expand All @@ -677,9 +721,8 @@ def test_job_without_cluster_uses_any_partition(
job = mock_job(threads=16, mem_mb=32000)
selected_partition = get_best_partition(partitions, job, mock_logger)

# Should select a partition (any cluster is acceptable)
assert selected_partition is not None
assert mock_logger.info.call_count >= 1
# Should return None since all partitions have a cluster assignment
assert selected_partition is None
finally:
temp_path.unlink()

Expand Down
Loading