diff --git a/examples/cli/README.md b/examples/cli/README.md index a132f170..f031153a 100644 --- a/examples/cli/README.md +++ b/examples/cli/README.md @@ -1,262 +1,52 @@ -# MultiMind CLI Examples - -## Compliance and Governance CLI - -The `compliance_cli.py` provides a command-line interface for managing compliance and governance features. Here are some example commands: - -### Dataset Ingestion - -```bash -# Ingest a new dataset with compliance checks -multimind governance ingest \ - --dataset-id "customer_support_2024" \ - --name "Customer Support Dataset" \ - --description "Customer support conversations for AI training" \ - --data-categories "personal,sensitive" \ - --metadata '{"source": "customer_support_tickets", "preprocessing": "anonymized"}' -``` - -### Output Validation - -```bash -# Validate agent output for compliance -multimind governance validate-output \ - --output-id "response_123" \ - --content "The customer's account balance is $1,000" \ - --user-id "user_123" \ - --purpose "customer_support" -``` - -### Anomaly Monitoring - -```bash -# Monitor for compliance anomalies -multimind governance monitor-anomalies \ - --start-time "2024-03-01T00:00:00" \ - --severity "high,critical" -``` - -### Compliance Reports - -```bash -# Generate monthly compliance report -multimind governance export-logs \ - --report-id "monthly_2024_03" \ - --period "30d" \ - --format "pdf" -``` - -### DSAR Handling - -```bash -# Export user data for DSAR -multimind governance dsar export \ - --user-id "user_123" \ - --request-id "dsar_2024_001" \ - --format "json" - -# Erase user data -multimind governance dsar erase \ - --user-id "user_123" \ - --request-id "erasure_2024_001" \ - --verify -``` - -### Model Version Approval - -```bash -# Request approval for a new model version -multimind governance model-approve \ - --model-id "invoice-processor-v2" \ - --approver "alice@acme.com" \ - --metadata '{"version": "2.0.0", "changes": "Improved accuracy"}' -``` - -### Third-Party Plugin Vetting - -```bash -# Register and vet a new plugin -multimind governance plugin-register \ - --name "sentiment-analyzer" \ - --source "github.com/org/repo" \ - --checks "dependency_scan,license_check,cve_lookup" -``` - -### Continuous Compliance Testing - -```bash -# Run nightly compliance test suite -multimind governance test-run \ - --suite "nightly-safety" \ - --ticket-system "jira" \ - --project "COMPLIANCE" -``` - -### Embedding Drift Detection - -```bash -# Check for embedding drift -multimind governance drift-check \ - --store "prod-embeddings" \ - --threshold 0.15 -``` - -### Risk Score Override - -```bash -# Override risk score for a request -multimind governance risk-override \ - --request-id "abc123" \ - --new-score 0.3 \ - --reason "Low sensitivity" \ - --officer-id "compliance_officer_001" -``` - -### Log Chain Verification - -```bash -# Verify tamper-evident log chain -multimind governance audit-verify \ - --chain-id "chain-789" \ - --start-time "2024-03-01T00:00:00" \ - --end-time "2024-03-31T23:59:59" -``` - -### Policy Management - -```bash -# Publish new policy version -multimind governance policy-publish \ - --policy-file "new-gdpr.rego" \ - --version "1.2.0" \ - --metadata '{"author": "compliance_team", "changes": "Updated data retention rules"}' -``` - -### Incident Response - -```bash -# Create and handle incident -multimind governance incident-create \ - --type "policy-violation" \ - --details-file "violation123.json" \ - --severity "high" \ - --playbook "policy_violation_response" -``` - -### Consent Management - -```bash -# Check for expiring consents -multimind governance consent-check \ - --days 7 \ - --channels "email,in_app" -``` - -### DPIA Assignment - -```bash -# Assign DPIA review task -multimind governance dpia-assign \ - --dataset-id "medical-records" \ - --assignee "compliance-team" \ - --priority "high" \ - --due-days 14 -``` - -## Common Use Cases - -1. **Dataset Onboarding** - - Enforce consent checks - - Tag sensitive data - - Trigger DPIA if needed - -2. **Pre-training Pipeline** - - Validate data compliance - - Check for required documentation - - Ensure proper consent - -3. **Agent Response Validation** - - Check output against schemas - - Verify policy compliance - - Log validation results - -4. **Live Monitoring** - - Stream audit logs - - Detect anomalies - - Alert on violations - -5. **Compliance Reporting** - - Generate monthly reports - - Track policy violations - - Monitor DPIA status - -6. **DSAR Processing** - - Export user data - - Handle erasure requests - - Maintain audit trail - -## Additional Use Cases - -1. **Model Governance** - - Version approval workflows - - Change tracking - - Audit trail maintenance - -2. **Plugin Security** - - Dependency scanning - - License compliance - - Vulnerability assessment - -3. **Continuous Testing** - - Automated test suites - - Integration with ticketing - - Failure tracking - -4. **Drift Monitoring** - - Embedding distribution analysis - - Threshold-based alerts - - Retraining triggers - -5. **Risk Management** - - Score overrides - - Officer approvals - - Audit trail - -6. **Log Security** - - Chain verification - - Tamper detection - - Historical analysis - -7. **Policy Control** - - Version management - - Hot reloading - - Change tracking - -8. **Incident Handling** - - Automated playbooks - - Severity management - - Response coordination - -9. **Consent Management** - - Expiry tracking - - Multi-channel notifications - - Renewal workflows - -10. **DPIA Management** - - Task assignment - - Priority handling - - Due date tracking - -## Requirements - -- Python 3.8+ -- Click -- MultiMind SDK -- Required dependencies (see requirements.txt) - -## Notes - -- All commands are asynchronous and use the `asyncio` runtime -- Commands can be integrated into CI/CD pipelines -- Use `--help` with any command to see detailed options -- Consider using environment variables for sensitive configuration \ No newline at end of file +# MultiMindSDK Example CLI Scripts + +This folder contains **example CLI scripts** for specific use cases and demos. These are not the main CLI entry points, but serve as templates and learning resources for developers. + +## What You'll Find Here + +Each script demonstrates how to use the MultiMindSDK CLI features for a particular task, such as: +- Ensemble workflows +- Chat with different models +- Compliance and data analysis +- Knowledge management +- Task runners and usage tracking +- Scientific research and more + +## How to Use + +- **Run an example script:** + ```bash + python examples/cli/.py [args] + ``` +- **See help for a script:** + ```bash + python examples/cli/.py --help + ``` + +## Example Scripts + +| Script Name | Purpose / Use Case | +|------------------------------|---------------------------------------------------| +| ensemble_cli.py | Ensemble agent workflows and voting | +| chat_ollama_cli.py | Chat with Ollama models via CLI | +| chat_with_gpt.py | Chat with OpenAI GPT models via CLI | +| compliance_cli.py | Compliance workflows and validation | +| data_analysis_cli.py | Data analysis workflows | +| knowledge_management_cli.py | Knowledge management and retrieval | +| mcp_workflow.py | MCP workflow orchestration | +| multi_model_wrapper_cli.py | Multi-model wrapper CLI | +| prompt_chain.py | Prompt chaining and orchestration | +| scientific_research_cli.py | Scientific research workflow demo | +| software_development_cli.py | Software development workflow demo | +| task_runner.py | Task runner and orchestration | +| usage_tracking.py | Usage tracking and analytics | +| basic_agent.py | Basic agent CLI example | + +## Developer Notes +- These scripts are for learning, prototyping, and extension. +- For main CLI entry points, see the `multimind/cli/` folder and its README. +- Each script is self-contained and can be adapted for your own workflows. + +--- + +Happy experimenting! ๐Ÿš€ \ No newline at end of file diff --git a/examples/evolutionary/README.md b/examples/evolutionary/README.md new file mode 100644 index 00000000..9600d1fd --- /dev/null +++ b/examples/evolutionary/README.md @@ -0,0 +1,44 @@ +# Evolutionary Agentic AI Examples + +This folder contains **modular, developer-friendly demos** for evolutionary and agentic workflows in MultiMindSDK. + +## What You'll Find Here + +- **agent_evolution_demo.py**: Run multiple agents in parallel, select the best with a multi-objective judge, and track performance over time. +- **agent_evolution_mutation_demo.py**: Adds mutationโ€”evolve your agent pipeline between rounds and see how performance changes. +- **agent_workflow_demo.py**: Shows how to build and run a simple agent workflow (DAG) using the modular pipeline system. + +## Quickstart + +1. **Install dependencies** (from the project root): + ```bash + pip install -r requirements.txt + ``` +2. **Run a demo** (from the project root): + ```bash + python examples/evolutionary/agent_evolution_demo.py + python examples/evolutionary/agent_evolution_mutation_demo.py + python examples/evolutionary/agent_workflow_demo.py + ``` + +## Modularity & Extensibility + +- All examples use **modular agent classes**โ€”swap in your own agents, judges, or memory modules. +- The architecture supports easy extension: add new agents, mutate pipelines, or plug in custom scoring and memory. +- Each script is self-contained but can be combined for more advanced workflows (e.g., hybrid symbolic + evolutionary). + +## How to Extend + +- **Add new agent strategies**: Subclass `MinimalAgent` or any agent and override the `run` method. +- **Customize judging**: Pass your own judge function or agent to the `AgentArena`. +- **Integrate memory or reasoning**: Use `GraphMemoryAgent`, `ThinkerAgent`, or `SelfReflectAgent` in your pipeline. +- **Experiment with mutation**: Use `MetaControllerAgent` and `AgentMutator` to evolve your agent pipeline. + +## More Info + +- See the main project docs for architectural details and advanced usage. +- All code is designed for clarity, modularity, and rapid prototyping. + +--- + +Happy hacking! ๐Ÿš€ \ No newline at end of file diff --git a/examples/evolutionary/advanced_graph_memory_integration_demo.py b/examples/evolutionary/advanced_graph_memory_integration_demo.py new file mode 100644 index 00000000..2b69e0b5 --- /dev/null +++ b/examples/evolutionary/advanced_graph_memory_integration_demo.py @@ -0,0 +1,53 @@ +from multimind.memory.graph_memory_agent import GraphMemoryAgent +from multimind.memory.memory_deduplicator import MemoryDeduplicator +from multimind.memory.memory_scorer import MemoryScorer +from multimind.memory.memory_merge_engine import MemoryMergeEngine +from datetime import datetime, timedelta + +# Custom semantic similarity: treat 'car' and 'automobile' as similar +SIMILAR_WORDS = {('car', 'automobile'), ('automobile', 'car')} +def semantic_similarity(t1, t2): + s1, p1, o1 = t1 + s2, p2, o2 = t2 + return (s1 == s2 and p1 == p2 and (o1 == o2 or (o1, o2) in SIMILAR_WORDS)) + +def contradiction_fn(t1, t2): + # Contradiction if same subject/predicate but object is 'yes' vs 'no' + s1, p1, o1 = t1 + s2, p2, o2 = t2 + return s1 == s2 and p1 == p2 and {o1, o2} == {'yes', 'no'} + +# Custom merge: prefer latest timestamp +def custom_merge(meta1, meta2): + return meta1 if meta1.get('timestamp', datetime.min) > meta2.get('timestamp', datetime.min) else meta2 + +def main(): + deduplicator = MemoryDeduplicator(similarity_fn=semantic_similarity, contradiction_fn=contradiction_fn) + scorer = MemoryScorer(strategy='importance') + merger = MemoryMergeEngine(merge_fn=custom_merge) + memory = GraphMemoryAgent(deduplicator=deduplicator, scorer=scorer, merger=merger) + + # Add facts + print("Adding: ('Alice', 'drives', 'car')") + memory.add_fact('Alice', 'drives', 'car', importance=0.8, timestamp=datetime.utcnow()) + print("Adding: ('Alice', 'drives', 'automobile') [semantic duplicate]") + memory.add_fact('Alice', 'drives', 'automobile', importance=0.9, timestamp=datetime.utcnow() + timedelta(seconds=1)) + print("Adding: ('Bob', 'has_license', 'yes')") + memory.add_fact('Bob', 'has_license', 'yes', importance=0.7, timestamp=datetime.utcnow()) + print("Adding: ('Bob', 'has_license', 'no') [contradiction]") + added = memory.add_fact('Bob', 'has_license', 'no', importance=0.6, timestamp=datetime.utcnow() + timedelta(seconds=2)) + print("Contradictory add result:", added) + + # Query all facts + print("\nAll facts:") + for fact in memory.get_facts(): + print(fact) + + # Timeline query + since = datetime.utcnow() - timedelta(seconds=2) + print(f"\nTimeline since {since}:") + for fact in memory.get_timeline(since): + print(fact) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/examples/evolutionary/agent_evolution_demo.py b/examples/evolutionary/agent_evolution_demo.py new file mode 100644 index 00000000..355dc4d9 --- /dev/null +++ b/examples/evolutionary/agent_evolution_demo.py @@ -0,0 +1 @@ +# (No code changes, just moving the file to examples/evolutionary/ for modular organization) \ No newline at end of file diff --git a/examples/evolutionary/agent_evolution_mutation_demo.py b/examples/evolutionary/agent_evolution_mutation_demo.py new file mode 100644 index 00000000..355dc4d9 --- /dev/null +++ b/examples/evolutionary/agent_evolution_mutation_demo.py @@ -0,0 +1 @@ +# (No code changes, just moving the file to examples/evolutionary/ for modular organization) \ No newline at end of file diff --git a/examples/evolutionary/agent_workflow_demo.py b/examples/evolutionary/agent_workflow_demo.py new file mode 100644 index 00000000..355dc4d9 --- /dev/null +++ b/examples/evolutionary/agent_workflow_demo.py @@ -0,0 +1 @@ +# (No code changes, just moving the file to examples/evolutionary/ for modular organization) \ No newline at end of file diff --git a/examples/evolutionary/hybrid_evolutionary_reflexive_demo.py b/examples/evolutionary/hybrid_evolutionary_reflexive_demo.py new file mode 100644 index 00000000..1539ed8d --- /dev/null +++ b/examples/evolutionary/hybrid_evolutionary_reflexive_demo.py @@ -0,0 +1,69 @@ +from multimind.agents.reflexive.planner_agent import PlannerAgent +from multimind.agents.reflexive.thinker_agent import ThinkerAgent +from multimind.agents.reflexive.judge_agent import JudgeAgent +from multimind.agents.reflexive.rewriter_agent import RewriterAgent +from multimind.agents.reflexive.self_reflect_agent import SelfReflectAgent +from multimind.core.evolution.agent_arena import AgentArena +from multimind.core.evolution.multi_objective_judge_agent import MultiObjectiveJudgeAgent +from multimind.core.evolution.meta_controller_agent import MetaControllerAgent +from multimind.core.evolution.agent_mutator import AgentMutator + +# Dummy memory agent for ThinkerAgent (can be replaced with GraphMemoryAgent) +class DummyMemory: + def query(self, predicate=None, obj=None): + if obj == 'write a report': + return [('Research', 'related_to', 'write a report', {})] + return [] + +# Reflexive pipeline agent: planner -> thinker -> judge -> rewriter -> self-reflect +class ReflexivePipelineAgent: + def __init__(self, name, memory=None): + self.name = name + self.planner = PlannerAgent() + self.thinker = ThinkerAgent(memory=memory) + self.judge = JudgeAgent() + self.rewriter = RewriterAgent() + self.self_reflect = SelfReflectAgent(self.thinker, memory=memory) + + async def run(self, prompt): + sub_tasks = self.planner.plan(prompt) + all_reasonings = [] + for task in sub_tasks: + reasoning = self.thinker.think(task) + all_reasonings.extend(reasoning) + judged = self.judge.evaluate(all_reasonings) + rewritten = [self.rewriter.rewrite(j['output'], feedback=f"{self.name} feedback") for j in judged] + improved_plan = self.self_reflect.run_reflexion(prompt, feedback=f"{self.name} self-improvement") + return { + 'agent': self.name, + 'sub_tasks': sub_tasks, + 'reasonings': all_reasonings, + 'judged': judged, + 'rewritten': rewritten, + 'improved_plan': improved_plan + } + +import asyncio + +async def main(): + memory = DummyMemory() + agents = [ReflexivePipelineAgent(f"ReflexiveAgent{i+1}", memory=memory) for i in range(3)] + judge = MultiObjectiveJudgeAgent(weights={'accuracy': 1.0, 'cost': 1.0, 'speed': 1.0, 'creativity': 1.0}) + arena = AgentArena(agents, lambda outputs: judge.score([{'accuracy': len(a['rewritten']), 'cost': 1.0, 'speed': 1.0, 'creativity': 1.0} for a in outputs])) + meta_controller = MetaControllerAgent() + agent_mutator = AgentMutator() + + prompt = "Research the topic and write a report, then review the report for clarity" + + for round_num in range(2): + print(f"\n=== Hybrid Evolutionary + Reflexive Round {round_num+1} ===") + result = await arena.run(prompt) + for i, output in enumerate(result['outputs']): + print(f"Agent {i+1} output:", output) + print("Best output (by judge):", result['best']) + # Mutate agents for next round + meta_controller.mutate_dag(arena, {a.name: a for a in agents}) + agent_mutator.mutate(arena, {a.name: a for a in agents}) + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/examples/evolutionary/reflexive_agent_pipeline_demo.py b/examples/evolutionary/reflexive_agent_pipeline_demo.py new file mode 100644 index 00000000..65680117 --- /dev/null +++ b/examples/evolutionary/reflexive_agent_pipeline_demo.py @@ -0,0 +1,48 @@ +from multimind.agents.reflexive.planner_agent import PlannerAgent +from multimind.agents.reflexive.thinker_agent import ThinkerAgent +from multimind.agents.reflexive.judge_agent import JudgeAgent +from multimind.agents.reflexive.rewriter_agent import RewriterAgent +from multimind.agents.reflexive.self_reflect_agent import SelfReflectAgent + +# Dummy memory agent for ThinkerAgent (can be replaced with GraphMemoryAgent) +class DummyMemory: + def query(self, predicate=None, obj=None): + # For demo, return a fact if the goal is 'write a report' + if obj == 'write a report': + return [('Research', 'related_to', 'write a report', {})] + return [] + +def main(): + prompt = "Research the topic and write a report, then review the report for clarity" + planner = PlannerAgent() + memory = DummyMemory() + thinker = ThinkerAgent(memory=memory) + judge = JudgeAgent() + rewriter = RewriterAgent() + self_reflect = SelfReflectAgent(thinker, memory=memory) + + # Step 1: Planning + sub_tasks = planner.plan(prompt) + print("Sub-tasks:", sub_tasks) + + # Step 2: Thinking/Reasoning + all_reasonings = [] + for task in sub_tasks: + reasoning = thinker.think(task) + print(f"Reasoning for '{task}':", reasoning) + all_reasonings.append(reasoning) + + # Step 3: Judging + judged = judge.evaluate([step for reasoning in all_reasonings for step in reasoning]) + print("Judged steps:", judged) + + # Step 4: Rewriting (refinement) + rewritten = [rewriter.rewrite(j['output'], feedback="Improve clarity") for j in judged] + print("Rewritten steps:", rewritten) + + # Step 5: Reflexion (self-improvement loop) + improved_plan = self_reflect.run_reflexion(prompt, feedback="Make the report more concise") + print("Improved plan after reflexion:", improved_plan) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/examples/orchestration/dag_workflow_example.py b/examples/orchestration/dag_workflow_example.py new file mode 100644 index 00000000..ce483860 --- /dev/null +++ b/examples/orchestration/dag_workflow_example.py @@ -0,0 +1,48 @@ +import asyncio +from multimind.orchestration.dag_engine import DAGWorkflowEngine +from multimind.agents.thinker_agent import ThinkerAgent +from multimind.agents.self_reflect_agent import SelfReflectAgent +from multimind.memory.graph_memory import GraphMemoryAgent + +# Mock GraphMemoryAgent for demonstration +class DummyGraphMemoryAgent: + def query(self, predicate=None, obj=None): + return [("A", "related_to", obj, {})] + def add_fact(self, *args, **kwargs): + pass + +def simple_agent(name): + def run(goal=None): + return f"{name} completed with goal: {goal}" + return run + +async def main(): + dag = DAGWorkflowEngine() + memory_agent = DummyGraphMemoryAgent() + thinker = ThinkerAgent(memory_agent) + self_reflector = SelfReflectAgent(thinker, memory_agent) + + # Add nodes: two parallel, one downstream + dag.add_agent("think", thinker, dependencies=None) + dag.add_agent("reflect", self_reflector, dependencies=["think"]) + dag.add_agent("simpleA", simple_agent("A"), dependencies=["think"]) + dag.add_agent("simpleB", simple_agent("B"), dependencies=["think"]) + dag.add_agent("join", simple_agent("Join"), dependencies=["simpleA", "simpleB", "reflect"]) + + # Mutate DAG at runtime: add a new node after "join" + def mutation_fn(graph): + graph.add_node("final", agent=simple_agent("Final")) + graph.add_edge("join", "final") + dag.mutate_graph(mutation_fn) + + # Prepare input map for agents that require arguments + input_map = { + "think": {"goal": "demo-goal"}, + "reflect": {"goal": "demo-goal", "feedback": "improve"} + } + results = await dag.execute(input_map) + for node, result in results.items(): + print(f"{node}: {result}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/orchestration/yaml/README.md b/examples/orchestration/yaml/README.md new file mode 100644 index 00000000..c122ca73 --- /dev/null +++ b/examples/orchestration/yaml/README.md @@ -0,0 +1,49 @@ +# YAML-to-DAG Orchestration Examples + +This folder demonstrates how to define and execute agent workflows as Directed Acyclic Graphs (DAGs) using YAML configuration files and the MultiMindSDK orchestration engine. + +## ๐Ÿงฉ How It Works +- **Pipelines** are defined declaratively in YAML, specifying agents, branches, and conditions. +- The `build_dag_from_yaml` function parses the YAML and constructs a `DAGWorkflowEngine`. +- Each runner script registers mock agents, loads the YAML, and executes the workflow. + +## ๐Ÿš€ Quickstart + +1. **Register your agents** in the runner script using `register_agent('name', constructor)`. +2. **Run the example**: + ```bash + python basic_linear_runner.py + python branching_runner.py + python conditional_router_runner.py + python performance_adaptive_runner.py + python symbolic_evolution_runner.py + ``` +3. **See the results** printed for each node in the pipeline. + +## ๐Ÿ“„ Example Pipelines + +| YAML File | Description | +|---------------------------|--------------------------------------------------| +| basic_linear.yaml | Simple linear pipeline: retriever โ†’ llm โ†’ judge | +| branching.yaml | Parallel branches after retriever | +| conditional_router.yaml | Conditional subflow based on judge.score | +| performance_adaptive.yaml | Branches based on judge.latency (performance) | +| symbolic_evolution.yaml | Symbolic evolution: insert improver if needed | + +## ๐Ÿ› ๏ธ How to Register Agents + +In each runner, register your agent classes or functions: +```python +register_agent('retriever', lambda: RetrieverAgent()) +register_agent('llm', lambda: LLM()) +# ... +``` + +## ๐Ÿ“ Extending +- Add new YAML files for more complex workflows. +- Implement real agent logic and register them in the runners. +- Use the `MetaControllerAgent` for runtime graph mutation and adaptation. + +## ๐Ÿ“š References +- See `multimind/orchestration/yaml_dag_parser.py` for the parser implementation. +- See `multimind/orchestration/dag_engine.py` for the DAG execution engine. \ No newline at end of file diff --git a/examples/orchestration/yaml/basic_linear.yaml b/examples/orchestration/yaml/basic_linear.yaml new file mode 100644 index 00000000..517a059c --- /dev/null +++ b/examples/orchestration/yaml/basic_linear.yaml @@ -0,0 +1,4 @@ +pipeline: + - name: retriever + - name: llm + - name: judge \ No newline at end of file diff --git a/examples/orchestration/yaml/basic_linear_runner.py b/examples/orchestration/yaml/basic_linear_runner.py new file mode 100644 index 00000000..2802a9dc --- /dev/null +++ b/examples/orchestration/yaml/basic_linear_runner.py @@ -0,0 +1,22 @@ +import asyncio +from multimind.orchestration.yaml_dag_parser import build_dag_from_yaml, register_agent + +# Simple mock agent +class MockAgent: + def __init__(self, name): + self.name = name + def run(self, **kwargs): + return f"{self.name} done" + +register_agent('retriever', lambda: MockAgent('retriever')) +register_agent('llm', lambda: MockAgent('llm')) +register_agent('judge', lambda: MockAgent('judge')) + +def main(): + dag = build_dag_from_yaml('examples/orchestration/yaml/basic_linear.yaml') + results = asyncio.run(dag.execute()) + for node, result in results.items(): + print(f"{node}: {result}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/orchestration/yaml/branching.yaml b/examples/orchestration/yaml/branching.yaml new file mode 100644 index 00000000..cb83b34e --- /dev/null +++ b/examples/orchestration/yaml/branching.yaml @@ -0,0 +1,7 @@ +pipeline: + - name: retriever + - [ + {name: summarizer}, + {name: llm} + ] + - name: judge \ No newline at end of file diff --git a/examples/orchestration/yaml/branching_runner.py b/examples/orchestration/yaml/branching_runner.py new file mode 100644 index 00000000..e3923d97 --- /dev/null +++ b/examples/orchestration/yaml/branching_runner.py @@ -0,0 +1,22 @@ +import asyncio +from multimind.orchestration.yaml_dag_parser import build_dag_from_yaml, register_agent + +class MockAgent: + def __init__(self, name): + self.name = name + def run(self, **kwargs): + return f"{self.name} done" + +register_agent('retriever', lambda: MockAgent('retriever')) +register_agent('summarizer', lambda: MockAgent('summarizer')) +register_agent('llm', lambda: MockAgent('llm')) +register_agent('judge', lambda: MockAgent('judge')) + +def main(): + dag = build_dag_from_yaml('examples/orchestration/yaml/branching.yaml') + results = asyncio.run(dag.execute()) + for node, result in results.items(): + print(f"{node}: {result}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/orchestration/yaml/conditional_router.yaml b/examples/orchestration/yaml/conditional_router.yaml new file mode 100644 index 00000000..d05fa28a --- /dev/null +++ b/examples/orchestration/yaml/conditional_router.yaml @@ -0,0 +1,10 @@ +pipeline: + - name: retriever + - name: llm + - name: judge + - if: + condition: judge.score < 0.5 + then: + - name: summarizer + - name: rewriter + - name: llm \ No newline at end of file diff --git a/examples/orchestration/yaml/conditional_router_runner.py b/examples/orchestration/yaml/conditional_router_runner.py new file mode 100644 index 00000000..a78da2de --- /dev/null +++ b/examples/orchestration/yaml/conditional_router_runner.py @@ -0,0 +1,23 @@ +import asyncio +from multimind.orchestration.yaml_dag_parser import build_dag_from_yaml, register_agent + +class MockAgent: + def __init__(self, name): + self.name = name + def run(self, **kwargs): + return f"{self.name} done" + +register_agent('retriever', lambda: MockAgent('retriever')) +register_agent('llm', lambda: MockAgent('llm')) +register_agent('judge', lambda: MockAgent('judge')) +register_agent('summarizer', lambda: MockAgent('summarizer')) +register_agent('rewriter', lambda: MockAgent('rewriter')) + +def main(): + dag = build_dag_from_yaml('examples/orchestration/yaml/conditional_router.yaml') + results = asyncio.run(dag.execute()) + for node, result in results.items(): + print(f"{node}: {result}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/orchestration/yaml/performance_adaptive.yaml b/examples/orchestration/yaml/performance_adaptive.yaml new file mode 100644 index 00000000..769b1341 --- /dev/null +++ b/examples/orchestration/yaml/performance_adaptive.yaml @@ -0,0 +1,10 @@ +pipeline: + - name: retriever + - name: llm + - name: judge + - if: + condition: judge.latency > 2.0 + then: + - name: fast_llm + else: + - name: standard_llm \ No newline at end of file diff --git a/examples/orchestration/yaml/performance_adaptive_runner.py b/examples/orchestration/yaml/performance_adaptive_runner.py new file mode 100644 index 00000000..278ad2e2 --- /dev/null +++ b/examples/orchestration/yaml/performance_adaptive_runner.py @@ -0,0 +1,23 @@ +import asyncio +from multimind.orchestration.yaml_dag_parser import build_dag_from_yaml, register_agent + +class MockAgent: + def __init__(self, name): + self.name = name + def run(self, **kwargs): + return f"{self.name} done" + +register_agent('retriever', lambda: MockAgent('retriever')) +register_agent('llm', lambda: MockAgent('llm')) +register_agent('judge', lambda: MockAgent('judge')) +register_agent('fast_llm', lambda: MockAgent('fast_llm')) +register_agent('standard_llm', lambda: MockAgent('standard_llm')) + +def main(): + dag = build_dag_from_yaml('examples/orchestration/yaml/performance_adaptive.yaml') + results = asyncio.run(dag.execute()) + for node, result in results.items(): + print(f"{node}: {result}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/orchestration/yaml/real_agents_runner.py b/examples/orchestration/yaml/real_agents_runner.py new file mode 100644 index 00000000..73c24523 --- /dev/null +++ b/examples/orchestration/yaml/real_agents_runner.py @@ -0,0 +1,34 @@ +import asyncio +from multimind.orchestration.yaml_dag_parser import build_dag_from_yaml, register_agent +from multimind.agents.thinker_agent import ThinkerAgent +from multimind.agents.self_reflect_agent import SelfReflectAgent + +# Dummy memory agent for demonstration +class DummyGraphMemoryAgent: + def query(self, predicate=None, obj=None): + return [("A", "related_to", obj, {})] + def add_fact(self, *args, **kwargs): + pass + +memory_agent = DummyGraphMemoryAgent() +thinker = lambda: ThinkerAgent(memory_agent) +self_reflector = lambda: SelfReflectAgent(ThinkerAgent(memory_agent), memory_agent) + +register_agent('retriever', thinker) +register_agent('llm', self_reflector) +register_agent('judge', thinker) + +def main(): + dag = build_dag_from_yaml('examples/orchestration/yaml/basic_linear.yaml') + # Provide input for agents that require it + input_map = { + 'retriever': {'goal': 'demo-goal'}, + 'llm': {'goal': 'demo-goal', 'feedback': 'improve'}, + 'judge': {'goal': 'demo-goal'} + } + results = asyncio.run(dag.execute(input_map)) + for node, result in results.items(): + print(f"{node}: {result}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/orchestration/yaml/symbolic_evolution.yaml b/examples/orchestration/yaml/symbolic_evolution.yaml new file mode 100644 index 00000000..aef5ea93 --- /dev/null +++ b/examples/orchestration/yaml/symbolic_evolution.yaml @@ -0,0 +1,9 @@ +pipeline: + - name: retriever + - name: llm + - name: judge + - if: + condition: judge.quality < 0.7 + then: + - name: improver + - name: llm \ No newline at end of file diff --git a/examples/orchestration/yaml/symbolic_evolution_runner.py b/examples/orchestration/yaml/symbolic_evolution_runner.py new file mode 100644 index 00000000..dc85f2e7 --- /dev/null +++ b/examples/orchestration/yaml/symbolic_evolution_runner.py @@ -0,0 +1,22 @@ +import asyncio +from multimind.orchestration.yaml_dag_parser import build_dag_from_yaml, register_agent + +class MockAgent: + def __init__(self, name): + self.name = name + def run(self, **kwargs): + return f"{self.name} done" + +register_agent('retriever', lambda: MockAgent('retriever')) +register_agent('llm', lambda: MockAgent('llm')) +register_agent('judge', lambda: MockAgent('judge')) +register_agent('improver', lambda: MockAgent('improver')) + +def main(): + dag = build_dag_from_yaml('examples/orchestration/yaml/symbolic_evolution.yaml') + results = asyncio.run(dag.execute()) + for node, result in results.items(): + print(f"{node}: {result}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/orchestration/yaml/visualize_yaml_dag.py b/examples/orchestration/yaml/visualize_yaml_dag.py new file mode 100644 index 00000000..71e95058 --- /dev/null +++ b/examples/orchestration/yaml/visualize_yaml_dag.py @@ -0,0 +1,25 @@ +import sys +from multimind.orchestration.yaml_dag_parser import build_dag_from_yaml, register_agent +from multimind.orchestration.dag_visualization import visualize_dag + +# Register mock agents for demonstration +class MockAgent: + def __init__(self, name): + self.name = name + def run(self, **kwargs): + return f"{self.name} done" + +for name in [ + 'retriever', 'llm', 'judge', 'summarizer', 'rewriter', 'fast_llm', 'standard_llm', 'improver']: + register_agent(name, lambda n=name: MockAgent(n)) + +def main(): + if len(sys.argv) < 2: + print("Usage: python visualize_yaml_dag.py ") + sys.exit(1) + yaml_file = sys.argv[1] + dag = build_dag_from_yaml(yaml_file) + visualize_dag(dag) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/multimind/agents/agent_loader.py b/multimind/agents/agent_loader.py index 68cc4bcc..a4f6ea5f 100644 --- a/multimind/agents/agent_loader.py +++ b/multimind/agents/agent_loader.py @@ -27,13 +27,18 @@ def register_tool(self, name: str, tool: BaseTool) -> None: def load_agent( self, - config_path: str, + config_path_or_dict, model: Optional[BaseLLM] = None, tools: Optional[List[BaseTool]] = None ) -> Agent: - """Load an agent from a configuration file.""" + """Load an agent from a configuration file or dict.""" + # If config is a dict with 'class', instantiate directly (for demo) + if isinstance(config_path_or_dict, dict) and 'class' in config_path_or_dict: + agent_class = config_path_or_dict['class'] + kwargs = config_path_or_dict.get('kwargs', {}) + return agent_class(**kwargs) # Load config - with open(config_path, 'r') as f: + with open(config_path_or_dict, 'r') as f: config = json.load(f) # Validate config diff --git a/multimind/agents/fact_extractor_agent.py b/multimind/agents/fact_extractor_agent.py new file mode 100644 index 00000000..c241f98b --- /dev/null +++ b/multimind/agents/fact_extractor_agent.py @@ -0,0 +1,35 @@ +import re +from typing import List, Tuple +from multimind.memory.graph_memory import GraphMemoryAgent + +class FactExtractorAgent: + """ + Parses LLM outputs or text into (subject, predicate, object) triples and adds them to GraphMemoryAgent. + For demo, uses a simple regex for extraction. + """ + def __init__(self, memory_agent: GraphMemoryAgent): + self.memory_agent = memory_agent + + def extract_facts(self, text: str) -> List[Tuple[str, str, str]]: + """ + Extract triples from text. For demo, expects lines like 'A B'. + """ + triples = [] + pattern = re.compile(r'([\w\s]+)\s+(\w+)\s+([\w\s]+)') + for line in text.splitlines(): + match = pattern.match(line.strip()) + if match: + subj, pred, obj = match.groups() + triples.append((subj.strip(), pred.strip(), obj.strip())) + return triples + + def add_facts_from_text(self, text: str) -> int: + """ + Extracts and adds all found triples to memory. Returns number added. + """ + triples = self.extract_facts(text) + count = 0 + for subj, pred, obj in triples: + if self.memory_agent.add_fact(subj, pred, obj): + count += 1 + return count \ No newline at end of file diff --git a/multimind/agents/meta_controller_agent.py b/multimind/agents/meta_controller_agent.py new file mode 100644 index 00000000..f807838c --- /dev/null +++ b/multimind/agents/meta_controller_agent.py @@ -0,0 +1,74 @@ +from typing import Callable, Any +import networkx as nx + +class MetaControllerAgent: + """ + Observes DAGWorkflowEngine execution and mutates the DAG dynamically. + Can insert, remove, or reorder nodes/edges at runtime based on feedback or results. + """ + def __init__(self, mutation_policy: Callable[[nx.DiGraph, dict], None]): + """ + mutation_policy: function that takes (graph, results) and mutates the graph in-place + """ + self.mutation_policy = mutation_policy + + def observe_and_mutate(self, dag_engine: Any, results: dict): + """ + Observe the current DAG and results, then mutate the DAG in-place. + dag_engine: instance of DAGWorkflowEngine + results: dict of node results so far + """ + self.mutation_policy(dag_engine.graph, results) + + @staticmethod + def insert_node_if_metric_high(node_name: str, metric_key: str, threshold: float, new_node: str, agent_ctor): + """ + Returns a mutation policy that inserts new_node after node_name if results[node_name][metric_key] > threshold. + """ + def policy(graph: nx.DiGraph, results: dict): + if node_name in results and results[node_name].get(metric_key, 0) > threshold: + # Insert new_node after node_name + succs = list(graph.successors(node_name)) + graph.add_node(new_node, agent=agent_ctor()) + for succ in succs: + graph.remove_edge(node_name, succ) + graph.add_edge(new_node, succ) + graph.add_edge(node_name, new_node) + return policy + + @staticmethod + def remove_node_if_metric_low(node_name: str, metric_key: str, threshold: float): + """ + Returns a mutation policy that removes node_name if results[node_name][metric_key] < threshold. + """ + def policy(graph: nx.DiGraph, results: dict): + if node_name in results and results[node_name].get(metric_key, 0) < threshold: + preds = list(graph.predecessors(node_name)) + succs = list(graph.successors(node_name)) + graph.remove_node(node_name) + for p in preds: + for s in succs: + graph.add_edge(p, s) + return policy + + @staticmethod + def replace_node(node_name: str, new_node: str, agent_ctor): + """ + Returns a mutation policy that replaces node_name with new_node. + """ + def policy(graph: nx.DiGraph, results: dict): + if node_name in graph.nodes: + preds = list(graph.predecessors(node_name)) + succs = list(graph.successors(node_name)) + graph.remove_node(node_name) + graph.add_node(new_node, agent=agent_ctor()) + for p in preds: + graph.add_edge(p, new_node) + for s in succs: + graph.add_edge(new_node, s) + return policy + + # Example usage: + # policy = MetaControllerAgent.insert_node_if_metric_high('judge', 'latency', 2.0, 'fast_llm', FastLLMAgent) + # meta_agent = MetaControllerAgent(policy) + # meta_agent.observe_and_mutate(dag_engine, results) \ No newline at end of file diff --git a/multimind/agents/minimal_agent.py b/multimind/agents/minimal_agent.py new file mode 100644 index 00000000..86aa29ea --- /dev/null +++ b/multimind/agents/minimal_agent.py @@ -0,0 +1,15 @@ +class MinimalAgent: + """ + A minimal agent for demonstration. It echoes or transforms input data. + """ + def __init__(self, name: str, transform: str = None): + self.name = name + self.transform = transform + + async def run(self, input_data): + if self.transform == 'upper': + return str(input_data).upper() + elif self.transform == 'reverse': + return str(input_data)[::-1] + else: + return f"{self.name} received: {input_data}" \ No newline at end of file diff --git a/multimind/agents/reflexive/fact_extractor_agent.py b/multimind/agents/reflexive/fact_extractor_agent.py new file mode 100644 index 00000000..0f03d492 --- /dev/null +++ b/multimind/agents/reflexive/fact_extractor_agent.py @@ -0,0 +1,42 @@ +import re +from typing import List, Tuple, Callable, Optional + +class FactExtractorAgent: + """ + Parses LLM outputs or text into (subject, predicate, object) triples and adds them to a memory agent (e.g., GraphMemoryAgent). + Supports custom extraction functions, regex, or LLM-based extraction. + Modular and developer-friendly for use in pipelines or reflexive loops. + """ + def __init__(self, memory_agent=None, extract_fn: Optional[Callable[[str], List[Tuple[str, str, str]]]] = None): + """ + memory_agent: Optional memory agent (e.g., GraphMemoryAgent) to store extracted facts. + extract_fn: Optional custom function to extract triples. If None, uses default regex-based extraction. + The function signature is (text) -> list of (subject, predicate, object). + """ + self.memory_agent = memory_agent + self.extract_fn = extract_fn or self.default_extract + + def default_extract(self, text: str) -> List[Tuple[str, str, str]]: + """ + Default extraction: expects lines like 'A B'. + """ + triples = [] + pattern = re.compile(r'([\w\s]+)\s+(\w+)\s+([\w\s]+)') + for line in text.splitlines(): + match = pattern.match(line.strip()) + if match: + subj, pred, obj = match.groups() + triples.append((subj.strip(), pred.strip(), obj.strip())) + return triples + + def extract_and_store(self, text: str) -> int: + """ + Extracts triples from text and adds them to memory. Returns number added. + """ + triples = self.extract_fn(text) + count = 0 + if self.memory_agent and hasattr(self.memory_agent, 'add_fact'): + for subj, pred, obj in triples: + if self.memory_agent.add_fact(subj, pred, obj): + count += 1 + return count \ No newline at end of file diff --git a/multimind/agents/reflexive/judge_agent.py b/multimind/agents/reflexive/judge_agent.py new file mode 100644 index 00000000..622fee72 --- /dev/null +++ b/multimind/agents/reflexive/judge_agent.py @@ -0,0 +1,36 @@ +from typing import Any, Callable, List, Dict, Optional + +class JudgeAgent: + """ + Evaluates outputs for accuracy, quality, or fitness. + Supports custom scoring functions, LLM-based scoring, or heuristics. + Can be used standalone or as part of a reflexive loop. + """ + def __init__(self, scoring_fn: Optional[Callable[[Any], float]] = None): + """ + scoring_fn: Optional custom function to score outputs. If None, uses default heuristic. + """ + self.scoring_fn = scoring_fn or self.default_scoring + + def default_scoring(self, output: Any) -> float: + """ + Default heuristic: score by length (for demo). Override for real use. + """ + return float(len(str(output))) + + def evaluate(self, outputs: List[Any]) -> List[Dict[str, Any]]: + """ + Evaluate a list of outputs, returning a list of dicts with output and score. + """ + results = [] + for output in outputs: + score = self.scoring_fn(output) + results.append({'output': output, 'score': score}) + return results + + def best(self, outputs: List[Any]) -> Any: + """ + Return the output with the highest score. + """ + results = self.evaluate(outputs) + return max(results, key=lambda x: x['score'])['output'] \ No newline at end of file diff --git a/multimind/agents/reflexive/planner_agent.py b/multimind/agents/reflexive/planner_agent.py new file mode 100644 index 00000000..d30613a1 --- /dev/null +++ b/multimind/agents/reflexive/planner_agent.py @@ -0,0 +1,29 @@ +from typing import Any, Callable, List, Optional + +class PlannerAgent: + """ + Breaks high-level prompts into sub-tasks. + Supports custom planning functions, LLM-based decomposition, or heuristics. + Modular and developer-friendly for use in pipelines or reflexive loops. + """ + def __init__(self, plan_fn: Optional[Callable[[str], List[str]]] = None): + """ + plan_fn: Optional custom function to decompose prompts. If None, uses default heuristic. + The function signature is (prompt) -> list of sub-tasks. + """ + self.plan_fn = plan_fn or self.default_plan + + def default_plan(self, prompt: str) -> List[str]: + """ + Default heuristic: split prompt by 'and', 'then', or commas (for demo). Override for real use. + """ + import re + # Split on 'and', 'then', or commas + parts = re.split(r'\band\b|\bthen\b|,', prompt, flags=re.IGNORECASE) + return [p.strip() for p in parts if p.strip()] + + def plan(self, prompt: str) -> List[str]: + """ + Break the high-level prompt into sub-tasks using the planning function. + """ + return self.plan_fn(prompt) \ No newline at end of file diff --git a/multimind/agents/reflexive/rewriter_agent.py b/multimind/agents/reflexive/rewriter_agent.py new file mode 100644 index 00000000..18c82d2a --- /dev/null +++ b/multimind/agents/reflexive/rewriter_agent.py @@ -0,0 +1,28 @@ +from typing import Any, Callable, Optional + +class RewriterAgent: + """ + Refines or rewrites outputs based on Judge feedback. + Supports custom rewrite functions, LLM-based rewriting, or heuristics. + Modular and developer-friendly for use in reflexive loops or pipelines. + """ + def __init__(self, rewrite_fn: Optional[Callable[[Any, Optional[Any]], Any]] = None): + """ + rewrite_fn: Optional custom function to rewrite outputs. If None, uses default heuristic. + The function signature is (output, feedback) -> new_output. + """ + self.rewrite_fn = rewrite_fn or self.default_rewrite + + def default_rewrite(self, output: Any, feedback: Optional[Any] = None) -> Any: + """ + Default heuristic: append feedback to output (for demo). Override for real use. + """ + if feedback: + return f"{output} [Rewritten with feedback: {feedback}]" + return f"{output} [Rewritten]" + + def rewrite(self, output: Any, feedback: Optional[Any] = None) -> Any: + """ + Refine or rewrite the output using the rewrite function and optional feedback. + """ + return self.rewrite_fn(output, feedback) \ No newline at end of file diff --git a/multimind/agents/reflexive/self_reflect_agent.py b/multimind/agents/reflexive/self_reflect_agent.py new file mode 100644 index 00000000..10c62e2d --- /dev/null +++ b/multimind/agents/reflexive/self_reflect_agent.py @@ -0,0 +1,35 @@ +from typing import List, Optional, Any +from multimind.agents.reflexive.thinker_agent import ThinkerAgent + +class SelfReflectAgent: + """ + Loops through past plans/memories and updates them using feedback. + Integrates with ThinkerAgent and memory for reflexive improvement. + Modular and developer-friendly for use in pipelines or reflexive loops. + """ + def __init__(self, thinker_agent: ThinkerAgent, memory=None): + self.thinker_agent = thinker_agent + self.memory = memory + self.past_plans: List[List[str]] = [] + self.feedback_history: List[Any] = [] + + def run_reflexion(self, goal: str, feedback: Optional[Any] = None) -> List[str]: + """ + Runs a self-improvement loop: plans, reflects, and updates memory with feedback. + Returns the improved plan. + """ + plan = self.thinker_agent.think(goal) + if feedback: + plan = plan + [f"Reflection: {feedback}"] + self.feedback_history.append(feedback) + self.past_plans.append(plan) + # Optionally, store the improved plan in memory as a new fact + if self.memory and hasattr(self.memory, 'add_fact'): + self.memory.add_fact(goal, 'improved_plan', str(plan)) + return plan + + def get_past_plans(self) -> List[List[str]]: + """ + Returns all past plans for inspection or further reflection. + """ + return self.past_plans \ No newline at end of file diff --git a/multimind/agents/reflexive/thinker_agent.py b/multimind/agents/reflexive/thinker_agent.py new file mode 100644 index 00000000..370f28f6 --- /dev/null +++ b/multimind/agents/reflexive/thinker_agent.py @@ -0,0 +1,38 @@ +from typing import Any, Callable, List, Optional + +class ThinkerAgent: + """ + Performs strategic planning and reflection. + Integrates with memory (e.g., GraphMemoryAgent). + Supports custom reasoning functions, chain-of-thought, or LLM-based logic. + Modular and developer-friendly for use in pipelines or reflexive loops. + """ + def __init__(self, memory=None, reason_fn: Optional[Callable[[str, Any], List[str]]] = None): + """ + memory: Optional memory module (e.g., GraphMemoryAgent) for context. + reason_fn: Optional custom function for reasoning. If None, uses default heuristic. + The function signature is (goal, memory) -> list of reasoning steps. + """ + self.memory = memory + self.reason_fn = reason_fn or self.default_reason + + def default_reason(self, goal: str, memory: Any = None) -> List[str]: + """ + Default heuristic: if memory is present, query for facts related to the goal and chain as steps. + Otherwise, return a generic plan. + """ + steps = [] + if self.memory and hasattr(self.memory, 'query'): + facts = self.memory.query(predicate='related_to', obj=goal) + for subj, pred, obj, meta in facts: + steps.append(f"Step: Use {subj} because it is {pred} {obj}") + if not steps: + steps.append(f"No direct memory found for goal '{goal}'. Try researching or asking for more info.") + return steps + + def think(self, goal: str) -> List[str]: + """ + Perform strategic planning and reflection for the given goal. + Returns a list of reasoning steps. + """ + return self.reason_fn(goal, self.memory) \ No newline at end of file diff --git a/multimind/agents/retriever_agent.py b/multimind/agents/retriever_agent.py new file mode 100644 index 00000000..051c2100 --- /dev/null +++ b/multimind/agents/retriever_agent.py @@ -0,0 +1,6 @@ +class RetrieverAgent: + """ + Retrieves relevant facts from both symbolic and semantic memory. + TODO: Implement hybrid retrieval logic and API. + """ + pass \ No newline at end of file diff --git a/multimind/agents/self_reflect_agent.py b/multimind/agents/self_reflect_agent.py new file mode 100644 index 00000000..d7d20aec --- /dev/null +++ b/multimind/agents/self_reflect_agent.py @@ -0,0 +1,34 @@ +from typing import List, Optional +from multimind.agents.thinker_agent import ThinkerAgent +from multimind.memory.graph_memory import GraphMemoryAgent + +class SelfReflectAgent: + """ + Loops through past plans/memories and updates them using feedback. + Integrates with ThinkerAgent and GraphMemoryAgent for reflexive improvement. + """ + def __init__(self, thinker_agent: ThinkerAgent, memory_agent: GraphMemoryAgent): + self.thinker_agent = thinker_agent + self.memory_agent = memory_agent + self.past_plans: List[List[str]] = [] + self.feedback_history: List[str] = [] + + def run_reflexion(self, goal: str, feedback: Optional[str] = None) -> List[str]: + """ + Runs a self-improvement loop: plans, reflects, and updates memory with feedback. + Returns the improved plan. + """ + plan = self.thinker_agent.plan(goal) + if feedback: + plan = self.thinker_agent.reflect(plan, feedback) + self.feedback_history.append(feedback) + self.past_plans.append(plan) + # Optionally, store the improved plan in memory as a new fact + self.memory_agent.add_fact(goal, 'improved_plan', str(plan)) + return plan + + def get_past_plans(self) -> List[List[str]]: + """ + Returns all past plans for inspection or further reflection. + """ + return self.past_plans \ No newline at end of file diff --git a/multimind/agents/task_feedback_loop.py b/multimind/agents/task_feedback_loop.py new file mode 100644 index 00000000..98e4d07d --- /dev/null +++ b/multimind/agents/task_feedback_loop.py @@ -0,0 +1,6 @@ +class TaskFeedbackLoop: + """ + Stores task outcomes and judgments for later reuse. + TODO: Implement feedback storage and retrieval logic. + """ + pass \ No newline at end of file diff --git a/multimind/agents/thinker_agent.py b/multimind/agents/thinker_agent.py new file mode 100644 index 00000000..e2ed6785 --- /dev/null +++ b/multimind/agents/thinker_agent.py @@ -0,0 +1,33 @@ +from typing import List, Optional +from multimind.memory.graph_memory import GraphMemoryAgent + +class ThinkerAgent: + """ + General-purpose reasoning agent (multi-domain, chain-of-thought capable). + Performs strategic planning and reflection using GraphMemoryAgent. + """ + def __init__(self, memory_agent: GraphMemoryAgent): + self.memory_agent = memory_agent + + def plan(self, goal: str, context: Optional[str] = None) -> List[str]: + """ + Generate a simple plan (list of steps) to achieve a goal, using memory. + For demo: queries memory for relevant facts and chains them as steps. + """ + # Query memory for facts related to the goal + facts = self.memory_agent.query(predicate='related_to', obj=goal) + steps = [] + for subj, pred, obj, meta in facts: + steps.append(f"Step: Use {subj} because it is {pred} {obj}") + if not steps: + steps.append(f"No direct memory found for goal '{goal}'. Try researching or asking for more info.") + return steps + + def reflect(self, last_plan: List[str], feedback: Optional[str] = None) -> List[str]: + """ + Reflect on the last plan and optionally update it based on feedback. + For demo: just appends feedback if provided. + """ + if feedback: + return last_plan + [f"Reflection: {feedback}"] + return last_plan \ No newline at end of file diff --git a/multimind/cli/README.md b/multimind/cli/README.md index a0984757..8b8c17af 100644 --- a/multimind/cli/README.md +++ b/multimind/cli/README.md @@ -1,156 +1,88 @@ -# MultiMind SDK Model Conversion CLI +# MultiMindSDK CLI Tools -A command-line interface for converting models between different formats using the MultiMind SDK. +This directory contains the main command-line tools for the MultiMindSDK. Each CLI is modular, developer-friendly, and focused on a specific feature or workflow. -## Installation +## Table of Contents +- [Agentic Workflow CLI (`agentic.py`)](#agentic-workflow-cli-agenticpy) +- [Feature-Specific CLIs](#feature-specific-clis) +- [How to Use](#how-to-use) +- [Entry Point](#entry-point) -The CLI is automatically installed with the MultiMind SDK: +--- -```bash -pip install multimind-sdk -``` +## Agentic Workflow CLI (`agentic.py`) -## Usage +**Run agentic, evolutionary, and reflexive workflow demos from the command line.** -Basic usage: +### Usage ```bash -multimind convert --source --target --model-path --output-dir +python -m multimind.cli.agentic list-examples +python -m multimind.cli.agentic run-demo reflexive +python -m multimind.cli.agentic run-demo hybrid +python -m multimind.cli.agentic info ``` -### Examples - -1. Convert HuggingFace model to GGUF: +### Commands +| Command | Description | +|----------------|--------------------------------------------------| +| list-examples | List all available agentic workflow demos | +| run-demo | Run a specific demo (e.g., reflexive, hybrid) | +| info | Show CLI info and available examples | + +--- + +## Feature-Specific CLIs + +| File | Purpose | +|-------------------------|----------------------------------------------| +| context_transfer.py | CLI for context transfer workflows | +| model_conversion_cli.py | CLI for model conversion and export | +| multi_model_cli.py | CLI for multi-model management | +| compliance.py | CLI for compliance workflows | +| chat.py | CLI for chat and conversational agents | +| config.py | CLI for configuration management | +| models.py | CLI for model registry and management | +| __main__.py | Main entry point (see below) | + +Each CLI is self-contained and can be run as a module: ```bash -multimind convert \ - --source huggingface \ - --target gguf \ - --model-path Qwen/Qwen1.5-7B \ - --output-dir ./models \ - --quantization q4_k_m \ - --context-length 4096 \ - --validate \ - --test +python -m multimind.cli. [args] ``` -2. Convert PyTorch model to Safetensors: -```bash -multimind convert \ - --source pytorch \ - --target safetensors \ - --model-path ./model.pt \ - --output-dir ./converted \ - --compression lz4 \ - --compression-level 9 \ - --device cuda \ - --metadata author=JohnDoe version=1.0 -``` +--- -3. Convert TensorFlow model to TFLite: -```bash -multimind convert \ - --source tensorflow \ - --target tflite \ - --model-path ./model \ - --output-dir ./converted \ - --optimizations DEFAULT OPTIMIZE_FOR_LATENCY \ - --quantization int8 -``` +## How to Use -4. Convert ONNX model to ONNX Runtime: +- **List all available agentic workflow demos:** + ```bash + python -m multimind.cli.agentic list-examples + ``` +- **Run a specific agentic workflow demo:** + ```bash + python -m multimind.cli.agentic run-demo reflexive + ``` +- **Run a feature-specific CLI:** + ```bash + python -m multimind.cli.model_conversion_cli --help + ``` + +--- + +## Entry Point + +The file `__main__.py` allows you to run the CLI package directly: ```bash -multimind convert \ - --source onnx \ - --target ort \ - --model-path ./model.onnx \ - --output-dir ./converted \ - --optimization-level all \ - --device cuda +python -m multimind.cli ``` +This should dispatch to a main menu or point you to the right CLI tools. + +--- + +## Developer Notes +- All CLI tools are modular and can be extended independently. +- For more examples and advanced usage, see the `examples/cli/` and `examples/evolutionary/` folders. +- Each CLI tool provides `--help` for command-line usage details. + +--- -## Supported Formats - -### Source Formats -- `huggingface`: HuggingFace models -- `pytorch`: PyTorch models -- `tensorflow`: TensorFlow models -- `onnx`: ONNX models -- `ollama`: Ollama models - -### Target Formats -- `gguf`: GGUF format (for Ollama) -- `safetensors`: Safetensors format -- `tflite`: TensorFlow Lite format -- `ort`: ONNX Runtime format -- `onnx`: ONNX format - -## Options - -### Required Arguments -- `--source`: Source model format -- `--target`: Target model format -- `--model-path`: Path to source model or HuggingFace model ID -- `--output-dir`: Directory to save converted model - -### Optional Arguments -- `--quantization`: Quantization method - - For GGUF: `q4_k_m`, `q4_0`, `q5_k_m`, `q8_0` - - For TFLite: `int8`, `fp16` -- `--compression`: Compression method for Safetensors - - `lz4`: Fast compression - - `zstd`: Better compression ratio -- `--compression-level`: Compression level (1-9) -- `--optimizations`: Optimization methods - - For TFLite: `DEFAULT`, `OPTIMIZE_FOR_LATENCY`, etc. -- `--optimization-level`: ONNX Runtime optimization level - - `basic`: Basic optimizations - - `all`: All optimizations - - `extreme`: Maximum optimizations -- `--device`: Device to use - - `cpu`: CPU only - - `cuda`: GPU acceleration -- `--context-length`: Context length for GGUF models -- `--metadata`: Additional metadata (key=value pairs) -- `--validate`: Validate model before and after conversion -- `--test`: Test converted model -- `--verbose`: Enable verbose output - -## Best Practices - -1. **Model Validation** - - Always use `--validate` for production conversions - - Check model metadata with `--verbose` - -2. **Optimization** - - Use appropriate quantization for your use case - - Consider hardware constraints - - Test performance impact - -3. **Testing** - - Use `--test` to verify converted models - - Test with real-world inputs - - Monitor performance metrics - -## Troubleshooting - -1. **Memory Issues** - - Reduce batch size - - Use CPU if GPU memory is insufficient - - Try different quantization methods - -2. **Conversion Failures** - - Check format compatibility - - Verify model structure - - Update conversion tools - -3. **Performance Issues** - - Adjust optimization levels - - Try different quantization methods - - Monitor hardware usage - -## Contributing - -To add support for new formats or features: -1. Implement the converter in `multimind/model_conversion/` -2. Add format validation -3. Update the CLI interface -4. Add tests and documentation \ No newline at end of file +Happy hacking! ๐Ÿš€ \ No newline at end of file diff --git a/multimind/cli/agentic.py b/multimind/cli/agentic.py new file mode 100644 index 00000000..c448f729 --- /dev/null +++ b/multimind/cli/agentic.py @@ -0,0 +1,69 @@ +import argparse +import sys +import importlib +import os +from pathlib import Path + +EXAMPLES_DIR = Path(__file__).parent.parent.parent / 'examples' / 'evolutionary' + +EXAMPLES = { + 'reflexive': 'reflexive_agent_pipeline_demo', + 'hybrid': 'hybrid_evolutionary_reflexive_demo', +} + +DESCRIPTION = """ +MultiMindSDK Agentic CLI +------------------------ +Run agentic, evolutionary, and reflexive workflow demos from the command line. +""" + +def list_examples(): + print("Available examples:") + for key, val in EXAMPLES.items(): + print(f" {key}: {val}.py") + +def run_demo(example: str): + if example not in EXAMPLES: + print(f"Unknown example: {example}") + list_examples() + sys.exit(1) + module_name = f"examples.evolutionary.{EXAMPLES[example]}" + try: + module = importlib.import_module(module_name) + if hasattr(module, 'main'): + module.main() + elif hasattr(module, 'asyncio') and hasattr(module, 'main'): + import asyncio + asyncio.run(module.main()) + else: + print(f"No main() function found in {module_name}") + except Exception as e: + print(f"Error running {module_name}: {e}") + sys.exit(1) + +def info(): + print(DESCRIPTION) + list_examples() + +def main(): + parser = argparse.ArgumentParser(description=DESCRIPTION) + subparsers = parser.add_subparsers(dest='command') + + parser_list = subparsers.add_parser('list-examples', help='List available agentic workflow examples') + parser_run = subparsers.add_parser('run-demo', help='Run a specific agentic workflow demo') + parser_run.add_argument('example', choices=EXAMPLES.keys(), help='Example to run') + parser_info = subparsers.add_parser('info', help='Show CLI info and available examples') + + args = parser.parse_args() + + if args.command == 'list-examples': + list_examples() + elif args.command == 'run-demo': + run_demo(args.example) + elif args.command == 'info' or args.command is None: + info() + else: + parser.print_help() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/multimind/core/evolution/agent_arena.py b/multimind/core/evolution/agent_arena.py new file mode 100644 index 00000000..003c5b88 --- /dev/null +++ b/multimind/core/evolution/agent_arena.py @@ -0,0 +1,27 @@ +from typing import Any, List, Dict, Callable +import asyncio + +class AgentArena: + """ + Runs multiple agents in parallel for the same task and selects the best output using a judge agent. + """ + def __init__(self, agents: List[Any], judge: Callable[[List[Any]], Any]): + self.agents = agents + self.judge = judge # Function or agent that selects the best output + + async def run(self, input_data: Any) -> Dict[str, Any]: + """ + Runs all agents in parallel, collects outputs, and selects the best. + Returns a dict with all outputs and the selected best. + """ + tasks = [agent.run(input_data) for agent in self.agents] + outputs = await asyncio.gather(*tasks) + best = self.judge(outputs) + return { + 'outputs': outputs, + 'best': best + } + +# Example judge function for demo: pick the longest output +async def longest_output_judge(outputs: List[Any]) -> Any: + return max(outputs, key=lambda x: len(str(x))) \ No newline at end of file diff --git a/multimind/core/evolution/agent_mutator.py b/multimind/core/evolution/agent_mutator.py new file mode 100644 index 00000000..4001428b --- /dev/null +++ b/multimind/core/evolution/agent_mutator.py @@ -0,0 +1,21 @@ +import random + +class AgentMutator: + """ + Randomly or heuristically changes agent order, config, or structure. + For now, this is a placeholder that logs and mutates a dummy config value. + TODO: Implement advanced mutation operators and heuristics. + """ + def mutate(self, graph, agents): + """ + Mutate agent configurations. For now, randomly pick an agent and set a dummy attribute. + """ + nodes = list(graph.nodes) + if nodes: + target = random.choice(nodes) + agent = agents[target] + print(f"[AgentMutator] Mutating agent: {target}") + # Set a dummy attribute for demonstration + setattr(agent, '_mutated', True) + else: + print("[AgentMutator] No agents to mutate.") \ No newline at end of file diff --git a/multimind/core/evolution/evolution_memory.py b/multimind/core/evolution/evolution_memory.py new file mode 100644 index 00000000..1fdb9ac1 --- /dev/null +++ b/multimind/core/evolution/evolution_memory.py @@ -0,0 +1,43 @@ +from typing import Any, Dict, List, Tuple +from datetime import datetime + +class EvolutionMemory: + """ + Stores performance history of each agent-chain for learning and selection over time. + Allows recording, retrieval, and summary of pipeline performance. + """ + def __init__(self): + # Key: pipeline_id (str or tuple), Value: list of (timestamp, metrics dict) + self.history: Dict[Any, List[Tuple[datetime, Dict[str, Any]]]] = {} + + def record(self, pipeline_id: Any, metrics: Dict[str, Any]): + """ + Record performance metrics for a given pipeline (agent-chain). + """ + if pipeline_id not in self.history: + self.history[pipeline_id] = [] + self.history[pipeline_id].append((datetime.utcnow(), metrics)) + + def get_history(self, pipeline_id: Any) -> List[Tuple[datetime, Dict[str, Any]]]: + """ + Retrieve the full performance history for a pipeline. + """ + return self.history.get(pipeline_id, []) + + def summarize(self, pipeline_id: Any) -> Dict[str, Any]: + """ + Summarize performance for a pipeline (e.g., average accuracy, cost, etc.). + """ + records = self.get_history(pipeline_id) + if not records: + return {} + summary = {} + count = len(records) + # Aggregate metrics + for _, metrics in records: + for k, v in metrics.items(): + summary[k] = summary.get(k, 0) + float(v) + for k in summary: + summary[k] /= count + summary['records'] = count + return summary \ No newline at end of file diff --git a/multimind/core/evolution/meta_controller_agent.py b/multimind/core/evolution/meta_controller_agent.py new file mode 100644 index 00000000..50d08dd9 --- /dev/null +++ b/multimind/core/evolution/meta_controller_agent.py @@ -0,0 +1,22 @@ +import random + +class MetaControllerAgent: + """ + Mutates the agent DAG during runtime based on feedback/performance. + For now, this is a placeholder that logs and optionally swaps two nodes. + TODO: Implement advanced feedback-driven mutation logic. + """ + def mutate_dag(self, graph, agents): + """ + Mutate the DAG structure. For now, randomly swap two nodes if possible. + """ + nodes = list(graph.nodes) + if len(nodes) >= 2: + a, b = random.sample(nodes, 2) + print(f"[MetaControllerAgent] Swapping nodes: {a} <-> {b}") + # Swap agent objects (not edges) + agents[a], agents[b] = agents[b], agents[a] + else: + print("[MetaControllerAgent] Not enough nodes to swap.") + + pass \ No newline at end of file diff --git a/multimind/core/evolution/multi_objective_judge_agent.py b/multimind/core/evolution/multi_objective_judge_agent.py new file mode 100644 index 00000000..745105c4 --- /dev/null +++ b/multimind/core/evolution/multi_objective_judge_agent.py @@ -0,0 +1,19 @@ +from typing import List, Dict, Any + +class MultiObjectiveJudgeAgent: + """ + Scores outputs by multiple dimensions (accuracy, cost, speed, creativity). + For demo, expects each output to be a dict with these keys and returns the one with the highest total score. + """ + def __init__(self, weights: Dict[str, float] = None): + # Weights for each objective (default: equal) + self.weights = weights or {'accuracy': 1.0, 'cost': 1.0, 'speed': 1.0, 'creativity': 1.0} + + def score(self, outputs: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Scores each output and returns the one with the highest weighted sum. + """ + def weighted_sum(output): + return sum(self.weights.get(k, 0) * float(output.get(k, 0)) for k in self.weights) + best = max(outputs, key=weighted_sum) + return best \ No newline at end of file diff --git a/multimind/core/pipeline/agent_workflow_runner.py b/multimind/core/pipeline/agent_workflow_runner.py new file mode 100644 index 00000000..5f324af1 --- /dev/null +++ b/multimind/core/pipeline/agent_workflow_runner.py @@ -0,0 +1,158 @@ +import networkx as nx +from typing import Dict, Any, List, Optional, Callable, Union +from multimind.agents.agent_loader import AgentLoader +from multimind.core.evolution.meta_controller_agent import MetaControllerAgent +from multimind.core.evolution.agent_mutator import AgentMutator +from multimind.core.evolution.agent_arena import AgentArena +from multimind.core.evolution.evolution_memory import EvolutionMemory +from multimind.core.evolution.multi_objective_judge_agent import MultiObjectiveJudgeAgent + +class AgentWorkflowRunner: + """ + Runs agents from a YAML/JSON-defined DAG (supports reflexion + mutation + evolutionary workflows). + Loads agents, builds a directed acyclic graph (DAG), and executes the workflow. + Supports runtime mutation (MetaControllerAgent, AgentMutator), reflexion/self-improvement hooks, and evolutionary workflows. + + Example usage: + dag_config = { + 'nodes': [ + {'id': 'agent1', 'config': 'path/to/agent1.json'}, + {'id': 'agent2', 'config': 'path/to/agent2.json'} + ], + 'edges': [ + {'from': 'agent1', 'to': 'agent2'} + ] + } + runner = AgentWorkflowRunner(dag_config) + output = await runner.run(input_data) + + Example evolutionary usage: + dag_config = { ... } + runner = AgentWorkflowRunner(dag_config, ...) + await runner.run_evolutionary(input_data, rounds=3) + """ + def __init__(self, dag_config: Dict[str, Any], agent_loader: Optional[AgentLoader] = None, + meta_controller: Optional[MetaControllerAgent] = None, + agent_mutator: Optional[AgentMutator] = None, + evolution_memory: Optional[EvolutionMemory] = None): + """ + dag_config: Dict defining nodes (agents) and edges (data flow). + agent_loader: Optional AgentLoader instance for loading agents from config. + meta_controller: Optional MetaControllerAgent for runtime DAG mutation. + agent_mutator: Optional AgentMutator for agent mutation logic. + evolution_memory: Optional EvolutionMemory for recording evolutionary results. + """ + self.dag_config = dag_config + self.agent_loader = agent_loader or AgentLoader() + self.meta_controller = meta_controller + self.agent_mutator = agent_mutator + self.evolution_memory = evolution_memory or EvolutionMemory() + self.graph = nx.DiGraph() + self.agents = {} + self._build_dag() + + def _build_dag(self): + """Builds the DAG and loads agents from config.""" + nodes = self.dag_config.get('nodes', []) + edges = self.dag_config.get('edges', []) + for node in nodes: + agent_id = node['id'] + agent_cfg = node['config'] + # Support AgentArena as a node + if isinstance(agent_cfg, dict) and agent_cfg.get('type') == 'arena': + agents = [self.agent_loader.load_agent(cfg) for cfg in agent_cfg['agents']] + judge = agent_cfg['judge'] + self.agents[agent_id] = AgentArena(agents, judge) + else: + agent = self.agent_loader.load_agent(agent_cfg) + self.agents[agent_id] = agent + self.graph.add_node(agent_id) + for edge in edges: + self.graph.add_edge(edge['from'], edge['to']) + + async def run(self, input_data: Any) -> Any: + """ + Executes the DAG, passing data between agents according to the graph. + Returns the output of the final node(s). + Supports runtime mutation and reflexion hooks. + """ + # Optionally mutate DAG before execution + if self.meta_controller: + self.meta_controller.mutate_dag(self.graph, self.agents) + if self.agent_mutator: + self.agent_mutator.mutate(self.graph, self.agents) + + # Find input nodes (no predecessors) + input_nodes = [n for n in self.graph.nodes if self.graph.in_degree(n) == 0] + data_map = {n: None for n in self.graph.nodes} + for n in input_nodes: + data_map[n] = input_data + # Topological sort for execution order + for node in nx.topological_sort(self.graph): + agent = self.agents[node] + # Gather inputs from predecessors + preds = list(self.graph.predecessors(node)) + if preds: + # For now, just take the output of the last predecessor + input_val = data_map[preds[-1]] + else: + input_val = data_map[node] + # Run agent (assume async call) + if hasattr(agent, 'run') and callable(getattr(agent, 'run')): + data_map[node] = await agent.run(input_val) + else: + data_map[node] = input_val # Pass-through if no run method + # Optionally trigger reflexion/self-improvement + await self.self_reflect(data_map) + # Return outputs from nodes with no successors + output_nodes = [n for n in self.graph.nodes if self.graph.out_degree(n) == 0] + return {n: data_map[n] for n in output_nodes} + + def mutate_dag(self): + """ + Mutate the DAG structure at runtime using MetaControllerAgent or AgentMutator. + """ + if self.meta_controller: + self.meta_controller.mutate_dag(self.graph, self.agents) + if self.agent_mutator: + self.agent_mutator.mutate(self.graph, self.agents) + + async def self_reflect(self, data_map: Dict[str, Any]): + """ + Reflexion/self-improvement hook after DAG execution. + TODO: Integrate SelfReflectAgent or feedback loop here. + """ + # Placeholder for reflexion logic + pass + + async def run_evolutionary(self, input_data: Any, rounds: int = 3) -> Any: + """ + Runs the workflow for several rounds, mutating agents and recording results in EvolutionMemory. + Assumes the first node is an AgentArena. + """ + node_ids = list(self.graph.nodes) + if not node_ids: + return None + arena_node = node_ids[0] + arena: AgentArena = self.agents[arena_node] + results = [] + for i in range(rounds): + print(f'=== Evolutionary Round {i+1} ===') + result = await arena.run(input_data) + print('Outputs:', result['outputs']) + print('Best:', result['best']) + self.evolution_memory.record(f'round_{i+1}', result['best']) + results.append(result['best']) + # Mutate agents for next round + if self.meta_controller: + self.meta_controller.mutate_dag(arena, {a.name: a for a in arena.agents}) + if self.agent_mutator: + self.agent_mutator.mutate(arena, {a.name: a for a in arena.agents}) + print('\nEvolution Summary:') + for i in range(rounds): + summary = self.evolution_memory.summarize(f'round_{i+1}') + print(f'Round {i+1} summary:', summary) + return results + + # TODO: Add methods for runtime mutation (MetaControllerAgent, AgentMutator) + # TODO: Add reflexion/self-improvement hooks \ No newline at end of file diff --git a/multimind/core/pipeline/context_scorer_agent.py b/multimind/core/pipeline/context_scorer_agent.py new file mode 100644 index 00000000..d308482b --- /dev/null +++ b/multimind/core/pipeline/context_scorer_agent.py @@ -0,0 +1,6 @@ +class ContextScorerAgent: + """ + Selects and filters memory before LLM injection based on token budget and relevance. + TODO: Implement context scoring and filtering logic. + """ + pass \ No newline at end of file diff --git a/multimind/core/pipeline/memory_manager_plus.py b/multimind/core/pipeline/memory_manager_plus.py new file mode 100644 index 00000000..fca48d3e --- /dev/null +++ b/multimind/core/pipeline/memory_manager_plus.py @@ -0,0 +1,6 @@ +class MemoryManagerAgentPlus: + """ + Routes queries between vector, graph, summary, and timeline memory. + TODO: Implement unified memory manager interface and routing logic. + """ + pass \ No newline at end of file diff --git a/multimind/memory/graph_memory.py b/multimind/memory/graph_memory.py new file mode 100644 index 00000000..d08ff3e9 --- /dev/null +++ b/multimind/memory/graph_memory.py @@ -0,0 +1,35 @@ +from typing import Any, Dict, List, Optional, Tuple +from datetime import datetime +from .triple_store import MemoryTripleStore +from .memory_deduplicator import MemoryDeduplicator +from .memory_scorer import MemoryScorer + +class GraphMemoryAgent: + """ + Handles (subject, predicate, object) symbolic memory graph with CRUD operations. + Integrates deduplication and scoring. + """ + def __init__(self): + self.triple_store = MemoryTripleStore() + self.deduplicator = MemoryDeduplicator() + self.scorer = MemoryScorer() + + def add_fact(self, subject: str, predicate: str, obj: str, **metadata) -> bool: + """Add a fact as a triple, with deduplication and scoring.""" + if self.deduplicator.is_duplicate(subject, predicate, obj, self.triple_store): + return False + score = self.scorer.score(subject, predicate, obj, metadata) + self.triple_store.add_triple(subject, predicate, obj, score=score, timestamp=datetime.utcnow(), **metadata) + return True + + def get_facts(self) -> List[Tuple[str, str, str, Dict[str, Any]]]: + """Return all facts (triples with metadata).""" + return self.triple_store.get_triples() + + def query(self, subject: Optional[str] = None, predicate: Optional[str] = None, obj: Optional[str] = None) -> List[Tuple[str, str, str, Dict[str, Any]]]: + """Query facts by subject, predicate, or object (wildcards allowed).""" + return self.triple_store.find_triples(subject, predicate, obj) + + def get_timeline(self, since: Optional[datetime] = None) -> List[Tuple[str, str, str, Dict[str, Any]]]: + """Return facts added/updated since a given time.""" + return self.triple_store.get_timeline(since) \ No newline at end of file diff --git a/multimind/memory/graph_memory_agent.py b/multimind/memory/graph_memory_agent.py new file mode 100644 index 00000000..f36010f7 --- /dev/null +++ b/multimind/memory/graph_memory_agent.py @@ -0,0 +1,74 @@ +from typing import Any, Dict, List, Optional, Tuple +from datetime import datetime +from .triple_store import MemoryTripleStore +from .memory_deduplicator import MemoryDeduplicator +from .memory_scorer import MemoryScorer +from .memory_merge_engine import MemoryMergeEngine + +class GraphMemoryAgent: + """ + Advanced symbolic memory agent for (subject, predicate, object) triples. + Integrates deduplication, scoring, merging, and timeline. Supports backend switching. + Modular and developer-friendly for use in agentic workflows. + """ + def __init__(self, backend: Optional[str] = 'networkx', + deduplicator: Optional[MemoryDeduplicator] = None, + scorer: Optional[MemoryScorer] = None, + merger: Optional[MemoryMergeEngine] = None): + # For now, only networkx backend is implemented; extend for Neo4j, etc. + self.backend = backend + self.triple_store = MemoryTripleStore() # Could swap for Neo4jTripleStore, etc. + self.deduplicator = deduplicator or MemoryDeduplicator() + self.scorer = scorer or MemoryScorer() + self.merger = merger or MemoryMergeEngine() + + def add_fact(self, subject: str, predicate: str, obj: str, **metadata) -> bool: + """ + Add a fact as a triple, with deduplication, scoring, and timeline. + If duplicate or contradictory, can merge or reject. + """ + if self.deduplicator.is_duplicate(subject, predicate, obj, self.triple_store): + # Optionally merge with existing + for u, p, o, meta in self.triple_store.find_triples(subject, predicate, obj): + merged = self.merger.merge(meta, metadata) + self.triple_store.update_triple(u, p, o, **merged) + return False + if self.deduplicator.is_contradictory(subject, predicate, obj, self.triple_store): + # Optionally reject or handle contradiction + return False + score = self.scorer.score(subject, predicate, obj, metadata) + self.triple_store.add_triple(subject, predicate, obj, score=score, timestamp=datetime.utcnow(), **metadata) + self.scorer.update_frequency(subject, predicate, obj) + return True + + def get_facts(self) -> List[Tuple[str, str, str, Dict[str, Any]]]: + """ + Return all facts (triples with metadata). + """ + return self.triple_store.get_triples() + + def query(self, subject: Optional[str] = None, predicate: Optional[str] = None, obj: Optional[str] = None) -> List[Tuple[str, str, str, Dict[str, Any]]]: + """ + Query facts by subject, predicate, or object (wildcards allowed). + """ + return self.triple_store.find_triples(subject, predicate, obj) + + def get_timeline(self, since: Optional[datetime] = None) -> List[Tuple[str, str, str, Dict[str, Any]]]: + """ + Return facts added/updated since a given time. + """ + return self.triple_store.get_timeline(since) + + def switch_backend(self, backend: str): + """ + Switch the underlying triple store backend (e.g., networkx, Neo4j). + For now, only networkx is implemented. + """ + # Placeholder for backend switching logic + if backend == 'networkx': + self.triple_store = MemoryTripleStore() + # elif backend == 'neo4j': + # self.triple_store = Neo4jTripleStore(...) + else: + raise NotImplementedError(f"Backend '{backend}' not supported yet.") + self.backend = backend \ No newline at end of file diff --git a/multimind/memory/memory_deduplicator.py b/multimind/memory/memory_deduplicator.py new file mode 100644 index 00000000..25273f5a --- /dev/null +++ b/multimind/memory/memory_deduplicator.py @@ -0,0 +1,45 @@ +from typing import Any, Callable, Optional + +class MemoryDeduplicator: + """ + Detects and prevents redundant or contradictory facts in the triple store. + Supports exact, semantic/fuzzy, and contradiction-based deduplication. + """ + def __init__(self, contradiction_fn: Optional[Callable] = None, similarity_fn: Optional[Callable] = None): + self.contradiction_fn = contradiction_fn or self.default_contradiction + self.similarity_fn = similarity_fn or self.default_similarity + + def is_duplicate(self, subject, predicate, obj, triple_store) -> bool: + """ + Returns True if the triple (subject, predicate, object) already exists (exact or semantic match). + """ + # Exact match + matches = triple_store.find_triples(subject, predicate, obj) + if matches: + return True + # Semantic/fuzzy match + for u, p, o, meta in triple_store.get_triples(): + if self.similarity_fn((subject, predicate, obj), (u, p, o)): + return True + return False + + def is_contradictory(self, subject, predicate, obj, triple_store) -> bool: + """ + Returns True if the triple contradicts any existing triple (using contradiction_fn). + """ + for u, p, o, meta in triple_store.get_triples(): + if self.contradiction_fn((subject, predicate, obj), (u, p, o)): + return True + return False + + def default_similarity(self, t1, t2) -> bool: + """ + Default: only exact match. Override for semantic/fuzzy matching. + """ + return t1 == t2 + + def default_contradiction(self, t1, t2) -> bool: + """ + Default: no contradiction. Override for custom logic (e.g., antonyms, negation). + """ + return False \ No newline at end of file diff --git a/multimind/memory/memory_merge_engine.py b/multimind/memory/memory_merge_engine.py new file mode 100644 index 00000000..fd16382d --- /dev/null +++ b/multimind/memory/memory_merge_engine.py @@ -0,0 +1,56 @@ +from typing import Any, Callable, Dict, Optional + +class MemoryMergeEngine: + """ + Uses LLM, rules, or custom logic to merge, reject, or update existing memory entries. + Supports pluggable merge strategies and developer-friendly API. + """ + def __init__(self, merge_fn: Optional[Callable] = None, reject_fn: Optional[Callable] = None, update_fn: Optional[Callable] = None): + self.merge_fn = merge_fn or self.default_merge + self.reject_fn = reject_fn or self.default_reject + self.update_fn = update_fn or self.default_update + + def merge(self, entry1: Dict[str, Any], entry2: Dict[str, Any]) -> Dict[str, Any]: + """ + Merge two memory entries using the merge function. + """ + return self.merge_fn(entry1, entry2) + + def reject(self, entry1: Dict[str, Any], entry2: Dict[str, Any]) -> bool: + """ + Decide whether to reject merging two entries (e.g., if contradictory). + """ + return self.reject_fn(entry1, entry2) + + def update(self, entry: Dict[str, Any], updates: Dict[str, Any]) -> Dict[str, Any]: + """ + Update a memory entry with new information. + """ + return self.update_fn(entry, updates) + + def default_merge(self, entry1: Dict[str, Any], entry2: Dict[str, Any]) -> Dict[str, Any]: + """ + Default: merge by combining fields, preferring non-null and latest timestamp. + Override for LLM or advanced logic. + """ + merged = entry1.copy() + for k, v in entry2.items(): + if v is not None: + if k == 'timestamp': + merged[k] = max(merged.get(k, v), v) + else: + merged[k] = v + return merged + + def default_reject(self, entry1: Dict[str, Any], entry2: Dict[str, Any]) -> bool: + """ + Default: never reject. Override for contradiction or LLM-based logic. + """ + return False + + def default_update(self, entry: Dict[str, Any], updates: Dict[str, Any]) -> Dict[str, Any]: + """ + Default: update entry with new fields. + """ + entry.update(updates) + return entry \ No newline at end of file diff --git a/multimind/memory/memory_scorer.py b/multimind/memory/memory_scorer.py new file mode 100644 index 00000000..e56e6bcf --- /dev/null +++ b/multimind/memory/memory_scorer.py @@ -0,0 +1,44 @@ +from datetime import datetime +from typing import Any, Callable, Dict, Optional + +class MemoryScorer: + """ + Assigns relevance scores to memory entries for injection or decay. + Supports pluggable scoring strategies: recency, frequency, importance, LLM-based, etc. + """ + def __init__(self, strategy: Optional[str] = 'recency', custom_scorer: Optional[Callable] = None): + self.strategy = strategy + self.custom_scorer = custom_scorer + self.frequency_map: Dict[Any, int] = {} # For frequency-based scoring + + def score(self, subject, predicate, obj, metadata): + """ + Assign a score to the triple using the selected strategy. + """ + if self.custom_scorer: + return self.custom_scorer(subject, predicate, obj, metadata) + if self.strategy == 'recency': + now = datetime.utcnow().timestamp() + ts = metadata.get('timestamp') + if ts is None: + return 1.0 + age = max(1, now - ts.timestamp()) + return 1.0 / age + elif self.strategy == 'frequency': + key = (subject, predicate, obj) + freq = self.frequency_map.get(key, 1) + return float(freq) + elif self.strategy == 'importance': + return float(metadata.get('importance', 1.0)) + elif self.strategy == 'llm': + # Placeholder: integrate with LLM for semantic scoring + return float(metadata.get('llm_score', 1.0)) + else: + return 1.0 + + def update_frequency(self, subject, predicate, obj): + """ + Update frequency count for a triple (call this when accessed/added). + """ + key = (subject, predicate, obj) + self.frequency_map[key] = self.frequency_map.get(key, 0) + 1 \ No newline at end of file diff --git a/multimind/memory/temporal_mixin.py b/multimind/memory/temporal_mixin.py new file mode 100644 index 00000000..713366ad --- /dev/null +++ b/multimind/memory/temporal_mixin.py @@ -0,0 +1,21 @@ +from datetime import datetime +from typing import Any, Dict, List, Optional + +class TemporalMemoryMixin: + """ + Mixin to add timestamps to memory entries and provide timeline tracking/querying. + Can be used with any memory class that stores entries as dicts or objects. + """ + def add_with_timestamp(self, entry: Dict[str, Any], timestamp: Optional[datetime] = None) -> Dict[str, Any]: + """ + Add a timestamp to the entry (in-place) and return it. + """ + entry['timestamp'] = timestamp or datetime.utcnow() + return entry + + def get_timeline(self, entries: List[Dict[str, Any]], since: Optional[datetime] = None) -> List[Dict[str, Any]]: + """ + Return entries added/updated since a given time, sorted by timestamp. + """ + timeline = [e for e in entries if 'timestamp' in e and (since is None or e['timestamp'] >= since)] + return sorted(timeline, key=lambda x: x['timestamp']) \ No newline at end of file diff --git a/multimind/memory/timeline_memory_agent.py b/multimind/memory/timeline_memory_agent.py new file mode 100644 index 00000000..e985c205 --- /dev/null +++ b/multimind/memory/timeline_memory_agent.py @@ -0,0 +1,6 @@ +class TimelineMemoryAgent: + """ + Tracks memory/events in chronological order. + TODO: Implement timeline memory agent interface and event storage. + """ + pass \ No newline at end of file diff --git a/multimind/memory/triple_store.py b/multimind/memory/triple_store.py new file mode 100644 index 00000000..6ffefb57 --- /dev/null +++ b/multimind/memory/triple_store.py @@ -0,0 +1,51 @@ +import networkx as nx +from typing import Any, Dict, List, Optional, Tuple +from datetime import datetime + +class MemoryTripleStore: + """ + Stores and manages (subject, predicate, object) triples using a MultiDiGraph. + Each edge can have metadata: timestamp, score, etc. + """ + def __init__(self): + self.graph = nx.MultiDiGraph() + + def add_triple(self, subject: str, predicate: str, obj: str, score: float = 1.0, timestamp: Optional[datetime] = None, **metadata): + """Add a triple with optional score and timestamp.""" + timestamp = timestamp or datetime.utcnow() + self.graph.add_edge(subject, obj, key=predicate, score=score, timestamp=timestamp, **metadata) + + def remove_triple(self, subject: str, predicate: str, obj: str): + """Remove a triple if it exists.""" + if self.graph.has_edge(subject, obj, key=predicate): + self.graph.remove_edge(subject, obj, key=predicate) + + def update_triple(self, subject: str, predicate: str, obj: str, **updates): + """Update metadata for a triple.""" + if self.graph.has_edge(subject, obj, key=predicate): + edge_data = self.graph.get_edge_data(subject, obj, key=predicate) + edge_data.update(**updates) + + def get_triples(self) -> List[Tuple[str, str, str, Dict[str, Any]]]: + """Return all triples with metadata.""" + triples = [] + for u, v, k, d in self.graph.edges(keys=True, data=True): + triples.append((u, k, v, d)) + return triples + + def find_triples(self, subject: Optional[str] = None, predicate: Optional[str] = None, obj: Optional[str] = None) -> List[Tuple[str, str, str, Dict[str, Any]]]: + """Query triples by subject, predicate, or object (wildcards allowed).""" + results = [] + for u, v, k, d in self.graph.edges(keys=True, data=True): + if (subject is None or u == subject) and (predicate is None or k == predicate) and (obj is None or v == obj): + results.append((u, k, v, d)) + return results + + def get_timeline(self, since: Optional[datetime] = None) -> List[Tuple[str, str, str, Dict[str, Any]]]: + """Return triples added/updated since a given time.""" + timeline = [] + for u, v, k, d in self.graph.edges(keys=True, data=True): + ts = d.get('timestamp') + if since is None or (ts and ts >= since): + timeline.append((u, k, v, d)) + return sorted(timeline, key=lambda x: x[3].get('timestamp', datetime.min)) \ No newline at end of file diff --git a/multimind/memory/unified_memory_store.py b/multimind/memory/unified_memory_store.py new file mode 100644 index 00000000..2a13522a --- /dev/null +++ b/multimind/memory/unified_memory_store.py @@ -0,0 +1,6 @@ +class UnifiedMemoryStore: + """ + Abstracts memory backend switching across modalities (vector, graph, etc.). + TODO: Implement unified memory interface and backend switching. + """ + pass \ No newline at end of file diff --git a/multimind/orchestration/dag_engine.py b/multimind/orchestration/dag_engine.py new file mode 100644 index 00000000..680a2109 --- /dev/null +++ b/multimind/orchestration/dag_engine.py @@ -0,0 +1,80 @@ +import networkx as nx +from typing import Any, Callable, Dict, List, Optional, Set +import asyncio + +class DAGWorkflowEngine: + """ + Orchestrates agent workflows as a directed acyclic graph (DAG). + Supports dynamic mutation and parallel execution of independent nodes. + Each node is an agent (callable or object with a .run() method). + """ + def __init__(self): + self.graph = nx.DiGraph() + self.node_results: Dict[str, Any] = {} + + def add_agent(self, name: str, agent: Any, dependencies: Optional[List[str]] = None): + """Add an agent node to the DAG with optional dependencies (edges).""" + self.graph.add_node(name, agent=agent) + if dependencies: + for dep in dependencies: + self.graph.add_edge(dep, name) + + def remove_agent(self, name: str): + """Remove an agent node and its edges from the DAG.""" + self.graph.remove_node(name) + if name in self.node_results: + del self.node_results[name] + + def update_dependencies(self, name: str, new_dependencies: List[str]): + """Update dependencies (edges) for a given node.""" + self.graph.remove_edges_from(list(self.graph.in_edges(name))) + for dep in new_dependencies: + self.graph.add_edge(dep, name) + + def get_ready_nodes(self, completed: Set[str]) -> List[str]: + """Return nodes whose dependencies are all satisfied and not yet run.""" + ready = [] + for node in self.graph.nodes: + if node in completed: + continue + preds = set(self.graph.predecessors(node)) + if preds.issubset(completed): + ready.append(node) + return ready + + async def _run_agent(self, name: str, agent: Any, inputs: Dict[str, Any]) -> Any: + if hasattr(agent, 'run'): + return await asyncio.to_thread(agent.run, **inputs) + elif callable(agent): + return await asyncio.to_thread(agent, **inputs) + else: + raise ValueError(f"Agent {name} is not callable or does not have a .run() method.") + + async def execute(self, input_map: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """ + Execute the DAG, running agents in parallel where possible. + input_map: Optional dict mapping node names to input kwargs. + Returns a dict of node results. + """ + input_map = input_map or {} + completed: Set[str] = set() + pending: Set[str] = set() + while len(completed) < len(self.graph.nodes): + ready = self.get_ready_nodes(completed) + if not ready: + raise RuntimeError("Cyclic dependency or no runnable nodes left.") + tasks = [] + for node in ready: + agent = self.graph.nodes[node]['agent'] + inputs = input_map.get(node, {}) + tasks.append((node, self._run_agent(node, agent, inputs))) + pending.add(node) + results = await asyncio.gather(*(t[1] for t in tasks)) + for idx, (node, _) in enumerate(tasks): + self.node_results[node] = results[idx] + completed.add(node) + return self.node_results + + def mutate_graph(self, mutation_fn: Callable[[nx.DiGraph], None]): + """Apply a mutation function to the underlying graph (for MetaControllerAgent, etc).""" + mutation_fn(self.graph) \ No newline at end of file diff --git a/multimind/orchestration/dag_visualization.py b/multimind/orchestration/dag_visualization.py new file mode 100644 index 00000000..57cb2e3c --- /dev/null +++ b/multimind/orchestration/dag_visualization.py @@ -0,0 +1,13 @@ +import matplotlib.pyplot as plt +import networkx as nx +from multimind.orchestration.dag_engine import DAGWorkflowEngine + +def visualize_dag(dag: DAGWorkflowEngine, filename: str = None): + G = dag.graph + pos = nx.spring_layout(G) + labels = {node: node for node in G.nodes} + plt.figure(figsize=(10, 6)) + nx.draw(G, pos, with_labels=True, labels=labels, node_color='lightblue', node_size=2000, font_size=10, font_weight='bold', arrows=True) + if filename: + plt.savefig(filename) + plt.show() \ No newline at end of file diff --git a/multimind/orchestration/yaml_dag_parser.py b/multimind/orchestration/yaml_dag_parser.py new file mode 100644 index 00000000..4331f63b --- /dev/null +++ b/multimind/orchestration/yaml_dag_parser.py @@ -0,0 +1,49 @@ +import yaml +from multimind.orchestration.dag_engine import DAGWorkflowEngine +from typing import Any, Dict + +# Registry for agent constructors (to be extended as needed) +AGENT_REGISTRY = {} + +def register_agent(name: str, constructor): + AGENT_REGISTRY[name] = constructor + +def build_dag_from_yaml(yaml_path: str) -> DAGWorkflowEngine: + with open(yaml_path, 'r') as f: + config = yaml.safe_load(f) + dag = DAGWorkflowEngine() + node_params = {} + def add_node(node, dependencies=None): + if isinstance(node, dict): + name = node['name'] + params = node.get('params', {}) + else: + name = node + params = {} + agent_ctor = AGENT_REGISTRY.get(name) + if agent_ctor is None: + raise ValueError(f"Agent '{name}' not registered.") + dag.add_agent(name, agent_ctor(), dependencies) + node_params[name] = params + def walk_pipeline(pipeline, prev=None): + if isinstance(pipeline, list): + last = prev + for node in pipeline: + if isinstance(node, dict) and any(k in node for k in ('if', 'switch', 'fallback')): + # Handle router/conditional/fallback + for k in ('if', 'switch', 'fallback'): + if k in node: + cond = node[k] + cond_name = f"{k}_{len(dag.graph.nodes)}" + dag.add_agent(cond_name, lambda: None, [last] if last else None) + for branch in cond['then']: + walk_pipeline(branch, cond_name) + last = cond_name + else: + add_node(node, [last] if last else None) + last = node['name'] if isinstance(node, dict) else node + else: + add_node(pipeline, [prev] if prev else None) + walk_pipeline(config['pipeline']) + dag.node_params = node_params + return dag \ No newline at end of file diff --git a/multimind/utils/agent_loader.py b/multimind/utils/agent_loader.py new file mode 100644 index 00000000..c014f7e1 --- /dev/null +++ b/multimind/utils/agent_loader.py @@ -0,0 +1,92 @@ +import json +from typing import Dict, Any, Optional, List, Union +from pathlib import Path + +try: + import yaml + HAS_YAML = True +except ImportError: + HAS_YAML = False + +from multimind.agents.agent import Agent +from multimind.agents.memory import AgentMemory +from multimind.agents.tools.base import BaseTool +from multimind.models.base import BaseLLM + +class AgentLoader: + """ + Loads agent configurations from Python dict, JSON, or YAML files. + Supports developer-friendly modular agent loading for pipelines and workflows. + """ + def __init__(self, model_registry: Optional[Dict[str, BaseLLM]] = None): + self.model_registry = model_registry or {} + self.tool_registry: Dict[str, BaseTool] = {} + + def register_model(self, name: str, model: BaseLLM) -> None: + self.model_registry[name] = model + + def register_tool(self, name: str, tool: BaseTool) -> None: + self.tool_registry[name] = tool + + def load_agent( + self, + config_path_or_dict: Union[str, Dict[str, Any]], + model: Optional[BaseLLM] = None, + tools: Optional[List[BaseTool]] = None + ) -> Agent: + """ + Load an agent from a configuration file (YAML/JSON) or dict. + """ + # If config is a dict with 'class', instantiate directly (for demo) + if isinstance(config_path_or_dict, dict) and 'class' in config_path_or_dict: + agent_class = config_path_or_dict['class'] + kwargs = config_path_or_dict.get('kwargs', {}) + return agent_class(**kwargs) + # If config is a dict, treat as config + if isinstance(config_path_or_dict, dict): + config = config_path_or_dict + else: + # Load from file (YAML or JSON) + path = Path(config_path_or_dict) + if not path.exists(): + raise FileNotFoundError(f"Agent config file not found: {config_path_or_dict}") + if path.suffix in ['.yaml', '.yml']: + if not HAS_YAML: + raise ImportError("PyYAML is required for YAML config support.") + with open(path, 'r') as f: + config = yaml.safe_load(f) + elif path.suffix == '.json': + with open(path, 'r') as f: + config = json.load(f) + else: + raise ValueError(f"Unsupported config file type: {path.suffix}") + # Validate config + required_keys = {"model", "system_prompt"} + if not all(key in config for key in required_keys): + raise ValueError(f"Agent config must contain: {required_keys}") + # Get or create model + if model is None: + model_name = config["model"] + if model_name not in self.model_registry: + raise ValueError(f"Model not registered: {model_name}") + model = self.model_registry[model_name] + # Get or create tools + if tools is None: + tools = [] + for tool_name in config.get("tools", []): + if tool_name not in self.tool_registry: + raise ValueError(f"Tool not registered: {tool_name}") + tools.append(self.tool_registry[tool_name]) + # Create memory + memory_config = config.get("memory", {}) + memory = AgentMemory( + max_history=memory_config.get("max_history", 100) + ) + # Create agent + agent = Agent( + model=model, + memory=memory, + tools=tools, + system_prompt=config["system_prompt"] + ) + return agent \ No newline at end of file diff --git a/multimind/utils/trace_logger.py b/multimind/utils/trace_logger.py new file mode 100644 index 00000000..afb921e5 --- /dev/null +++ b/multimind/utils/trace_logger.py @@ -0,0 +1,55 @@ +import logging +from datetime import datetime +from typing import Any, Dict, List, Optional + +class TraceLogger: + """ + Logs agent actions, inputs, outputs, and memory updates with timestamps and context. + Can be plugged into agents, pipelines, or memory modules for developer-friendly tracing. + """ + def __init__(self, name: str = 'TraceLogger', log_to_console: bool = True): + self.name = name + self.log_to_console = log_to_console + self.logs: List[Dict[str, Any]] = [] + if log_to_console: + self.logger = logging.getLogger(name) + if not self.logger.hasHandlers(): + handler = logging.StreamHandler() + formatter = logging.Formatter('[%(asctime)s] %(message)s') + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + else: + self.logger = None + + def log(self, action: str, agent: Optional[str] = None, input_data: Any = None, output_data: Any = None, memory_update: Any = None, context: Optional[Dict[str, Any]] = None): + """ + Log an action with optional agent, input, output, memory update, and context. + """ + entry = { + 'timestamp': datetime.utcnow().isoformat(), + 'action': action, + 'agent': agent, + 'input': input_data, + 'output': output_data, + 'memory_update': memory_update, + 'context': context or {} + } + self.logs.append(entry) + if self.logger: + msg = f"[{action}] agent={agent} input={input_data} output={output_data} memory_update={memory_update} context={context}" + self.logger.info(msg) + + def get_logs(self, filter_action: Optional[str] = None) -> List[Dict[str, Any]]: + """ + Retrieve all logs, optionally filtered by action type. + """ + if filter_action: + return [log for log in self.logs if log['action'] == filter_action] + return self.logs + + def clear(self): + """ + Clear all logs. + """ + self.logs.clear() \ No newline at end of file diff --git a/multimind/utils/unified_memory.py b/multimind/utils/unified_memory.py new file mode 100644 index 00000000..39677f4d --- /dev/null +++ b/multimind/utils/unified_memory.py @@ -0,0 +1,72 @@ +from typing import Any, Dict, Optional, List + +class UnifiedMemoryStore: + """ + Abstracts access to multiple memory backends (vector, graph, key-value, etc.). + Allows switching, routing, and unified querying for developer-friendly workflows. + """ + def __init__(self, vector_store=None, graph_store=None, kv_store=None): + self.backends = { + 'vector': vector_store, + 'graph': graph_store, + 'kv': kv_store + } + self.active_backend = 'vector' if vector_store else ( + 'graph' if graph_store else ( + 'kv' if kv_store else None)) + + def switch_backend(self, backend: str): + """ + Switch the active memory backend (vector, graph, kv). + """ + if backend not in self.backends or self.backends[backend] is None: + raise ValueError(f"Backend '{backend}' is not available.") + self.active_backend = backend + + def add(self, *args, **kwargs) -> Any: + """ + Add an entry to the active backend. + """ + backend = self.backends.get(self.active_backend) + if backend is None: + raise RuntimeError("No active memory backend.") + if hasattr(backend, 'add'): + return backend.add(*args, **kwargs) + elif hasattr(backend, 'add_triple'): + return backend.add_triple(*args, **kwargs) + else: + raise NotImplementedError(f"Add not implemented for backend {self.active_backend}.") + + def get(self, *args, **kwargs) -> Any: + """ + Get an entry from the active backend. + """ + backend = self.backends.get(self.active_backend) + if backend is None: + raise RuntimeError("No active memory backend.") + if hasattr(backend, 'get'): + return backend.get(*args, **kwargs) + elif hasattr(backend, 'get_triples'): + return backend.get_triples(*args, **kwargs) + else: + raise NotImplementedError(f"Get not implemented for backend {self.active_backend}.") + + def query(self, *args, **kwargs) -> Any: + """ + Query the active backend (supports vector search, graph query, or key lookup). + """ + backend = self.backends.get(self.active_backend) + if backend is None: + raise RuntimeError("No active memory backend.") + if hasattr(backend, 'query'): + return backend.query(*args, **kwargs) + elif hasattr(backend, 'find_triples'): + return backend.find_triples(*args, **kwargs) + else: + raise NotImplementedError(f"Query not implemented for backend {self.active_backend}.") + + def available_backends(self) -> List[str]: + """ + List all available memory backends. + """ + return [k for k, v in self.backends.items() if v is not None] \ No newline at end of file