From f2b56f259aa3d67581f4a85a8c1474ba7328fd04 Mon Sep 17 00:00:00 2001 From: sam-grant Date: Wed, 22 Oct 2025 18:20:39 +0000 Subject: [PATCH 1/2] Added postprocess method to Skeleton --- pyutils/pyprocess.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pyutils/pyprocess.py b/pyutils/pyprocess.py index 11e4651..24cdc96 100644 --- a/pyutils/pyprocess.py +++ b/pyutils/pyprocess.py @@ -454,14 +454,18 @@ def execute(self): use_processes=self.use_processes ) + # Postprocess + results = self.postprocess(results) + self.logger.log(f"Analysis complete", "success") return results - def process_results(self): + def postprocess(self, results): """Run post processing on the results list + Placeholder method! You can override it """ - pass + return results # Such as combination \ No newline at end of file From bd966d4e1c6870c06ae68b547a92079d0d02a369 Mon Sep 17 00:00:00 2001 From: sam-grant Date: Thu, 23 Oct 2025 00:38:34 +0000 Subject: [PATCH 2/2] Preprocess remote file lists in batches, helps prevent errors from subprocess calls during parallelisation --- pyutils/pyimport.py | 31 +++--- pyutils/pyprocess.py | 257 +++++++++++++++++++++++++++++-------------- pyutils/pyread.py | 91 --------------- 3 files changed, 187 insertions(+), 192 deletions(-) delete mode 100644 pyutils/pyread.py diff --git a/pyutils/pyimport.py b/pyutils/pyimport.py index de774e1..eedeec3 100644 --- a/pyutils/pyimport.py +++ b/pyutils/pyimport.py @@ -1,6 +1,5 @@ import uproot import awkward as ak -from .pyread import Reader from .pylogger import Logger class Importer: @@ -9,40 +8,36 @@ class Importer: Intended to used via by the pyprocess Processor class """ - def __init__(self, file_name, branches, tree_path="EventNtuple/ntuple", use_remote=False, location="disk", schema="root", verbosity=1): + def __init__(self, file_name, branches, tree_path="EventNtuple/ntuple", verbosity=1): """Initialise the importer Args: file_name: Name of the file branches: Flat list or grouped dict of branches to import tree_path (str, opt): Path to the Ntuple in file directory. Default is "EventNtuple/ntuple". - use_remote: Flag for reading remote files - location: Remote files only. File location: tape (default), disk, scratch, nersc - schema: Remote files only. Schema used when writing the URL: root (default), http, path, dcap, samFile verbosity: Print detail level (0: minimal, 1: medium, 2: maximum) """ self.file_name = file_name self.branches = branches self.tree_path = tree_path - self.use_remote = use_remote - self.location = location - self.schema = schema self.verbosity = verbosity self.logger = Logger( # Start logger print_prefix = "[pyimport]", verbosity = verbosity ) - - # Create reader - self.reader = Reader( - use_remote=self.use_remote, - location=self.location, - schema=self.schema, - verbosity=self.verbosity - ) - + + def _read_file(self, file_path): + """Open file with uproot""" + try: + file = uproot.open(file_path) + self.logger.log(f"Opened {file_path}", "success") + return file + except Exception as e: + self.logger.log(f"Exception while opening {file_path}: {e}", "warning") + raise # propagate exception up + def import_branches(self): """Internal function to open ROOT file and import specified branches @@ -51,7 +46,7 @@ def import_branches(self): """ try: # Open file - file = self.reader.read_file(self.file_name) + file = self._read_file(self.file_name) # Access the tree components = self.tree_path.split('/') current = file diff --git a/pyutils/pyprocess.py b/pyutils/pyprocess.py index 24cdc96..9d868ab 100644 --- a/pyutils/pyprocess.py +++ b/pyutils/pyprocess.py @@ -7,24 +7,28 @@ import inspect import tqdm import functools +import time +import random from . import _env_manager from .pyimport import Importer from .pylogger import Logger -def _worker_func(file_name, branches, tree_path, use_remote, location, schema, verbosity): - """Module-level worker function for processing files""" +# def _worker_func(file_name, branches, tree_path, use_remote, location, schema, verbosity): +def _worker_func(file_name, branches, tree_path, verbosity): + """Module-level worker function for processing files, safe for threads and processes.""" + # Random stagger to avoid hammering I/O + time.sleep(random.uniform(0.05, 0.20)) # 50-200 ms delay + # Create Importer and pass arguments importer = Importer( file_name=file_name, branches=branches, tree_path=tree_path, - use_remote=use_remote, - location=location, - schema=schema, verbosity=verbosity ) + return importer.import_branches() - + class Processor: """Interface for processing files or datasets""" @@ -54,6 +58,9 @@ def __init__(self, tree_path="EventNtuple/ntuple", use_remote=False, location="t if self.use_remote: # Ensure mdh environment _env_manager.ensure_environment() + # tqdm progress bar format and styling + self.bar_format = "{desc}: {percentage:3.0f}%|{bar:30}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]" + # Print out optional args confirm_str = f"Initialised Processor:\n\tpath = '{self.tree_path}'\n\tuse_remote = {self.use_remote}" if use_remote: @@ -62,81 +69,161 @@ def __init__(self, tree_path="EventNtuple/ntuple", use_remote=False, location="t self.logger.log(confirm_str, "info") - def get_file_list(self, defname=None, file_list_path=None): - """Utility to get a list of files from a SAM definition OR a text file - + def get_file_list(self, file_name=None, defname=None, file_list_path=None, batch_size=50, stagger=(0.05, 0.20), timeout=30): + """Utility to get a list of files from a single file, SAM definition, a text file, with remote prestaging. + Args: + file_name: File name defname: SAM definition name file_list_path: Path to a plain text file containing file paths + batch_size: Number of files per mdh batch call + stagger: Tuple of min/max sleep time between batches + timeout: Timeout per mdh call (seconds) Returns: - List of file paths + List of file paths (local or resolved remote URLs) """ - - # Long help message for warnings about file lists of length zero + help_message = f""" - Common causes: - - Authetification issues (trying running getToken) + Common causes of empty file list: + - Authentication issues (try running getToken) - Invalid SAM definition (defname='{defname}') - The files are not staged - Incorrect file location (location='{self.location}') - - use_remote is not True, if not working with local files (remote='{self.use_remote}') + - use_remote is not True (remote='{self.use_remote}') """ - - # Check if a file list path was provided - if file_list_path: - try: - if not os.path.exists(file_list_path): - self.logger.log(f"File list path does not exist: {file_list_path}", "error") - return [] - - self.logger.log(f"Loading file list from {file_list_path}", "info") - - with open(file_list_path, "r") as file_list: - file_list = file_list.readlines() - - file_list = [line.strip() for line in file_list if line.strip()] - - if (len(file_list) > 0): - self.logger.log(f"Successfully loaded file list\n\tPath: {defname}\n\tCount: {len(file_list)} files", "success") - else: - self.logger.log(f"File list has length {len(file_list)}{help_message}", "warning") - + + # Handle single files + # This is important, since process_functions are designed to call process_data with single files + if file_name: + file_list = [file_name] + # If the file is accessible, return immediately + if self.use_remote and file_name.startswith(("root://", "http://", "dcap://")): return file_list - - except Exception as e: - self.logger.log(f"Error reading file list from {file_list_path}: {e}", "error") - return [] + elif os.path.exists(file_name): + return file_list + else: + self.logger.log(f"File does not exist: {file_name}", "warning") + return [] - # Otherwise, try to use the SAM definition + # Load file list from file + elif file_list_path: + if not os.path.exists(file_list_path): + self.logger.log(f"File list path does not exist: {file_list_path}", "error") + return [] + self.logger.log(f"Loading file list from {file_list_path}", "info") + with open(file_list_path, "r") as f: + file_list = [line.strip() for line in f if line.strip()] + + # Load file list from SAM definition elif defname: - self.logger.log(f"Loading file list for SAM definition: {defname}", "max") - try: - # Setup commands for SAM query commands = f"samweb list-files 'defname: {defname} with availability anylocation' | sort -V 2>/dev/null" - - # Execute commands - file_list_output = subprocess.check_output(commands, shell=True, universal_newlines=True, stderr=subprocess.DEVNULL) - file_list = [line for line in file_list_output.splitlines() if line] - - if (len(file_list) > 0): - self.logger.log(f"Successfully loaded file list\n\tSAM definition: {defname}\n\tCount: {len(file_list)} files", "success") - else: - self.logger.log(f"File list has length {len(file_list)}{help_message}", "warning") - - # Return the file list - return file_list - + output = subprocess.check_output(commands, shell=True, universal_newlines=True, stderr=subprocess.DEVNULL) + file_list = [line for line in output.splitlines() if line] except Exception as e: self.logger.log(f"Exception while getting file list for {defname}: {e}", "error") return [] + + else: + self.logger.log("Error: Either 'file_name', 'defname' or 'file_list_path' must be provided", "error") + return [] + + if len(file_list) == 0: + self.logger.log(f"File list has length zero{help_message}", "warning") + return [] + + ##### REMOTE FILE HANDLING ##### + # Prestage remote URLs in batches using threads + if self.use_remote: + resolved_files = [] + n_files = len(file_list) + + if len(file_list) == 0: + self.logger.log(f"File list is empty{help_message}", "warning") + return [] + # If there's only one file, resolve it directly without threads + if file_name: + self.logger.log("Single file detected, resolving directly", "info") + try: + proc = subprocess.run( + ["mdh", "print-url", "-l", self.location, "-s", self.schema, "-"], + input=file_list[0], + capture_output=True, + text=True, + timeout=timeout + ) + if proc.returncode != 0: + self.logger.log(f"Error prestaging single file: {proc.stderr}", "warning") + return [] + file_list = [line.strip() for line in proc.stdout.splitlines() if line.strip()] + except subprocess.TimeoutExpired: + self.logger.log("Timeout resolving single file", "warning") + return [] + except Exception as e: + self.logger.log(f"Exception prestaging single file: {e}", "warning") + return [] + return file_list # skip the rest + + # Otherwise, handle batches with threads + prestage_threads = min(n_files // batch_size + 1, 2 * os.cpu_count(), 50) + + # Function to resolve one batch + def resolve_batch(batch_idx, batch_files): + # Optional stagger + time.sleep(random.uniform(*stagger)) + try: + proc = subprocess.run( + ["mdh", "print-url", "-l", self.location, "-s", self.schema, "-"], + input="\n".join(batch_files), + capture_output=True, + text=True, + timeout=timeout + ) + if proc.returncode != 0: + self.logger.log(f"Error resolving batch {batch_idx}: {proc.stderr}", "warning") + return [] + return [line.strip() for line in proc.stdout.splitlines() if line.strip()] + except subprocess.TimeoutExpired: + self.logger.log(f"Timeout resolving batch {batch_idx}", "warning") + return [] + except Exception as e: + self.logger.log(f"Exception resolving batch {batch_idx}: {e}", "warning") + return [] + + # Create batches + batches = [(i // batch_size, file_list[i:i + batch_size]) + for i in range(0, n_files, batch_size)] + + self.logger.log(f"Resolving remote file list in batches:\n\t{n_files} files, {len(batches)} batches, {prestage_threads} threads", "info") + # Threaded execution with progress bar + with ThreadPoolExecutor(max_workers=prestage_threads) as executor: + futures = {executor.submit(resolve_batch, idx, batch): idx for idx, batch in batches} + + for future in tqdm.tqdm( + as_completed(futures), + total=len(futures), + desc="Resolving", + unit="batch", + ncols=150, + bar_format=self.bar_format, + colour="cyan" + ): + urls = future.result() + resolved_files.extend(urls) + + file_list = resolved_files + + # Final logging + if len(file_list) > 0: + self.logger.log(f"Successfully loaded file list\n\tCount: {len(file_list)} files", "success") else: - self.logger.log("Error: Either 'defname' or 'file_list_path' must be provide", "error") - return [] - + self.logger.log(f"File list has length {len(file_list)}{help_message}", "warning") + + return file_list + def _process_files_parallel(self, file_list, worker_func, max_workers=None, use_processes=False): """Internal function to parallelise file operations with given a process function @@ -152,11 +239,16 @@ def _process_files_parallel(self, file_list, worker_func, max_workers=None, use_ if not file_list: self.logger.log("Error: Empty file list provided", "error") return None - + + # If single file, skip multiprocessing + if len(file_list) == 1: + result = worker_func(file_list[0]) + return [result] + if max_workers is None: # Return a sensible default for max threads - max_workers = min(len(file_list), os.cpu_count()) # FIXME: sensible for threads, not processes - + max_workers = min(len(file_list), os.cpu_count()) # FIXME: sensible for threads, maybe not processes + ExecutorClass = ProcessPoolExecutor if use_processes else ThreadPoolExecutor executor_type = "processes" if use_processes else "threads" @@ -170,23 +262,22 @@ def _process_files_parallel(self, file_list, worker_func, max_workers=None, use_ completed_files = 0 failed_files = 0 - # Set up tqdm format and styling - bar_format = "{desc}: {percentage:3.0f}%|{bar:30}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]" - with tqdm.tqdm( total=total_files, desc="Processing", unit="file", - bar_format=bar_format, + bar_format=self.bar_format, colour="green", ncols=150 ) as pbar: # Start thread pool executor with ExecutorClass(max_workers=max_workers) as executor: - # Create futures for each file processing task - futures = {executor.submit(worker_func, file_name): file_name for file_name in file_list} - + # Create futures for each file processing task + futures = { + executor.submit(worker_func, file_name): file_name + for file_name in file_list + } # Process results as they complete for future in as_completed(futures): file_name = futures[future] @@ -227,7 +318,7 @@ def _process_files_parallel(self, file_list, worker_func, max_workers=None, use_ # Return the results return results - def process_data(self, file_name=None, file_list_path=None, defname=None, branches=None, max_workers=None, custom_worker_func=None, use_processes=False): + def process_data(self, file_name=None, file_list_path=None, defname=None, branches=None, max_workers=None, custom_worker_func=None, use_processes=False, prestage_remote=True): """Process the data Args: @@ -238,6 +329,7 @@ def process_data(self, file_name=None, file_list_path=None, defname=None, branch max_workers: Maximum number of parallel workers custom_worker_func: Optional custom processing function for each file use_processes: Whether to use processes rather than threads + prestage_remote (bool, opt): Prestage file list with XRootD paths Returns: - If custom_worker_func is None: a concatenated awkward array with imported data from all files @@ -262,31 +354,32 @@ def process_data(self, file_name=None, file_list_path=None, defname=None, branch if len(sig.parameters) != 1: self.logger.log(f"custom_worker_func must take exactly one argument (file_name)", "error") return None - + # Set up process function if custom_worker_func is None: # Then use the default function worker_func = functools.partial( _worker_func, # Module-level function branches=branches, tree_path=self.tree_path, - use_remote=self.use_remote, - location=self.location, - schema=self.schema, verbosity=0 if file_name is None else self.worker_verbosity # multifile only ) else: # Use the custom process function worker_func = custom_worker_func - # Handle the single file case + # Prepare file list + file_list = self.get_file_list( + file_name=file_name, + file_list_path=file_list_path, + defname=defname + ) + + # For single files, skip parallelisation if file_name: - result = worker_func(file_name) # Run the process + result = worker_func(file_list[0]) # Run the process self.logger.log(f"Completed process on {file_name}", "success") return result - # Prepare file list - file_list = self.get_file_list(file_list_path=file_list_path, defname=defname) - - # Get list of results + # Process and return list of results results = self._process_files_parallel( file_list, worker_func, @@ -463,9 +556,7 @@ def execute(self): def postprocess(self, results): """Run post processing on the results list - Placeholder method! You can override it + Placeholder method intended to be overridden """ return results - - # Such as combination \ No newline at end of file diff --git a/pyutils/pyread.py b/pyutils/pyread.py deleted file mode 100644 index f76fd40..0000000 --- a/pyutils/pyread.py +++ /dev/null @@ -1,91 +0,0 @@ -#! /usr/bin/env python -import uproot -import os -import subprocess -from . import _env_manager -from .pylogger import Logger - -class Reader: - """Unified interface for reading files, either locally or remotely""" - - def __init__(self, use_remote=False, location="tape", schema="root", verbosity=1): - """Initialise the reader - - Args: - use_remote (bool, opt): Whether to use remote access methods - location (str, opt): File location for remote files: 'tape' (default), 'disk', 'scratch', 'nersc' - schema (str, opt): Schema for remote file path: 'root' (default), 'http', 'path', 'dcap', 'sam' - verbosity (int, opt): Level of output detail (0: errors only, 1: info & warnings, 2: max) - """ - self.use_remote = use_remote # access files on /pnfs from EAF - self.location = location - self.schema = schema - - # Start logger - self.logger = Logger( - print_prefix = "[pyread]", - verbosity = verbosity - ) - - # Setup and validation for remote reading - if self.use_remote: - # Ensure mdh environment - _env_manager.ensure_environment() - # Check arguments - self.valid_locations = ["tape", "disk", "scratch", "nersc"] - if self.location not in self.valid_locations: - self.logger.log(f"Location '{location}' may not be valid. Expected one of {self.valid_locations}", "warning") - self.valid_schemas = ["root", "http", "path", "dcap", "sam"] - if self.schema not in self.valid_schemas: - self.logger.log(f"Schema '{schema}' may not be valid. Expected one of {self.valid_schemas}", "warning") - - def read_file(self, file_path): - """Read a file using the appropriate method - - Args: - file_path: Path to the file - - Returns: - uproot file object - """ - if self.use_remote: - return self._read_remote_file(file_path) - else: - return self._read_file(file_path) - - def _read_file(self, file_path): - """Open file with uproot""" - try: - file = uproot.open(file_path) - self.logger.log(f"Opened {file_path}", "success") - return file - except Exception as e: - self.logger.log(f"Exception while opening {file_path}: {e}", "warning") - raise # propagate exception up - - def _read_remote_file(self, file_path): - """Open a file from /pnfs via mdh - NO FALLBACKS""" - self.logger.log(f"Opening remote file: {file_path}", "info") - # Try the specified location - return self._attempt_remote_read(file_path, self.location) - - def _attempt_remote_read(self, file_path, location): - """Attempt to read remote file with specific location""" - commands = f"mdh print-url {file_path} -l {location} -s {self.schema}" - - this_file_path = subprocess.check_output( - commands, - shell=True, - universal_newlines=True, - stderr=subprocess.DEVNULL, - timeout=30 - ).strip() - - self.logger.log(f"Created file path: {this_file_path}", "info") - - # Read the file - return self._read_file(this_file_path) - - # Previously I had a fallback method which tried to read from multiple locations, - # but I think it is far easier to debug if you simply let the process fail than - # to deal with fallout from overwhelming the network with subprocess calls \ No newline at end of file