From fc33b9a60a10c533081500fe5e728a8d2329baa8 Mon Sep 17 00:00:00 2001 From: juncaipeng <13006307475@163.com> Date: Thu, 20 Nov 2025 12:52:24 +0000 Subject: [PATCH 1/2] Refine splitwise deployment --- examples/splitwise/README.md | 36 ++++ examples/splitwise/start_mixed.sh | 34 ++-- examples/splitwise/start_v0_tp1.sh | 32 ++-- examples/splitwise/start_v0_tp2.sh | 111 ------------- examples/splitwise/start_v1_tp1.sh | 35 ++-- examples/splitwise/start_v1_tp2.sh | 110 ------------- examples/splitwise/stop.sh | 7 +- examples/splitwise/utils.sh | 24 +++ fastdeploy/engine/common_engine.py | 102 ++++++------ fastdeploy/engine/engine.py | 6 +- .../engine/sched/resource_manager_v1.py | 104 +++++++----- fastdeploy/router/__init__.py | 4 + fastdeploy/router/launch.py | 33 +--- fastdeploy/router/router.py | 53 +++++- fastdeploy/splitwise/splitwise_connector.py | 155 +++++++++--------- 15 files changed, 361 insertions(+), 485 deletions(-) create mode 100644 examples/splitwise/README.md delete mode 100644 examples/splitwise/start_v0_tp2.sh delete mode 100644 examples/splitwise/start_v1_tp2.sh create mode 100644 examples/splitwise/utils.sh diff --git a/examples/splitwise/README.md b/examples/splitwise/README.md new file mode 100644 index 00000000000..2b8d88ca061 --- /dev/null +++ b/examples/splitwise/README.md @@ -0,0 +1,36 @@ +# Run the Examples on NVIDIA CUDA GPU + +## Prepare the Environment +Refer to [NVIDIA CUDA GPU Installation](https://paddlepaddle.github.io/FastDeploy/get_started/installation/nvidia_gpu/) to pull the docker image, such as: +``` +docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-cuda-12.6:2.3.0 +``` + +In the docker container, the [NVIDIA MLNX_OFED](https://network.nvidia.com/products/infiniband-drivers/linux/mlnx_ofed/) and [Redis](https://redis.io/) are pre-installed. + +## Build and install FastDeploy + +``` +git clone https://github.com/PaddlePaddle/FastDeploy +cd FastDeploy + +export ENABLE_FD_RDMA=1 + +# Argument 1: Whether to build wheel package (1 for yes, 0 for compile only) +# Argument 2: Python interpreter path +# Argument 3: Whether to compile CPU inference operators +# Argument 4: Target GPU architectures +bash build.sh 1 python false [80,90] +``` + +## Run the Examples + +Run the shell scripts in this directory, ```bash start_v0_tp1.sh``` or ```bash start_v1_tp1.sh``` + +Note that, there are two methods for splitwise deployment: +* v0: using splitwise_scheduler or dp_scheduler, in which the requests are scheduled in the engine. +* v1: using router, in which the requests are scheduled in the router. + +# Run the Examples on Kunlunxin XPU + +Coming soon... diff --git a/examples/splitwise/start_mixed.sh b/examples/splitwise/start_mixed.sh index c36027ac26a..37699caeb2b 100644 --- a/examples/splitwise/start_mixed.sh +++ b/examples/splitwise/start_mixed.sh @@ -3,33 +3,28 @@ set -e # Test mixed server + router -wait_for_health() { - local server_port=$1 - while true; do - status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") - if [ "$status_code" -eq 200 ]; then - break - else - echo "Service not ready. Retrying in 2s..." - sleep 2 - fi - done -} - # prepare environment -MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" - +export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" export FD_DEBUG=1 -export ENABLE_V1_KVCACHE_SCHEDULER=0 -export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 unset http_proxy && unset https_proxy rm -rf log_* +source ./utils.sh S1_PORT=52400 S2_PORT=52500 ROUTER_PORT=52600 +ports=( + $S1_PORT $((S1_PORT + 1)) $((S1_PORT + 2)) $((S1_PORT + 3)) + $S2_PORT $((S2_PORT + 1)) $((S2_PORT + 2)) $((S2_PORT + 3)) + $ROUTER_PORT +) +check_ports "${ports[@]}" || { + echo "❌ Some ports are in use. Please release them." + exit 1 +} + # start router export FD_LOG_DIR="log_router" mkdir -p ${FD_LOG_DIR} @@ -37,7 +32,6 @@ mkdir -p ${FD_LOG_DIR} nohup python -m fastdeploy.router.launch \ --port ${ROUTER_PORT} \ 2>&1 >${FD_LOG_DIR}/nohup & -sleep 1 # start modelserver 0 export CUDA_VISIBLE_DEVICES=0 @@ -53,7 +47,6 @@ nohup python -m fastdeploy.entrypoints.openai.api_server \ --max-model-len 32768 \ --router "0.0.0.0:${ROUTER_PORT}" \ 2>&1 >${FD_LOG_DIR}/nohup & -sleep 1 wait_for_health ${S1_PORT} @@ -76,6 +69,7 @@ wait_for_health ${S2_PORT} # send request sleep 10 # make sure server is registered to router +echo "send request..." curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \ -H "Content-Type: application/json" \ -d '{ @@ -83,5 +77,5 @@ curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \ {"role": "user", "content": "hello"} ], "max_tokens": 20, - "stream": true + "stream": false }' diff --git a/examples/splitwise/start_v0_tp1.sh b/examples/splitwise/start_v0_tp1.sh index 42f585a5a71..c91bcf9d302 100644 --- a/examples/splitwise/start_v0_tp1.sh +++ b/examples/splitwise/start_v0_tp1.sh @@ -6,22 +6,8 @@ set -e # v0: using splitwise_scheduler or dp_scheduler # v1: using local_scheduler + router -wait_for_health() { - local server_port=$1 - while true; do - status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") - if [ "$status_code" -eq 200 ]; then - break - else - echo "Service not ready. Retrying in 2s..." - sleep 2 - fi - done -} - # prepare environment -MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" - +export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" export FD_DEBUG=1 export ENABLE_V1_KVCACHE_SCHEDULER=1 export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 @@ -37,10 +23,21 @@ fi unset http_proxy && unset https_proxy rm -rf log_* +source ./utils.sh P_PORT=52400 D_PORT=52500 -REDIS_PORT=56388 +REDIS_PORT="${REDIS_PORT:-56388}" + +ports=( + $P_PORT $((P_PORT + 1)) $((P_PORT + 2)) $((P_PORT + 3)) $((P_PORT + 4)) $((P_PORT + 5)) + $D_PORT $((D_PORT + 1)) $((D_PORT + 2)) $((D_PORT + 3)) $((D_PORT + 4)) $((D_PORT + 5)) + $REDIS_PORT +) +check_ports "${ports[@]}" || { + echo "❌ Some ports are in use. Please release them." + exit 1 +} # start redis if ! redis-cli -p ${REDIS_PORT} ping &>/dev/null; then @@ -104,6 +101,7 @@ wait_for_health ${D_PORT} # send request sleep 10 # make sure server is registered to router +echo "send request..." curl -X POST "http://0.0.0.0:${D_PORT}/v1/chat/completions" \ -H "Content-Type: application/json" \ -d '{ @@ -111,5 +109,5 @@ curl -X POST "http://0.0.0.0:${D_PORT}/v1/chat/completions" \ {"role": "user", "content": "hello"} ], "max_tokens": 20, - "stream": true + "stream": false }' diff --git a/examples/splitwise/start_v0_tp2.sh b/examples/splitwise/start_v0_tp2.sh deleted file mode 100644 index cb2015ec4ac..00000000000 --- a/examples/splitwise/start_v0_tp2.sh +++ /dev/null @@ -1,111 +0,0 @@ -#!/bin/bash -set -e - -# Test splitwise deployment -# There are two methods for splitwise deployment: -# v0: using splitwise_scheduler or dp_scheduler -# v1: using local_scheduler + router - -wait_for_health() { - local server_port=$1 - while true; do - status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") - if [ "$status_code" -eq 200 ]; then - break - else - echo "Service not ready. Retrying in 2s..." - sleep 2 - fi - done -} - -# prepare environment -MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" - -export FD_DEBUG=1 -export ENABLE_V1_KVCACHE_SCHEDULER=0 -export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 - -SCRIPT_PATH=$(readlink -f "$0") -SCRIPT_DIR=$(dirname "$SCRIPT_PATH") -export $(bash ${SCRIPT_DIR}/../../scripts/get_rdma_nics.sh gpu) -echo "KVCACHE_RDMA_NICS:${KVCACHE_RDMA_NICS}" -if [ -z "${KVCACHE_RDMA_NICS}" ]; then - echo "KVCACHE_RDMA_NICS is empty, please check the output of get_rdma_nics.sh" - exit 1 -fi - -unset http_proxy && unset https_proxy -rm -rf log_* - -# start redis -if ! redis-cli ping &>/dev/null; then - echo "Redis is not running. Starting redis-server..." - redis-server --daemonize yes - sleep 1 -else - echo "Redis is already running." -fi -sleep 1 - -# start prefill -export CUDA_VISIBLE_DEVICES=0,1 -export FD_LOG_DIR="log_prefill" -mkdir -p ${FD_LOG_DIR} - -nohup python -m fastdeploy.entrypoints.openai.api_server \ - --model ${MODEL_NAME} \ - --port 8100 \ - --metrics-port 8101 \ - --engine-worker-queue-port 8102 \ - --cache-queue-port 8103 \ - --max-model-len 32768 \ - --tensor-parallel-size 2 \ - --splitwise-role "prefill" \ - --cache-transfer-protocol "rdma,ipc" \ - --pd-comm-port 8104 \ - --rdma-comm-ports 8105,8106 \ - --scheduler-name "splitwise" \ - --scheduler-host "127.0.0.1" \ - --scheduler-port 6379 \ - --scheduler-ttl 9000 \ - 2>&1 >${FD_LOG_DIR}/nohup & -# wait_for_health 8100 - -# start decode -export CUDA_VISIBLE_DEVICES=2,3 -export FD_LOG_DIR="log_decode" -mkdir -p ${FD_LOG_DIR} - -nohup python -m fastdeploy.entrypoints.openai.api_server \ - --model ${MODEL_NAME} \ - --port 9000 \ - --metrics-port 9001 \ - --engine-worker-queue-port 9002 \ - --cache-queue-port 9003 \ - --max-model-len 32768 \ - --tensor-parallel-size 2 \ - --splitwise-role "decode" \ - --cache-transfer-protocol "rdma,ipc" \ - --pd-comm-port 9004 \ - --rdma-comm-ports 9005,9006 \ - --scheduler-name "splitwise" \ - --scheduler-host "127.0.0.1" \ - --scheduler-port 6379 \ - --scheduler-ttl 9000 \ - 2>&1 >${FD_LOG_DIR}/nohup & -wait_for_health 9000 - - -# send request -sleep 10 # make sure server is registered to router -port=9000 -curl -X POST "http://0.0.0.0:${port}/v1/chat/completions" \ --H "Content-Type: application/json" \ --d '{ - "messages": [ - {"role": "user", "content": "hello"} - ], - "max_tokens": 20, - "stream": true -}' diff --git a/examples/splitwise/start_v1_tp1.sh b/examples/splitwise/start_v1_tp1.sh index 31eca8ab77f..51f594f9e61 100644 --- a/examples/splitwise/start_v1_tp1.sh +++ b/examples/splitwise/start_v1_tp1.sh @@ -6,22 +6,8 @@ set -e # v0: using splitwise_scheduler or dp_scheduler # v1: using local_scheduler + router -wait_for_health() { - local server_port=$1 - while true; do - status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") - if [ "$status_code" -eq 200 ]; then - break - else - echo "Service not ready. Retrying in 2s..." - sleep 2 - fi - done -} - # prepare environment -MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" - +export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" export FD_DEBUG=1 export ENABLE_V1_KVCACHE_SCHEDULER=1 export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 @@ -37,10 +23,21 @@ fi unset http_proxy && unset https_proxy rm -rf log_* +source ./utils.sh P_PORT=52400 D_PORT=52500 -ROUTER_PORT=52600 +ROUTER_PORT=52700 + +ports=( + $P_PORT $((P_PORT + 1)) $((P_PORT + 2)) $((P_PORT + 3)) $((P_PORT + 4)) $((P_PORT + 5)) + $D_PORT $((D_PORT + 1)) $((D_PORT + 2)) $((D_PORT + 3)) $((D_PORT + 4)) $((D_PORT + 5)) + $ROUTER_PORT +) +check_ports "${ports[@]}" || { + echo "❌ Some ports are in use. Please release them." + exit 1 +} # start router export FD_LOG_DIR="log_router" @@ -50,7 +47,6 @@ nohup python -m fastdeploy.router.launch \ --port ${ROUTER_PORT} \ --splitwise \ 2>&1 >${FD_LOG_DIR}/nohup & -sleep 1 # start prefill export CUDA_VISIBLE_DEVICES=0 @@ -97,12 +93,13 @@ wait_for_health ${D_PORT} # send request sleep 10 # make sure server is registered to router +echo "send request..." curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \ -H "Content-Type: application/json" \ -d '{ "messages": [ {"role": "user", "content": "hello"} ], - "max_tokens": 20, - "stream": true + "max_tokens": 100, + "stream": false }' diff --git a/examples/splitwise/start_v1_tp2.sh b/examples/splitwise/start_v1_tp2.sh deleted file mode 100644 index c58a8a9cead..00000000000 --- a/examples/splitwise/start_v1_tp2.sh +++ /dev/null @@ -1,110 +0,0 @@ -#!/bin/bash -set -e - -# Test splitwise deployment -# There are two methods for splitwise deployment: -# v0: using splitwise_scheduler or dp_scheduler -# v1: using local_scheduler + router - -wait_for_health() { - local server_port=$1 - while true; do - status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") - if [ "$status_code" -eq 200 ]; then - break - else - echo "Service not ready. Retrying in 2s..." - sleep 2 - fi - done -} - -# prepare environment -MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" - -export FD_DEBUG=1 -export ENABLE_V1_KVCACHE_SCHEDULER=0 -export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 - -SCRIPT_PATH=$(readlink -f "$0") -SCRIPT_DIR=$(dirname "$SCRIPT_PATH") -export $(bash ${SCRIPT_DIR}/../../scripts/get_rdma_nics.sh gpu) -echo "KVCACHE_RDMA_NICS:${KVCACHE_RDMA_NICS}" -if [ -z "${KVCACHE_RDMA_NICS}" ]; then - echo "KVCACHE_RDMA_NICS is empty, please check the output of get_rdma_nics.sh" - exit 1 -fi - -unset http_proxy && unset https_proxy -rm -rf log_* - -# start router -export FD_LOG_DIR="log_router" -mkdir -p ${FD_LOG_DIR} - -echo "start router" -router_port=9000 -nohup python -m fastdeploy.router.launch \ - --port ${router_port} \ - --splitwise \ - 2>&1 >${FD_LOG_DIR}/nohup & -sleep 1 - -# start prefill -export CUDA_VISIBLE_DEVICES=0,1 -export FD_LOG_DIR="log_prefill" -mkdir -p ${FD_LOG_DIR} - -echo "start prefill" -nohup python -m fastdeploy.entrypoints.openai.api_server \ - --model ${MODEL_NAME} \ - --port 8100 \ - --metrics-port 8101 \ - --engine-worker-queue-port 8102 \ - --cache-queue-port 8103 \ - --tensor-parallel-size 2 \ - --max-model-len 32768 \ - --splitwise-role "prefill" \ - --pd-comm-port 8104 \ - --rdma-comm-ports 8105,8106 \ - --router "0.0.0.0:${router_port}" \ - 2>&1 >${FD_LOG_DIR}/nohup & - -# wait_for_health 8100 - -# start decode -export CUDA_VISIBLE_DEVICES=2,3 -export FD_LOG_DIR="log_decode" -mkdir -p ${FD_LOG_DIR} - -echo "start decode" -nohup python -m fastdeploy.entrypoints.openai.api_server \ - --model ${MODEL_NAME} \ - --port 8200 \ - --metrics-port 8201 \ - --engine-worker-queue-port 8202 \ - --cache-queue-port 8203 \ - --max-model-len 32768 \ - --tensor-parallel-size 2 \ - --splitwise-role "decode" \ - --pd-comm-port 8204 \ - --rdma-comm-ports 8205,8206 \ - --router "0.0.0.0:${router_port}" \ - 2>&1 >${FD_LOG_DIR}/nohup & - -wait_for_health 8200 - - - -# send request -sleep 10 # make sure server is registered to router -port=9000 -curl -X POST "http://0.0.0.0:${port}/v1/chat/completions" \ --H "Content-Type: application/json" \ --d '{ - "messages": [ - {"role": "user", "content": "hello"} - ], - "max_tokens": 20, - "stream": true -}' diff --git a/examples/splitwise/stop.sh b/examples/splitwise/stop.sh index 5b0f13c5d95..943efa12c58 100644 --- a/examples/splitwise/stop.sh +++ b/examples/splitwise/stop.sh @@ -1,7 +1,6 @@ pkill -9 -f python pkill -9 -f fastdeploy -pkill -f -9 gunicorn +pkill -9 -f gunicorn +pkill -9 -f redis-server -if redis-cli ping >/dev/null 2>&1; then - redis-cli shutdown -fi +sleep 1 diff --git a/examples/splitwise/utils.sh b/examples/splitwise/utils.sh new file mode 100644 index 00000000000..b02784ac046 --- /dev/null +++ b/examples/splitwise/utils.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +check_ports() { + for port in "$@"; do + if ss -tuln | grep -q ":$port "; then + echo "❌ Port $port is already in use" + return 1 + fi + done + return 0 +} + +wait_for_health() { + local server_port=$1 + while true; do + status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") + if [ "$status_code" -eq 200 ]; then + break + else + echo "Service not ready. Retrying in 4s..." + sleep 4 + fi + done +} diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 9f0db935cf5..f268eb84af0 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -23,7 +23,7 @@ import traceback import weakref from concurrent.futures import ThreadPoolExecutor -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple import numpy as np import paddle @@ -324,18 +324,18 @@ def start_worker_queue_service(self, start_queue): local_data_parallel_id=self.cfg.parallel_config.local_data_parallel_id, ) - def insert_tasks(self, tasks: Union[List[Request], List[RequestOutput]], current_id=-1): + def insert_tasks(self, tasks: List[Request], current_id=-1): """ - Insert tasks to engine. + Allocate resource and insert tasks to engine. + Used in v0_kvcache_scheduler. """ + if not isinstance(tasks, list): + tasks = [tasks] for task in tasks: start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER) self.resource_manager.check_and_free_block_tables() - if not isinstance(tasks, list): - tasks = [tasks] - need_delete_tasks = [] for task in tasks: if self.cfg.scheduler_config.splitwise_role != "mixed": @@ -388,7 +388,11 @@ def insert_tasks(self, tasks: Union[List[Request], List[RequestOutput]], current is_prefill = True self.token_processor.number_of_input_tokens += tasks[i].prompt_token_ids_len - self.split_connector.send_cache_infos(tasks, current_id) + if self.cfg.scheduler_config.splitwise_role == "prefill": + self.split_connector.send_cache_info_to_messager(tasks, current_id) + elif self.cfg.scheduler_config.splitwise_role == "decode": + self.split_connector.send_cache_info_to_prefill(tasks) + if not is_decode: self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}") for task in tasks: @@ -406,7 +410,8 @@ def insert_tasks(self, tasks: Union[List[Request], List[RequestOutput]], current def _insert_prefilled_requests(self, request_outputs: List[RequestOutput]): """ - insert prefilled requests into engine worker queue. + Decode insert prefilled requests into engine worker queue. + Used in v1_kvcache_scheduler. Args: request_outputs: a list of RequestOutput sent by prefill instance """ @@ -640,8 +645,9 @@ def _schedule_request_to_worker(self): time.sleep(0.001) continue if self.cfg.scheduler_config.splitwise_role == "decode": - # Decode will instert the request sent by prefill to engine, - # so the task sent by client will be ignored + # TODO: refine scheduler to remove this limitation + # Decode will process and schedule the request sent by prefill to engine, + # so the same request sent by the decode api server will be ignored continue llm_logger.debug(f"get tasks from scheduler: {tasks}") @@ -692,8 +698,9 @@ def _fetch_request(): trace_print(LoggingEventName.REQUEST_QUEUE_END, task.request_id, getattr(task, "user", "")) if self.cfg.scheduler_config.splitwise_role == "decode": - # Decode will instert the request sent by prefill to engine, - # so the task sent by client will be ignored + # TODO: refine scheduler to remove this limitation + # Decode will process and schedule the request sent by prefill to engine, + # so the same request sent by the decode api server will be ignored is_fetching = False return @@ -744,11 +751,11 @@ def _fetch_request(): for tmp_task in need_delete_tasks: tasks.remove(tmp_task) # release resource in P - self.resource_manager.prerelease_resource(tmp_task) + self.resource_manager.pre_recycle_resource(tmp_task.request_id) if self.cfg.scheduler_config.splitwise_role == "prefill": # to send cache info to cache messager if tasks: - self.split_connector.send_cache_infos(tasks, 0) + self.split_connector.send_cache_info_to_messager(tasks, 0) # ensure cache tasks has sent to cache_messager need_check_req_ids = [task.request_id for task in tasks] while need_check_req_ids: @@ -1002,7 +1009,7 @@ def _zmq_send_generated_tokens(self): else: new_contents.append(content) if len(new_contents): - llm_logger.debug(f"Send response for request id: {request_id}") + llm_logger.debug(f"Send response for request id: {request_id}, {new_contents}") self.send_response_server.send_response(request_id, new_contents) except Exception as e: llm_logger.error(f"Unexcepted error happend: {e}, {traceback.format_exc()!s}") @@ -1041,7 +1048,7 @@ def _process_allocate_resource_requests(): if envs.ENABLE_V1_KVCACHE_SCHEDULER: if self.resource_manager.preallocate_resource_in_d(task): self.llm_logger.info(f"Resource available, processing task {task.request_id}") - self.split_connector.send_cache_infos([task], -1) + self.split_connector.send_cache_info_to_prefill([task]) processed_indices.append(idx) is_success = True else: @@ -1054,7 +1061,7 @@ def _process_allocate_resource_requests(): if not is_success: if not self.enable_decode_cache_task: task.error_msg = "Not enough resources" - self.split_connector.send_cache_infos([task], -1) + self.split_connector.send_cache_info_to_prefill([task]) processed_indices.append(idx) else: self.llm_logger.debug(f"Still waiting for resources {task.request_id}") @@ -1067,13 +1074,16 @@ def _process_prefilled_requests(): nonlocal prefilled_request_ouputs ready_request_outputs = [] waiting_request_outputs = [] - # Waiting for the api_server and scheduler in decode to - # receive the request sent by the client - for task in prefilled_request_ouputs: - if not hasattr(self.scheduler, "has_request") or self.scheduler.has_request(task.request_id): - ready_request_outputs.append(task) - else: - waiting_request_outputs.append(task) + + for req_output in prefilled_request_ouputs: + if hasattr(self.scheduler, "has_request") and not self.scheduler.has_request(req_output.request_id): + # ensure the api_server and scheduler in decode have + # received the request sent by the client + waiting_request_outputs.append(req_output) + continue + + ready_request_outputs.append(req_output) + self.llm_logger.debug(f"there are enough resource for prefilled request: {req_output.request_id}") prefilled_request_ouputs = waiting_request_outputs if self.cfg.splitwise_version == "v1": @@ -1083,35 +1093,27 @@ def _process_prefilled_requests(): if not envs.ENABLE_V1_KVCACHE_SCHEDULER: self._insert_prefilled_requests(ready_request_outputs) else: - for task in ready_request_outputs: - if envs.FD_ENABLE_INTERNAL_ADAPTER: - if ( - not task.outputs.token_ids - ): # first token is eos in Prefill, just recycle resource and continue - cur_req = self.resource_manager.requests[task.request_id] - self.resource_manager.stop_flags[cur_req.idx] = True - self.resource_manager.tasks_list[cur_req.idx] = None - self.resource_manager._free_blocks(cur_req) - if cur_req.request_id in self.token_processor.tokens_counter: - del self.token_processor.tokens_counter[task.request_id] - self.llm_logger.warning(f"{task.request_id} need not decode after first token") - del self.resource_manager.requests[task.request_id] - del self.resource_manager.req_dict[task.request_id] - continue - if task.error_code != 200: - cur_req = self.resource_manager.requests[task.request_id] - self.resource_manager.stop_flags[cur_req.idx] = True - self.resource_manager.tasks_list[cur_req.idx] = None - self.resource_manager._free_blocks(cur_req) - if cur_req.request_id in self.token_processor.tokens_counter: - del self.token_processor.tokens_counter[task.request_id] - self.scheduler.put_results([task]) + for req_output in ready_request_outputs: + request_id = req_output.request_id + if envs.FD_ENABLE_INTERNAL_ADAPTER and not req_output.outputs.token_ids: + # first token is eos in Prefill, just recycle resource and continue + self.llm_logger.warning(f"{request_id} need not decode after first token") + self.resource_manager.pre_recycle_resource(request_id) + if request_id in self.token_processor.tokens_counter: + del self.token_processor.tokens_counter[request_id] + continue + if req_output.error_code != 200: self.llm_logger.warning( - f"{task.request_id} prefill failed with msg:{task.error_msg}, recycle resource." + f"{request_id} prefill failed with msg:{req_output.error_msg}, recycle resource." ) + self.resource_manager.pre_recycle_resource(request_id) + if request_id in self.token_processor.tokens_counter: + del self.token_processor.tokens_counter[request_id] + self.scheduler.put_results([req_output]) continue - self.token_processor.tokens_counter[task.request_id] = 1 - self.resource_manager.insert_task_for_decoding(task) + self.token_processor.tokens_counter[request_id] = 1 + self.resource_manager.add_prefilled_request(req_output) + self.llm_logger.debug(f"add prefilled request success, {request_id}") def decode_loop(): while self.running: diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 40e02445bdc..0b3cc9c55f0 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -403,8 +403,10 @@ def _exit_sub_services(self): llm_logger.info("Engine shut down, exiting sub services...") if hasattr(self, "cache_manager_processes"): - self.engine.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear() - self.engine.resource_manager.cache_manager.cache_ready_signal.clear() + if hasattr(self.engine.resource_manager.cache_manager, "shm_cache_task_flag_broadcast"): + self.engine.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear() + if hasattr(self.engine.resource_manager.cache_manager, "cache_ready_signal"): + self.engine.resource_manager.cache_manager.cache_ready_signal.clear() for p in self.cache_manager_processes: llm_logger.info(f"Killing cache manager process {p.pid}") try: diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 2d6641ed91c..7bb03594518 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -166,6 +166,7 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l # Priority queues for requests. self.waiting: deque[Request] = deque() self.running: list[Request] = [] + self.preallocated_reqs: dict[str, Request] = {} self.enable_max_prefill = envs.FD_ENABLE_MAX_PREFILL self.finish_execution_pool = ThreadPoolExecutor(max_workers=1) self.lock = threading.Lock() @@ -906,18 +907,20 @@ def add_request(self, request: Request) -> None: self.waiting.append(request) self.requests[request.request_id] = request - def prerelease_resource(self, request: Request): + def pre_recycle_resource(self, request_id: str): """ - Release resource in P or D before finished due to unexpected error. + Recycle resource in P or D before finished due to unexpected error. """ with self.lock: - self.tasks_list[request.idx] = None - self.stop_flags[request.idx] = True - if request.request_id in self.requests: - del self.requests[request.request_id] - if request.request_id in self.req_dict: - del self.req_dict[request.request_id] - self._free_blocks(request) + if request_id not in self.requests: + return + req = self.requests[request_id] + self.tasks_list[req.idx] = None + self.stop_flags[req.idx] = True + self._free_blocks(req) + del self.requests[request_id] + if request_id in self.req_dict: + del self.req_dict[request_id] def add_request_in_p(self, requests: list[Request]): with self.lock: @@ -951,7 +954,7 @@ def preallocate_resource_in_p(self, request: Request): if not success: self._free_blocks(request) return False - # consider for mtp, plus enc_dec_block_num + need_extra_prefill_blocks = need_prealloc_prefill_blocks - request.cache_info[0] if self.cache_manager.can_allocate_gpu_blocks(need_extra_prefill_blocks): request.block_tables.extend(self.cache_manager.allocate_gpu_blocks(need_extra_prefill_blocks)) @@ -987,49 +990,62 @@ def preallocate_resource_in_d(self, request: Request): If can not, return False """ assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method" + if request.reasoning_max_tokens is not None: + request.reasoning_max_tokens -= 1 + request.need_prefill_tokens = len(request.prompt_token_ids) + need_prealloc_prefill_blocks = ( + request.need_prefill_tokens + self.config.cache_config.block_size - 1 + ) // self.config.cache_config.block_size + self.config.cache_config.enc_dec_block_num + with self.lock: if len(self.waiting) > 0: return False if self.available_batch() == 0: return False - if request.reasoning_max_tokens is not None: - request.reasoning_max_tokens -= 1 - request.need_prefill_tokens = len(request.prompt_token_ids) - need_prealloc_prefill_blocks = ( - request.need_prefill_tokens + self.config.cache_config.block_size - 1 - ) // self.config.cache_config.block_size + self.config.cache_config.enc_dec_block_num # consider for mtp, plus enc_dec_block_num - if self.cache_manager.can_allocate_gpu_blocks(need_prealloc_prefill_blocks): - request.block_tables.extend(self.cache_manager.allocate_gpu_blocks(need_prealloc_prefill_blocks)) - request.num_computed_tokens = request.need_prefill_tokens - request.disaggregate_info["block_tables"] = request.block_tables - allocated_position = self.get_available_position() - request.idx = allocated_position - self.tasks_list[request.idx] = request - self.stop_flags[request.idx] = False - self.requests[request.request_id] = request - self.req_dict[request.request_id] = allocated_position - return True - return False + if not self.cache_manager.can_allocate_gpu_blocks(need_prealloc_prefill_blocks): + return False - def insert_task_for_decoding(self, request_output_in_p: RequestOutput): + request.block_tables.extend(self.cache_manager.allocate_gpu_blocks(need_prealloc_prefill_blocks)) + request.num_computed_tokens = request.need_prefill_tokens + request.disaggregate_info["block_tables"] = request.block_tables + allocated_position = self.get_available_position() + request.idx = allocated_position + self.tasks_list[request.idx] = request + self.stop_flags[request.idx] = False + self.requests[request.request_id] = request + self.req_dict[request.request_id] = allocated_position + return True + + def has_resource_for_prefilled_req(self, request_id: str): """ - In P/D aggregated deployment, D should continue to decode after recieving first token and cache from P. + Check whether there are enough slot and gpu resource for the prefilled request, + of which the cache is saved in cpu buffer. """ assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method" - with self.lock: - request = self.requests[request_output_in_p.request_id] - request.output_token_ids.append(request_output_in_p.outputs.token_ids[0]) - request.num_cached_tokens = request_output_in_p.num_cached_tokens - if ( - self.config.speculative_config.method in ["mtp"] - and self.config.scheduler_config.splitwise_role == "decode" - ): - request.draft_token_ids = copy.deepcopy(request_output_in_p.outputs.draft_token_ids) - # update request.need_prefill_tokens - request.need_prefill_tokens = len(request.prompt_token_ids) + 1 - request.inference_start_time = time.time() - request.schedule_start_time = time.time() - self.running.append(request) + assert request_id in self.preallocated_reqs, "request_id must be in preallocate" + need_blocks_num = len(self.preallocated_reqs[request_id].disaggregate_info["block_tables"]) + return self.available_batch() > 0 and self.cache_manager.can_allocate_gpu_blocks(need_blocks_num) + + def add_prefilled_request(self, request_output: RequestOutput): + """ + In P/D aggregated deployment, D should continue to decode after receiving first token and cache from P. + NOTE: GPU resources should be checked in advance to ensure they are sufficient for the prefilled request. + """ + assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method" + request = self.requests[request_output.request_id] + + # update request and insert to running + request.output_token_ids.append(request_output.outputs.token_ids[0]) + request.num_cached_tokens = request_output.num_cached_tokens + if ( + self.config.speculative_config.method in ["mtp"] + and self.config.scheduler_config.splitwise_role == "decode" + ): + request.draft_token_ids = copy.deepcopy(request_output.outputs.draft_token_ids) + request.need_prefill_tokens = len(request.prompt_token_ids) + 1 + request.inference_start_time = time.time() + request.schedule_start_time = time.time() + self.running.append(request) def _free_blocks(self, request: Request): if self.config.cache_config.enable_prefix_caching: diff --git a/fastdeploy/router/__init__.py b/fastdeploy/router/__init__.py index 31be300c18e..029a4b4fed0 100644 --- a/fastdeploy/router/__init__.py +++ b/fastdeploy/router/__init__.py @@ -13,3 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ + +from .router import RouterArgs, launch_router + +__all__ = ["RouterArgs", "launch_router"] diff --git a/fastdeploy/router/launch.py b/fastdeploy/router/launch.py index 421baa65e82..46c3c03a14f 100644 --- a/fastdeploy/router/launch.py +++ b/fastdeploy/router/launch.py @@ -14,41 +14,18 @@ # limitations under the License. """ -import argparse - -from fastdeploy.router.router import start_router +from fastdeploy.router.router import RouterArgs, launch_router +from fastdeploy.utils import FlexibleArgumentParser from fastdeploy.utils import router_logger as logger def main() -> None: - parser = argparse.ArgumentParser(description="Router for splitwise deployment testing") - parser.add_argument( - "--host", - type=str, - default="0.0.0.0", - help="Host address to bind the router server.", - ) - parser.add_argument( - "--port", - type=int, - default="9000", - help="Port number to bind the router server", - ) - parser.add_argument( - "--splitwise", - action="store_true", - help="Router uses splitwise deployment", - ) - parser.add_argument( - "--request-timeout-secs", - type=int, - default=1800, - help="Request timeout in seconds", - ) + parser = FlexibleArgumentParser() + parser = RouterArgs.add_cli_args(parser) args = parser.parse_args() try: - start_router(args) + launch_router(args) except Exception as e: logger.error(f"Error starting router: {e}") raise e diff --git a/fastdeploy/router/router.py b/fastdeploy/router/router.py index 3ff4cd37125..fb5522abbe6 100644 --- a/fastdeploy/router/router.py +++ b/fastdeploy/router/router.py @@ -6,6 +6,7 @@ import asyncio import random +from dataclasses import dataclass from itertools import chain from uuid import uuid4 @@ -19,11 +20,60 @@ InstanceRole, check_service_health_async, ) +from fastdeploy.utils import FlexibleArgumentParser from fastdeploy.utils import router_logger as logger app = FastAPI() +@dataclass +class RouterArgs: + host: str = "0.0.0.0" + """ + Host address to bind the router server + """ + port: str = "9000" + """ + Port to bind the router server. + """ + splitwise: bool = False + """ + Router uses splitwise deployment + """ + request_timeout_secs: int = 1800 + """ + Request timeout in seconds + """ + + @staticmethod + def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: + parser.add_argument( + "--host", + type=str, + default=RouterArgs.host, + help="Host address to bind the router server.", + ) + parser.add_argument( + "--port", + type=int, + default=RouterArgs.port, + help="Port number to bind the router server", + ) + parser.add_argument( + "--splitwise", + action="store_true", + default=RouterArgs.splitwise, + help="Router uses splitwise deployment", + ) + parser.add_argument( + "--request-timeout-secs", + type=int, + default=RouterArgs.request_timeout_secs, + help="Request timeout in seconds", + ) + return parser + + class Router: """ Router class that handles requests from client and @@ -306,8 +356,9 @@ async def health_generate(): return Response(status_code=200) -def start_router(router_args): +def launch_router(router_args: RouterArgs): app.state.router_args = router_args + print(f"Starting router with args: {router_args}") @app.on_event("startup") async def startup_event(): diff --git a/fastdeploy/splitwise/splitwise_connector.py b/fastdeploy/splitwise/splitwise_connector.py index 8daab42ddf1..5952c3ef829 100644 --- a/fastdeploy/splitwise/splitwise_connector.py +++ b/fastdeploy/splitwise/splitwise_connector.py @@ -53,7 +53,6 @@ def __init__(self, cfg, worker_queue, resource_manager): self.engine_worker_queue = worker_queue self.resource_manager = resource_manager self.connect_innode_instances = {} - self.temp_cache_info = dict() self.current_request_ids = dict() self.idx = self.cfg.parallel_config.local_data_parallel_id self.enable_decode_cache_task = envs.FD_ENABLE_CACHE_TASK == "1" @@ -291,98 +290,96 @@ def check_decode_allocated(self, task): self.logger.error(f"Receive_decode_allocated error: {msg}") return False, msg - def send_cache_infos(self, tasks: List[Request], current_id): + def send_cache_info_to_messager(self, tasks: List[Request], current_id): """ - Send cache information to specific port. + Prefill sends the request with allocated block ids to cache messager by engine worker queue. - Parameters: - tasks (list): List of tasks. - current_id (int): Current id to indicate the prefill number. - - Returns: - bool: Whether it is in decode status. + args: + tasks (list): List of tasks. + current_id (int): Current id to indicate the prefill number. """ - is_decode = False - temp_cache_info = dict() + cache_info = [] for i in range(len(tasks)): - if tasks[i].disaggregate_info is None: + dsg_info = tasks[i].disaggregate_info + if dsg_info is None: continue - self.logger.info(f"{tasks[i].disaggregate_info}") - if tasks[i].disaggregate_info["role"] == "decode": - if tasks[i].disaggregate_info["transfer_protocol"] == "ipc": - cache_info = { - "request_id": tasks[i].request_id, - "device_ids": self.cfg.parallel_config.device_ids.split(","), - "transfer_protocol": "ipc", - "dest_block_ids": tasks[i].disaggregate_info["block_tables"], - } - if tasks[i].disaggregate_info["cache_info"]["ipc"]["port"] not in temp_cache_info: - temp_cache_info[tasks[i].disaggregate_info["cache_info"]["ipc"]["port"]] = [] - temp_cache_info[tasks[i].disaggregate_info["cache_info"]["ipc"]["port"]].append(cache_info) - else: - addr = ( - f"{tasks[i].disaggregate_info['cache_info']['rdma']['ip']}:" - + f"{tasks[i].disaggregate_info['cache_info']['rdma']['port']}" - ) - if tasks[i].get("error_msg", None) is not None: - cache_info = { - "request_id": tasks[i].request_id, - "error_msg": tasks[i].get("error_msg"), - } - else: - cache_info = { - "request_id": tasks[i].request_id, - "device_ids": self.cfg.parallel_config.device_ids.split(","), - "ip": self.cfg.host_ip, - "rdma_ports": self.cfg.disaggregate_info["cache_info"]["rdma"]["rdma_port"], - "transfer_protocol": "rdma", - "dest_block_ids": tasks[i].disaggregate_info["block_tables"], - } - if addr not in temp_cache_info: - temp_cache_info[addr] = [] - - temp_cache_info[addr].append(cache_info) - is_decode = True + if envs.ENABLE_V1_KVCACHE_SCHEDULER: + info = { + "request_id": tasks[i].request_id, + "src_block_ids": tasks[i].block_tables, + "current_id": tasks[i].idx, + "need_prefill_tokens": tasks[i].need_prefill_tokens, + } else: - addr = "prefill" if current_id == -1: - current_id = tasks[i].disaggregate_info["cache_info"]["ipc"]["current_id"] - if envs.ENABLE_V1_KVCACHE_SCHEDULER: - cache_info = { + current_id = dsg_info["cache_info"]["ipc"]["current_id"] + info = { + "request_id": tasks[i].request_id, + "src_block_ids": tasks[i].block_tables, + "current_id": current_id, + } + cache_info.append(info) + + self.logger.debug(f"send_cache_info_to_messager, {cache_info}") + self.engine_worker_queue.put_cache_info(cache_info) + + def send_cache_info_to_prefill(self, tasks: List[Request]): + """ + Decode sends the request with allocated block ids to prefill. + + args: + tasks (list): List of tasks. + """ + cache_info = dict() + for i in range(len(tasks)): + dsg_info = tasks[i].disaggregate_info + if dsg_info is None: + self.logger.debug(f"skip send_cache_infos_to_prefill, {tasks[i].request_id}") + continue + self.logger.debug(f"send_cache_infos_to_prefill, {dsg_info}") + + if dsg_info["transfer_protocol"] == "ipc": + info = { + "request_id": tasks[i].request_id, + "device_ids": self.cfg.parallel_config.device_ids.split(","), + "transfer_protocol": "ipc", + "dest_block_ids": dsg_info["block_tables"], + } + if dsg_info["cache_info"]["ipc"]["port"] not in cache_info: + cache_info[dsg_info["cache_info"]["ipc"]["port"]] = [] + cache_info[dsg_info["cache_info"]["ipc"]["port"]].append(info) + else: + if tasks[i].get("error_msg", None) is not None: + info = { "request_id": tasks[i].request_id, - "src_block_ids": tasks[i].block_tables, - "current_id": tasks[i].idx, - "need_prefill_tokens": tasks[i].need_prefill_tokens, + "error_msg": tasks[i].get("error_msg"), } else: - cache_info = { + info = { "request_id": tasks[i].request_id, - "src_block_ids": tasks[i].block_tables, - "current_id": current_id, + "device_ids": self.cfg.parallel_config.device_ids.split(","), + "ip": self.cfg.host_ip, + "rdma_ports": self.cfg.disaggregate_info["cache_info"]["rdma"]["rdma_port"], + "transfer_protocol": "rdma", + "dest_block_ids": dsg_info["block_tables"], } - if addr not in temp_cache_info: - temp_cache_info[addr] = [] - temp_cache_info[addr].append(cache_info) - - if not is_decode and len(temp_cache_info): - for k, v in temp_cache_info.items(): - self.logger.debug(f"send cache info to cachemessager, {v}") - self.engine_worker_queue.put_cache_info(v) - else: - self.logger.debug(f"send cache info to coupled instance, {temp_cache_info}") - if len(temp_cache_info): - for k, v in temp_cache_info.items(): - self.logger.info(f"{k} {v}") - if ":" in str(k): - self._send_message(k, "cache_sync", v) - else: - if k not in self.connect_innode_instances: - self.create_connection(k) - self.connect_innode_instances[k].put_cache_info(v) - - return is_decode + addr = f"{dsg_info['cache_info']['rdma']['ip']}:" + f"{dsg_info['cache_info']['rdma']['port']}" + if addr not in cache_info: + cache_info[addr] = [] + cache_info[addr].append(info) + + self.logger.debug(f"send cache info to prefill, {cache_info}") + if len(cache_info): + for k, v in cache_info.items(): + self.logger.info(f"{k} {v}") + if ":" in str(k): + self._send_message(k, "cache_sync", v) + else: + if k not in self.connect_innode_instances: + self.create_connection(k) + self.connect_innode_instances[k].put_cache_info(v) def _serialize_message(self, msg_type: str, payload) -> bytes: # TODO 压缩 From 380e47a1edbd3851f9f6acc8b77efedbee382cdb Mon Sep 17 00:00:00 2001 From: juncaipeng <13006307475@163.com> Date: Thu, 20 Nov 2025 13:14:32 +0000 Subject: [PATCH 2/2] up --- fastdeploy/engine/sched/resource_manager_v1.py | 3 +++ fastdeploy/router/router.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 7bb03594518..1d799bfed67 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -1032,6 +1032,9 @@ def add_prefilled_request(self, request_output: RequestOutput): NOTE: GPU resources should be checked in advance to ensure they are sufficient for the prefilled request. """ assert self.config.scheduler_config.splitwise_role == "decode", "Only D instance can call this method" + if request_output.request_id not in self.requests: + self.logger.error(f"Request {request_output.request_id} not found in requests") + return request = self.requests[request_output.request_id] # update request and insert to running diff --git a/fastdeploy/router/router.py b/fastdeploy/router/router.py index fb5522abbe6..48bac8633c7 100644 --- a/fastdeploy/router/router.py +++ b/fastdeploy/router/router.py @@ -55,7 +55,7 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: ) parser.add_argument( "--port", - type=int, + type=str, default=RouterArgs.port, help="Port number to bind the router server", )