diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 2abb5ba..30bd9fe 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -269,6 +269,20 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) + pass_command_as_script: bool = field( + default=False, + metadata={ + "help": ( + "Pass to sbatch and srun the command to be executed as a shell script" + " (fed through stdin) instead of wrapping it in the command line " + "call. Useful when a limit exists on SLURM command line length (ie. " + "max_submit_line_size)." + ), + "env_var": False, + "required": False, + }, + ) + def __post_init__(self): """Validate settings after initialization.""" validate_executor_settings(self) @@ -413,7 +427,10 @@ def warn_on_jobcontext(self, done=None): done = True def additional_general_args(self): - return "--executor slurm-jobstep --jobs 1" + general_args = "--executor slurm-jobstep --jobs 1" + if self.workflow.executor_settings.pass_command_as_script: + general_args += " --slurm-jobstep-pass-command-as-script" + return general_args def run_job(self, job: JobExecutorInterface): # Implement here how to run a job. @@ -500,8 +517,17 @@ def run_job(self, job: JobExecutorInterface): exec_job = self.format_job_exec(job) - # and finally the job to execute with all the snakemake parameters - call += f' --wrap="{exec_job}"' + if not self.workflow.executor_settings.pass_command_as_script: + # and finally wrap the job to execute with all the snakemake parameters + call += f' --wrap="{exec_job}"' + subprocess_stdin = None + else: + # format the job to execute with all the snakemake parameters into a script + sbatch_script = "\n".join(["#!/bin/sh", exec_job]) + self.logger.debug(f"sbatch script:\n{sbatch_script}") + # feed the shell script to sbatch via stdin + call += " /dev/stdin" + subprocess_stdin = sbatch_script self.logger.debug(f"sbatch call: {call}") try: @@ -509,10 +535,13 @@ def run_job(self, job: JobExecutorInterface): call, shell=True, text=True, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) - out, err = process.communicate() + out, err = process.communicate( + input=subprocess_stdin # feed the sbatch shell script through stdin + ) if process.returncode != 0: raise subprocess.CalledProcessError( process.returncode, call, output=err @@ -524,6 +553,11 @@ def run_job(self, job: JobExecutorInterface): "SLURM sbatch failed. " f"The error message was '{e.output.strip()}'.\n" f" sbatch call:\n {call}\n" + + ( + f" sbatch script:\n{sbatch_script}\n" + if subprocess_stdin is not None + else "" + ) ), ) return