diff --git a/benchmark/raytune/OPTIMIZATION_EXPERIMENT.py b/benchmark/raytune/OPTIMIZATION_EXPERIMENT.py new file mode 100644 index 00000000..b47c38ea --- /dev/null +++ b/benchmark/raytune/OPTIMIZATION_EXPERIMENT.py @@ -0,0 +1,123 @@ +"""Run configuration for hyperparameter search. + +All configuration parameters for the hyperparameter search are defined here. +Modify this file to change search behavior without editing the main script. +""" + +import os +from typing import Dict, List, Optional + +# Model configurations +# Weight files are loaded from SPARSE_ATTENTION_WEIGHTS_DIR environment variable +# Set it to the directory containing your HashAttention weight files + +HASHATTENTION_WEIGHTS_DIR: str = "/data/apdesai/code/HashAttention-1.0/artifacts" +DOUBLE_SPARSITY_CONFIG_DIR: str = "/data/apdesai/code/DoubleSparse/config" +hashattention_dir: str = HASHATTENTION_WEIGHTS_DIR +doublesparsity_config_dir: str = DOUBLE_SPARSITY_CONFIG_DIR + + +MODEL_CONFIGS: Dict[str, Dict[str, str]] = { + "llama3.1-8b": { + "model_name": "meta-llama/Llama-3.1-8B-Instruct", + "hash_attention_weight_file": os.path.join(hashattention_dir, "llama3.1-8b-patch.64K.v1.hat_weights.pkl"), + "double_sparsity_config_file": os.path.join(doublesparsity_config_dir, "meta-llama/Llama-3.1-8B-Instruct.json"), + }, + "llama3.2-1b": { + "model_name": "meta-llama/Llama-3.2-1B-Instruct", + "hash_attention_weight_file": os.path.join(hashattention_dir, "DNE.pkl"), + "double_sparsity_config_file": os.path.join(doublesparsity_config_dir, "meta-llama/Llama-3.2-1B-Instruct.json"), + }, + "llama3.2-3b": { + "model_name": "meta-llama/Llama-3.2-3B-Instruct", + "hash_attention_weight_file": os.path.join(hashattention_dir, "DNE.pkl"), + "double_sparsity_config_file": os.path.join(doublesparsity_config_dir, "meta-llama/Llama-3.2-3B-Instruct.json"), + }, + "deepseek": { + "model_name": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B", + "hash_attention_weight_file": os.path.join(hashattention_dir, "DeepSeek-R1-Distill-Llama-8B-patch-layers2-dim64-max-context-24K_hat_weights.pkl"), + }, + "mistral": { + "model_name": "mistralai/Mistral-7B-Instruct-v0.3", + "hash_attention_weight_file": os.path.join(hashattention_dir, "Mistral-7B-Instruct-v0.3.24K.20.500.hat_weights.pkl"), + }, + "qwen3-30b-moe": { + "model_name": "Qwen/Qwen3-30B-A3B-Instruct-2507", + "hash_attention_weight_file": os.path.join(hashattention_dir, "DNE.pkl"), + "double_sparsity_config_file": os.path.join(doublesparsity_config_dir, "Qwen/Qwen3-30B-A3B-Instruct-2507.json"), + }, + "qwen3-4b": { + "model_name": "Qwen/Qwen3-4B-Instruct-2507", + "hash_attention_weight_file": os.path.join(hashattention_dir, "DNE.pkl"), + "double_sparsity_config_file": os.path.join(doublesparsity_config_dir, "Qwen/Qwen3-4B-Instruct-2507.json"), + }, +} + +MODELS : List[str] = [ + "llama3.1-8b", + "llama3.2-1b", + "llama3.2-3b", + "qwen3-4b", + "qwen3-30b-moe", +] + +TASKS: List[str] = [ + "ruler32k/vt", + "ruler32k/qa_1", + "ruler32k/qa_2", + "ruler32k/fwe", + "ruler32k/niah_multikey_2", + "ruler32k/niah_multikey_3", +] + +SPARSITY_OBJECTIVES: List[str] = [ + 2, + 5, + 10, + 20, +] + +MEMORY_OBJECTIVES: List[Optional[str]] = [ + 32, + 64, + 128, +] # Memory objective parameter (e.g., "memory_32") for configs that need it + +BUILDER_NAMES: List[str] = [ + # "dense", + # "double_sparsity", + # "hashattention_topk", + # "magicpig", + # "oracle_topk", + # "oracle_topp", + # "quest_topk", + # "vattention_hashattention", + # "vattention_oracle", + # "pqcache", + "vattention_pqcache", +] # Specify which builders to use (e.g., ["magicpig"], ["dense"], ["double_sparsity"]) + + +# SEARCH PARAMS +NUM_SAMPLES: int = 1 # Number of samples per hyperparameter search +SEARCH_MAX_NEW_TOKENS: int = 3 # Max new tokens for search trials +SEARCH_MAX_CONTEXT_LENGTH: int = 40000 # Max context length for search trials +SEARCH_MAX_REQUESTS: int = 3 # Max requests per search trial +OPTIMAL_CONFIGS_DIR: str = "/data/apdesai/code/DO_NOT_DELETE/vattention_pqcache_optimization/" # Directory for storing optimal configurations +RAY_RESULTS_DIR: str = "/tmp/ray_results" # Directory for Ray Tune results +SEARCH_TIMEOUT: int = 900 # Timeout per search trial in seconds +ACTORS_PER_GPU: int = 1 # Number of actors per GPU for resource allocation + + +""" DRY RUN +if true , it will do everything except the actual running of benchmark helper -- it will just return +randomly generated scores for each trial and choose based on that +""" +DRY_RUN: bool = False + + +""" If you use Time stamp then by default it will perform entire search again. +""" +USE_TIMESTAMP_FOR_RESULTS_DIR: bool = False +FORCE_SEARCH: bool = False # Force re-run of search even if configs exist + diff --git a/benchmark/raytune/README.md b/benchmark/raytune/README.md index 0dea73df..51904c98 100644 --- a/benchmark/raytune/README.md +++ b/benchmark/raytune/README.md @@ -1,99 +1,102 @@ -# Ray Tune Benchmark Suite +## Ray Tune Benchmark Suite -A distributed benchmark suite for sparse attention configurations using Ray for parallel execution. +Distributed benchmark suite for sparse attention configurations using Ray. -## Setup +### 1. Quick Start (Run existing builders on new models / settings / objectives) -### Environment Variables +- **Optimize configs** -For HashAttention configurations, set the weights directory: + 1. Edit `benchmark/raytune/OPTIMIZATION_EXPERIMENT.py` to choose: + - **Models**: `MODEL_CONFIGS`, `MODELS` + - **Tasks**: `TASKS` + - **Objectives**: `SPARSITY_OBJECTIVES`, `MEMORY_OBJECTIVES` + - **Builders**: `BUILDER_NAMES` + - **Search/runtime**: samples, timeouts, context limits, output dirs + 2. Run the optimization: ```bash -export SPARSE_ATTENTION_WEIGHTS_DIR=/path/to/your/hashattention/weights +python3 benchmark/raytune/run_optimize_configs.py ``` -The directory should contain the HashAttention weight files for your models (e.g., `llama3.1-8b-patch.64K.v1.hat_weights.pkl`). + This writes one JSON config per (model, task, builder, objective) into the configured optimal-configs directory. -## Quick Start +- **Run benchmarks with optimized configs** -### 1. Optimize Configurations -Find optimal sparse attention configurations for your models: - -```bash -python3 benchmark/raytune/run_optimize_configs.py \ - --objective sparsity_10 \ - --optimal-configs-dir > \ - --num-samples 1 \ - --search-max-new-tokens 5 \ - --search-max-context-length 32678 \ - --search-max-requests 2 \ - --actors-per-gpu 1 -``` - -### 2. Run Benchmarks -Execute benchmarks using the optimized configurations: + Use the config directory produced above with `run_config_dir.py`: ```bash python3 benchmark/raytune/run_config_dir.py \ - --configs-dir \ + --configs-dir /path/to/optimal/configs \ --max-new-tokens 100 \ --max-context-length 32678 \ --max-requests 2 \ --actors-per-gpu 1 \ - --benchmark-results-dir ./test_bench.1/ + --benchmark-results-dir ./bench_results/ ``` -## Workflow - -### Phase 1: Configuration Optimization -Use `run_optimize_configs.py` to search for optimal sparse attention parameters: - -**Configuration Sources:** -- **Models**: Defined in `get_run_configuration()` function -- **Tasks**: Specified in the configuration -- **Sparse Configs**: Two types handled: - - `to_optimize_configs`: Configurations that need hyperparameter search - - `optimal_configs`: Pre-optimized configurations (used as-is) -- **Search Spaces**: Each config type can have its own search space defined separately. Example: - -```python -# Create a ResearchAttentionConfig with custom search spaces -config = ResearchAttentionConfig(masker_configs=[ - SinkMaskerConfig(sink_size=128), - LocalMaskerConfig(window_size=128), - OracleTopKConfig(heavy_size=0.10), - AdaptiveSamplingMaskerConfig( - base_rate_sampling=0.1, - epsilon=0.25, - delta=0.25, - init_offset=128, - local_offset=128 - ) -]) - -# Define search spaces for specific maskers -config.masker_configs[2].search_space = { - "heavy_size": tune.grid_search([0.01, 0.05, 0.1, 0.2]) -} -config.masker_configs[3].search_space = { - "base_rate_sampling": tune.grid_search([0.01, 0.02, 0.05]), - "epsilon": tune.grid_search([0.05, 0.1, 0.2]), - "delta": tune.grid_search([0.05, 0.1, 0.2]) -} -``` - -**Output**: Optimal configurations are written to `/run_/` directory with individual JSON files per model-task-config combination. - -### Phase 2: Benchmark Execution -Use `run_config_dir.py` to run full benchmarks with the found configurations: - -**Input**: Pass the config directory (e.g., `/run_/`) containing all the JSON configuration files generated in Phase 1. - -**Output**: Benchmark results saved to the specified `--benchmark-results-dir`. - -## Features - -- **Distributed Execution**: Ray-based parallel processing across multiple GPUs -- **Automatic Resource Management**: Efficient GPU utilization and task scheduling -- **Sparse Attention Support**: Multiple masker types and configurations -- **Comprehensive Metrics**: Detailed performance and accuracy measurements +### 2. Implementation of optimization + +- **Config builders**: For each sparse attention method, a config builder constructs a `ResearchAttentionConfig` (masker stack, defaults, and metadata) for a given model/task/objective. +- **Search spaces**: Builders attach Ray Tune search spaces (e.g. `config.masker_configs[i].search_space`) to selected hyperparameters; `run_optimize_configs.py` passes these to Ray. +- **Validity checker**: Each builder defines a small validity checker that rejects invalid hyperparameter combinations early so trials can be skipped before running the benchmark. + +High-level flow: + +```text +(model, task, objectives, builder name) + │ + ▼ + Config builder + ┌─────────┴────────────────────────────┐ + │ │ + ▼ ▼ +ResearchAttentionConfig Ray Tune search_space attached + │ + ▼ +Ray Tune iterates over configs ──► validity checker ──► + │ │ + ├─ valid ──► run benchmark trial + └─ invalid ──► skip early (no trial) +``` + +### 3. Adding a new builder + +- **Create a builder**: Copy an existing builder from `benchmark/raytune/config_builders/`, rename it, and adapt: + - masker composition and default parameters + - Ray Tune search spaces on the relevant hyperparameters + - the validity checker logic for early exit on bad configs +- **Wire it up**: + - Register the new builder name wherever builders are dispatched (e.g. builder registry/factory). + - Add the new name to `BUILDER_NAMES` in `OPTIMIZATION_EXPERIMENT.py` so it is included in optimization and benchmarking. + +**Example sketch (`vattention_pqcache`)** in `config_builders/vattention_pqcache.py` (Check the file for details) : + +- **1. Builder name**: + + - Decorator: `@register_builder("vattention_pqcache")` + - Class: `VAttentionPQCacheConfigBuilder` + +- **2. Search space**: + + - Base definition on the PQCache masker: + + ```python + config.masker_configs[2].search_space = { + "pq_group_factor": tune.grid_search([2, 4]), + "pq_bits": tune.grid_search([4, 8]), + "kmeans_iter": tune.grid_search([10]), + "metric": tune.grid_search(["euclidean"]), + } + ``` + + - Plus sparsity-dependent grids on PQCache + AdaptiveSampling (e.g. `config.masker_configs[2].search_space["heavy_size"] = ...`, `config.masker_configs[3].search_space = {...}` inside the `if sparsity_objective == ...` blocks). + +- **3. Validity checker**: + + - Function: `_validity_check(config, sparsity_val)` at the top of the file. + - Attached to the config with: + + ```python + config.validity_constraint = partial(_validity_check, sparsity_val=sparsity_val) + ``` + diff --git a/benchmark/raytune/benchmark_helper.py b/benchmark/raytune/benchmark_helper.py new file mode 100644 index 00000000..149efedc --- /dev/null +++ b/benchmark/raytune/benchmark_helper.py @@ -0,0 +1,193 @@ +"""Benchmark helper for executing individual benchmark runs during config search.""" + +import json +import logging +import math +import os +import sys +from pathlib import Path +from typing import Dict, Tuple + +# Path setup +current_dir = Path(__file__).parent +root_path = current_dir.parent.parent +sys.path.extend([str(current_dir), str(root_path)]) +os.environ["PYTHONPATH"] = os.environ.get("PYTHONPATH", "") + f":{current_dir}:{root_path}" + +import torch + +from benchmark.executor_config import AdapterConfig +from benchmark.benchmark_registry import create_benchmark_instance +from sparse_attention_hub.adapters.huggingface import ModelAdapterHF +from sparse_attention_hub.metric_logging.logger import MicroMetricLogger +from config_builders.utility import OBJECTIVE_FUNCTIONS +from OPTIMIZATION_EXPERIMENT import DRY_RUN +import random + + +class BenchmarkHelper: + """Handles individual benchmark runs during config search. + + This class is responsible for executing a single benchmark run with a given + sparse attention configuration and returning the evaluation metrics (score, density, error). + """ + + def __init__(self, + base_result_dir: Path, + generation_kwargs: Dict[str, any], + request_kwargs: Dict[str, any]) -> None: + """Initialize the benchmark helper with configuration. + + Args: + config: Dictionary containing benchmark configuration including: + - search_result_dir: Base directory for search results + - search_max_new_tokens: Maximum new tokens for generation + - search_max_context_length: Maximum context length + - search_max_requests: Maximum requests per trial + - objective_function: Name of objective function to use + """ + self.base_result_dir: Path = base_result_dir + self.adapter_config: AdapterConfig = AdapterConfig( + adapter_name="huggingface", + model_kwargs={"torch_dtype": torch.bfloat16}, + tokenizer_kwargs={"padding_side": "left"}, + ) + self.generation_kwargs: Dict[str, any] = generation_kwargs + self.request_kwargs: Dict[str, any] = request_kwargs + + def __call__(self, attention_config: any, task_name: str, model_name: str) -> Tuple[float, float, float]: + """Run benchmark and return (score, density, error) tuple. + + Args: + attention_config: Sparse attention configuration to test + task_name: Name of the benchmark task (may include subset, e.g., "benchmark/subset") + model_name: Name of the model to use + + Returns: + Tuple of (score, density, error) where: + - score: Combined objective score (lower is better) + - density: Attention density (0.0 to 1.0) + - error: Attention output error (0.0 to 1.0) + """ + try: + # Early validation check - skip expensive benchmark if constraint fails + if hasattr(attention_config, 'validity_constraint') and attention_config.validity_constraint is not None: + if not attention_config.validity_constraint(attention_config): + logging.info(f"Config failed validity constraint, returning penalty score") + return 100.0, 1.0, 1.0 # Penalty score, worst density, worst error + else: + raise ValueError(f"No validity constraint found for attention configuration: {attention_config}. If there is no validity constraint . just set lambda: True in builder.") + + if hasattr(attention_config, 'objective') and attention_config.objective is not None: + objective_function = OBJECTIVE_FUNCTIONS[attention_config.objective] + logging.info(f"Using objective function: {objective_function.__name__} for attention configuration: {attention_config}") + else: + raise ValueError(f"No objective function found for attention configuration: {attention_config}. If config is objective agnostic just set default in builder.") + + if DRY_RUN: + return random.random(), random.random(), random.random() + + benchmark_name: str + subset_name: str | None + benchmark_name, subset_name = task_name.split("/", 1) if "/" in task_name else (task_name, None) + + # Create result directory for this specific run + result_dir: Path = os.path.join(self.base_result_dir, f"{model_name}_{task_name}_{hash(str(attention_config)) % 1000000}") + os.makedirs(result_dir, exist_ok=True) + + # Create model adapter + adapter: ModelAdapterHF = ModelAdapterHF( + model_name=model_name, + sparse_attention_config=attention_config, + model_kwargs=self.adapter_config.model_kwargs, + tokenizer_kwargs=self.adapter_config.tokenizer_kwargs + ) + + # Create benchmark instance + benchmark = create_benchmark_instance( + benchmark_name=benchmark_name, + subsets=[subset_name] if subset_name else None + ) + print("The result directory is ", result_dir, flush=True) + # Setup micro metric logger + metric_logger: MicroMetricLogger = MicroMetricLogger() + metric_logger.configure_logging( + log_path=str(result_dir), + enabled_metrics=["research_attention_density", "research_attention_output_error"], + ) + + # Run benchmark directly + metrics = benchmark.run_benchmark( + adapter=adapter, + result_dir=str(result_dir), + generation_kwargs=self.generation_kwargs, + request_kwargs=self.request_kwargs + ) + + # Flush the metric logger to ensure all metrics are written + metric_logger.flush() + + # Extract micro metrics for sparse attention evaluation + micro_metrics: Dict[str, float] = self._extract_micro_metrics(result_dir) + error: float = micro_metrics["attention_error"] + density: float = micro_metrics["density"] + + # For dense configuration (density=1.0, error=0.0), use a simple score + if density == 1.0 and error == 0.0: + # Dense baseline: use benchmark accuracy metrics instead of sparse metrics + score: float = 100.0 # Small baseline score for dense + else: + # Use the selected objective function + score = objective_function(error, density) + # Also print to stdout so the test script can detect it + print(f"Objective: {objective_function.__name__}, Error: {error:.4f}, Density: {density:.4f}, Score: {score:.4f}") + logging.info(f"Objective: {objective_function.__name__}, Error: {error:.4f}, Density: {density:.4f}, Score: {score:.4f}") + + return score, density, error + + except Exception as e: + logging.error(f"Benchmark failed: {e}") + import traceback + traceback.print_exc() + + return 5.0, 1.0, 1.0 # Penalty score, worst-case density, and worst-case error + + def _extract_micro_metrics(self, result_dir: Path) -> Dict[str, float]: + """Extract attention error and density from micro metrics. + + Args: + result_dir: Directory containing the micro_metrics.jsonl file + + Returns: + Dictionary with keys: + - attention_error: Average attention output error (0.0 to 1.0) + - density: Average attention density (0.0 to 1.0) + """ + micro_metrics_file: Path = os.path.join(result_dir, "micro_metrics.jsonl") + if not os.path.exists(micro_metrics_file): + # For dense configuration, micro_metrics.jsonl won't exist since no sparse attention is used + # Return default values: 0 error (perfect) and 1.0 density (fully dense) + logging.info(f"micro_metrics.jsonl not found in {result_dir}, using dense defaults") + return {"attention_error": 0.0, "density": 1.0} + + errors: list[float] = [] + densities: list[float] = [] + with open(micro_metrics_file, "r") as f: + for line in f: + try: + entry: dict = json.loads(line.strip()) + metric: str | None = entry.get("metric") + value: any = entry.get("value") + if value is not None and not (isinstance(value, float) and math.isnan(value)): + if metric == "research_attention_output_error": + errors.append(float(value)) + elif metric == "research_attention_density": + densities.append(float(value)) + except (json.JSONDecodeError, ValueError, TypeError): + continue + + return { + "attention_error": sum(errors) / len(errors) if errors else 1.0, + "density": sum(densities) / len(densities) if densities else 1.0 + } + diff --git a/benchmark/raytune/config_builders/__init__.py b/benchmark/raytune/config_builders/__init__.py new file mode 100644 index 00000000..acf2661f --- /dev/null +++ b/benchmark/raytune/config_builders/__init__.py @@ -0,0 +1,38 @@ +"""Configuration builders for sparse attention configs.""" + +from .base import BaseConfigBuilder +from .factory import get_config_builder, get_all_config_builders, register_builder + +# Import builders to trigger registration via decorators +from .dense import DenseConfigBuilder # noqa: E402, F401 +from .double_sparsity import DoubleSparsityConfigBuilder # noqa: E402, F401 +from .vattention_oracle import VAttentionOracleConfigBuilder # noqa: E402, F401 +from .vattention_hashattention import VAttentionHashAttentionConfigBuilder # noqa: E402, F401 +from .vattention_pqcache import VAttentionPQCacheConfigBuilder # noqa: E402, F401 +from .oracle_topk import OracleTopKConfigBuilder # noqa: E402, F401 +from .oracle_topp import OracleTopPConfigBuilder # noqa: E402, F401 +from .hashattention_topk import HashAttentionTopKConfigBuilder # noqa: E402, F401 +from .magicpig import MagicPigConfigBuilder # noqa: E402, F401 +from .pqcache import PQCacheConfigBuilder # noqa: E402, F401 +from .quest_top_k import QuestTopKConfigBuilder # noqa: E402, F401 +from .random_sampling import RandomSamplingConfigBuilder # noqa: E402, F401 + +__all__ = [ + "BaseConfigBuilder", + "DenseConfigBuilder", + "DoubleSparsityConfigBuilder", + "VAttentionOracleConfigBuilder", + "VAttentionHashAttentionConfigBuilder", + "VAttentionPQCacheConfigBuilder", + "OracleTopKConfigBuilder", + "OracleTopPConfigBuilder", + "HashAttentionTopKConfigBuilder", + "MagicPigConfigBuilder", + "PQCacheConfigBuilder", + "QuestTopKConfigBuilder", + "RandomSamplingConfigBuilder", + "get_config_builder", + "get_all_config_builders", + "register_builder", +] + diff --git a/benchmark/raytune/config_builders/base.py b/benchmark/raytune/config_builders/base.py new file mode 100644 index 00000000..276ca1ac --- /dev/null +++ b/benchmark/raytune/config_builders/base.py @@ -0,0 +1,39 @@ +"""Base class for configuration builders.""" + +from abc import ABC, abstractmethod +from typing import List, Optional, Tuple + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig + + +class BaseConfigBuilder(ABC): + """Abstract base class for building sparse attention configurations. + + Each builder is responsible for creating configurations for a specific + sparse attention method or combination of methods. + """ + + @abstractmethod + def build_configs( + self, + weight_file: Optional[str] = None, + objective: str = "default", + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Build sparse attention configurations. + + Args: + weight_file: Path to weight file (required for some configs) + objective: Objective function name (e.g., "sparsity_5", "default") + **kwargs: Additional parameters specific to the builder + + Returns: + Tuple of (optimal_configs, to_optimize_configs) where each is a list + of (name, full_config, masker_classes) tuples. + + - optimal_configs: Configs that don't need hyperparameter search + - to_optimize_configs: Configs that need Ray Tune optimization + """ + pass + diff --git a/benchmark/raytune/config_builders/dense.py b/benchmark/raytune/config_builders/dense.py new file mode 100644 index 00000000..d084e94b --- /dev/null +++ b/benchmark/raytune/config_builders/dense.py @@ -0,0 +1,42 @@ +"""Configuration builder for dense (no sparse attention) model.""" + +from typing import List, Optional, Tuple, Dict + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig + +from .base import BaseConfigBuilder +from .factory import register_builder + + +@register_builder("dense") +class DenseConfigBuilder(BaseConfigBuilder): + """Builder for dense (no sparse attention) configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get dense baseline configuration. + + Ignores: + sparsity_objectives: List[int] - List of sparsity objectives + memory_objectives: List[int] - List of memory objectives + model_config: Dict[str, str] - Model configuration + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + # Dense baseline: no sparse attention, so sparse_config and masker_classes are None + # Since dense doesn't depend on sparsity or memory objectives, we just return a single config + # with None values (no sparse attention configuration needed) + optimal_configs.append(("dense", None, None)) + + return optimal_configs, to_optimize_configs + diff --git a/benchmark/raytune/config_builders/double_sparsity.py b/benchmark/raytune/config_builders/double_sparsity.py new file mode 100644 index 00000000..8b3d2ce9 --- /dev/null +++ b/benchmark/raytune/config_builders/double_sparsity.py @@ -0,0 +1,88 @@ +"""Configuration builder for DoubleSparsity attention.""" + +from functools import partial +from typing import List, Optional, Tuple, Dict +import os +from ray import tune + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( + DoubleSparsityTopKMaskerConfig, + LocalMaskerConfig, + SinkMaskerConfig, +) + +from .base import BaseConfigBuilder +from .factory import register_builder +from .utility import get_masker_list_name + +from logging import getLogger +logger = getLogger(__name__) + +def _validity_check(config: ResearchAttentionConfig, mem_obj: int) -> bool: + """Check if the config meets the memory objective constraint.""" + return (128 // config.masker_configs[2].group_factor) * config.masker_configs[2].label_bits == mem_obj + + +@register_builder("double_sparsity") +class DoubleSparsityConfigBuilder(BaseConfigBuilder): + """Builder for DoubleSparsity sparse attention configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all double sparsity attention configurations. + + Uses: + sparsity_objectives: List[int] - List of sparsity objectives to build the configurations. + memory_objectives: List[int] - List of memory objectives to build the configurations. + model_config: Dict[str, str] - Model configuration + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + if model_config["double_sparsity_config_file"] is None or not os.path.exists(model_config["double_sparsity_config_file"]): + logger.warning(f"Double sparsity config file {model_config['double_sparsity_config_file']} for model {model_config['model_name']} does not exist. Skipping Double Sparsity configurations.") + return optimal_configs, to_optimize_configs + + for sparsity_objective in sparsity_objectives: + for memory_objective in memory_objectives: + heavy_size: float = float(sparsity_objective) / 100.0 + aux_mem: int = memory_objective + + classes = [SinkMaskerConfig, LocalMaskerConfig, DoubleSparsityTopKMaskerConfig] + name: str = get_masker_list_name(classes, other_params={"builder": "double_sparsity", "sparsity_obj": sparsity_objective, "memory_obj": memory_objective}) + + config = ResearchAttentionConfig(masker_configs=[ + SinkMaskerConfig(sink_size=128), + LocalMaskerConfig(window_size=128), + DoubleSparsityTopKMaskerConfig( + heavy_size=heavy_size - (256.0 / 32768), + group_factor=8, + label_bits=2, + sorted_channel_file=model_config["double_sparsity_config_file"], + channel_selection="q_proj"), + ]) + + config.masker_configs[2].search_space = { + "channel_selection": tune.grid_search(["q_proj"]), + "group_factor": tune.grid_search([2, 4, 8, 16]), + "label_bits": tune.grid_search([1, 2, 4, 8, 16]), + } + # Set validity constraint to use the correct memory_objective for comparison + config.validity_constraint = partial(_validity_check, mem_obj=aux_mem) + # Set objective function + config.objective = sparsity_objective + + to_optimize_configs.append((name, config, classes)) + + return optimal_configs, to_optimize_configs + diff --git a/benchmark/raytune/config_builders/factory.py b/benchmark/raytune/config_builders/factory.py new file mode 100644 index 00000000..7560279a --- /dev/null +++ b/benchmark/raytune/config_builders/factory.py @@ -0,0 +1,96 @@ +"""Factory for creating configuration builders.""" + +from typing import Dict, List, Optional, Tuple + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig + +from .base import BaseConfigBuilder + +# Registry of available config builders +_BUILDER_REGISTRY: Dict[str, type[BaseConfigBuilder]] = {} + + +def register_builder(name: str): + """Decorator to register a configuration builder. + + Usage: + @register_builder("my_builder") + class MyBuilder(BaseConfigBuilder): + ... + + Args: + name: Name to register the builder under + """ + def decorator(builder_class: type[BaseConfigBuilder]) -> type[BaseConfigBuilder]: + if not issubclass(builder_class, BaseConfigBuilder): + raise TypeError(f"Builder class must inherit from BaseConfigBuilder") + _BUILDER_REGISTRY[name] = builder_class + return builder_class + return decorator + + +def get_config_builder(builder_name: str) -> BaseConfigBuilder: + """Get a configuration builder by name. + + Args: + builder_name: Name of the builder (e.g., "double_sparsity", "vattention_oracle") + + Returns: + Instance of the requested builder + + Raises: + ValueError: If builder_name is not registered + """ + if builder_name not in _BUILDER_REGISTRY: + available = ", ".join(_BUILDER_REGISTRY.keys()) + raise ValueError(f"Unknown builder '{builder_name}'. Available builders: {available}") + + builder_class = _BUILDER_REGISTRY[builder_name] + return builder_class() + + +def get_all_config_builders() -> Dict[str, BaseConfigBuilder]: + """Get all registered configuration builders. + + Returns: + Dictionary mapping builder names to builder instances + """ + return {name: get_config_builder(name) for name in _BUILDER_REGISTRY.keys()} + + +def build_all_configs( + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + builder_names: List[str], + **kwargs +) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Build configs using all specified builders. + + Args: + weight_file: Path to weight file + objective: Objective function name + builder_names: List of builder names to use. If None, uses all builders. + **kwargs: Additional parameters passed to each builder + + Returns: + Tuple of (optimal_configs, to_optimize_configs) aggregated from all builders + """ + builders: Dict[str, BaseConfigBuilder] = {name: get_config_builder(name) for name in builder_names} + + all_optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + all_to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + for builder_name, builder in builders.items(): + optimal_configs, to_optimize_configs = builder.build_configs( + model_config=model_config, + sparsity_objectives=sparsity_objectives, + memory_objectives=memory_objectives, + **kwargs + ) + all_optimal_configs.extend(optimal_configs) + all_to_optimize_configs.extend(to_optimize_configs) + + return all_optimal_configs, all_to_optimize_configs + diff --git a/benchmark/raytune/config_builders/hashattention_topk.py b/benchmark/raytune/config_builders/hashattention_topk.py new file mode 100644 index 00000000..eabf1e5b --- /dev/null +++ b/benchmark/raytune/config_builders/hashattention_topk.py @@ -0,0 +1,80 @@ +"""Configuration builder for HashAttention TopK attention.""" + +from typing import List, Optional, Tuple, Dict + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( + HashAttentionTopKMaskerConfig, + LocalMaskerConfig, + SinkMaskerConfig, +) + +from .base import BaseConfigBuilder +from .factory import register_builder +from .utility import get_masker_list_name +import os +import logging + +logger = logging.getLogger(__name__) + + +@register_builder("hashattention_topk") +class HashAttentionTopKConfigBuilder(BaseConfigBuilder): + """Builder for HashAttention TopK sparse attention configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all HashAttention TopK attention configurations. + + Uses: + sparsity_objectives: List[int] - List of sparsity objectives to build the configurations. + model_config: Dict[str, str] - Model configuration (hash_attention_weight_file extracted from it) + + Ignores: + memory_objectives: List[int] - List of memory objectives + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + weight_file: str = model_config.get("hash_attention_weight_file") + + + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + if not weight_file or not os.path.isfile(weight_file): + logger.warning(f"Weight file {weight_file} for model {model_config['model_name']} does not exist. Skipping HashAttention TopK configurations.") + return optimal_configs, to_optimize_configs + + for sparsity_objective in sparsity_objectives: + heavy_size: float = float(sparsity_objective) / 100.0 + classes = [SinkMaskerConfig, LocalMaskerConfig, HashAttentionTopKMaskerConfig] + name: str = get_masker_list_name(classes, other_params={"builder": "hashattention_topk", "sparsity_obj": sparsity_objective}) + + config = ResearchAttentionConfig(masker_configs=[ + SinkMaskerConfig(sink_size=128), + LocalMaskerConfig(window_size=128), + HashAttentionTopKMaskerConfig( + heavy_size=heavy_size - (256.0 / 32768), + hat_bits=32, + hat_mlp_layers=3, + hat_mlp_hidden_size=128, + hat_mlp_activation="silu", + hat_weight_file=weight_file + ), + ]) + # Set validity to default (doesn't depend on memory objectives) + config.validity_constraint = lambda config: True + # Set objective function + config.objective = sparsity_objective + + optimal_configs.append((name, config, classes)) + + return optimal_configs, to_optimize_configs + diff --git a/benchmark/raytune/config_builders/magicpig.py b/benchmark/raytune/config_builders/magicpig.py new file mode 100644 index 00000000..b4c75aa9 --- /dev/null +++ b/benchmark/raytune/config_builders/magicpig.py @@ -0,0 +1,91 @@ +"""Configuration builder for MagicPig attention.""" + +from typing import List, Optional, Tuple, Dict + +from ray import tune + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( + LocalMaskerConfig, + SinkMaskerConfig, +) +from sparse_attention_hub.sparse_attention.research_attention.maskers.sampling.implementations import ( + MagicPigConfig, +) + +from .base import BaseConfigBuilder +from .factory import register_builder +from .utility import get_masker_list_name + + +def _validity_check(config: ResearchAttentionConfig) -> bool: + """Check if the config meets the LSH constraint. + + Returns True if lsh_l * lsh_k is greater than 64 * 64. + + Args: + config: ResearchAttentionConfig to validate. + + Returns: + True if lsh_l * lsh_k > 64 * 64, False otherwise. + """ + magicpig_config = config.masker_configs[2] + # anything greater than this causes too much memory usage for 32K context + return (magicpig_config.lsh_l * magicpig_config.lsh_k) > 4096 + + + +@register_builder("magicpig") +class MagicPigConfigBuilder(BaseConfigBuilder): + """Builder for MagicPig sparse attention configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all MagicPig attention configurations. + + Uses: + sparsity_objectives: List[int] - List of sparsity objectives to build the configurations. + Ignores: + memory_objectives: List[int] - List of memory objectives + model_config: Dict[str, str] - Model configuration + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + for sparsity_objective in sparsity_objectives: + classes = [SinkMaskerConfig, LocalMaskerConfig, MagicPigConfig] + name: str = get_masker_list_name(classes, other_params={"builder": "magicpig", "sparsity_obj": sparsity_objective}) + + config = ResearchAttentionConfig(masker_configs=[ + SinkMaskerConfig(sink_size=128), + LocalMaskerConfig(window_size=128), + MagicPigConfig( + lsh_l=8, # Default value from search space + lsh_k=64 # Default value from search space + ) + ]) + + # Set up search space for LSH parameters + config.masker_configs[2].search_space = { + "lsh_l": tune.grid_search([16, 32, 64, 128]), + "lsh_k": tune.grid_search([2, 4, 8, 16]), + } + + # Set validity constraint + config.validity_constraint = _validity_check + # Set objective function + config.objective = sparsity_objective + + to_optimize_configs.append((name, config, classes)) + + return optimal_configs, to_optimize_configs + diff --git a/benchmark/raytune/config_builders/oracle_topk.py b/benchmark/raytune/config_builders/oracle_topk.py new file mode 100644 index 00000000..2504e88c --- /dev/null +++ b/benchmark/raytune/config_builders/oracle_topk.py @@ -0,0 +1,64 @@ +"""Configuration builder for Oracle TopK attention.""" + +from typing import List, Optional, Tuple, Dict + +from ray import tune + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( + LocalMaskerConfig, + OracleTopKConfig, + SinkMaskerConfig, +) + +from .base import BaseConfigBuilder +from .factory import register_builder +from .utility import get_masker_list_name + + +@register_builder("oracle_topk") +class OracleTopKConfigBuilder(BaseConfigBuilder): + """Builder for Oracle TopK sparse attention configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all Oracle TopK attention configurations based on the sparsity and memory objectives. + + Uses: + sparsity_objectives: List[int] - List of sparsity objectives to build the configurations. + Ignores: + memory_objectives: List[int] - List of memory objectives + model_config: Dict[str, str] - Model configuration + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + + for sparsity_objective in sparsity_objectives: + heavy_size = float(sparsity_objective) / 100.0 + classes = [SinkMaskerConfig, LocalMaskerConfig, OracleTopKConfig] + name: str = get_masker_list_name(classes, other_params={"builder": "oracle_topk", "sparsity_obj": sparsity_objective}) + + config = ResearchAttentionConfig(masker_configs=[ + SinkMaskerConfig(sink_size=128), + LocalMaskerConfig(window_size=128), + OracleTopKConfig(heavy_size=heavy_size - (256.0 / 32768)), + ]) + # set validity to default + config.validity_constraint = lambda config: True + # set objective function + config.objective = sparsity_objective + + optimal_configs.append((name, config, classes)) + + return optimal_configs, to_optimize_configs + diff --git a/benchmark/raytune/config_builders/oracle_topp.py b/benchmark/raytune/config_builders/oracle_topp.py new file mode 100644 index 00000000..738c77c4 --- /dev/null +++ b/benchmark/raytune/config_builders/oracle_topp.py @@ -0,0 +1,69 @@ +"""Configuration builder for Oracle TopP attention.""" + +from typing import List, Optional, Tuple, Dict + +from ray import tune + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( + LocalMaskerConfig, + OracleTopPMaskerConfig, + SinkMaskerConfig, +) + +from .base import BaseConfigBuilder +from .factory import register_builder +from .utility import get_masker_list_name + + +@register_builder("oracle_topp") +class OracleTopPConfigBuilder(BaseConfigBuilder): + """Builder for Oracle TopP sparse attention configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all Oracle TopP attention configurations. + + Uses: + sparsity_objectives: List[int] - List of sparsity objectives to build the configurations. + Ignores: + memory_objectives: List[int] - List of memory objectives + model_config: Dict[str, str] - Model configuration + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + for sparsity_objective in sparsity_objectives: + classes = [SinkMaskerConfig, LocalMaskerConfig, OracleTopPMaskerConfig] + name: str = get_masker_list_name(classes, other_params={"builder": "oracle_topp", "sparsity_obj": sparsity_objective}) + + config = ResearchAttentionConfig(masker_configs=[ + SinkMaskerConfig(sink_size=128), + LocalMaskerConfig(window_size=128), + OracleTopPMaskerConfig(top_p=0.7) # Default middle value from search space + ]) + + # Set up search space for top_p parameter + # Using the default search space from OracleTopPMaskerConfig + config.masker_configs[2].search_space = { + "top_p": tune.grid_search([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.85, 0.9, 0.95, 0.98, 0.99]), + } + + # Set validity to default (doesn't depend on memory objectives) + config.validity_constraint = lambda config: True + # Set objective function + config.objective = sparsity_objective + + to_optimize_configs.append((name, config, classes)) + + return optimal_configs, to_optimize_configs + diff --git a/benchmark/raytune/config_builders/pqcache.py b/benchmark/raytune/config_builders/pqcache.py new file mode 100644 index 00000000..c54c384c --- /dev/null +++ b/benchmark/raytune/config_builders/pqcache.py @@ -0,0 +1,81 @@ +"""Configuration builder for PQCache attention.""" + +from typing import List, Optional, Tuple, Dict + +from ray import tune + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( + LocalMaskerConfig, + PQCacheConfig, + SinkMaskerConfig, +) + +from .base import BaseConfigBuilder +from .factory import register_builder +from .utility import get_masker_list_name + + +@register_builder("pqcache") +class PQCacheConfigBuilder(BaseConfigBuilder): + """Builder for PQCache sparse attention configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all PQCache attention configurations. + + Uses: + sparsity_objectives: List[int] - List of sparsity objectives to build the configurations. + Ignores: + memory_objectives: List[int] - List of memory objectives + model_config: Dict[str, str] - Model configuration + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + for sparsity_objective in sparsity_objectives: + heavy_size: float = float(sparsity_objective) / 100.0 + classes = [SinkMaskerConfig, LocalMaskerConfig, PQCacheConfig] + name: str = get_masker_list_name(classes, other_params={"builder": "pqcache", "sparsity_obj": sparsity_objective}) + + config = ResearchAttentionConfig(masker_configs=[ + SinkMaskerConfig(sink_size=128), + LocalMaskerConfig(window_size=128), + PQCacheConfig( + heavy_size=heavy_size - (256.0 / 32768), + pq_group_factor=2, # Default value: head_dim=128 // pq_sub_dim=64 = 2 + pq_bits=6, # Default value from search space + kmeans_iter=10, # Default value from search space + init_offset=128, # Matches sink_size + metric="euclidean", # Default value from search space + ) + ]) + + # Set up search space for PQCache parameters + # Note: pq_group_factor = head_dim // pq_sub_dim + # Assuming head_dim=128: pq_sub_dim=64 -> pq_group_factor=2, pq_sub_dim=32 -> pq_group_factor=4 + config.masker_configs[2].search_space = { + "pq_group_factor": tune.grid_search([2, 4]), # Corresponds to pq_sub_dim=[64, 32] for head_dim=128 + "pq_bits": tune.grid_search([4, 6, 8]), + "kmeans_iter": tune.grid_search([10]), + "metric": tune.grid_search(["euclidean"]), + } + + # Set validity to default (doesn't depend on memory objectives) + config.validity_constraint = lambda config: True + # Set objective function + config.objective = sparsity_objective + + to_optimize_configs.append((name, config, classes)) + + return optimal_configs, to_optimize_configs + diff --git a/benchmark/raytune/config_builders/quest_top_k.py b/benchmark/raytune/config_builders/quest_top_k.py new file mode 100644 index 00000000..da65d86f --- /dev/null +++ b/benchmark/raytune/config_builders/quest_top_k.py @@ -0,0 +1,80 @@ +"""Configuration builder for Quest TopK attention.""" + +from functools import partial +from typing import List, Optional, Tuple, Dict + +from ray import tune + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( + LocalMaskerConfig, + QuestTopKMaskerConfig, + SinkMaskerConfig, +) + +from .base import BaseConfigBuilder +from .factory import register_builder +from .utility import get_masker_list_name + + +def _validity_check(config: ResearchAttentionConfig, mem_obj: int) -> bool: + """Check if the config meets the memory objective constraint.""" + return mem_obj == 2 * (128 * config.masker_configs[2].label_bits) / config.masker_configs[2].page_size + + +@register_builder("quest_topk") +class QuestTopKConfigBuilder(BaseConfigBuilder): + """Builder for Quest TopK sparse attention configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all Quest TopK attention configurations. + + Uses: + sparsity_objectives: List[int] - List of sparsity objectives to build the configurations. + memory_objectives: List[int] - List of memory objectives to build the configurations. + Ignores: + model_config: Dict[str, str] - Model configuration + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + for sparsity_objective in sparsity_objectives: + for memory_objective in memory_objectives: + heavy_size: float = float(sparsity_objective) / 100.0 - (256.0 / 32768) + aux_mem: int = memory_objective + + classes = [SinkMaskerConfig, LocalMaskerConfig, QuestTopKMaskerConfig] + name: str = get_masker_list_name(classes, other_params={"builder": "quest_topk", "sparsity_obj": sparsity_objective, "memory_obj": memory_objective}) + + config = ResearchAttentionConfig(masker_configs=[ + SinkMaskerConfig(sink_size=128), + LocalMaskerConfig(window_size=128), + QuestTopKMaskerConfig( + heavy_size=heavy_size - (256.0 / 32768), + page_size=128, + label_bits=16), + ]) + + config.masker_configs[2].search_space = { + "page_size": tune.grid_search([8, 16, 32, 64, 128]), + "label_bits": tune.grid_search([2, 4, 8, 16]), + } + # Set validity constraint to use the correct memory_objective for comparison + config.validity_constraint = partial(_validity_check, mem_obj=aux_mem) + # Set objective function + config.objective = sparsity_objective + + to_optimize_configs.append((name, config, classes)) + + return optimal_configs, to_optimize_configs + diff --git a/benchmark/raytune/config_builders/random_sampling.py b/benchmark/raytune/config_builders/random_sampling.py new file mode 100644 index 00000000..837038f4 --- /dev/null +++ b/benchmark/raytune/config_builders/random_sampling.py @@ -0,0 +1,65 @@ +"""Configuration builder for Random Sampling attention.""" + +from typing import List, Optional, Tuple, Dict + +from ray import tune + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( + LocalMaskerConfig, + SinkMaskerConfig, +) +from sparse_attention_hub.sparse_attention.research_attention.maskers.sampling.implementations import ( + RandomSamplingMaskerConfig, +) + +from .base import BaseConfigBuilder +from .factory import register_builder +from .utility import get_masker_list_name + + +@register_builder("random_sampling") +class RandomSamplingConfigBuilder(BaseConfigBuilder): + """Builder for Random Sampling sparse attention configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all Random Sampling attention configurations. + + Uses: + sparsity_objectives: List[int] - List of sparsity objectives to build the configurations. + Ignores: + memory_objectives: List[int] - List of memory objectives + model_config: Dict[str, str] - Model configuration + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + classes = [SinkMaskerConfig, LocalMaskerConfig, RandomSamplingMaskerConfig] + + for sparsity_objective in sparsity_objectives: + budget_size: float = float(sparsity_objective) / 100.0 + name: str = get_masker_list_name(classes, other_params={"builder": "random_sampling", "sparsity_obj": sparsity_objective}) + config = ResearchAttentionConfig(masker_configs=[ + SinkMaskerConfig(sink_size=128), # Middle value from search space + LocalMaskerConfig(window_size=128), # Middle value from search space + RandomSamplingMaskerConfig(sampling_rate=budget_size - (256.0 / 32768)) # Middle value from search space + ]) + # Set validity to default (doesn't depend on memory objectives) + config.validity_constraint = lambda config: True + # Set objective function + config.objective = sparsity_objective + + optimal_configs.append((name, config, classes)) + + return optimal_configs, to_optimize_configs + diff --git a/benchmark/raytune/utility.py b/benchmark/raytune/config_builders/utility.py similarity index 94% rename from benchmark/raytune/utility.py rename to benchmark/raytune/config_builders/utility.py index b84c0d83..ad638599 100644 --- a/benchmark/raytune/utility.py +++ b/benchmark/raytune/config_builders/utility.py @@ -67,12 +67,13 @@ def objective(error: float, density: float) -> float: # Pre-defined objective functions for common sparsity levels OBJECTIVE_FUNCTIONS = { - "sparsity_5": create_sparsity_objective(0.05), - "sparsity_10": create_sparsity_objective(0.10), - "sparsity_15": create_sparsity_objective(0.15), - "sparsity_20": create_sparsity_objective(0.20), - "sparsity_25": create_sparsity_objective(0.25), - "default": lambda error, density: error + 0.1 * density + (5.0 if density > 0.5 else 0.0), + 2: create_sparsity_objective(0.02), + 5: create_sparsity_objective(0.05), + 10: create_sparsity_objective(0.10), + 15: create_sparsity_objective(0.15), + 20: create_sparsity_objective(0.20), + 25: create_sparsity_objective(0.25), + -1: lambda error, density: error + 0.1 * density + (5.0 if density > 0.5 else 0.0), } diff --git a/benchmark/raytune/config_builders/vattention_hashattention.py b/benchmark/raytune/config_builders/vattention_hashattention.py new file mode 100644 index 00000000..23d6a841 --- /dev/null +++ b/benchmark/raytune/config_builders/vattention_hashattention.py @@ -0,0 +1,157 @@ +"""Configuration builder for VAttention HashAttention TopK configurations. +Currently works for 32 bits hash attention only. Need some changes to support + general bit-width hashattention in future. +""" + +from functools import partial +from typing import List, Optional, Tuple, Dict + +from ray import tune + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( + HashAttentionTopKMaskerConfig, + LocalMaskerConfig, + SinkMaskerConfig, +) +from sparse_attention_hub.sparse_attention.research_attention.maskers.sampling.implementations import ( + AdaptiveSamplingMaskerConfig, +) + +from .base import BaseConfigBuilder +from .factory import register_builder +from .utility import get_masker_list_name +import os +import logging + +logger = logging.getLogger(__name__) + +def _validity_check(config: ResearchAttentionConfig, sparsity_val: float) -> bool: + """Check if the config meets the sparsity constraint.""" + return ( + (config.masker_configs[2].heavy_size + config.masker_configs[3].base_rate_sampling) <= sparsity_val + ) + + +@register_builder("vattention_hashattention") +class VAttentionHashAttentionConfigBuilder(BaseConfigBuilder): + """Builder for VAttention HashAttention TopK sparse attention configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all sparse attention configurations. + + Uses: + sparsity_objectives: List[int] - List of sparsity objectives to build the configurations. + Ignores: + model_config: Dict[str, str] - Model configuration (weight_file extracted from it) + memory_objectives: List[int] - List of memory objectives (bit-width) to build the configurations. + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + weight_file: str = model_config.get("hash_attention_weight_file") + assert weight_file is not None, "Weight file is required for HashAttention Masker" + + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + if not weight_file or not os.path.isfile(weight_file): + logger.warning(f"Weight file {weight_file} for model {model_config['model_name']} does not exist. Skipping HashAttention TopK configurations.") + return optimal_configs, to_optimize_configs + + for sparsity_objective in sparsity_objectives: + sparsity_val: float = float(sparsity_objective) / 100.0 + classes = [SinkMaskerConfig, LocalMaskerConfig, HashAttentionTopKMaskerConfig, AdaptiveSamplingMaskerConfig] + name: str = get_masker_list_name(classes, other_params={"builder": "vattention_hashattention", "sparsity_obj": sparsity_objective}) + config = ResearchAttentionConfig(masker_configs=[ + SinkMaskerConfig(sink_size=128), + LocalMaskerConfig(window_size=128), + HashAttentionTopKMaskerConfig( + heavy_size=0.05, # Middle value from search space + hat_bits=32, # Required parameter + hat_mlp_layers=3, # Required parameter + hat_mlp_hidden_size=128, # Required parameter + hat_mlp_activation="silu", # Required parameter + hat_weight_file=weight_file # Weight file is required + ), + AdaptiveSamplingMaskerConfig( + base_rate_sampling=0.05, # Middle value + epsilon=0.05, # Middle value + delta=0.05, # Middle value + init_offset=128, # Middle value + local_offset=128 # Middle value + ) + ]) + + if sparsity_objective == 2: + # Adaptive sampling with HashAttention top k + config.masker_configs[2].search_space = { + "heavy_size": tune.grid_search([0.005, 0.01, 0.02 - (256.0 / 32768)]), + } + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0, 0.005, 0.01]), + "epsilon": tune.grid_search([0.1, 0.2, 0.3, 0.4]), + "delta": tune.grid_search([0.1, 0.2, 0.3, 0.4]) + } + + elif sparsity_objective == 5: + # Adaptive sampling with HashAttention top k + config.masker_configs[2].search_space = { + "heavy_size": tune.grid_search([0.01, 0.025, 0.05]), + } + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0, 0.01, 0.02, 0.03]), + "epsilon": tune.grid_search([0.05, 0.1, 0.2, 0.3]), + "delta": tune.grid_search([0.05, 0.1, 0.2, 0.3]) + } + + elif sparsity_objective == 10: + # Adaptive sampling with HashAttention top k + config.masker_configs[2].search_space = { + "heavy_size": tune.grid_search([0.025, 0.05, 0.075, 0.1]), + } + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0, 0.025, 0.05, 0.075]), + "epsilon": tune.grid_search([0.025, 0.05, 0.075]), + "delta": tune.grid_search([0.025, 0.05, 0.075]) + } + elif sparsity_objective == 15: + # Adaptive sampling with HashAttention top k + config.masker_configs[2].search_space = { + "heavy_size": tune.grid_search([0.05, 0.1, 0.15]), + } + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0, 0.04, 0.06, 0.1]), + "epsilon": tune.grid_search([0.01, 0.025, 0.05, 0.1]), + "delta": tune.grid_search([0.01, 0.025, 0.05, 0.1]) + } + + elif sparsity_objective == 20: + # Adaptive sampling with HashAttention top k + config.masker_configs[2].search_space = { + "heavy_size": tune.grid_search([0.05, 0.1, 0.15]), + } + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0.05, 0.1, 0.15]), + "epsilon": tune.grid_search([0.01, 0.025, 0.05, 0.1]), + "delta": tune.grid_search([0.01, 0.025, 0.05, 0.1]) + } + else: + raise ValueError(f"sparsity_objective not supported: {sparsity_objective}") + + # Set validity constraint to use the correct sparsity value for comparison + config.validity_constraint = partial(_validity_check, sparsity_val=sparsity_val) + # Set objective function + config.objective = sparsity_objective + + to_optimize_configs.append((name, config, classes)) + + return optimal_configs, to_optimize_configs + diff --git a/benchmark/raytune/config_builders/vattention_oracle.py b/benchmark/raytune/config_builders/vattention_oracle.py new file mode 100644 index 00000000..00996751 --- /dev/null +++ b/benchmark/raytune/config_builders/vattention_oracle.py @@ -0,0 +1,135 @@ +"""Configuration builder for VAttention Oracle TopK configurations.""" + +from functools import partial +from typing import List, Optional, Tuple, Dict + +from ray import tune + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( + LocalMaskerConfig, + OracleTopKConfig, + SinkMaskerConfig, +) +from sparse_attention_hub.sparse_attention.research_attention.maskers.sampling.implementations import ( + AdaptiveSamplingMaskerConfig, +) + +from .base import BaseConfigBuilder +from .factory import register_builder +from .utility import get_masker_list_name + + +def _validity_check(config: ResearchAttentionConfig, sparsity_val: float) -> bool: + """Check if the config meets the sparsity constraint.""" + return (config.masker_configs[2].heavy_size + config.masker_configs[3].base_rate_sampling) <= sparsity_val + + +@register_builder("vattention_oracle") +class VAttentionOracleConfigBuilder(BaseConfigBuilder): + """Builder for VAttention Oracle TopK sparse attention configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all sparse attention configurations. + + Uses: + sparsity_objectives: List[int] - List of sparsity objectives to build the configurations. + Ignores: + memory_objectives: List[int] - List of memory objectives + model_config: Dict[str, str] - Model configuration + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + for sparsity_objective in sparsity_objectives: + sparsity_val: float = float(sparsity_objective) / 100.0 + classes = [SinkMaskerConfig, LocalMaskerConfig, OracleTopKConfig, AdaptiveSamplingMaskerConfig] + name: str = get_masker_list_name(classes, other_params={"builder": "vattention_oracle", "sparsity_obj": sparsity_objective}) + config = ResearchAttentionConfig(masker_configs=[ + SinkMaskerConfig(sink_size=128), + LocalMaskerConfig(window_size=128), + OracleTopKConfig(heavy_size=0.05), # Middle value from search space + AdaptiveSamplingMaskerConfig( + base_rate_sampling=0.05, # Middle value + epsilon=0.05, # Middle value + delta=0.05, # Middle value + init_offset=128, # Middle value + local_offset=128 # Middle value + ) + ]) + + if sparsity_objective == 2: + # Adaptive sampling with oracle top k + config.masker_configs[2].search_space = { + "heavy_size": tune.grid_search([0.005, 0.01, 0.02 - (256.0 / 32768)]), + } + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0, 0.005, 0.01]), + "epsilon": tune.grid_search([0.1, 0.2, 0.3, 0.4]), + "delta": tune.grid_search([0.1, 0.2, 0.3, 0.4]) + } + + elif sparsity_objective == 5: + # Adaptive sampling with oracle top k + config.masker_configs[2].search_space = { + "heavy_size": tune.grid_search([0.01, 0.025, 0.05]), + } + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0, 0.01, 0.02, 0.03]), + "epsilon": tune.grid_search([0.05, 0.1, 0.2, 0.3]), + "delta": tune.grid_search([0.05, 0.1, 0.2, 0.3]) + } + + elif sparsity_objective == 10: + # Adaptive sampling with oracle top k + config.masker_configs[2].search_space = { + "heavy_size": tune.grid_search([0.025, 0.05, 0.075, 0.1]), + } + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0, 0.025, 0.05, 0.075]), + "epsilon": tune.grid_search([0.025, 0.05, 0.075]), + "delta": tune.grid_search([0.025, 0.05, 0.075]) + } + elif sparsity_objective == 15: + # Adaptive sampling with oracle top k + config.masker_configs[2].search_space = { + "heavy_size": tune.grid_search([0.05, 0.1, 0.15]), + } + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0, 0.04, 0.06, 0.1]), + "epsilon": tune.grid_search([0.01, 0.025, 0.05, 0.1]), + "delta": tune.grid_search([0.01, 0.025, 0.05, 0.1]) + } + + elif sparsity_objective == 20: + # Adaptive sampling with oracle top k + config.masker_configs[2].search_space = { + "heavy_size": tune.grid_search([0.05, 0.1, 0.15]), + } + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0.05, 0.1, 0.15]), + "epsilon": tune.grid_search([0.01, 0.025, 0.05, 0.1]), + "delta": tune.grid_search([0.01, 0.025, 0.05, 0.1]) + } + else: + raise ValueError(f"sparsity_objective not supported: {sparsity_objective}") + + # Set validity constraint to use the correct sparsity value for comparison + config.validity_constraint = partial(_validity_check, sparsity_val=sparsity_val) + # Set objective function + config.objective = sparsity_objective + + to_optimize_configs.append((name, config, classes)) + + return optimal_configs, to_optimize_configs + diff --git a/benchmark/raytune/config_builders/vattention_pqcache.py b/benchmark/raytune/config_builders/vattention_pqcache.py new file mode 100644 index 00000000..e1e4d486 --- /dev/null +++ b/benchmark/raytune/config_builders/vattention_pqcache.py @@ -0,0 +1,154 @@ +"""Configuration builder for VAttention PQCache configurations.""" + +from functools import partial +from typing import List, Optional, Tuple, Dict + +from ray import tune + +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( + LocalMaskerConfig, + PQCacheConfig, + SinkMaskerConfig, +) +from sparse_attention_hub.sparse_attention.research_attention.maskers.sampling.implementations import ( + AdaptiveSamplingMaskerConfig, +) + +from .base import BaseConfigBuilder +from .factory import register_builder +from .utility import get_masker_list_name + + +def _validity_check(config: ResearchAttentionConfig, sparsity_val: float) -> bool: + """Check if the config meets the sparsity constraint. + + Args: + config: ResearchAttentionConfig to validate. + sparsity_val: Target sparsity value as a float. + + Returns: + True if pqcache heavy_size + adaptive sampling base_rate_sampling <= sparsity_val, False otherwise. + """ + return (config.masker_configs[2].heavy_size + config.masker_configs[3].base_rate_sampling) <= sparsity_val + + +@register_builder("vattention_pqcache") +class VAttentionPQCacheConfigBuilder(BaseConfigBuilder): + """Builder for VAttention PQCache sparse attention configurations.""" + + def build_configs( + self, + model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + **kwargs + ) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all VAttention PQCache attention configurations. + + Uses: + sparsity_objectives: List[int] - List of sparsity objectives to build the configurations. + Ignores: + memory_objectives: List[int] - List of memory objectives + model_config: Dict[str, str] - Model configuration + + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + optimal_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + to_optimize_configs: List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]] = [] + + for sparsity_objective in sparsity_objectives: + sparsity_val: float = float(sparsity_objective) / 100.0 + heavy_size: float = float(sparsity_objective) / 100.0 + classes = [SinkMaskerConfig, LocalMaskerConfig, PQCacheConfig, AdaptiveSamplingMaskerConfig] + name: str = get_masker_list_name(classes, other_params={"builder": "vattention_pqcache", "sparsity_obj": sparsity_objective}) + + config = ResearchAttentionConfig(masker_configs=[ + SinkMaskerConfig(sink_size=128), + LocalMaskerConfig(window_size=128), + PQCacheConfig( + heavy_size=heavy_size - (256.0 / 32768), + pq_group_factor=2, # Default value: head_dim=128 // pq_sub_dim=64 = 2 + pq_bits=6, # Default value from search space + kmeans_iter=10, # Default value from search space + init_offset=128, # Matches sink_size + metric="euclidean", # Default value from search space + ), + AdaptiveSamplingMaskerConfig( + base_rate_sampling=0.05, # Middle value + epsilon=0.05, # Middle value + delta=0.05, # Middle value + init_offset=128, # Middle value + local_offset=128 # Middle value + ) + ]) + + # Set up search space for PQCache parameters (from pqcache builder) + # Note: pq_group_factor = head_dim // pq_sub_dim + # Assuming head_dim=128: pq_sub_dim=64 -> pq_group_factor=2, pq_sub_dim=32 -> pq_group_factor=4 + config.masker_configs[2].search_space = { + "pq_group_factor": tune.grid_search([2, 4]), # Corresponds to pq_sub_dim=[64, 32] for head_dim=128 + "pq_bits": tune.grid_search([4, 8]), + "kmeans_iter": tune.grid_search([10]), + "metric": tune.grid_search(["euclidean"]), + } + + # Set up search space for AdaptiveSamplingMaskerConfig (from vattention_hashattention builder) + if sparsity_objective == 2: + # Adaptive sampling with PQCache + config.masker_configs[2].search_space["heavy_size"] = tune.grid_search([0.005, 0.01, 0.02 - (256.0 / 32768)]) + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0, 0.005, 0.01]), + "epsilon": tune.grid_search([0.2, 0.4]), + "delta": tune.grid_search([0.2, 0.4]) + } + + elif sparsity_objective == 5: + # Adaptive sampling with PQCache + config.masker_configs[2].search_space["heavy_size"] = tune.grid_search([0.01, 0.025, 0.05]) + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0, 0.01, 0.025]), + "epsilon": tune.grid_search([0.15, 0.25]), + "delta": tune.grid_search([0.15, 0.25]) + } + + elif sparsity_objective == 10: + # Adaptive sampling with PQCache + config.masker_configs[2].search_space["heavy_size"] = tune.grid_search([0.025, 0.05, 0.075]) + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0.025, 0.05, 0.075]), + "epsilon": tune.grid_search([0.025, 0.05, 0.075]), + "delta": tune.grid_search([0.025, 0.05, 0.075]) + } + elif sparsity_objective == 15: + # Adaptive sampling with PQCache + config.masker_configs[2].search_space["heavy_size"] = tune.grid_search([0.05, 0.1, 0.15]) + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0, 0.05, 0.1]), + "epsilon": tune.grid_search([0.01, 0.04, 0.1]), + "delta": tune.grid_search([0.01, 0.04, 0.1]) + } + + elif sparsity_objective == 20: + # Adaptive sampling with PQCache + config.masker_configs[2].search_space["heavy_size"] = tune.grid_search([0.05, 0.1, 0.15]) + config.masker_configs[3].search_space = { + "base_rate_sampling": tune.grid_search([0.05, 0.1, 0.15]), + "epsilon": tune.grid_search([0.01, 0.04, 0.1]), + "delta": tune.grid_search([0.01, 0.04, 0.1]) + } + else: + raise ValueError(f"sparsity_objective not supported: {sparsity_objective}") + + # Set validity constraint to use the correct sparsity value for comparison + config.validity_constraint = partial(_validity_check, sparsity_val=sparsity_val) + # Set objective function + config.objective = sparsity_objective + + to_optimize_configs.append((name, config, classes)) + + return optimal_configs, to_optimize_configs + + diff --git a/benchmark/raytune/optimizer_factory.py b/benchmark/raytune/optimizer_factory.py index 6e2f849a..3684bbae 100755 --- a/benchmark/raytune/optimizer_factory.py +++ b/benchmark/raytune/optimizer_factory.py @@ -73,7 +73,12 @@ def create_config_from_params(self, params: Dict[str, Any]) -> ResearchAttention setattr(masker_config_copy, key, value) masker_instances.append(masker_config_copy) - return ResearchAttentionConfig(masker_configs=masker_instances) + new_config = ResearchAttentionConfig(masker_configs=masker_instances) + if hasattr(self.research_attention_config, 'validity_constraint'): + new_config.validity_constraint = self.research_attention_config.validity_constraint + if hasattr(self.research_attention_config, 'objective'): + new_config.objective = self.research_attention_config.objective + return new_config def create_optimizer(research_attention_config: Optional[ResearchAttentionConfig] = None) -> SparseConfigOptimizer: """ diff --git a/benchmark/raytune/run_config_dir.py b/benchmark/raytune/run_config_dir.py index d47dc76e..e340e5c4 100755 --- a/benchmark/raytune/run_config_dir.py +++ b/benchmark/raytune/run_config_dir.py @@ -38,7 +38,7 @@ from sparse_attention_hub.adapters.huggingface import ModelAdapterHF from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig from sparse_attention_hub.metric_logging.logger import MicroMetricLogger -from utility import deserialize_sparse_config +from config_builders.utility import deserialize_sparse_config @@ -296,10 +296,10 @@ def progress_reporter(total_tasks: int, result_queue: RayQueue) -> None: def main( configs_dir: str, - benchmark_results_dir: str = "./benchmark_vt_full_10pct", + benchmark_results_dir: str = "/data/apdesai/DO_NOT_DELETE/sparse_attention_hub", max_new_tokens: int = 1000, max_context_length: int = 100000, - max_requests: int = 1000, + max_requests: int = 100, actors_per_gpu: Optional[int] = None ): """Ray-based parallel benchmark runner with efficient resource management. @@ -383,7 +383,7 @@ def main( # Create adapter config adapter_config = { "adapter_name": "huggingface", - "model_kwargs": {"torch_dtype": torch.bfloat16}, + "model_kwargs": {"torch_dtype": torch.bfloat16, "attn_implementation": "flash_attention_2"}, "tokenizer_kwargs": {"padding_side": "left"} } diff --git a/benchmark/raytune/run_optimize_configs.py b/benchmark/raytune/run_optimize_configs.py index 98265255..9e5673a7 100755 --- a/benchmark/raytune/run_optimize_configs.py +++ b/benchmark/raytune/run_optimize_configs.py @@ -3,16 +3,9 @@ Hyperparameter search for optimal sparse attention configurations. """ -import fire -import json import logging -import math import os import sys -import time -import traceback -from dataclasses import asdict, dataclass, field -from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple @@ -24,411 +17,110 @@ import torch import ray -from ray import tune -from benchmark.executor_config import AdapterConfig, BenchmarkConfig -from benchmark.benchmark_registry import create_benchmark_instance -from sparse_attention_hub.adapters.huggingface import ModelAdapterHF -from sparse_attention_hub.metric_logging.logger import MicroMetricLogger -from optimizer_factory import create_optimizer -from utility import ( - get_masker_list_name, - create_sparsity_objective, - OBJECTIVE_FUNCTIONS, - OptimalConfig, - get_all_masker_config_classes, - serialize_sparse_config, - deserialize_sparse_config, -) +from config_builders.utility import OBJECTIVE_FUNCTIONS, OptimalConfig +from config_builders.factory import build_all_configs # Import all masker configs from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig -from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( - LocalMaskerConfig, - SinkMaskerConfig, - OracleTopKConfig, - OracleTopPMaskerConfig, - HashAttentionTopKMaskerConfig, - DoubleSparsityTopKMaskerConfig, -) -from sparse_attention_hub.sparse_attention.research_attention.maskers.sampling.implementations import ( - AdaptiveSamplingMaskerConfig, - RandomSamplingMaskerConfig, - MagicPigConfig, -) +# Import search manager +from search_manager import ConfigSearchManager + +# Import run configuration +from OPTIMIZATION_EXPERIMENT import ( + MODEL_CONFIGS, + MODELS, + TASKS, + SPARSITY_OBJECTIVES, + MEMORY_OBJECTIVES, + SEARCH_MAX_NEW_TOKENS, + SEARCH_MAX_CONTEXT_LENGTH, + SEARCH_MAX_REQUESTS, + FORCE_SEARCH, + OPTIMAL_CONFIGS_DIR, + RAY_RESULTS_DIR, + ACTORS_PER_GPU, + BUILDER_NAMES, +) -class BenchmarkHelper: - """Handles individual benchmark runs during config search.""" +def get_all_sparse_configs(model_config: Dict[str, str], + sparsity_objectives: List[int], + memory_objectives: List[int], + builder_names: List[str]) -> Tuple[List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]], + List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]]: + """Get all sparse attention configurations. + Returns list of (name, full_config, masker_classes) tuples. - def __init__(self, config: dict): - self.config = config - self.base_result_dir = Path(config["search_result_dir"]) - self.adapter_config = AdapterConfig( - adapter_name="huggingface", - model_kwargs={"torch_dtype": torch.bfloat16}, - tokenizer_kwargs={"padding_side": "left"}, - ) - self.generation_kwargs = { - "max_new_tokens": config["search_max_new_tokens"], - "do_sample": False - } - self.request_kwargs = { - "max_context_length": config["search_max_context_length"], - "max_requests": config["search_max_requests"], - } + Note: The configs returned here are only used to determine which masker classes + to use. The actual parameter values will be determined by Ray Tune search. + + Args: + weight_file: Path to weight file (required) + objective: Objective function name (e.g., "sparsity_5") + memory_objective: Memory objective parameter for configs that need it + builder_names: List of builder names to use - # Get objective function - self.objective_name = config.get("objective_function", "default") - self.objective_function = OBJECTIVE_FUNCTIONS.get(self.objective_name, OBJECTIVE_FUNCTIONS["default"]) - logging.info(f"Using objective function: {self.objective_name}") - - def __call__(self, attention_config, task_name: str, model_name: str) -> Tuple[float, float, float]: - """Run benchmark and return (score, density, error) tuple.""" - try: - benchmark_name, subset_name = task_name.split("/", 1) if "/" in task_name else (task_name, None) - - # Create result directory for this specific run - result_dir = self.base_result_dir / f"{model_name}_{task_name}_{hash(str(attention_config)) % 1000000}" - result_dir.mkdir(parents=True, exist_ok=True) - - # Create model adapter - adapter = ModelAdapterHF( - model_name=model_name, - sparse_attention_config=attention_config, - model_kwargs=self.adapter_config.model_kwargs, - tokenizer_kwargs=self.adapter_config.tokenizer_kwargs - ) - - # Create benchmark instance - benchmark = create_benchmark_instance( - benchmark_name=benchmark_name, - subsets=[subset_name] if subset_name else None - ) - print("The result directory is ", result_dir, flush=True) - # Setup micro metric logger - metric_logger = MicroMetricLogger() - metric_logger.configure_logging( - log_path=str(result_dir), - enabled_metrics=["research_attention_density", "research_attention_output_error"], - ) - - # Run benchmark directly - metrics = benchmark.run_benchmark( - adapter=adapter, - result_dir=str(result_dir), - generation_kwargs=self.generation_kwargs, - request_kwargs=self.request_kwargs - ) - - # Flush the metric logger to ensure all metrics are written - metric_logger.flush() - - # Extract micro metrics for sparse attention evaluation - micro_metrics = self._extract_micro_metrics(result_dir) - error, density = micro_metrics["attention_error"], micro_metrics["density"] - - # For dense configuration (density=1.0, error=0.0), use a simple score - if density == 1.0 and error == 0.0: - # Dense baseline: use benchmark accuracy metrics instead of sparse metrics - score = 100.0 # Small baseline score for dense - else: - # Use the selected objective function - score = self.objective_function(error, density) - # Also print to stdout so the test script can detect it - print(f"Objective: {self.objective_name}, Error: {error:.4f}, Density: {density:.4f}, Score: {score:.4f}") - logging.info(f"Objective: {self.objective_name}, Error: {error:.4f}, Density: {density:.4f}, Score: {score:.4f}") - - return score, density, error - - except Exception as e: - logging.error(f"Benchmark failed: {e}") - import traceback - traceback.print_exc() - - return 5.0, 1.0, 1.0 # Penalty score, worst-case density, and worst-case error + Returns: + Tuple of (optimal_configs, to_optimize_configs) + """ + # Use factory to build all configs + optimal_configs, to_optimize_configs = build_all_configs( + model_config = model_config, + sparsity_objectives=sparsity_objectives, + memory_objectives=memory_objectives, + builder_names=builder_names, + ) - def _extract_micro_metrics(self, result_dir: Path) -> dict: - """Extract attention error and density from micro metrics.""" - micro_metrics_file = result_dir / "micro_metrics.jsonl" - if not micro_metrics_file.exists(): - # For dense configuration, micro_metrics.jsonl won't exist since no sparse attention is used - # Return default values: 0 error (perfect) and 1.0 density (fully dense) - logging.info(f"micro_metrics.jsonl not found in {result_dir}, using dense defaults") - return {"attention_error": 0.0, "density": 1.0} - - errors, densities = [], [] - with open(micro_metrics_file, "r") as f: - for line in f: - try: - entry = json.loads(line.strip()) - metric, value = entry.get("metric"), entry.get("value") - if value is not None and not (isinstance(value, float) and math.isnan(value)): - if metric == "research_attention_output_error": - errors.append(float(value)) - elif metric == "research_attention_density": - densities.append(float(value)) - except (json.JSONDecodeError, ValueError, TypeError): - continue - - return { - "attention_error": sum(errors) / len(errors) if errors else 1.0, - "density": sum(densities) / len(densities) if densities else 1.0 - } + return optimal_configs, to_optimize_configs -class ConfigSearchManager: - """Manages Phase 1: Hyperparameter search for optimal configs.""" - - def __init__(self, base_config: dict): - self.config = base_config - # Add timestamp to the results directory - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - base_dir = Path(base_config["optimal_configs_dir"]) - self.results_dir = base_dir / f"run_{timestamp}" - self.results_dir.mkdir(parents=True, exist_ok=True) - self.timestamp = timestamp - print(f"Saving optimal configs to: {self.results_dir}") - - def search_optimal_config( - self, - model: str, - task: str, - masker_name: str, - masker_classes: Optional[List], - full_sparse_config: Optional[ResearchAttentionConfig] = None, - actors_per_gpu: int = 1 - ) -> OptimalConfig: - """Search for optimal hyperparameters for a single combination.""" - - config_file = self.results_dir / f"{model}_{task}_{masker_name}.json".replace("/", "_") - - # Check if already exists - if config_file.exists() and not self.config.get("force_search", False): - print(f" → Loading existing config") - return self._load_config(config_file) - - # Handle dense config (no optimization needed) - if masker_classes is None: - optimal = OptimalConfig( - model=model, - task=task, - masker_name=masker_name, - sparse_config=None, - masker_classes=None, - hyperparams={}, - score=0.0, - search_time=0.0, - num_trials=1 - ) - self._save_config(optimal, config_file) - return optimal - - # Run hyperparameter search - start_time = time.time() - - try: - # Create optimizer with template config for fixed parameters - optimizer = create_optimizer(full_sparse_config) - - # Show what we're searching - search_space = optimizer.create_search_space(task) - print(f" → Search space parameters:") - for param, space_obj in search_space.items(): - # Extract actual values from Ray Tune objects - if hasattr(space_obj, 'categories'): - values = space_obj.categories - print(f" - {param}: {values}") - else: - print(f" - {param}: {space_obj}") - - # Create objective function - def objective(trial_config): - runner = BenchmarkHelper(self.config) - attention_config = optimizer.create_config_from_params(trial_config) - score, density, error = runner(attention_config, task, model) - return {"combined_score": score, "density": density, "error": error} - - # ### run a sample objective to ensure there are no errors - print("="*10, "Running a short test objective to ensure there are no errors", flush=True) - sample_config = { - "AdaptiveSamplingMaskerConfig_base_rate_sampling": 0.1, - "AdaptiveSamplingMaskerConfig_epsilon": 0.25, - "AdaptiveSamplingMaskerConfig_delta": 0.25 - } - result = objective(sample_config) - print("="*10, "Successfully ran a short test objective", flush=True) - print(sample_config) - print(result) - print("="*100, flush=True) - - # Run Ray Tune - sanitized_name = f"{model}_{task}_{masker_name}".replace("/", "_") - analysis = tune.run( - objective, - config=search_space, - metric="combined_score", - mode="min", - resources_per_trial={"CPU": 1, "GPU": 1.0 / actors_per_gpu}, - storage_path=os.path.abspath(self.config["ray_results_dir"]), - name=sanitized_name, - verbose=1, # Show Ray Tune progress - stop={"training_iteration": 1}, # One evaluation per config - ) - - # Get best config - best_trial = analysis.get_best_trial("combined_score", "min", "last") - best_config = optimizer.create_config_from_params(best_trial.config) - - # Save detailed trial information for post-analysis - trials_info = [] - for trial in analysis.trials: - trial_info = { - "trial_id": trial.trial_id, - "config": trial.config, - "score": trial.last_result.get("combined_score", float('inf')) if trial.last_result else float('inf'), - "status": trial.status, - "start_time": trial.start_time.isoformat() if hasattr(trial, 'start_time') and trial.start_time else None, - "metric_history": trial.metric_analysis.get("combined_score", {}) if hasattr(trial, 'metric_analysis') else {} - } - trials_info.append(trial_info) - - # Save trial details to separate file - trials_file = self.results_dir / f"{model}_{task}_{masker_name}_trials.json".replace("/", "_") - with open(trials_file, "w") as f: - json.dump({ - "model": model, - "task": task, - "masker_name": masker_name, - "objective_function": self.config.get("objective_function", "default"), - "best_trial_id": best_trial.trial_id, - "trials": trials_info, - "analysis_dataframe_path": str(self.results_dir / f"{model}_{task}_{masker_name}_analysis.csv".replace("/", "_")) - }, f, indent=2) - - # Save Ray analysis dataframe for detailed analysis - df = analysis.dataframe() - df.to_csv(self.results_dir / f"{model}_{task}_{masker_name}_analysis.csv".replace("/", "_"), index=False) - - optimal = OptimalConfig( - model=model, - task=task, - masker_name=masker_name, - sparse_config=best_config, - masker_classes=masker_classes, - hyperparams=best_trial.config, - score=best_trial.last_result["combined_score"], - search_time=time.time() - start_time, - num_trials=len(analysis.trials) - ) - - self._save_config(optimal, config_file) - return optimal - - except Exception as e: - print(f" ✗ Search failed: {e}") - traceback.print_exc() - # Return failure config - optimal = OptimalConfig( - model=model, - task=task, - masker_name=masker_name, - sparse_config=full_sparse_config, - masker_classes=masker_classes, - hyperparams={}, - score=5.0, - search_time=time.time() - start_time, - num_trials=0 - ) - self._save_config(optimal, config_file) - return optimal + +def run_search() -> Dict[str, OptimalConfig]: + """Find optimal configurations for all combinations. - def _save_config(self, config: OptimalConfig, filepath: Path): - """Save configuration to JSON.""" - data = asdict(config) - - # Convert sparse config to serializable format - if config.sparse_config: - data["sparse_config"] = serialize_sparse_config(config.sparse_config) - - # Convert masker classes to strings - if config.masker_classes: - data["masker_classes"] = [cls.__name__ for cls in config.masker_classes] - - with open(filepath, "w") as f: - json.dump(data, f, indent=2) + This function orchestrates the search process across all model/task/config + combinations, using ConfigSearchManager to handle individual searches. + All configuration is loaded from OPTIMIZATION_EXPERIMENT.py. - def _load_config(self, filepath: Path) -> OptimalConfig: - """Load configuration from JSON.""" - with open(filepath, "r") as f: - data = json.load(f) - - # Reconstruct sparse config if present - if data.get("sparse_config"): - data["sparse_config"] = deserialize_sparse_config(data["sparse_config"]) - - # Reconstruct masker classes from strings - if data.get("masker_classes"): - # Dynamically discover all available masker config classes - class_map = get_all_masker_config_classes() - data["masker_classes"] = [class_map[name] for name in data["masker_classes"]] + Args: + actors_per_gpu: Number of actors per GPU for resource allocation - return OptimalConfig(**data) - -def run_search(config: dict, actors_per_gpu: int = 1) -> Dict[str, OptimalConfig]: - """Find optimal configurations for all combinations.""" - print("\n" + "="*80) - print("HYPERPARAMETER SEARCH") - print("="*80) - print(f"Models: {len(config['models'])}") - print(f"Tasks: {len(config['tasks'])}") - print(f"Optimal Configs: {len(config['optimal_configs'])}") - print(f"To Optimize Configs: {len(config['to_optimize_configs'])}") - print(f"Total Combinations to optimize: {len(config['models']) * len(config['tasks']) * len(config['to_optimize_configs'])}") - print(f"Samples per search: {config['num_samples']}") - print(f"Objective Function: {config['objective_function']}") - - # Display objective function details - if config['objective_function'].startswith('sparsity_'): - target = int(config['objective_function'].split('_')[1]) - print(f" → Targeting {target}% density (0.{target:02d} fraction)") - print(f" → Formula: 0.99 * error + 0.01 * density + penalty for exceeding target") - - print("\nSearch Configuration:") - print(f" → Max new tokens: {config['search_max_new_tokens']}") - print(f" → Max context length: {config['search_max_context_length']}") - print(f" → Max requests per trial: {config['search_max_requests']}") - print(f" → Timeout per trial: {config['search_timeout']}s") - - print("\nNote: For each sparse config, Ray Tune will search different hyperparameter") - print("values (e.g., window_size, sink_size, sampling_rate) to find the best combination.") - print("="*80) - - manager = ConfigSearchManager(config) - optimal_configs = {} - - total = len(config["models"]) * len(config["tasks"]) * len(config["to_optimize_configs"]) + len(config["models"]) * len(config["tasks"]) * len(config["optimal_configs"]) - current = 0 + Returns: + Dictionary mapping config keys to OptimalConfig objects + """ - for model in config["models"]: - print(f"\nModel: {model}") - print("-" * 60) - - for task in config["tasks"]: - for masker_name, (masker_classes, full_config) in config["to_optimize_configs_map"].items(): - current += 1 - key = f"{model}_{task}_{masker_name}".replace("/", "_") - - print(f"\n[{current}/{total}] Task: {task} | Config: {masker_name}") - optimal = manager.search_optimal_config( - model, task, masker_name, masker_classes, full_config, actors_per_gpu - ) - optimal_configs[key] = optimal - - for masker_name, (masker_classes, full_config) in config["optimal_configs_map"].items(): - current += 1 + manager: ConfigSearchManager = ConfigSearchManager( + optimal_configs_dir=OPTIMAL_CONFIGS_DIR, + force_search=FORCE_SEARCH, + generation_kwargs={ + "max_new_tokens": SEARCH_MAX_NEW_TOKENS, + "do_sample": False + }, + request_kwargs={ + "max_context_length": SEARCH_MAX_CONTEXT_LENGTH, + "max_requests": SEARCH_MAX_REQUESTS + }, + ray_results_dir=RAY_RESULTS_DIR + ) + final_optimal_configs: Dict[str, OptimalConfig] = {} + + # first run all the optimal configs + for model in MODELS: + # Get model configuration + model_config: Dict[str, str] = MODEL_CONFIGS[model] + + # Get all sparse configs + optimal_configs, to_optimize_configs = get_all_sparse_configs( + model_config, + sparsity_objectives=SPARSITY_OBJECTIVES, + memory_objectives=MEMORY_OBJECTIVES, + builder_names=BUILDER_NAMES + ) + for task in TASKS: + for (masker_name, full_config, masker_classes) in optimal_configs: key = f"{model}_{task}_{masker_name}".replace("/", "_") optimal = OptimalConfig( - model=model, + model=model_config["model_name"], task=task, masker_name=masker_name, sparse_config=full_config, @@ -438,347 +130,33 @@ def run_search(config: dict, actors_per_gpu: int = 1) -> Dict[str, OptimalConfig search_time=0.0, num_trials=0 ) - manager._save_config(optimal, Path(manager.results_dir) / f"{key}.json") - optimal_configs[key] = optimal + manager._save_config(optimal, os.path.join(manager.results_dir, f"{key}.json")) + final_optimal_configs[key] = optimal - print(f"\n{'='*80}") - print(f"Search complete. Found {len(optimal_configs)} optimal configurations.") - print(f"Configs saved to: {manager.results_dir}") - print(f"Run identifier: {manager.timestamp}") - print(f"{'='*80}") - - return optimal_configs - -################################################################# CONFIGURE YOUR RUN HERE ################################################################# - -# Model configurations -# Weight files are loaded from SPARSE_ATTENTION_WEIGHTS_DIR environment variable -# Set it to the directory containing your HashAttention weight files -weights_dir = os.environ.get("SPARSE_ATTENTION_WEIGHTS_DIR", "./weights") -MODEL_CONFIGS = { - "llama": { - "weight_file": os.path.join(weights_dir, "llama3.1-8b-patch.64K.v1.hat_weights.pkl"), - "model_name": "meta-llama/Llama-3.1-8B-Instruct" - }, - "deepseek": { - "weight_file": os.path.join(weights_dir, "DeepSeek-R1-Distill-Llama-8B-patch-layers2-dim64-max-context-24K_hat_weights.pkl"), - "model_name": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" - }, - "mistral": { - "weight_file": os.path.join(weights_dir, "Mistral-7B-Instruct-v0.3.24K.20.500.hat_weights.pkl"), - "model_name": "mistralai/Mistral-7B-Instruct-v0.3" - } -} - -DEFAULT_MODEL = "llama" - -# Task configurations -DEBUG_TASKS = ["loogle/shortdep_qa"] - -RUN_TASKS = [ - "ruler32k/vt", -] - -def get_all_sparse_configs(weight_file: str = None, objective: str = "default") -> List[Tuple[str, Optional[ResearchAttentionConfig], Optional[List]]]: - """Get all sparse attention configurations. - Returns list of (name, full_config, masker_classes) tuples. - - Note: The configs returned here are only used to determine which masker classes - to use. The actual parameter values will be determined by Ray Tune search. - """ - assert weight_file is not None, "Weight file is required for HashAttention Masker" - optimal_configs = [] - to_optimize_configs = [] - - - # ############################## optimal configs ############################## - #1. Dense baseline - optimal_configs.append(("dense", None, None)) - - # 2. Oracle top k (already included above with adaptive, but also standalone) - for heavy_size in [0.1]: - classes = [SinkMaskerConfig, LocalMaskerConfig, OracleTopKConfig] - name = get_masker_list_name(classes, other_params={"heavy_size": heavy_size}) - config = ResearchAttentionConfig(masker_configs=[ - SinkMaskerConfig(sink_size=128), - LocalMaskerConfig(window_size=128), - OracleTopKConfig(heavy_size=heavy_size) - ]) - optimal_configs.append((name, config, classes)) - - #3. HashAttention top k - for heavy_size in [0.1]: - classes = [SinkMaskerConfig, LocalMaskerConfig, HashAttentionTopKMaskerConfig] - name = get_masker_list_name(classes, other_params={"heavy_size": heavy_size}) - config = ResearchAttentionConfig(masker_configs=[ - SinkMaskerConfig(sink_size=128), - LocalMaskerConfig(window_size=128), - HashAttentionTopKMaskerConfig( - heavy_size=heavy_size, - hat_bits=32, - hat_mlp_layers=3, - hat_mlp_hidden_size=128, - hat_mlp_activation="silu", - hat_weight_file=weight_file - ), - ]) - optimal_configs.append((name, config, classes)) - - # 4. Random sampling with sink and local - classes = [SinkMaskerConfig, LocalMaskerConfig, RandomSamplingMaskerConfig] - name = get_masker_list_name(classes) - config = ResearchAttentionConfig(masker_configs=[ - SinkMaskerConfig(sink_size=128), # Middle value from search space [4, 8, 16, 32, 64, 128] - LocalMaskerConfig(window_size=128), # Middle value from search space [32, 64, 128, 256] - RandomSamplingMaskerConfig(sampling_rate=0.095) # Middle value from search space [0.01, 0.05, 0.1, 0.2, 0.3, 0.5] - ]) - optimal_configs.append((name, config, classes)) - - ############################# to optimize configs ############################## - - - #1. Adaptive sampling with oracle top k - classes = [SinkMaskerConfig, LocalMaskerConfig, OracleTopKConfig, AdaptiveSamplingMaskerConfig] - name = get_masker_list_name(classes, other_params={"objective": objective}) - config = ResearchAttentionConfig(masker_configs=[ - SinkMaskerConfig(sink_size=128), - LocalMaskerConfig(window_size=128), - OracleTopKConfig(heavy_size=0.10), # Middle value from search space - AdaptiveSamplingMaskerConfig( - base_rate_sampling=0.1, # Middle value - epsilon=0.25, # Middle value - delta=0.25, # Middle value - init_offset=128, # Middle value - local_offset=128 # Middle value - ) - ]) - config.masker_configs[2].search_space = { - "heavy_size": tune.grid_search([0.01, 0.02]), - } - config.masker_configs[3].search_space = { - "base_rate_sampling": tune.grid_search([0, 0.01, 0.02]), - "epsilon": tune.grid_search([0.05]), - "delta": tune.grid_search([0.05]), - "init_offset": tune.grid_search([0.01]), - "local_offset": tune.grid_search([0.01]), - } - to_optimize_configs.append((name, config, classes)) - - # 2. Adaptive sampling with oracle top p - - classes = [SinkMaskerConfig, LocalMaskerConfig, OracleTopPMaskerConfig, AdaptiveSamplingMaskerConfig] - name = get_masker_list_name(classes) - config = ResearchAttentionConfig(masker_configs=[ - SinkMaskerConfig(sink_size=128), - LocalMaskerConfig(window_size=128), - OracleTopPMaskerConfig(top_p=0.10), # Middle value from search space - AdaptiveSamplingMaskerConfig( - base_rate_sampling=0.1, # Middle value - epsilon=0.25, # Middle value - delta=0.25, # Middle value - init_offset=128, # Middle value - local_offset=128 # Middle value - ) - ]) - to_optimize_configs.append((name, config, classes)) - - # #3. Adaptive sampling with HAT top k - classes = [SinkMaskerConfig, LocalMaskerConfig, HashAttentionTopKMaskerConfig, AdaptiveSamplingMaskerConfig] - name = get_masker_list_name(classes, other_params={"objective": objective}) - config = ResearchAttentionConfig(masker_configs=[ - SinkMaskerConfig(sink_size=128), - LocalMaskerConfig(window_size=128), - HashAttentionTopKMaskerConfig( - heavy_size=0.05, # Required parameter - hat_bits=32, # Required parameter - hat_mlp_layers=3, # Required parameter - hat_mlp_hidden_size=128, # Required parameter - hat_mlp_activation="silu", # Required parameter - hat_weight_file=weight_file # Weight file is required - ), - AdaptiveSamplingMaskerConfig( - base_rate_sampling=0.1, - epsilon=0.25, - delta=0.25, - init_offset=128, - local_offset=128 - ) - ]) - to_optimize_configs.append((name, config, classes)) - - - # # 4. Oracle top p - classes = [SinkMaskerConfig, LocalMaskerConfig, OracleTopPMaskerConfig] - name = get_masker_list_name(classes, other_params={"objective": objective}) - config = ResearchAttentionConfig(masker_configs=[ - SinkMaskerConfig(sink_size=128), - LocalMaskerConfig(window_size=128), - OracleTopPMaskerConfig(top_p=0.7) # Default middle value from search space - ]) - to_optimize_configs.append((name, config, classes)) - - - # # 5. MagicPig config - classes = [SinkMaskerConfig, LocalMaskerConfig, MagicPigConfig] - name = get_masker_list_name(classes) - config = ResearchAttentionConfig(masker_configs=[ - SinkMaskerConfig(sink_size=128), - LocalMaskerConfig(window_size=128), - MagicPigConfig( - lsh_l=8, # Default value from search space - lsh_k=8 # Default value from search space - ) - ]) - to_optimize_configs.append((name, config, classes)) - - - # 5. Double Sparsity Top K config - # sorted_channel_file is available in the author's repository - # https://github.com/andy-yang-1/DoubleSparse/tree/main/config - # TODO: fix the path via environment variable or something else - - for heavy_size in [0.1, 0.2]: - classes = [SinkMaskerConfig, LocalMaskerConfig, DoubleSparsityTopKMaskerConfig] - name = get_masker_list_name(classes, other_params={"heavy_size": heavy_size}) - - config = ResearchAttentionConfig(masker_configs=[ - SinkMaskerConfig(sink_size=128), - LocalMaskerConfig(window_size=128), - DoubleSparsityTopKMaskerConfig( - heavy_size=heavy_size, - group_factor=2, - label_bits=2, - sorted_channel_file="/home/ubuntu/DoubleSparse/config/meta-llama/Llama-3.1-8B-Instruct.json", - channel_selection="q_proj"), - ]) - optimal_configs.append((name, config, classes)) - - return optimal_configs, to_optimize_configs - - -def get_run_configuration( - objective: str, - debug: bool, - num_samples: int, - search_timeout: int, - search_max_new_tokens: int, - search_max_context_length: int, - search_max_requests: int, - force_search: bool, - optimal_configs_dir: str, - ray_results_dir: str -) -> dict: - """Build complete configuration from command-line arguments.""" - num_gpus = torch.cuda.device_count() - - # Get model configuration - model_config = MODEL_CONFIGS[DEFAULT_MODEL] - weight_file = model_config["weight_file"] - model_name = model_config["model_name"] - - if not os.path.exists(weight_file): - weight_file = "./hat_weights.pkl" - print(f"Warning: HashAttention weights not found, using {weight_file}") - - # Get all sparse configs - optimal_configs, to_optimize_configs = get_all_sparse_configs(weight_file, objective=objective) - - # Filter configs based on debug mode - if debug: - sparse_configs = to_optimize_configs[:3] # Just first 3 for debug - models = [model_name] - tasks = DEBUG_TASKS - num_samples = 8 - else: - models = [model_name] - tasks = RUN_TASKS - # num_samples is already passed as parameter - - # Build config maps - optimal_configs_map = {} - to_optimize_configs_map = {} - for name, full_config, classes in optimal_configs: - optimal_configs_map[name] = (classes, full_config) - for name, full_config, classes in to_optimize_configs: - to_optimize_configs_map[name] = (classes, full_config) - - return { - "models": models, - "tasks": tasks, - "optimal_configs": optimal_configs, - "to_optimize_configs": to_optimize_configs, - "optimal_configs_map": optimal_configs_map, - "to_optimize_configs_map": to_optimize_configs_map, - "gpu_ids": list(range(num_gpus)), - "num_samples": num_samples, - "objective_function": objective, - # Directories - "optimal_configs_dir": optimal_configs_dir, - "ray_results_dir": ray_results_dir, - "search_result_dir": os.path.join(ray_results_dir, "search_runs"), - - # Search params - "search_timeout": search_timeout, - "search_max_new_tokens": search_max_new_tokens, - "search_max_context_length": search_max_context_length, - "search_max_requests": search_max_requests, - "force_search": force_search, - } + for task in TASKS: + for (masker_name, full_config, masker_classes) in to_optimize_configs: + key: str = f"{model}_{task}_{masker_name}".replace("/", "_") + + optimal: OptimalConfig = manager.search_optimal_config( + model_config["model_name"], task, masker_name, masker_classes, full_config, ACTORS_PER_GPU + ) + final_optimal_configs[key] = optimal -######################################################### CONFIGURATION ENDS HERE #########################################################`` + return final_optimal_configs -def main( - objective: str, - num_samples: int, - search_max_new_tokens: int, - search_max_context_length: int, - search_max_requests: int, - debug: bool = False, - force_search: bool = False, - optimal_configs_dir: str = "./optimal_configs", - ray_results_dir: str = "./ray_results", - search_timeout: int = 900, - actors_per_gpu: int = 1, -): - """ - Hyperparameter search for sparse attention methods. +def main() -> None: + """Hyperparameter search for sparse attention methods. - Args: - objective: Objective function to use for optimization (required) - num_samples: Number of samples per hyperparameter search (required) - search_max_new_tokens: Max new tokens for search trials (required) - search_max_context_length: Max context length for search trials (required) - search_max_requests: Max requests per search trial (required) - debug: Debug mode with minimal configs (default: False) - force_search: Force re-run of search even if configs exist (default: False) - optimal_configs_dir: Directory for storing optimal configurations (default: "./optimal_configs") - ray_results_dir: Directory for Ray Tune results (default: "./ray_results") - search_timeout: Timeout per search trial in seconds (default: 900) - actors_per_gpu: Number of actors per GPU for resource allocation (default: 1) + All configuration is loaded from OPTIMIZATION_EXPERIMENT.py. Modify that file to change + search parameters instead of passing command-line arguments. """ - # Validate objective function - if objective not in OBJECTIVE_FUNCTIONS: - raise ValueError(f"Invalid objective function '{objective}'. Choose from: {list(OBJECTIVE_FUNCTIONS.keys())}") - - config = get_run_configuration( - objective=objective, - debug=debug, - num_samples=num_samples, - search_timeout=search_timeout, - search_max_new_tokens=search_max_new_tokens, - search_max_context_length=search_max_context_length, - search_max_requests=search_max_requests, - force_search=force_search, - optimal_configs_dir=optimal_configs_dir, - ray_results_dir=ray_results_dir, - ) - + if not ray.is_initialized(): ray.init(ignore_reinit_error=True, log_to_driver=False, runtime_env={"working_dir": str(root_path)}) - optimal_configs = run_search(config, actors_per_gpu) + run_search() ray.shutdown() @@ -787,4 +165,4 @@ def main( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) - fire.Fire(main) + main() diff --git a/benchmark/raytune/search_manager.py b/benchmark/raytune/search_manager.py new file mode 100644 index 00000000..ae49de20 --- /dev/null +++ b/benchmark/raytune/search_manager.py @@ -0,0 +1,279 @@ +"""Search manager for orchestrating Ray Tune hyperparameter search.""" + +import json +import os +import sys +import time +import traceback +from dataclasses import asdict +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional + +# Path setup +current_dir = Path(__file__).parent +root_path = current_dir.parent.parent +sys.path.extend([str(current_dir), str(root_path)]) +os.environ["PYTHONPATH"] = os.environ.get("PYTHONPATH", "") + f":{current_dir}:{root_path}" + +from ray import tune + +from optimizer_factory import create_optimizer +from config_builders.utility import ( + OptimalConfig, + get_all_masker_config_classes, + serialize_sparse_config, + deserialize_sparse_config, +) +from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig +from benchmark_helper import BenchmarkHelper +from OPTIMIZATION_EXPERIMENT import USE_TIMESTAMP_FOR_RESULTS_DIR + +class ConfigSearchManager: + """Manages Phase 1: Hyperparameter search for optimal configs. + + This class orchestrates Ray Tune hyperparameter search to find optimal + sparse attention configurations for given model/task combinations. + """ + + def __init__(self, optimal_configs_dir: str, + force_search: bool, + generation_kwargs: Dict[str, any], + request_kwargs: Dict[str, any], + ray_results_dir: str) -> None: + """Initialize the search manager with configuration. + + Args: + base_config: Dictionary containing search configuration including: + - optimal_configs_dir: Directory to save optimal configs + - force_search: Whether to force re-search even if configs exist + """ + # Add timestamp to the results directory + if USE_TIMESTAMP_FOR_RESULTS_DIR: + timestamp: str = datetime.now().strftime("%Y%m%d_%H%M%S") + else: + timestamp: str = "default" + self.results_dir: str = os.path.join(optimal_configs_dir, f"run_{timestamp}") + os.makedirs(self.results_dir, exist_ok=True) + + self.force_search: bool = force_search + self.generation_kwargs: Dict[str, any] = generation_kwargs + self.request_kwargs: Dict[str, any] = request_kwargs + self.ray_results_dir: Path = ray_results_dir + print(f"Saving optimal configs to: {self.results_dir}") + + def search_optimal_config( + self, + model: str, + task: str, + masker_name: str, + masker_classes: Optional[List], + full_sparse_config: Optional[ResearchAttentionConfig] = None, + actors_per_gpu: int = 1 + ) -> OptimalConfig: + """Search for optimal hyperparameters for a single combination. + + Args: + model: Model name to use + task: Task name to benchmark + masker_name: Name of the masker configuration + masker_classes: List of masker classes (None for dense configs) + full_sparse_config: Full sparse attention config template + actors_per_gpu: Number of actors per GPU for resource allocation + + Returns: + OptimalConfig containing the best configuration found + """ + config_file: Path = os.path.join(self.results_dir, f"{model}_{task}_{masker_name}.json".replace("/", "_")) + + # Check if already exists + if os.path.exists(config_file) and not self.force_search: + print(f" → Loading existing config") + return self._load_config(config_file) + + # Handle dense config (no optimization needed) + if masker_classes is None: + optimal: OptimalConfig = OptimalConfig( + model=model, + task=task, + masker_name=masker_name, + sparse_config=None, + masker_classes=None, + hyperparams={}, + score=0.0, + search_time=0.0, + num_trials=1 + ) + self._save_config(optimal, config_file) + return optimal + + # Run hyperparameter search + start_time: float = time.time() + + try: + # Create optimizer with template config for fixed parameters + optimizer = create_optimizer(full_sparse_config) + + # Show what we're searching + search_space: Dict[str, any] = optimizer.create_search_space(task) + print(f" → Search space parameters:") + for param, space_obj in search_space.items(): + # Extract actual values from Ray Tune objects + if hasattr(space_obj, 'categories'): + values = space_obj.categories + print(f" - {param}: {values}") + else: + print(f" - {param}: {space_obj}") + + # Create objective function + def objective(trial_config: Dict[str, any]) -> Dict[str, float]: + runner: BenchmarkHelper = BenchmarkHelper( + base_result_dir=self.results_dir, + generation_kwargs=self.generation_kwargs, + request_kwargs=self.request_kwargs + ) + attention_config = optimizer.create_config_from_params(trial_config) + score: float + density: float + error: float + score, density, error = runner(attention_config, task, model) + return {"combined_score": score, "density": density, "error": error} + + # # ### run a sample objective to ensure there are no errors + # print("="*10, "Running a short test objective to ensure there are no errors", flush=True) + # sample_config: Dict[str, float] = { + # "AdaptiveSamplingMaskerConfig_base_rate_sampling": 0.1, + # "AdaptiveSamplingMaskerConfig_epsilon": 0.25, + # "AdaptiveSamplingMaskerConfig_delta": 0.25 + # } + # result: Dict[str, float] = objective(sample_config) + # print("="*10, "Successfully ran a short test objective", flush=True) + # print(sample_config) + # print(result) + # print("="*100, flush=True) + + # Run Ray Tune + sanitized_name: str = f"{model}_{task}_{masker_name}".replace("/", "_") + analysis = tune.run( + objective, + config=search_space, + metric="combined_score", + mode="min", + resources_per_trial={"CPU": 1, "GPU": 1.0 / actors_per_gpu}, + storage_path=self.ray_results_dir, + name=sanitized_name, + verbose=1, # Show Ray Tune progress + stop={"training_iteration": 1}, # One evaluation per config + ) + + # Get best config + best_trial = analysis.get_best_trial("combined_score", "min", "last") + best_config = optimizer.create_config_from_params(best_trial.config) + + # Save detailed trial information for post-analysis + trials_info: List[Dict[str, any]] = [] + for trial in analysis.trials: + trial_info: Dict[str, any] = { + "trial_id": trial.trial_id, + "config": trial.config, + "score": trial.last_result.get("combined_score", float('inf')) if trial.last_result else float('inf'), + "status": trial.status, + "start_time": trial.start_time.isoformat() if hasattr(trial, 'start_time') and trial.start_time else None, + "metric_history": trial.metric_analysis.get("combined_score", {}) if hasattr(trial, 'metric_analysis') else {} + } + trials_info.append(trial_info) + + # Save trial details to separate file + trials_file: Path = os.path.join(self.results_dir, f"{model}_{task}_{masker_name}_trials.json".replace("/", "_")) + with open(trials_file, "w") as f: + json.dump({ + "model": model, + "task": task, + "masker_name": masker_name, + "objective_function": full_sparse_config.objective if full_sparse_config.objective else "None", + "best_trial_id": best_trial.trial_id, + "trials": trials_info, + "analysis_dataframe_path": str(os.path.join(self.results_dir, f"{model}_{task}_{masker_name}_analysis.csv".replace("/", "_"))) + }, f, indent=2) + + # Save Ray analysis dataframe for detailed analysis + df = analysis.dataframe() + df.to_csv(os.path.join(self.results_dir, f"{model}_{task}_{masker_name}_analysis.csv".replace("/", "_")), index=False) + + optimal = OptimalConfig( + model=model, + task=task, + masker_name=masker_name, + sparse_config=best_config, + masker_classes=masker_classes, + hyperparams=best_trial.config, + score=best_trial.last_result["combined_score"], + search_time=time.time() - start_time, + num_trials=len(analysis.trials) + ) + + self._save_config(optimal, config_file) + return optimal + + except Exception as e: + print(f" ✗ Search failed: {e}") + traceback.print_exc() + # Return failure config + optimal = OptimalConfig( + model=model, + task=task, + masker_name=masker_name, + sparse_config=full_sparse_config, + masker_classes=masker_classes, + hyperparams={}, + score=5.0, + search_time=time.time() - start_time, + num_trials=0 + ) + self._save_config(optimal, config_file) + return optimal + + def _save_config(self, config: OptimalConfig, filepath: Path) -> None: + """Save configuration to JSON. + + Args: + config: OptimalConfig to save + filepath: Path where to save the config + """ + data: Dict[str, any] = asdict(config) + + # Convert sparse config to serializable format + if config.sparse_config: + data["sparse_config"] = serialize_sparse_config(config.sparse_config) + + # Convert masker classes to strings + if config.masker_classes: + data["masker_classes"] = [cls.__name__ for cls in config.masker_classes] + + with open(filepath, "w") as f: + json.dump(data, f, indent=2) + + def _load_config(self, filepath: Path) -> OptimalConfig: + """Load configuration from JSON. + + Args: + filepath: Path to the config file to load + + Returns: + OptimalConfig loaded from file + """ + with open(filepath, "r") as f: + data: Dict[str, any] = json.load(f) + + # Reconstruct sparse config if present + if data.get("sparse_config"): + data["sparse_config"] = deserialize_sparse_config(data["sparse_config"]) + + # Reconstruct masker classes from strings + if data.get("masker_classes"): + # Dynamically discover all available masker config classes + class_map: Dict[str, type] = get_all_masker_config_classes() + data["masker_classes"] = [class_map[name] for name in data["masker_classes"]] + + return OptimalConfig(**data) + diff --git a/benchmark/scripts/single_benchmark_model_example.py b/benchmark/scripts/single_benchmark_model_example.py index ad70e8ce..f0a864d3 100644 --- a/benchmark/scripts/single_benchmark_model_example.py +++ b/benchmark/scripts/single_benchmark_model_example.py @@ -24,13 +24,15 @@ import sys # Change to directory two levels below current location -os.chdir('/home/ubuntu/sparse-attention-hub') -sys.path.insert(0, '/home/ubuntu/sparse-attention-hub') +os.chdir('/data/apdesai/code/sparse-attention-hub') +sys.path.insert(0, '/data/apdesai/code/sparse-attention-hub') from sparse_attention_hub.metric_logging.logger import MicroMetricLogger from sparse_attention_hub.sparse_attention.research_attention import ResearchAttentionConfig from sparse_attention_hub.sparse_attention.research_attention.maskers.fixed.implementations import ( - DoubleSparsityTopKMaskerConfig + SinkMaskerConfig, + LocalMaskerConfig, + QuestTopKMaskerConfig ) #from benchmark.longbench import LongBench @@ -38,20 +40,15 @@ from sparse_attention_hub.adapters import ModelAdapterHF def main(): - model_name = "meta-llama/Llama-3.1-8B-Instruct" + model_name = "Qwen/Qwen3-4B-Instruct-2507" device = 0 # sorted_channel_file is available in the author's repository # https://github.com/andy-yang-1/DoubleSparse/tree/main/config # TODO: is there a better way to use the paths in scripts? sparse_attention_config = ResearchAttentionConfig(masker_configs=[ - DoubleSparsityTopKMaskerConfig( - heavy_size=4096, - group_factor=2, - label_bits=2, - sorted_channel_file="/home/ubuntu/DoubleSparse/config/meta-llama/Llama-3.1-8B-Instruct.json", - channel_selection="q_proj" - ) + SinkMaskerConfig(sink_size=128), + LocalMaskerConfig(window_size=128) ]) print(" ✓ Loading model...") @@ -61,14 +58,14 @@ def main(): adapter = ModelAdapterHF( model_name=model_name, sparse_attention_config=sparse_attention_config, - model_kwargs= {"torch_dtype": torch.bfloat16, "attn_implementation": "flash_attention_3"}, + model_kwargs= {"torch_dtype": torch.bfloat16, "attn_implementation": "flash_attention_2"}, device=device ) #benchmark = LongBench(['passage_retrieval_en']) - benchmark = Ruler32K(['vt']) + benchmark = Ruler32K(['niah_multikey_2']) - result_dir = Path("./test_results.vt.4096.2.2.q_proj/") + result_dir = Path("./test_results.4B/") result_dir.mkdir(exist_ok=True) metric_logger = MicroMetricLogger() metric_logger.configure_logging( @@ -79,7 +76,7 @@ def main(): ], ) metric_logger.flush() - benchmark.run_benchmark(adapter, result_dir, request_kwargs={"max_requests": 10, "max_context_length": 1000000}, generation_kwargs={"max_new_tokens": 500}) + benchmark.run_benchmark(adapter, result_dir, request_kwargs={"max_requests": 100, "max_context_length": 1000000}, generation_kwargs={"max_new_tokens": 500}) if __name__ == "__main__": main()