Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,20 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

pass_command_as_script: bool = field(
default=False,
metadata={
"help": (
"Pass to sbatch and srun the command to be executed as a shell script"
" (fed through stdin) instead of wrapping it in the command line "
"call. Useful when a limit exists on SLURM command line length (ie. "
"max_submit_line_size)."
),
"env_var": False,
"required": False,
},
)

def __post_init__(self):
"""Validate settings after initialization."""
validate_executor_settings(self)
Expand Down Expand Up @@ -413,7 +427,10 @@ def warn_on_jobcontext(self, done=None):
done = True

def additional_general_args(self):
return "--executor slurm-jobstep --jobs 1"
general_args = "--executor slurm-jobstep --jobs 1"
if self.workflow.executor_settings.pass_command_as_script:
general_args += " --slurm-jobstep-pass-command-as-script"
return general_args

def run_job(self, job: JobExecutorInterface):
# Implement here how to run a job.
Expand Down Expand Up @@ -500,19 +517,31 @@ def run_job(self, job: JobExecutorInterface):

exec_job = self.format_job_exec(job)

# and finally the job to execute with all the snakemake parameters
call += f' --wrap="{exec_job}"'
if not self.workflow.executor_settings.pass_command_as_script:
# and finally wrap the job to execute with all the snakemake parameters
call += f' --wrap="{exec_job}"'
subprocess_stdin = None
else:
# format the job to execute with all the snakemake parameters into a script
sbatch_script = "\n".join(["#!/bin/sh", exec_job])
self.logger.debug(f"sbatch script:\n{sbatch_script}")
# feed the shell script to sbatch via stdin
call += " /dev/stdin"
subprocess_stdin = sbatch_script

self.logger.debug(f"sbatch call: {call}")
try:
process = subprocess.Popen(
call,
shell=True,
text=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = process.communicate()
out, err = process.communicate(
input=subprocess_stdin # feed the sbatch shell script through stdin
)
if process.returncode != 0:
raise subprocess.CalledProcessError(
process.returncode, call, output=err
Expand All @@ -524,6 +553,11 @@ def run_job(self, job: JobExecutorInterface):
"SLURM sbatch failed. "
f"The error message was '{e.output.strip()}'.\n"
f" sbatch call:\n {call}\n"
+ (
f" sbatch script:\n{sbatch_script}\n"
if subprocess_stdin is not None
else ""
)
),
)
return
Expand Down