Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 174 additions & 175 deletions ucm/integration/vllm/ucm_connector.py

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions ucm/store/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ def create_connector(cls, connector_name: str, config: dict) -> UcmKVStoreBase:
UcmConnectorFactory.register_connector(
"UcmNfsStore", "ucm.store.nfsstore.nfsstore_connector", "UcmNfsStore"
)
UcmConnectorFactory.register_connector(
"UcmPcStore", "ucm.store.pcstore.pcstore_connector", "UcmPcStore"
)
UcmConnectorFactory.register_connector(
"UcmMooncakeStore",
"ucm.store.mooncakestore.mooncake_connector",
Expand Down
62 changes: 62 additions & 0 deletions ucm/store/factory_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# MIT License
#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

import importlib
from typing import Callable

from ucm.logger import init_logger
from ucm.store.ucmstore_v1 import UcmKVStoreBaseV1

logger = init_logger(__name__)


class UcmConnectorFactoryV1:
_registry: dict[str, Callable[[], type[UcmKVStoreBaseV1]]] = {}

@classmethod
def register_connector(cls, name: str, module_path: str, class_name: str) -> None:
"""Register a connector with a lazy-loading module and class name."""
if name in cls._registry:
raise ValueError(f"Connector '{name}' is already registered.")

def loader() -> type[UcmKVStoreBaseV1]:
module = importlib.import_module(module_path)
return getattr(module, class_name)

cls._registry[name] = loader

@classmethod
def create_connector(cls, connector_name: str, config: dict) -> UcmKVStoreBaseV1:
if connector_name in cls._registry:
connector_cls = cls._registry[connector_name]()
else:
raise ValueError(f"Unsupported connector type: {connector_name}")
assert issubclass(connector_cls, UcmKVStoreBaseV1)
logger.info("Creating connector with name: %s", connector_name)
return connector_cls(config)


UcmConnectorFactoryV1.register_connector(
"UcmPcStore", "ucm.store.pcstore.pcstore_connector", "UcmPcStore"
)
11 changes: 8 additions & 3 deletions ucm/store/pcstore/cc/domain/trans/trans_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,14 @@ Status TransQueue::Setup(const int32_t deviceId, const size_t streamNumber, cons
UC_ERROR("Failed({}) to make host buffer({},{}).", ts.ToString(), blockSize, bufferNumber);
return Status::Error();
}
auto success =
this->devPool_.SetWorkerFn([this](auto t, auto) { this->DeviceWorker(std::move(t)); })
.Run();
auto success = this->devPool_
.SetWorkerInitFn([deviceId](auto&) {
Trans::Device device;
auto ts = device.Setup(deviceId);
return ts.Success();
})
.SetWorkerFn([this](auto t, auto) { this->DeviceWorker(std::move(t)); })
.Run();
if (!success) { return Status::Error(); }
success = this->filePool_.SetWorkerFn([this](auto t, auto) { this->FileWorker(std::move(t)); })
.SetNWorker(streamNumber)
Expand Down
6 changes: 3 additions & 3 deletions ucm/store/pcstore/cc/domain/trans/trans_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ class TransTask {
: id{NextId()}, type{std::move(type)}, startTp{NowTp()}, brief_{std::move(brief)}
{
}
void Append(const std::string& block, const uintptr_t address)
void Append(const std::string& block, const std::vector<uintptr_t>& addresses)
{
grouped_[block].push_back(address);
number_++;
grouped_[block] = addresses;
number_ += addresses.size();
}
auto Str() const noexcept { return fmt::format("{},{},{}", id, brief_, number_); }
size_t GroupNumber() const { return grouped_.size(); }
Expand Down
4 changes: 3 additions & 1 deletion ucm/store/pcstore/cpy/pcstore.py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ class PcStorePy : public PcStore {
auto blockId = blockIds.begin();
auto address = addresses.begin();
while ((blockId != blockIds.end()) && (address != addresses.end())) {
task.Append(blockId->cast<std::string>(), address->cast<uintptr_t>());
std::string id = blockId->cast<py::bytes>();
std::vector<uintptr_t> addrs = address->cast<std::vector<uintptr_t>>();
task.Append(id, addrs);
blockId++;
address++;
}
Expand Down
70 changes: 36 additions & 34 deletions ucm/store/pcstore/pcstore_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@
import torch

from ucm.store.pcstore import ucmpcstore
from ucm.store.ucmstore import Task, UcmKVStoreBase
from ucm.store.ucmstore_v1 import Task, UcmKVStoreBaseV1


@dataclass
class NfsTask(Task):
class PcTask(Task):
task_id: int


class UcmPcStore(UcmKVStoreBase):
class UcmPcStore(UcmKVStoreBaseV1):
def __init__(self, config: Dict):
super().__init__(config)
self.store = ucmpcstore.PcStore()
storage_backends = [
path for path in config["storage_backends"].split(":") if path
]
block_size = int(config["kv_block_size"])
block_size = config.get("kv_block_size", 33554432)
transfer_enable = True if config["role"] == "worker" else False
param = ucmpcstore.PcStore.Config(storage_backends, block_size, transfer_enable)
if transfer_enable:
Expand All @@ -52,8 +52,8 @@ def __init__(self, config: Dict):
param.transferIoDirect = config.get("use_direct", False)
param.transferStreamNumber = config.get("stream_number", 8)
param.transferBufferNumber = config.get("buffer_number", 4096)
param.transferLocalRankSize = config.get("local_rank_size", 8)
param.transferScatterGatherEnable = config.get("use_scatter_gatter", False)
param.transferLocalRankSize = config.get("local_rank_size", 1)
ret = self.store.Setup(param)
if ret != 0:
msg = f"Failed to initialize ucmpcstore, errcode: {ret}."
Expand All @@ -62,52 +62,54 @@ def __init__(self, config: Dict):
def cc_store(self) -> int:
return self.store.CCStoreImpl()

def create(self, block_ids: List[str]) -> List[int]:
return self.store.AllocBatch(block_ids)

def lookup(self, block_ids: List[str]) -> List[bool]:
def lookup(self, block_ids: List[bytes]) -> List[bool]:
return self.store.LookupBatch(block_ids)

def prefetch(self, block_ids: List[str]) -> None:
def prefetch(self, block_ids: List[bytes]) -> None:
pass

def load(
self, block_ids: List[str], offset: List[int], dst_tensor: List[torch.Tensor]
self,
block_ids: List[bytes],
shard_index: List[int],
dst_tensor: List[List[torch.Tensor]],
) -> Task:
dst_tensor_ptr = [t.data_ptr() for t in dst_tensor]
task_id = self.store.LoadToDevice(block_ids, dst_tensor_ptr)
return NfsTask(task_id=task_id)
dst_tensor_ptrs = [[t.data_ptr() for t in tensors] for tensors in dst_tensor]
task_id = self.store.LoadToDevice(block_ids, dst_tensor_ptrs)
return PcTask(task_id=task_id)

def dump(
self, block_ids: List[str], offset: List[int], src_tensor: List[torch.Tensor]
self,
block_ids: List[bytes],
shard_index: List[int],
src_tensor: List[List[torch.Tensor]],
) -> Task:
src_tensor_ptr = [t.data_ptr() for t in src_tensor]
task_id = self.store.DumpFromDevice(block_ids, src_tensor_ptr)
return NfsTask(task_id=task_id)
src_tensor_ptrs = [[t.data_ptr() for t in tensors] for tensors in src_tensor]
task_id = self.store.DumpFromDevice(block_ids, src_tensor_ptrs)
return PcTask(task_id=task_id)

def fetch_data(
self,
block_ids: List[str],
offset: List[int],
dst_addr: List[int],
size: List[int],
block_ids: List[bytes],
shard_index: List[int],
dst_addr: List[List[int]],
) -> Task:
pass
task_id = self.store.LoadToDevice(block_ids, dst_addr)
return task_id

def dump_data(
self,
block_ids: List[str],
offset: List[int],
src_addr: List[int],
size: List[int],
block_ids: List[bytes],
shard_index: List[int],
src_addr: List[List[int]],
) -> Task:
pass

def wait(self, task: Task) -> int:
return self.store.Wait(task.task_id)
task_id = self.store.DumpFromDevice(block_ids, src_addr)
return task_id

def commit(self, block_ids: List[str], is_success: bool = True) -> None:
self.store.CommitBatch(block_ids, is_success)
def wait(self, task: Task) -> None:
ret = self.store.Wait(task.task_id)
if ret != 0:
raise RuntimeError(f"Wait failed for task {task.task_id}, return={ret}")

def check(self, task: Task) -> Tuple[int, bool]:
def check(self, task: Task) -> bool:
return self.store.Check(task.task_id)
51 changes: 16 additions & 35 deletions ucm/store/test/e2e/pcstore_embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import torch

from ucm.store.pcstore.pcstore_connector import UcmPcStore
from ucm.store.ucmstore import UcmKVStoreBase
from ucm.store.ucmstore_v1 import UcmKVStoreBaseV1


def setup_store(storage_backends, block_size, device_id, io_size) -> UcmKVStoreBase:
def setup_store(storage_backends, block_size, device_id, io_size) -> UcmKVStoreBaseV1:
config = {}
config["storage_backends"] = storage_backends
config["kv_block_size"] = block_size
Expand All @@ -46,7 +46,7 @@ def setup_store(storage_backends, block_size, device_id, io_size) -> UcmKVStoreB
def make_buffers(
block_number, device_id, batch_size, block_dim, block_len, block_layer
):
hashes = [secrets.token_hex(16) for _ in range(block_number)]
hashes = [secrets.token_bytes(16) for _ in range(block_number)]
tensors = [
[
torch.rand(
Expand All @@ -61,44 +61,25 @@ def make_buffers(
return hashes, tensors


def embed(store: UcmKVStoreBase, hashes: List[str], tensors: List[List[torch.Tensor]]):
results = store.create(hashes)
assert sum(results) == 0
block_ids = []
offsets = []
layers = []
for hash_id, block in zip(hashes, tensors):
offset = 0
for layer in block:
block_ids.append(hash_id)
offsets.append(offset)
layers.append(layer)
offset += layer.untyped_storage().size()
task = store.dump(block_ids, offsets, layers)
def embed(
store: UcmKVStoreBaseV1, hashes: List[bytes], tensors: List[List[torch.Tensor]]
):
shard_index = [0] * len(hashes)
task = store.dump(hashes, shard_index, tensors)
assert task.task_id > 0
ret = store.wait(task)
assert ret == 0
store.commit(hashes, True)
store.wait(task)


def fetch(store: UcmKVStoreBase, hashes: List[str], tensors: List[List[torch.Tensor]]):
def fetch(
store: UcmKVStoreBaseV1, hashes: List[bytes], tensors: List[List[torch.Tensor]]
):
founds = store.lookup(hashes)
for found in founds:
assert found
block_ids = []
offsets = []
layers = []
for hash_id, block in zip(hashes, tensors):
offset = 0
for layer in block:
block_ids.append(hash_id)
offsets.append(offset)
layers.append(layer)
offset += layer.untyped_storage().size()
task = store.load(block_ids, offsets, layers)
shard_index = [0] * len(hashes)
task = store.load(hashes, shard_index, tensors)
assert task.task_id > 0
ret = store.wait(task)
assert ret == 0
store.wait(task)


def cmp_and_print_diff(a, b, rtol=0.0, atol=0.0):
Expand All @@ -120,7 +101,7 @@ def store_all_hashes(hashes):
file_path = os.path.join(current_directory, kvcache_block_hashes_file)
with open(file_path, "w", encoding="utf-8") as file:
for hs in hashes:
file.write(hs + "\n")
file.write(hs.hex() + "\n")


def main():
Expand Down
26 changes: 9 additions & 17 deletions ucm/store/test/e2e/pcstore_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import torch

from ucm.store.pcstore.pcstore_connector import UcmPcStore
from ucm.store.ucmstore import UcmKVStoreBase
from ucm.store.ucmstore_v1 import UcmKVStoreBaseV1


def setup_store(storage_backends, block_size, device_id, io_size) -> UcmKVStoreBase:
def setup_store(storage_backends, block_size, device_id, io_size) -> UcmKVStoreBaseV1:
config = {}
config["storage_backends"] = storage_backends
config["kv_block_size"] = block_size
Expand All @@ -48,7 +48,7 @@ def get_hashes(batch_size, batch_number):
file_path = os.path.join(current_directory, kvcache_block_hashes_file)
with open(file_path, "r", encoding="utf-8") as file:
lines = file.readlines()
total = [line.strip() for line in lines]
total = [bytes.fromhex(line.strip()) for line in lines]
hashes = []
for _ in range(batch_number):
hashes.extend(random.sample(total, batch_size))
Expand All @@ -70,24 +70,16 @@ def make_buffers(device_id, batch_size, block_dim, block_len, block_layer):
return tensors


def fetch(store: UcmKVStoreBase, hashes: List[str], tensors: List[List[torch.Tensor]]):
def fetch(
store: UcmKVStoreBaseV1, hashes: List[bytes], tensors: List[List[torch.Tensor]]
):
founds = store.lookup(hashes)
for found in founds:
assert found
block_ids = []
offsets = []
layers = []
for hash_id, block in zip(hashes, tensors):
offset = 0
for layer in block:
block_ids.append(hash_id)
offsets.append(offset)
layers.append(layer)
offset += layer.untyped_storage().size()
task = store.load(block_ids, offsets, layers)
shard_index = [0] * len(hashes)
task = store.load(hashes, shard_index, tensors)
assert task.task_id > 0
ret = store.wait(task)
assert ret == 0
store.wait(task)


def main():
Expand Down
Loading