Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 96 additions & 32 deletions examples/cli/mcp_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,141 @@

import asyncio
import os
import json
from dotenv import load_dotenv
from multimind import (
OpenAIModel, ClaudeModel,
MCPParser, MCPExecutor
OpenAIModel, ClaudeModel, MCPExecutor
)

async def main():
# Load environment variables
load_dotenv()

# Create models
openai_model = OpenAIModel(
model="gpt-3.5-turbo",
temperature=0.7
)
# Check which API keys are available
openai_key = os.getenv("OPENAI_API_KEY")
anthropic_key = os.getenv("ANTHROPIC_API_KEY") or os.getenv("CLAUDE_API_KEY")

claude_model = ClaudeModel(
model="claude-3-sonnet-20240229",
temperature=0.7
)
has_openai = openai_key is not None and openai_key.strip() != ""
has_claude = anthropic_key is not None and anthropic_key.strip() != ""

if not has_openai and not has_claude:
print("Error: No API keys found. Please set OPENAI_API_KEY or ANTHROPIC_API_KEY/CLAUDE_API_KEY")
return

# Create MCP executor
executor = MCPExecutor()

# Register models
executor.register_model("gpt-3.5", openai_model)
executor.register_model("claude-3", claude_model)
# Create and register available models
available_models = []
model_registry = {}

# Define MCP workflow
workflow = {
"version": "1.0.0",
"models": [
{
if has_openai:
try:
openai_model = OpenAIModel(
model_name="gpt-3.5-turbo",
temperature=0.7
)
executor.register_model("gpt-3.5", openai_model)
model_registry["gpt-3.5"] = {
"name": "gpt-3.5",
"type": "openai",
"config": {
"model": "gpt-3.5-turbo",
"temperature": 0.7
}
},
{
}
available_models.append("gpt-3.5")
print("✓ OpenAI model registered")
except Exception as e:
print(f"Warning: Failed to initialize OpenAI model: {e}")
has_openai = False

if has_claude:
try:
claude_model = ClaudeModel(
model_name="claude-3-sonnet-20240229",
temperature=0.7
)
executor.register_model("claude-3", claude_model)
model_registry["claude-3"] = {
"name": "claude-3",
"type": "claude",
"config": {
"model": "claude-3-sonnet-20240229",
"temperature": 0.7
}
}
],
available_models.append("claude-3")
print("✓ Claude model registered")
except Exception as e:
print(f"Warning: Failed to initialize Claude model: {e}")
has_claude = False

# Determine which models to use for each step
if has_openai and has_claude:
# Both models available - use both
initial_model = "gpt-3.5"
review_model = "claude-3"
print("\nUsing both models: OpenAI for initial analysis, Claude for expert review")
elif has_openai:
# Only OpenAI available - use it for both steps
initial_model = "gpt-3.5"
review_model = "gpt-3.5"
print("\nUsing OpenAI for both steps (Claude API key not found)")
else:
# Only Claude available - use it for both steps
initial_model = "claude-3"
review_model = "claude-3"
print("\nUsing Claude for both steps (OpenAI API key not found)")

# Ask user for topic (or use default if empty)
print("\n" + "="*50)
print("MCP Workflow - Topic Analysis")
print("="*50)
user_topic = input("\nEnter a topic to analyze (or press Enter for default): ").strip()

if not user_topic:
topic = "The Future of Artificial Intelligence"
print(f"\nUsing default topic: {topic}")
else:
topic = user_topic
print(f"\nAnalyzing topic: {topic}")

# Calculate quality check word from topic
# Extract meaningful words from topic for quality check (skip common words)
topic_words = topic.lower().split()
articles = ["the", "a", "an", "of", "in", "on", "at", "to", "for", "and", "or", "but"]
significant_words = [w for w in topic_words if w not in articles]

# Use the last significant word (usually the main subject) or first if only one
# This works better when there are typos in the topic
if len(significant_words) > 1:
quality_check_word = significant_words[-1] # Usually the main noun
elif significant_words:
quality_check_word = significant_words[0]
else:
quality_check_word = "analysis"

# Build workflow dynamically based on available models
workflow_models = [model_registry[model] for model in available_models]

workflow = {
"version": "1.0.0",
"models": workflow_models,
"workflow": {
"steps": [
{
"id": "initial_analysis",
"type": "model",
"config": {
"model": "gpt-3.5",
"model": initial_model,
"prompt_template": "Analyze the following topic: {topic}\nProvide a detailed analysis."
}
},
{
"id": "expert_review",
"type": "model",
"config": {
"model": "claude-3",
"model": review_model,
"prompt_template": "Review and enhance the following analysis:\n{initial_analysis}\nProvide expert insights and additional perspectives."
}
},
Expand All @@ -85,7 +155,7 @@ async def main():
"type": "condition",
"config": {
"type": "contains",
"value": "key insights"
"value": quality_check_word # Dynamically set based on topic
}
}
],
Expand All @@ -110,12 +180,6 @@ async def main():
}
}

# Save workflow to file (optional)
with open("workflow.json", "w") as f:
json.dump(workflow, f, indent=2)

# Execute workflow
topic = "The Future of Artificial Intelligence"
results = await executor.execute(workflow, {"topic": topic})

# Print results
Expand Down
20 changes: 14 additions & 6 deletions multimind/mcp/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def _execute_step(
raise ValueError(f"Unsupported step type: {step_type}")

# Update workflow state
self.workflow_state[step_id] = resul
self.workflow_state[step_id] = result

def _get_step_inputs(
self,
Expand All @@ -74,13 +74,21 @@ def _get_step_inputs(
"""Get inputs for a step from workflow state."""
inputs = {}

# Find incoming connections
# Find incoming connections (outputs from previous steps)
for conn in spec["workflow"]["connections"]:
if conn["to"] == step["id"]:
from_step = conn["from"]
if from_step in self.workflow_state:
inputs[from_step] = self.workflow_state[from_step]

# Also include initial context values (like 'topic') that aren't step outputs
# Get all step IDs to distinguish between step outputs and initial context
step_ids = {s["id"] for s in spec["workflow"]["steps"]}
for key, value in self.workflow_state.items():
# Include values that are not step outputs (initial context)
if key not in step_ids and key not in inputs:
inputs[key] = value

return inputs

async def _execute_model_step(
Expand Down Expand Up @@ -119,9 +127,9 @@ async def _execute_transform_step(
return separator.join(str(v) for v in inputs.values())

elif transform_type == "extract":
# Extract specific fields from inpu
# Extract specific fields from input
field = config["field"]
input_key = list(inputs.keys())[0] # Use first inpu
input_key = list(inputs.keys())[0] # Use first input
return inputs[input_key].get(field)

else:
Expand All @@ -135,7 +143,7 @@ async def _execute_condition_step(
) -> bool:
"""Execute a condition step."""
condition_type = config["type"]
input_key = list(inputs.keys())[0] # Use first inpu
input_key = list(inputs.keys())[0] # Use first input
value = inputs[input_key]

if condition_type == "equals":
Expand Down Expand Up @@ -164,4 +172,4 @@ def _prepare_model_prompt(
if placeholder in prompt:
prompt = prompt.replace(placeholder, str(value))

return promp
return prompt
20 changes: 18 additions & 2 deletions multimind/models/claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Anthropic Claude model implementation.
"""

import os
from typing import List, Dict, Any, Optional, AsyncGenerator, Union
from anthropic import AsyncAnthropic
from .base import BaseLLM
Expand All @@ -16,6 +17,9 @@ def __init__(
**kwargs
):
super().__init__(model_name, **kwargs)
# Load API key from environment if not provided
if api_key is None:
api_key = os.getenv("ANTHROPIC_API_KEY") or os.getenv("CLAUDE_API_KEY")
self.client = AsyncAnthropic(api_key=api_key)

async def generate(
Expand All @@ -26,6 +30,9 @@ async def generate(
**kwargs
) -> str:
"""Generate text using Claude's completion API."""
# Anthropic API requires max_tokens to be set
if max_tokens is None:
max_tokens = 1024 # Default value
response = await self.client.messages.create(
model=self.model_name,
messages=[{"role": "user", "content": prompt}],
Expand All @@ -43,6 +50,9 @@ async def generate_stream(
**kwargs
) -> AsyncGenerator[str, None]:
"""Generate streaming text using Claude's completion API."""
# Anthropic API requires max_tokens to be set
if max_tokens is None:
max_tokens = 1024 # Default value
stream = await self.client.messages.create(
model=self.model_name,
messages=[{"role": "user", "content": prompt}],
Expand All @@ -63,14 +73,17 @@ async def chat(
**kwargs
) -> str:
"""Generate chat completion using Claude's chat API."""
# Anthropic API requires max_tokens to be set
if max_tokens is None:
max_tokens = 1024 # Default value
response = await self.client.messages.create(
model=self.model_name,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
return response.content[0].tex
return response.content[0].text

async def chat_stream(
self,
Expand All @@ -80,6 +93,9 @@ async def chat_stream(
**kwargs
) -> AsyncGenerator[str, None]:
"""Generate streaming chat completion using Claude's chat API."""
# Anthropic API requires max_tokens to be set
if max_tokens is None:
max_tokens = 1024 # Default value
stream = await self.client.messages.create(
model=self.model_name,
messages=messages,
Expand All @@ -90,7 +106,7 @@ async def chat_stream(
)
async for chunk in stream:
if chunk.type == "content_block_delta" and chunk.delta.text:
yield chunk.delta.tex
yield chunk.delta.text

async def embeddings(
self,
Expand Down
4 changes: 4 additions & 0 deletions multimind/models/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
OpenAI model implementation.
"""

import os
import openai
from typing import List, Dict, Any, Optional, AsyncGenerator, Union, cast
from openai.types.chat import ChatCompletionMessageParam
Expand All @@ -17,6 +18,9 @@ def __init__(
**kwargs
):
super().__init__(model_name, **kwargs)
# Load API key from environment if not provided
if api_key is None:
api_key = os.getenv("OPENAI_API_KEY")
self.client = openai.AsyncOpenAI(api_key=api_key)
# Set pricing based on model
if "gpt-4" in model_name:
Expand Down
Loading
Loading