diff --git a/sdk/python/foundation-models/system/reinforcement-learning/environment/speculative-decoding-env/Dockerfile b/sdk/python/foundation-models/system/reinforcement-learning/environment/speculative-decoding-env/Dockerfile new file mode 100644 index 000000000..124a585fd --- /dev/null +++ b/sdk/python/foundation-models/system/reinforcement-learning/environment/speculative-decoding-env/Dockerfile @@ -0,0 +1,10 @@ +FROM lmsysorg/sglang:v0.5.2rc2-cu126 +ENV BASE_MODEL nvidia/Llama-3.1-8B-Instruct-FP8 +ENV DRAFT_MODEL lmsys/sglang-EAGLE3-LLaMA3.1-Instruct-8B +ENV SGLANG_ARGS "--tp-size 1 --max-running-requests 32 --mem-fraction-static 0.8 --enable-torch-compile --speculative-algorithm EAGLE3 --speculative-num-steps 3 --speculative-eagle-topk 2 --speculative-num-draft-tokens 4 --dtype float16 --attention-backend fa3 --host 0.0.0.0 --port 30000" +ENV SGL_HOST 0.0.0.0 +ENV SGL_PORT 30000 +ENV SGLANG_ALLOW_OVERWRITE_LONGER_CONTEXT_LEN 1 + +EXPOSE 30000 +ENTRYPOINT python3 -m sglang.launch_server --model-path $BASE_MODEL --speculative-draft-model-path $DRAFT_MODEL $SGLANG_ARGS \ No newline at end of file diff --git a/sdk/python/foundation-models/system/reinforcement-learning/images/metrics-base-target-spec-dec.png b/sdk/python/foundation-models/system/reinforcement-learning/images/metrics-base-target-spec-dec.png new file mode 100644 index 000000000..58073fe95 Binary files /dev/null and b/sdk/python/foundation-models/system/reinforcement-learning/images/metrics-base-target-spec-dec.png differ diff --git a/sdk/python/foundation-models/system/reinforcement-learning/reinforcement-learning.ipynb b/sdk/python/foundation-models/system/reinforcement-learning/reinforcement-learning.ipynb index 8eac40859..4a2163c7e 100644 --- a/sdk/python/foundation-models/system/reinforcement-learning/reinforcement-learning.ipynb +++ b/sdk/python/foundation-models/system/reinforcement-learning/reinforcement-learning.ipynb @@ -122,7 +122,7 @@ "source": [ "import matplotlib.pyplot as plt\n", "from scripts.utils import setup_workspace\n", - "from scripts.dataset import prepare_finqa_dataset\n", + "from scripts.dataset import prepare_finqa_dataset, prepare_sharegpt_dataset\n", "from scripts.run import get_run_metrics\n", "from scripts.reinforcement_learning import run_rl_training_pipeline\n", "from scripts.evaluation import run_evaluation_pipeline\n", @@ -130,8 +130,10 @@ " run_draft_model_pipeline,\n", " prepare_combined_model_for_deployment,\n", " deploy_speculative_decoding_endpoint,\n", + " deploy_base_model_endpoint,\n", + " run_evaluation_speculative_decoding,\n", ")\n", - "from scripts.deployment import create_managed_deployment, test_deployment" + "from scripts.deployment import test_deployment" ] }, { @@ -150,7 +152,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "

Prepare dataset for Finetuning. This would save train, test and valid dataset under data folder

" + "

Prepare dataset for Fine-tuning. This would save train, test and valid dataset under data folder

" ] }, { @@ -484,6 +486,15 @@ "

Reference: https://arxiv.org/abs/2503.01840

\n" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "draft_train_data_path = prepare_sharegpt_dataset()" + ] + }, { "cell_type": "code", "execution_count": null, @@ -498,7 +509,7 @@ " num_epochs=1, # Number of train epochs to be run by draft trainer.\n", " monitor=False, # Set to True to wait for completion.\n", " base_model_mlflow_path=\"azureml://registries/azureml-meta/models/Meta-Llama-3-8B-Instruct/versions/9\",\n", - " draft_train_data_path=\"./data_for_draft_model/train/sharegpt_train_small.jsonl\",\n", + " draft_train_data_path=draft_train_data_path,\n", ")" ] }, @@ -591,8 +602,7 @@ "endpoint_name = deploy_speculative_decoding_endpoint(\n", " ml_client=ml_client, # ML Client which specifies the workspace where endpoint gets deployed.\n", " combined_model=combined_model, # Reference from previous steps where combined model is created.\n", - " instance_type=\"octagepu\", # Instance type Kubernetes Cluster\n", - " compute_name=\"k8s-a100-compute\",\n", + " instance_type=\"Standard_NC40ads_H100_v5\", # Instance type\n", ")" ] }, @@ -631,10 +641,9 @@ "outputs": [], "source": [ "# Deploy managed online endpoint with base model\n", - "base_endpoint_name = create_managed_deployment( # Function to create endpoint for base model.\n", + "base_endpoint_name = deploy_base_model_endpoint( # Function to create endpoint for base model.\n", " ml_client=ml_client, # ML Client which specifies the workspace where endpoint gets deployed.\n", - " model_asset_id=\"meta-llama/Meta-Llama-3-8B-Instruct\", # Huggingface ID of the base model.\n", - " instance_type=\"Standard_ND96amsr_A100_v4\", # Compute SKU on which base model will be deployed.\n", + " instance_type=\"Standard_NC40ads_H100_v5\", # Compute SKU on which base model will be deployed.\n", ")" ] }, @@ -711,10 +720,12 @@ "# Run evaluation job to compare base model and speculative decoding endpoints' performance\n", "evaluation_job = run_evaluation_speculative_decoding(\n", " ml_client=ml_client,\n", + " registry_ml_client=registry_ml_client,\n", " base_endpoint_name=base_endpoint_name, # Base model endpoint from previous step.\n", " speculative_endpoint_name=endpoint_name, # Speculative endpoint from previous step.\n", - " base_model=\"meta-llama/Meta-Llama-3-8B-Instruct\", # HuggingFace repo ID of the model used in base endpoint, used for tokenization.\n", - " speculative_model=\"meta-llama/Meta-Llama-3-8B-Instruct\", # HuggingFace repo ID of the model used in speculative decoding endpoint, used for tokenization.\n", + " base_model_hf_id=\"meta-llama/Meta-Llama-3-8B-Instruct\", # HuggingFace repo ID of the model used in base endpoint, used for tokenization.\n", + " speculative_model_hf_id=\"meta-llama/Meta-Llama-3-8B-Instruct\", # HuggingFace repo ID of the model used in speculative decoding endpoint, used for tokenization.\n", + " compute_cluster=\"d13-v2\",\n", ")" ] }, @@ -735,7 +746,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "\"Performance" + "\"Performance" ] } ], diff --git a/sdk/python/foundation-models/system/reinforcement-learning/scripts/dataset.py b/sdk/python/foundation-models/system/reinforcement-learning/scripts/dataset.py index 13cd01942..3e5d744d7 100644 --- a/sdk/python/foundation-models/system/reinforcement-learning/scripts/dataset.py +++ b/sdk/python/foundation-models/system/reinforcement-learning/scripts/dataset.py @@ -6,6 +6,13 @@ from azure.ai.ml import MLClient from azure.ai.ml.entities import Data from azure.ai.ml.constants import AssetTypes +from typing import Optional +from json import JSONDecodeError +import requests +from tqdm import tqdm + + +SHAREGPT_URL = "https://huggingface.co/datasets/anon8231489123/ShareGPT_Vicuna_unfiltered/resolve/main/ShareGPT_V3_unfiltered_cleaned_split.json" def register_dataset(ml_client: MLClient, dataset_name: str, file_path: str): @@ -164,3 +171,100 @@ def map_fn(example: pd.Series, idx: int, split: str): return train_data.id, test_data.id, valid_data.id return train_dataset_path, test_dataset_path, valid_dataset_path + + +def _is_file_valid_json(path): + if not os.path.isfile(path): + return False + + try: + with open(path) as f: + json.load(f) + return True + except JSONDecodeError as e: + print( + f"{path} exists but json loading fails ({e=}), thus treat as invalid file" + ) + return False + + +def _download_and_cache_file(url: str, filename: Optional[str] = None): + """Read and cache a file from a url.""" + if filename is None: + filename = os.path.join("/tmp", url.split("/")[-1]) + + # Check if the cache file already exists + if _is_file_valid_json(filename): + return filename + + print(f"Downloading from {url} to {filename}") + + # Stream the response to show the progress bar + response = requests.get(url, stream=True) + response.raise_for_status() # Check for request errors + + # Total size of the file in bytes + total_size = int(response.headers.get("content-length", 0)) + chunk_size = 1024 # Download in chunks of 1KB + + # Use tqdm to display the progress bar + with open(filename, "wb") as f, tqdm( + desc=filename, + total=total_size, + unit="B", + unit_scale=True, + unit_divisor=1024, + ) as bar: + for chunk in response.iter_content(chunk_size=chunk_size): + f.write(chunk) + bar.update(len(chunk)) + + return filename + + +def prepare_sharegpt_dataset(dataset_path="./data/draft_model/sharegpt_train_processed.jsonl") -> str: + """Prepare the ShareGPT dataset for training the draft model.""" + # Download sharegpt if necessary + if not os.path.isfile(dataset_path): + temp_dataset_path = _download_and_cache_file(SHAREGPT_URL) + + # Load the dataset. + with open(temp_dataset_path) as f: + temp_dataset = json.load(f) + # Filter out the conversations with less than 2 turns. + temp_dataset = [data for data in temp_dataset if len(data["conversations"]) >= 2] + + # Keep one conversation in one list + new_dataset = [] + for temp_data in temp_dataset: + if len(temp_data["conversations"]) % 2 != 0: + continue + if temp_data["conversations"][0]["from"] != "human": + continue + + new_conversations = [] + + for i in range(0, len(temp_data["conversations"]), 2): + new_conversations.extend([ + { + "role": "user", + "content": temp_data["conversations"][i]["value"], + }, + { + "role": "assistant", + "content": temp_data["conversations"][i + 1]["value"], + } + ]) + + new_data = {} + new_data["id"] = temp_data.get("id", "") + new_data["conversations"] = new_conversations + + new_dataset.append(new_data) + + os.makedirs(os.path.dirname(dataset_path), exist_ok=True) + with open(dataset_path, "w") as f: + for item in new_dataset: + f.write(json.dumps(item) + "\n") + + return dataset_path diff --git a/sdk/python/foundation-models/system/reinforcement-learning/scripts/deployment.py b/sdk/python/foundation-models/system/reinforcement-learning/scripts/deployment.py index 944d81a09..ee3981b68 100644 --- a/sdk/python/foundation-models/system/reinforcement-learning/scripts/deployment.py +++ b/sdk/python/foundation-models/system/reinforcement-learning/scripts/deployment.py @@ -36,6 +36,7 @@ def create_managed_deployment( ml_client: MLClient, model_asset_id: str, # Asset ID of the model to deploy instance_type: str, # Supported instance type for managed deployment + model_mount_path: Optional[str] = None, environment_asset_id: Optional[str] = None, # Asset ID of the serving engine to use endpoint_name: Optional[str] = None, endpoint_description: str = "Sample endpoint", @@ -65,6 +66,7 @@ def create_managed_deployment( name=deployment_name, endpoint_name=endpoint_name, model=model_asset_id, + model_mount_path=model_mount_path, instance_type=instance_type, instance_count=1, environment=environment_asset_id, @@ -151,7 +153,10 @@ def test_deployment(ml_client, endpoint_name): """Run a test request against a deployed endpoint and print the result.""" print("Testing endpoint...") # Retrieve endpoint URI and API key to authenticate test request - scoring_uri = ml_client.online_endpoints.get(endpoint_name).scoring_uri + scoring_uri = ( + ml_client.online_endpoints.get(endpoint_name).scoring_uri.replace("/score", "/") + + "v1/chat/completions" + ) if not scoring_uri: raise ValueError("Scoring URI not found for endpoint.") diff --git a/sdk/python/foundation-models/system/reinforcement-learning/scripts/speculative_decoding.py b/sdk/python/foundation-models/system/reinforcement-learning/scripts/speculative_decoding.py index b22c420ac..b8b349f3c 100644 --- a/sdk/python/foundation-models/system/reinforcement-learning/scripts/speculative_decoding.py +++ b/sdk/python/foundation-models/system/reinforcement-learning/scripts/speculative_decoding.py @@ -6,9 +6,9 @@ from huggingface_hub import snapshot_download from azure.ai.ml import dsl, Input, MLClient from azure.ai.ml.constants import AssetTypes -from azure.ai.ml.entities import Model +from azure.ai.ml.entities import Model, Environment, BuildContext from scripts.run import monitor_run -from scripts.deployment import create_kubernetes_deployment +from scripts.deployment import create_managed_deployment class DraftModelPipeline: @@ -253,8 +253,8 @@ def run_draft_model_pipeline( ml_client: MLClient, registry_ml_client: MLClient, compute_cluster: str, - base_model_mlflow_path: str, - draft_train_data_path="./data_for_draft_model/train/sharegpt_train_small.jsonl", + base_model_mlflow_path="azureml://registries/azureml-meta/models/Meta-Llama-3-8B-Instruct/versions/9", + draft_train_data_path="./data/draft_model/sharegpt_train_small.jsonl", num_epochs=1, monitor=False, ): @@ -349,33 +349,63 @@ def prepare_combined_model_for_deployment( # Define paths draft_model_dir = "./models/draft" base_model_dir = "./models/base" + base_model_sub_dir = "model_artifact/model" + + print("\nDownloading draft model...") + ml_client.jobs.download( + name=draft_job_name, + output_name="output_model", + download_path=draft_model_dir, + all=True, + ) - temp_download_dir = "./models/draft_temp" - temp_path = Path(temp_download_dir) - required_files = ["config.json", "model.safetensors", "training_state.pt"] - - for file_pattern in required_files: - files_found = list(temp_path.rglob(file_pattern)) - if files_found: - src_path = files_found[0] # Take the first match - dst_path = Path(draft_model_dir) / file_pattern - shutil.move(str(src_path), str(dst_path)) - print(f"Moved {file_pattern}") - else: - print(f"File not found: {file_pattern}") - - # Clean up temporary directory - if os.path.exists(temp_download_dir): - shutil.rmtree(temp_download_dir) - print(f"Cleaned up temporary directory") - else: - print(f"Draft model already exists: {draft_model_dir}") + # Move all files from subdirectories to the root directory + for root, dirs, files in os.walk(draft_model_dir): + for file in files: + if root != draft_model_dir: # Skip files already in root + source = os.path.join(root, file) + destination = os.path.join(draft_model_dir, file) + shutil.move(source, destination) + + # Remove empty subdirectories + for root, dirs, files in os.walk(draft_model_dir, topdown=False): + for dir_name in dirs: + dir_path = os.path.join(root, dir_name) + if not os.listdir(dir_path): # Only remove if empty + os.rmdir(dir_path) + + # Update draft model config with extended context settings + draft_config = json.load(open(f"{draft_model_dir}/config.json")) + draft_config = { + **draft_config, + "max_position_embeddings": 131072, + "rope_scaling": { + "factor": 8, + "high_freq_factor": 4, + "low_freq_factor": 1, + "original_max_position_embeddings": 8192, + "rope_type": "llama3", + }, + } + with open(f"{draft_model_dir}/config.json", "w") as f: + json.dump(draft_config, f, indent=4) # Download base model from HuggingFace if force or not os.path.exists(base_model_dir): print("\nDownloading base model...") snapshot_download(repo_id=base_model_hf_id, local_dir=base_model_dir) print(f"Base model downloaded to: {base_model_dir}") + + # Create the target subdirectory structure + target_dir = os.path.join(base_model_dir, base_model_sub_dir) + os.makedirs(target_dir, exist_ok=True) + + # Move all files and directories from base_model_dir to target_dir + for item in os.listdir(base_model_dir): + if item != "model_artifact": # Skip the subdirectory we just created + source_path = os.path.join(base_model_dir, item) + target_path = os.path.join(target_dir, item) + shutil.move(source_path, target_path) else: print(f"Base model already exists: {base_model_dir}") @@ -392,9 +422,8 @@ def prepare_combined_model_for_deployment( def deploy_speculative_decoding_endpoint( ml_client: MLClient, - combined_model, - instance_type, # In kubernetes we can be granular upto the gpu level and leave the rest of the node unused - compute_name, # Compute argument for KubernetesOnlineEndpoint + combined_model: Model, + instance_type: str, ): print("Deploying speculative decoding endpoint") @@ -405,24 +434,35 @@ def deploy_speculative_decoding_endpoint( "Speculative decoding endpoint with GRPO fine-tuned base model" ) endpoint_tags = {"model_type": "speculative_decoding", "algorithm": "grpo"} - environment = ml_client.environments.get("speculative-decoding-env", label="latest") - if environment is None or environment.id is None: - raise ValueError("Speculative decoding environment not found in registry") + environment = Environment( + build=BuildContext(path="./environment/speculative-decoding-env"), + name="speculative-decoding-env", + description="Environment for speculative decoding inference using sglang.", + inference_config={ + "liveness_route": {"port": 30000, "path": "/health"}, + "readiness_route": {"port": 30000, "path": "/health_generate"}, + "scoring_route": {"port": 30000, "path": "/"}, + }, + ) environment_variables = { # Environment variables configure the serving engine and model paths for the container "SPECULATIVE_DECODING_MODE": "true", # Used sglang framework for inference - "BASE_MODEL": f"{model_mount_path}/models/base", # Path for base model + "BASE_MODEL": f"{model_mount_path}/models/base/model_artifact/model", # Path for base model "DRAFT_MODEL": f"{model_mount_path}/models/draft", # Path for draft model "NUM_SPECULATIVE_TOKENS": "5", "SERVING_ENGINE": "sglang", # the serving engine to use + "SGLANG_ARGS": "--tp-size 1 --max-running-requests 32 --mem-fraction-static 0.7 --speculative-algorithm EAGLE3 --speculative-num-steps 3 --speculative-eagle-topk 2 --speculative-num-draft-tokens 4 --dtype float16 --decode-attention-backend fa3 --prefill-attention-backend fa3 --host 0.0.0.0 --port 30000 --enable-torch-compile --cuda-graph-max-bs 16", + "SGLANG_ALLOW_OVERWRITE_LONGER_CONTEXT_LEN": "1", + "SGL_HOST": "0.0.0.0", + "SGL_PORT": "30000", } - endpoint_name = create_kubernetes_deployment( + endpoint_name = create_managed_deployment( ml_client=ml_client, model_asset_id=combined_model.id, - environment_asset_id=environment.id, instance_type=instance_type, - compute_name=compute_name, + model_mount_path=model_mount_path, + environment_asset_id=environment, endpoint_name=endpoint_name, endpoint_description=endpoint_description, endpoint_tags=endpoint_tags, @@ -432,3 +472,146 @@ def deploy_speculative_decoding_endpoint( print(f"Speculative decoding endpoint deployed: {endpoint_name}") return endpoint_name + + +def deploy_base_model_endpoint( + ml_client: MLClient, + instance_type: str, + base_model="./models/base/model_artifact", +): + """Deploy base model endpoint.""" + print("Deploying base model endpoint") + + endpoint_name = f"base-model" + deployment_name = "base-deployment" + endpoint_description = "Base model endpoint" + endpoint_tags = {"model_type": "base"} + + model = Model( + path=base_model, + name=f"base-model", + ) + + environment_variables = { + "TASK_TYPE": "chat-completion", + } + + endpoint_name = create_managed_deployment( + ml_client=ml_client, + model_asset_id=model, + instance_type=instance_type, + environment_asset_id=Environment( + image="mcr.microsoft.com/azureml/curated/foundation-model-inference:85", + inference_config={ + "liveness_route": {"port": 8000, "path": "/ping"}, + "readiness_route": {"port": 8000, "path": "/health"}, + "scoring_route": {"port": 8000, "path": "/"}, + }, + ), + endpoint_name=endpoint_name, + endpoint_description=endpoint_description, + endpoint_tags=endpoint_tags, + deployment_name=deployment_name, + deployment_env_vars=environment_variables, + ) + + print(f"Base model endpoint deployed: {endpoint_name}") + return endpoint_name + + +def _prepare_evaluation_inputs( + ml_client, base_endpoint_name, speculative_endpoint_name +): + """Prepare necessary inputs for evaluation.""" + + print(f"Preparing evaluation inputs...") + from azure.ai.ml.entities import WorkspaceConnection, ApiKeyConfiguration + + def _get_endpoint_details(endpoint_name): + """Get endpoint details including scoring URI and API key.""" + api_key = ml_client.online_endpoints.get_keys(name=endpoint_name).primary_key + base_url = ml_client.online_endpoints.get( + name=endpoint_name + ).scoring_uri.replace("/score", "/") + connections_name = f"conn-{str(uuid.uuid4())[:8]}" + + ws_connection = WorkspaceConnection( + name=connections_name, + type="azure_sql_db", + target=base_url, + credentials=ApiKeyConfiguration(key=api_key), + ) + ml_client.connections.create_or_update(workspace_connection=ws_connection) + + return connections_name, base_url + + base_connection_name, base_scoring_uri = _get_endpoint_details(base_endpoint_name) + speculative_connection_name, speculative_scoring_uri = _get_endpoint_details( + speculative_endpoint_name + ) + return ( + base_connection_name, + base_scoring_uri, + speculative_connection_name, + speculative_scoring_uri, + ) + + +def run_evaluation_speculative_decoding( + ml_client: MLClient, + registry_ml_client: MLClient, + base_endpoint_name: str, + speculative_endpoint_name: str, + base_model_hf_id: str, + speculative_model_hf_id: str, + compute_cluster="h100-dedicated", + monitor=False, +): + """Run evaluation pipeline to compare speculative decoding and base model endpoints.""" + base_conn_name, base_score_uri, speculative_conn_name, speculative_score_uri = ( + _prepare_evaluation_inputs( + ml_client, base_endpoint_name, speculative_endpoint_name + ) + ) + + # Load component + evaluation_component_name = "component_endpoint_benchmarking" + print(f" ✓ Loading component: {evaluation_component_name}") + evaluation_component = registry_ml_client.components.get( + name=evaluation_component_name, label="latest" + ) + print( + f" ✓ Component loaded: {evaluation_component.name} v{evaluation_component.version}" + ) + + # Define pipeline + @dsl.pipeline(compute=compute_cluster) + def evaluation_pipeline(): + node = evaluation_component( + base_scoring_url=base_score_uri, + base_connection_name=base_conn_name, + target_scoring_url=speculative_score_uri, + target_connection_name=speculative_conn_name, + base_model=base_model_hf_id, + target_model=speculative_model_hf_id, + base_backend="vllm", + target_backend="sglang", + ) + return {"metrics": node.outputs.metrics} + + # Submit pipeline + evaluation_job = evaluation_pipeline() + print(" ✓ Submitting evaluation pipeline...") + evaluation_job = ml_client.jobs.create_or_update( + evaluation_job, experiment_name="speculative-decoding-evaluation" + ) + + print(f"Job submitted: {evaluation_job.name}") + print(f"Studio URL: {evaluation_job.studio_url}") + + # Monitor if requested + if monitor: + _, status = monitor_run(ml_client, evaluation_job, poll_interval=60) + return evaluation_job, status + + return evaluation_job, None