|
13 | 13 | JobExecutorInterface, |
14 | 14 | ) |
15 | 15 | from snakemake_interface_executor_plugins.settings import ExecMode, CommonSettings |
| 16 | +from snakemake_interface_common.exceptions import WorkflowError |
16 | 17 |
|
17 | 18 |
|
18 | 19 | # Required: |
@@ -111,7 +112,15 @@ def run_job(self, job: JobExecutorInterface): |
111 | 112 | # The -n1 is important to avoid that srun executes the given command |
112 | 113 | # multiple times, depending on the relation between |
113 | 114 | # cpus per task and the number of CPU cores. |
114 | | - call = f"srun -n1 --cpu-bind=q {self.format_job_exec(job)}" |
| 115 | + |
| 116 | + # as of v22.11.0, the --cpu-per-task flag is needed to ensure that |
| 117 | + # the job can utilize the c-group's resources. |
| 118 | + # We set the limitation accordingly, assuming the submit executor |
| 119 | + # has set the resources correctly. |
| 120 | + |
| 121 | + call = "srun -n1 --cpu-bind=q " |
| 122 | + call += f"--cpus-per-task {get_cpus_per_task(job)} " |
| 123 | + call += f"{self.format_job_exec(job)}" |
115 | 124 |
|
116 | 125 | self.logger.debug(f"This job is a group job: {job.is_group()}") |
117 | 126 | self.logger.debug(f"The call for this job is: {call}") |
@@ -144,3 +153,16 @@ def cores(self): |
144 | 153 |
|
145 | 154 | def get_exec_mode(self) -> ExecMode: |
146 | 155 | return ExecMode.REMOTE |
| 156 | + |
| 157 | + |
| 158 | +def get_cpus_per_task(job: JobExecutorInterface): |
| 159 | + cpus_per_task = job.threads |
| 160 | + if job.resources.get("cpus_per_task"): |
| 161 | + if not isinstance(cpus_per_task, int): |
| 162 | + raise WorkflowError( |
| 163 | + f"cpus_per_task must be an integer, but is {cpus_per_task}" |
| 164 | + ) |
| 165 | + cpus_per_task = job.resources.cpus_per_task |
| 166 | + # ensure that at least 1 cpu is requested |
| 167 | + # because 0 is not allowed by slurm |
| 168 | + return max(1, cpus_per_task) |
0 commit comments