Skip to content

Refactor: Bifurcate SQL Queries from Codebase into Separate Query Layer #13

@avi-2002

Description

@avi-2002

📋 Issue Summary

The codebase currently contains hardcoded SQL queries directly embedded within the QueryPatternMatcher class in pattern_learning.py. This violates the separation of concerns principle and makes the code harder to maintain, test, and modify.

🔍 Current State Analysis

Files with SQL Queries:

  • src/backend/crew_ai/optimization/pattern_learning.py
    • Lines 60-67: Table creation (CREATE TABLE IF NOT EXISTS keyword_stats)
    • Lines 69-72: Index creation (CREATE INDEX IF NOT EXISTS idx_keyword_stats_name)
    • Lines 181-187: Insert/Update query (INSERT INTO ... ON CONFLICT ... DO UPDATE SET)
    • Lines 271-274: Select query (SELECT keyword_name, usage_count, last_used FROM keyword_stats)

Current Issues:

  1. ❌ Poor Separation of Concerns: SQL logic mixed with business logic
  2. ❌ Hard to Test: Cannot easily mock database operations
  3. ❌ Limited Reusability: SQL queries cannot be shared across modules
  4. ❌ Maintenance Overhead: Query changes require modifying multiple parts of code
  5. ❌ No Query Optimization: All queries are inline strings without optimization opportunities
  6. ❌ Difficult Migration: Hard to switch databases or ORM frameworks

🎯 Proposed Solution

Create a dedicated data access layer (DAL) to separate SQL queries from business logic.

Architecture:

src/backend/crew_ai/optimization/
├── pattern_learning.py          # Business logic only
├── data_access/
│   ├── __init__.py
│   ├── queries.py               # SQL query definitions
│   ├── repository.py            # Database operations
│   └── models.py                # Data models (optional)

Implementation Plan:

Step 1: Create Query Constants Module (queries.py)

"""SQL queries for pattern learning system."""

# Schema queries
CREATE_KEYWORD_STATS_TABLE = """
    CREATE TABLE IF NOT EXISTS keyword_stats (
        keyword_name TEXT PRIMARY KEY,
        usage_count INTEGER DEFAULT 1,
        last_used TEXT NOT NULL
    )
"""

CREATE_KEYWORD_STATS_INDEX = """
    CREATE INDEX IF NOT EXISTS idx_keyword_stats_name 
    ON keyword_stats(keyword_name)
"""

# Data manipulation queries
INSERT_OR_UPDATE_KEYWORD = """
    INSERT INTO keyword_stats (keyword_name, usage_count, last_used)
    VALUES (?, 1, ?)
    ON CONFLICT(keyword_name) DO UPDATE SET
        usage_count = usage_count + 1,
        last_used = ?
"""

SELECT_ALL_KEYWORD_STATS = """
    SELECT keyword_name, usage_count, last_used
    FROM keyword_stats
    ORDER BY usage_count DESC
"""

SELECT_KEYWORD_BY_NAME = """
    SELECT keyword_name, usage_count, last_used
    FROM keyword_stats
    WHERE keyword_name = ?
"""

Step 2: Create Repository Layer (repository.py)

"""Repository pattern for database operations."""

import sqlite3
from typing import List, Dict, Tuple
from pathlib import Path
from .queries import *

class KeywordStatsRepository:
    """Repository for keyword statistics database operations."""
    
    def __init__(self, db_path: str):
        self.db_path = db_path
        Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
    
    def initialize_schema(self) -> None:
        """Create database schema if it doesn't exist."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute(CREATE_KEYWORD_STATS_TABLE)
            cursor.execute(CREATE_KEYWORD_STATS_INDEX)
            conn.commit()
    
    def upsert_keyword(self, keyword: str, timestamp: str) -> None:
        """Insert or update keyword statistics."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute(INSERT_OR_UPDATE_KEYWORD, (keyword, timestamp, timestamp))
            conn.commit()
    
    def get_all_stats(self) -> Dict[str, Dict[str, any]]:
        """Get all keyword statistics."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute(SELECT_ALL_KEYWORD_STATS)
            
            stats = {}
            for keyword_name, usage_count, last_used in cursor.fetchall():
                stats[keyword_name] = {
                    "usage_count": usage_count,
                    "last_used": last_used
                }
            return stats
    
    def get_keyword_stats(self, keyword: str) -> Tuple[str, int, str]:
        """Get statistics for a specific keyword."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute(SELECT_KEYWORD_BY_NAME, (keyword,))
            return cursor.fetchone()

Step 3: Update pattern_learning.py

"""Pattern Learning System for Query-Keyword Association."""

from .data_access.repository import KeywordStatsRepository

class QueryPatternMatcher:
    """Learn and predict keyword usage patterns."""
    
    def __init__(self, db_path: str = "./data/pattern_learning.db", chroma_store=None):
        self.db_path = db_path
        self.chroma_store = chroma_store
        
        # Use repository instead of direct SQL
        self.repository = KeywordStatsRepository(db_path)
        self.repository.initialize_schema()
        
        # ChromaDB initialization...
    
    def learn_from_execution(self, user_query: str, generated_code: str):
        """Extract keywords and store pattern."""
        used_keywords = self._extract_keywords_from_code(generated_code)
        timestamp = datetime.now().isoformat()
        
        # Store in ChromaDB...
        
        # Use repository for SQL operations
        for keyword in used_keywords:
            self.repository.upsert_keyword(keyword, timestamp)
    
    def get_keyword_stats(self) -> Dict[str, Dict]:
        """Get statistics about keyword usage."""
        return self.repository.get_all_stats()

✅ Benefits

  1. 🎯 Separation of Concerns: Clear boundary between data access and business logic
  2. 🧪 Better Testability: Easy to mock repository for unit tests
  3. ♻️ Reusability: Queries can be shared across modules
  4. 🔧 Easier Maintenance: Query changes in one place
  5. 📈 Performance: Easier to optimize queries
  6. 🔄 Migration Ready: Simpler to switch databases/ORMs (e.g., SQLAlchemy)
  7. 📚 Documentation: Queries are self-documenting with clear names

🚀 Migration Path

Phase 1: Setup (Non-breaking)

  1. Create data_access/ folder structure
  2. Add queries.py with all SQL constants
  3. Add repository.py with database operations

Phase 2: Refactor (Backward compatible)

  1. Update QueryPatternMatcher to use repository
  2. Keep old methods as deprecated
  3. Add deprecation warnings

Phase 3: Cleanup

  1. Remove deprecated code
  2. Update tests
  3. Update documentation

📝 Testing Strategy

# tests/test_keyword_repository.py
import pytest
from src.backend.crew_ai.optimization.data_access.repository import KeywordStatsRepository

def test_upsert_keyword(tmp_path):
    db_path = tmp_path / "test.db"
    repo = KeywordStatsRepository(str(db_path))
    repo.initialize_schema()
    
    timestamp = "2025-01-01T00:00:00"
    repo.upsert_keyword("Click Element", timestamp)
    
    stats = repo.get_all_stats()
    assert "Click Element" in stats
    assert stats["Click Element"]["usage_count"] == 1

🔗 Related Issues

  • Improves code maintainability (aligns with SonarQube quality improvements)
  • Prepares for potential ORM integration (SQLAlchemy)
  • Supports future database migration needs

💡 Future Enhancements

  1. Connection Pooling: For better performance
  2. Query Builder: Type-safe query construction
  3. Migration System: Alembic for schema versioning
  4. ORM Integration: Consider SQLAlchemy for complex queries
  5. Caching Layer: Redis for frequently accessed stats

📌 Acceptance Criteria

  • SQL queries extracted to queries.py
  • Repository pattern implemented in repository.py
  • QueryPatternMatcher refactored to use repository
  • All existing tests pass
  • New unit tests for repository added
  • Documentation updated
  • No performance regression
  • Backward compatibility maintained during migration

🏷️ Labels

refactoring, code-quality, technical-debt, database, architecture


Priority: Medium
Estimated Effort: 4-6 hours
Impact: High (improves maintainability and testability)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions