From 12d5987ab8f3a8685cbc7574cd4bdaf0e6cf4ed8 Mon Sep 17 00:00:00 2001 From: luked Date: Mon, 1 Dec 2025 20:20:17 +0000 Subject: [PATCH] Fix incorrect ContextVar access in ParallelExecutor and add error logging --- dspy/utils/parallelizer.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dspy/utils/parallelizer.py b/dspy/utils/parallelizer.py index c32f5e3ebb..f34be84a01 100644 --- a/dspy/utils/parallelizer.py +++ b/dspy/utils/parallelizer.py @@ -89,10 +89,10 @@ def worker(parent_overrides, submission_id, index, item): from dspy.dsp.utils.settings import thread_local_overrides original = thread_local_overrides.get() - token = thread_local_overrides.set({**original, **parent_overrides.copy()}) + new_overrides = {**original, **parent_overrides.copy()} if parent_overrides.get("usage_tracker"): - # Usage tracker needs to be deep copied across threads so that each thread tracks its own usage - thread_local_overrides.overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"]) + new_overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"]) + token = thread_local_overrides.set(new_overrides) try: return index, function(item) @@ -154,7 +154,9 @@ def all_done(): try: index, outcome = f.result() except Exception: - pass + logger.error(f"Worker failed: {e}") + if self.provide_traceback: + traceback.print_exc() else: if outcome != job_cancelled and results[index] is None: # Check if this is an exception