Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions dspy/utils/parallelizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ def worker(parent_overrides, submission_id, index, item):
from dspy.dsp.utils.settings import thread_local_overrides

original = thread_local_overrides.get()
token = thread_local_overrides.set({**original, **parent_overrides.copy()})
new_overrides = {**original, **parent_overrides.copy()}
if parent_overrides.get("usage_tracker"):
# Usage tracker needs to be deep copied across threads so that each thread tracks its own usage
thread_local_overrides.overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"])
new_overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable, can we keep the comment? And also can you add a test?

token = thread_local_overrides.set(new_overrides)

try:
return index, function(item)
Expand Down Expand Up @@ -154,7 +154,9 @@ def all_done():
try:
index, outcome = f.result()
except Exception:
pass
logger.error(f"Worker failed: {e}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really helpful since we catch exceptions by wrapping the original function in _wrap_function

if self.provide_traceback:
traceback.print_exc()
else:
if outcome != job_cancelled and results[index] is None:
# Check if this is an exception
Expand Down