diff --git a/snakemake_executor_plugin_slurm_jobstep/__init__.py b/snakemake_executor_plugin_slurm_jobstep/__init__.py index ae92e06..3d6e3d8 100644 --- a/snakemake_executor_plugin_slurm_jobstep/__init__.py +++ b/snakemake_executor_plugin_slurm_jobstep/__init__.py @@ -7,12 +7,17 @@ import socket import subprocess import sys +from dataclasses import dataclass, field from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo from snakemake_interface_executor_plugins.executors.real import RealExecutor from snakemake_interface_executor_plugins.jobs import ( JobExecutorInterface, ) -from snakemake_interface_executor_plugins.settings import ExecMode, CommonSettings +from snakemake_interface_executor_plugins.settings import ( + CommonSettings, + ExecMode, + ExecutorSettingsBase, +) from snakemake_interface_common.exceptions import WorkflowError @@ -38,6 +43,25 @@ ) +@dataclass +class ExecutorSettings(ExecutorSettingsBase): + """Settings for the SLURM jobstep executor plugin.""" + + pass_command_as_script: bool = field( + default=False, + metadata={ + "help": ( + "Pass to srun the command to be executed as a shell script " + "(fed through stdin) instead of wrapping it in the command line " + "call. Useful when a limit exists on SLURM command line length (ie. " + "max_submit_line_size)." + ), + "env_var": False, + "required": False, + }, + ) + + # Required: # Implementation of your executor class Executor(RealExecutor): @@ -58,6 +82,7 @@ def run_job(self, job: JobExecutorInterface): # snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo. jobsteps = dict() + srun_script = None # TODO revisit special handling for group job levels via srun at a later stage # if job.is_group(): @@ -118,14 +143,29 @@ def run_job(self, job: JobExecutorInterface): call = "srun -n1 --cpu-bind=q " call += f" {get_cpu_setting(job, self.gpu_job)} " - call += f" {self.format_job_exec(job)}" + if self.workflow.executor_settings.pass_command_as_script: + # format the job to execute with all the snakemake parameters + # into a script + srun_script = self.format_job_exec(job) + # the process will read the srun script from stdin + call += " sh -s" + else: + call += f" {self.format_job_exec(job)}" self.logger.debug(f"This job is a group job: {job.is_group()}") self.logger.debug(f"The call for this job is: {call}") self.logger.debug(f"Job is running on host: {socket.gethostname()}") + if srun_script is not None: + self.logger.debug(f"The script for this job is: \n{srun_script}") # this dict is to support the to be implemented feature of oversubscription in # "ordinary" group jobs. - jobsteps[job] = subprocess.Popen(call, shell=True) + jobsteps[job] = subprocess.Popen( + call, shell=True, text=True, stdin=subprocess.PIPE + ) + if srun_script is not None: + # pass the srun bash script via stdin + jobsteps[job].stdin.write(srun_script) + jobsteps[job].stdin.close() job_info = SubmittedJobInfo(job) self.report_job_submission(job_info)