diff --git a/test/.gitignore b/test/.gitignore index 220d21ac1..8163f28a5 100644 --- a/test/.gitignore +++ b/test/.gitignore @@ -5,6 +5,7 @@ result_outputs/ results/ .cache/ backup/ +sites/Demo/* $null *__pycache__/ .* diff --git a/test/common/capture_utils.py b/test/common/capture_utils.py index b12b76637..077132cd5 100644 --- a/test/common/capture_utils.py +++ b/test/common/capture_utils.py @@ -1,4 +1,6 @@ +import dataclasses import functools +from collections.abc import Mapping from typing import Any, Dict, List from common.db_utils import write_to_db @@ -43,15 +45,64 @@ def post_process(table_name: str, **kwargs) -> List[Dict[str, Any]]: return [] +def _ensure_list(obj): + """ + Ensure the object is returned as a list. + """ + if isinstance(obj, list): + return obj + if isinstance(obj, (str, bytes, Mapping)): + return [obj] + if hasattr(obj, "__iter__") and not hasattr(obj, "__len__"): # 如 generator + return list(obj) + return [obj] + + +def _to_dict(obj: Any) -> Dict[str, Any]: + """ + Convert various object types to a dictionary for DB writing. + """ + if isinstance(obj, Mapping): + return dict(obj) + if dataclasses.is_dataclass(obj): + return dataclasses.asdict(obj) + if hasattr(obj, "_asdict"): # namedtuple + return obj._asdict() + if hasattr(obj, "__dict__"): + return vars(obj) + raise TypeError(f"Cannot convert {type(obj)} to dict for DB writing") + + +def proj_process(table_name: str, **kwargs) -> List[Dict[str, Any]]: + if "_proj" not in kwargs: + return [] + name = kwargs.get("_name", table_name) + raw_input = kwargs["_proj"] + raw_results = _ensure_list(raw_input) + + processed_results = [] + for result in raw_results: + try: + dict_result = _to_dict(result) + write_to_db(name, dict_result) + processed_results.append(dict_result) + except Exception as e: + raise ValueError(f"Failed to process item in _proj: {e}") from e + + return processed_results + + # ---------------- decorator ---------------- def export_vars(func): @functools.wraps(func) def wrapper(*args, **kwargs): result = func(*args, **kwargs) - # If the function returns a dict containing '_data' or 'data', post-process it + # If the function returns a dict containing '_data' or '_proj', post-process it if isinstance(result, dict): - if "_data" in result or "data" in result: + if "_data" in result: return post_process(func.__name__, **result) + if "_proj" in result: + return proj_process(func.__name__, **result) # Otherwise return unchanged return result @@ -65,33 +116,6 @@ def capture(): return {"name": "demo", "_data": {"accuracy": 0.1, "loss": 0.3}} -@export_vars -def capture_list(): - """All lists via '_name' + '_data'""" - return { - "_name": "demo", - "_data": { - "accuracy": [0.1, 0.2, 0.3], - "loss": [0.1, 0.2, 0.3], - }, - } - - -@export_vars -def capture_mix(): - """Mixed single + lists via '_name' + '_data'""" - return { - "_name": "demo", - "_data": { - "length": 10086, # single value - "accuracy": [0.1, 0.2, 0.3], # list - "loss": [0.1, 0.2, 0.3], # list - }, - } - - # quick test if __name__ == "__main__": print("capture(): ", capture()) - print("capture_list(): ", capture_list()) - print("capture_mix(): ", capture_mix()) diff --git a/test/common/db_utils.py b/test/common/db_utils.py index 089af43b2..646f91782 100644 --- a/test/common/db_utils.py +++ b/test/common/db_utils.py @@ -1,89 +1,120 @@ import json import logging import threading +from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional -import peewee -from common.config_utils import config_utils as config_instance -from peewee import AutoField, Model, MySQLDatabase, TextField +# Lazy imports for database components +peewee = None +PostgresqlDatabase = None +Model = None +AutoField = None +DateTimeField = None +TextField = None logger = logging.getLogger("db_handler") logger.setLevel(logging.DEBUG) -# Avoid adding handlers multiple times if not logger.handlers: - logger.setLevel(logging.DEBUG) - -# Global DB instance and lock for thread-safe singleton -_db_instance: Optional[MySQLDatabase] = None + # Basic config only once + handler = logging.StreamHandler() + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + +# Global state +_db_instance = None _db_lock = threading.Lock() _test_build_id: Optional[str] = None _backup_path: Optional[Path] = None -_db_enabled: bool = False # from config +_db_enabled: bool = False + + +def _ensure_peewee_imported(): + """Import peewee components only when DB is enabled.""" + global peewee, PostgresqlDatabase, Model, AutoField, DateTimeField, TextField + if peewee is None: + import peewee + from peewee import AutoField as _AF + from peewee import DateTimeField as _DTF + from peewee import Model as _Model + from peewee import PostgresqlDatabase as _PGDB + from peewee import TextField as _TF + + PostgresqlDatabase = _PGDB + Model = _Model + AutoField = _AF + DateTimeField = _DTF + TextField = _TF -def _get_db() -> Optional[MySQLDatabase]: - """Return a singleton MySQLDatabase instance based on YAML configuration.""" +def _get_db(): + """Return a singleton PostgresqlDatabase instance if enabled.""" global _db_instance, _backup_path, _db_enabled - if _db_instance is None: - with _db_lock: - if _db_instance is None: - db_config = config_instance.get_config("database", {}) - _db_enabled = db_config.get("enabled", False) - - backup_str = db_config.get("backup", "results/") - _backup_path = Path(backup_str).resolve() - _backup_path.mkdir(parents=True, exist_ok=True) - logger.info(f"Backup directory set to: {_backup_path}") - - if not _db_enabled: - return None - - try: - _db_instance = MySQLDatabase( - db_config.get("name", "test_db"), - user=db_config.get("user", "root"), - password=db_config.get("password", ""), - host=db_config.get("host", "localhost"), - port=db_config.get("port", 3306), - charset=db_config.get("charset", "utf8mb4"), - ) - logger.info( - f"Database instance created for: {_db_instance.database}" - ) - except Exception as e: - logger.error(f"Failed to create database instance: {e}") - _db_instance = None + if _db_instance is not None: + return _db_instance + + with _db_lock: + if _db_instance is not None: + return _db_instance + + db_config = _get_db_config() + _db_enabled = db_config.get("enabled", False) + + backup_str = db_config.get("backup", "results/") + _backup_path = Path(backup_str).resolve() + _backup_path.mkdir(parents=True, exist_ok=True) + logger.info(f"Backup directory set to: {_backup_path}") + + if not _db_enabled: + return None + + # Only import peewee when enabled + _ensure_peewee_imported() + + try: + _db_instance = PostgresqlDatabase( + db_config.get("name", "test_db"), + user=db_config.get("user", "postgres"), + password=db_config.get("password", ""), + host=db_config.get("host", "localhost"), + port=db_config.get("port", 5432), + ) + logger.info( + f"PostgreSQL database instance created for: {_db_instance.database}" + ) + except Exception as e: + logger.error(f"Failed to create PostgreSQL database instance: {e}") + _db_instance = None return _db_instance +def _get_db_config(): + """Wrapper to get config without early peewee dependency.""" + from common.config_utils import config_utils as config_instance + + return config_instance.get_config("database", {}) + + def _set_test_build_id(build_id: Optional[str] = None) -> None: - """Set or generate a unique test build ID.""" global _test_build_id _test_build_id = build_id or "default_build_id" logger.debug(f"Test build ID set to: {_test_build_id}") def _get_test_build_id() -> str: - """Return the current test build ID, generating one if necessary.""" global _test_build_id if _test_build_id is None: _set_test_build_id() return _test_build_id -class BaseEntity(Model): - """Base PeeWee model class using the singleton database.""" - - class Meta: - database = _get_db() - - def _backup_to_file(table_name: str, data: Dict[str, Any]) -> None: - """Write data to a JSON Lines (.jsonl) file in the backup directory.""" if not _backup_path: logger.warning("Backup path is not set. Skipping backup.") return @@ -100,73 +131,89 @@ def _backup_to_file(table_name: str, data: Dict[str, Any]) -> None: def write_to_db(table_name: str, data: Dict[str, Any]) -> bool: - """ - Attempt to insert data into the specified database table. - If the table doesn't exist or an error occurs, back up to a JSONL file. - """ - db = _get_db() + # Always add build ID data["test_build_id"] = _get_test_build_id() - # Skip DB entirely if disabled - if not _db_enabled or db is None: + # Early exit if DB disabled + db_config = _get_db_config() + if not db_config.get("enabled", False): _backup_to_file(table_name, data) return False - try: - if not db.table_exists(table_name): - logger.warning(f"Table '{table_name}' does not exist. Writing to backup.") - _backup_to_file(table_name, data) - return False - - # Get existing columns and filter data - columns = db.get_columns(table_name) - col_names = {col.name for col in columns} - filtered_data = {k: v for k, v in data.items() if k in col_names} - - # Build dynamic model for insertion - fields = {"id": AutoField()} - for col in columns: - if col.name != "id": - fields[col.name] = TextField(null=True) - - DynamicEntity = type( - f"{table_name.capitalize()}DynamicModel", - (BaseEntity,), - { - "Meta": type("Meta", (), {"database": db, "table_name": table_name}), - **fields, - }, - ) + # Load DB and peewee only when needed + db = _get_db() + if db is None: + _backup_to_file(table_name, data) + return False + + _ensure_peewee_imported() + try: + # Check if table exists + table_exists = db.table_exists(table_name) + + # Get or create dynamic model + columns = db.get_columns(table_name) if table_exists else [] + col_names = {col.name for col in columns} if table_exists else set() + + # Ensure required fields are present + all_fields = {"id", "created_at", "test_build_id"} + all_fields.update(data.keys()) + + # Build field definitions + fields = { + "id": AutoField(), + "created_at": DateTimeField(default=datetime.utcnow), + "test_build_id": TextField(null=True), + } + + # Add TextField for all other data keys (including future ones) + for key in data.keys(): + if key not in fields: + fields[key] = TextField(null=True) + + # Define dynamic model + Meta = type("Meta", (), {"database": db, "table_name": table_name}) + + attrs = {"Meta": Meta, **fields} + DynamicModel = type(f"{table_name.capitalize()}DynamicModel", (Model,), attrs) + + # Create table if not exists + if not table_exists: + db.create_tables([DynamicModel], safe=True) + logger.info( + f"Table '{table_name}' created with id, created_at, test_build_id, and dynamic fields." + ) + + # Prepare data for insert (only include fields that exist in model) + model_fields = set(fields.keys()) + filtered_data = {k: v for k, v in data.items() if k in model_fields} + + # Insert with db.atomic(): - DynamicEntity.insert(filtered_data).execute() + DynamicModel.insert(filtered_data).execute() + logger.info(f"Successfully inserted data into table '{table_name}'.") return True - except peewee.PeeweeException as e: - logger.error( - f"Database write error for table '{table_name}': {e}", exc_info=True - ) except Exception as e: - logger.critical( - f"Unexpected error during DB write for '{table_name}': {e}", exc_info=True + logger.error( + f"Error during DB write for table '{table_name}': {e}", exc_info=True ) - - # Fallback to backup on any failure - _backup_to_file(table_name, data) - return False + _backup_to_file(table_name, data) + return False def database_connection(build_id: str) -> None: - """Test database connection and set the build ID.""" logger.info(f"Setting test build ID: {build_id}") _set_test_build_id(build_id) - db = _get_db() - if not _db_enabled: + db_config = _get_db_config() + if not db_config.get("enabled", False): logger.info("Database connection skipped because enabled=false.") return + db = _get_db() if db is None: logger.error("No database instance available.") return @@ -174,9 +221,9 @@ def database_connection(build_id: str) -> None: logger.info(f"Attempting connection to database: {db.database}") try: db.connect(reuse_if_open=True) - logger.info("Database connection successful.") + logger.info("PostgreSQL connection successful.") except Exception as e: - logger.error(f"Database connection failed: {e}", exc_info=True) + logger.error(f"PostgreSQL connection failed: {e}", exc_info=True) finally: if not db.is_closed(): db.close() diff --git a/test/common/envPreCheck/run_env_preCheck.py b/test/common/envPreCheck/run_env_preCheck.py index f99b87952..daae41032 100644 --- a/test/common/envPreCheck/run_env_preCheck.py +++ b/test/common/envPreCheck/run_env_preCheck.py @@ -9,7 +9,6 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple -import ucmstore import yaml CODE_ROOT = Path(__file__).resolve().parent @@ -1192,6 +1191,8 @@ def setup_uc(block_size): Raises: RuntimeError: if ucmstore.Setup returns a non-zero value. """ + import ucmstore + param = ucmstore.SetupParam(STORAGE_BACKENDS, block_size, True) ret = ucmstore.Setup(param) if ret != 0: @@ -1232,6 +1233,8 @@ def embed(hashes, block_layer_size, block_layer): Raises: RuntimeError: If any UC operation fails. """ + import ucmstore + with StdoutInterceptor() as cap: # Allocate blocks in UC ret = ucmstore.AllocBatch(hashes) @@ -1296,6 +1299,8 @@ def fetch(hashes, block_layer_size, block_layer): Raises: RuntimeError: If any UC operation fails. """ + import ucmstore + with StdoutInterceptor() as cap: block_number = len(hashes) results = ucmstore.LookupBatch(hashes) diff --git a/test/config.yaml b/test/config.yaml index 3d2f809c0..0aa8a5b21 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -3,7 +3,7 @@ reports: use_timestamp: true directory_prefix: "pytest" html: # pytest-html - enabled: true + enabled: false filename: "report.html" title: "UCM Pytest Test Report" @@ -11,11 +11,10 @@ database: backup: "results/" enabled: true host: "127.0.0.1" - port: 3306 - name: "ucm_pytest" - user: "root" + port: 5432 + name: "ucm_test" + user: "postgres" password: "123456" - charset: "utf8mb4" models: ip_ports: "" @@ -24,62 +23,25 @@ models: payload: '' enable_clear_hbm: false - # LLM Connection Configuration llm_connection: - model: "qwen3" - server_url: "http://141.111.32.70:9382" - tokenizer_path: "/home/models/QwQ-32B" - stream: true # stream output - ignore_eos: true # Ignore the returned terminator - timeout: 180 # request time out - -reports: - base_dir: "results/reports" - use_timestamp: true - directory_prefix: "pytest" - html: # pytest-html - enabled: true - filename: "report.html" - title: "UCM Pytest Test Report" - -database: - backup: "results/" - enabled: true - host: "127.0.0.1" - port: 3306 - name: "ucm_pytest" - user: "root" - password: "123456" - charset: "utf8mb4" - -models: - ip_ports: "" + model: "" + server_url: "" tokenizer_path: "" - served_model_name: "" - payload: '' - enable_clear_hbm: false - - -# LLM Connection Configuration -llm_connection: - model: "qwen3" - server_url: "http://141.111.32.70:9382" - tokenizer_path: "/home/models/QwQ-32B" stream: true # stream output ignore_eos: true # Ignore the returned terminator timeout: 180 # request time out # Environment Pre-Check Configuration Env_preCheck: - master_ip: 141.111.32.70 - worker_ip: 141.111.32.72 - ascend_rt_visible_devices: "0,1" - node_num: 2 - model_path: "/home/models/QwQ-32B" - hf_model_name: "QwQ-32B" - middle_page: "Qwen" + master_ip: 192.168.0.1 + worker_ip: + ascend_rt_visible_devices: "" + node_num: + model_path: "" + hf_model_name: "" + middle_page: "" expected_embed_bandwidth: 10 expected_fetch_bandwidth: 10 kvCache_block_number: 1024 - storage_backends: ["/home/menglong/data"] \ No newline at end of file + storage_backends: [""] \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py index 150257952..a1b8af404 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -3,7 +3,6 @@ import datetime as dt import platform as pf import sys -from functools import wraps from pathlib import Path import pytest diff --git a/test/pytest.ini b/test/pytest.ini index 16643b39f..3a0ed9a91 100644 --- a/test/pytest.ini +++ b/test/pytest.ini @@ -6,7 +6,6 @@ python_functions = test_* addopts = -ra - --strict-markers --capture=no filterwarnings = ignore::pytest.PytestReturnNotNoneWarning diff --git a/test/requirements.txt b/test/requirements.txt index 07635b247..38866e802 100644 --- a/test/requirements.txt +++ b/test/requirements.txt @@ -1,6 +1,12 @@ +#pytest pytest>=7.0.0 pytest-html>=3.1.1 PyYAML>=6.0 -# MySQL +#database peewee>=3.14.5 -pymysql>=1.0.2 \ No newline at end of file +psycopg2-binary>=2.8 +#llmperf +requests>=2.10.0 +pandas>=2.3.0 +pydantic>=2.12.0 +transformers>=4.0.0 \ No newline at end of file