Skip to content

Conversation

@LukeDevs
Copy link

@LukeDevs LukeDevs commented Dec 1, 2025

## Summary

Fixes a bug where dspy.Parallel silently returns None for all tasks when called from within a module's forward() method while MLflow autologging is enabled.

**## Problem

When using mlflow.dspy.autolog(), calling dspy.Parallel from inside a module's forward() method causes all results to silently return None.

The root cause is twofold:

1. Incorrect ContextVar access in parallelizer.py (line 95)

thread_local_overrides is a ContextVar, which requires .get() and .set() methods for access. The existing code incorrectly attempts to access an .overrides attribute directly:

thread_local_overrides.overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"])

This raises AttributeError: '_contextvars.ContextVar' object has no attribute 'overrides'.

The bug only manifests when usage_tracker is present in the parent context, which occurs when MLflow autologging is enabled.

2. Silent exception handling (line ~155)

Worker exceptions are caught and silently discarded:

try:
    index, outcome = f.result()
except Exception:
    pass

This masked the underlying AttributeError.

**## Solution

1. Restructured ContextVar access to build the complete overrides dictionary before setting:

original = thread_local_overrides.get()
new_overrides = {**original, **parent_overrides.copy()}
if parent_overrides.get("usage_tracker"):
    new_overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"])
token = thread_local_overrides.set(new_overrides)

2. Added error logging for worker failures:

except Exception as e:
    logger.error(f"Worker failed: {e}")
    if self.provide_traceback:
        traceback.print_exc()

**## Reproduction

import dspy
import mlflow

mlflow.dspy.autolog()  # Enables usage_tracker, triggering the bug

class ProcessItem(dspy.Module):
    def __init__(self):
        self.predictor = dspy.ChainOfThought("item -> result")

    def forward(self, item):
        return self.predictor(item=item)

class GetItems(dspy.Module):
    def __init__(self):
        self.predictor = dspy.ChainOfThought("objective -> task_items: list[str]")

    def forward(self, objective):
        return self.predictor(objective=objective)

class Pipeline(dspy.Module):
    def __init__(self):
        self.get_items = GetItems()
        self.process_item = ProcessItem()

    def forward(self, objective):
        result = self.get_items(objective=objective)

        parallel = dspy.Parallel(num_threads=4, provide_traceback=True)
        results = parallel([
            (self.process_item, {"item": item})
            for item in result.task_items
        ])
        return dspy.Prediction(results=results)  # Returns [None, None, None, ...] before fix

lm = dspy.LM("azure/gpt-4")
dspy.configure(lm=lm)

pipeline = Pipeline()
results = pipeline("do something")
print(results)

index, outcome = f.result()
except Exception:
pass
logger.error(f"Worker failed: {e}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really helpful since we catch exceptions by wrapping the original function in _wrap_function

if parent_overrides.get("usage_tracker"):
# Usage tracker needs to be deep copied across threads so that each thread tracks its own usage
thread_local_overrides.overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"])
new_overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable, can we keep the comment? And also can you add a test?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants