diff --git a/ucm/store/CMakeLists.txt b/ucm/store/CMakeLists.txt index 34deb2bbb..51c245386 100644 --- a/ucm/store/CMakeLists.txt +++ b/ucm/store/CMakeLists.txt @@ -5,6 +5,7 @@ add_subdirectory(infra) add_subdirectory(device) add_subdirectory(nfsstore) add_subdirectory(pcstore) +add_subdirectory(ds3fsstore) add_subdirectory(dramstore) add_subdirectory(localstore) add_subdirectory(mooncakestore) diff --git a/ucm/store/ds3fsstore/CMakeLists.txt b/ucm/store/ds3fsstore/CMakeLists.txt new file mode 100644 index 000000000..0aab961ab --- /dev/null +++ b/ucm/store/ds3fsstore/CMakeLists.txt @@ -0,0 +1,29 @@ +find_path(HF3FS_USRBIO_INCLUDE_DIR NAMES hf3fs_usrbio.h PATHS /usr/local/3fs/src/lib/api NO_DEFAULT_PATH) +find_library(HF3FS_USRBIO_LIBRARY NAMES hf3fs_api_shared PATHS /usr/local/3fs/src/lib/rs/hf3fs-usrbio-sys/lib NO_DEFAULT_PATH) + +if(HF3FS_USRBIO_INCLUDE_DIR AND HF3FS_USRBIO_LIBRARY) + file(GLOB_RECURSE UCMSTORE_DS3FS_CC_SOURCE_FILES "./cc/*.cc") + add_library(ds3fsstore STATIC ${UCMSTORE_DS3FS_CC_SOURCE_FILES}) + target_include_directories(ds3fsstore PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/cc/api + ${CMAKE_CURRENT_SOURCE_DIR}/cc/domain + ) + + target_include_directories(ds3fsstore PRIVATE ${HF3FS_USRBIO_INCLUDE_DIR}) + target_link_libraries(ds3fsstore PRIVATE ${HF3FS_USRBIO_LIBRARY}) + target_link_libraries(ds3fsstore PUBLIC trans storeinfra) + + file(GLOB_RECURSE UCMSTORE_DS3FS_CPY_SOURCE_FILES "./cpy/*.cc") + pybind11_add_module(ucmds3fsstore ${UCMSTORE_DS3FS_CPY_SOURCE_FILES}) + target_link_libraries(ucmds3fsstore PRIVATE ds3fsstore) + set_target_properties(ucmds3fsstore PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) +else() + message(STATUS "ds3fsstore: Skipping build - required HF3FS dependencies not found") + if(NOT HF3FS_USRBIO_INCLUDE_DIR) + message(STATUS " - Missing: hf3fs_usrbio.h ") + endif() + if(NOT HF3FS_USRBIO_LIBRARY) + message(STATUS " - Missing: hf3fs_api_shared library ") + endif() + message(STATUS " Please ensure HF3FS dependencies are installed or build paths are correct") +endif() \ No newline at end of file diff --git a/ucm/store/ds3fsstore/__init__.py b/ucm/store/ds3fsstore/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ucm/store/ds3fsstore/cc/api/ds3fsstore.cc b/ucm/store/ds3fsstore/cc/api/ds3fsstore.cc new file mode 100644 index 000000000..d5e392960 --- /dev/null +++ b/ucm/store/ds3fsstore/cc/api/ds3fsstore.cc @@ -0,0 +1,115 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "ds3fsstore.h" +#include +#include "logger/logger.h" +#include "space/space_manager.h" +#include "status/status.h" +#include "trans/trans_manager.h" + +namespace UC { + +class Ds3FsStoreImpl : public Ds3FsStore { +public: + int32_t Setup(const Config& config) + { + auto status = this->spaceMgr_.Setup(config.storageBackends, config.kvcacheBlockSize); + if (status.Failure()) { return status.Underlying(); } + if (config.transferEnable) { + status = this->transMgr_.Setup( + config.transferLocalRankSize, config.transferDeviceId, config.transferStreamNumber, + config.kvcacheBlockSize, config.transferIoSize, config.transferIoDirect, + config.transferBufferNumber, this->spaceMgr_.GetSpaceLayout(), + config.transferTimeoutMs, config.mountPoint); + if (status.Failure()) { return status.Underlying(); } + } + this->ShowConfig(config); + return Status::OK().Underlying(); + } + int32_t Alloc(const std::string& block) override { return Status::OK().Underlying(); } + bool Lookup(const std::string& block) override { return this->spaceMgr_.LookupBlock(block); } + void Commit(const std::string& block, const bool success) override {} + std::list Alloc(const std::list& blocks) override + { + std::list results; + for (const auto& block : blocks) { results.emplace_back(this->Alloc(block)); } + return results; + } + std::list Lookup(const std::list& blocks) override + { + std::list founds; + for (const auto& block : blocks) { founds.emplace_back(this->Lookup(block)); } + return founds; + } + void Commit(const std::list& blocks, const bool success) override {} + size_t Submit(TransTask&& task) override + { + auto taskId = TransTask::invalid; + auto status = this->transMgr_.Submit(std::move(task), taskId); + if (status.Failure()) { taskId = TransTask::invalid; } + return taskId; + } + int32_t Wait(const size_t task) override { return this->transMgr_.Wait(task).Underlying(); } + int32_t Check(const size_t task, bool& finish) override + { + return this->transMgr_.Check(task, finish).Underlying(); + } + +private: + void ShowConfig(const Config& config) + { + std::string buildType = UCM_BUILD_TYPE; + if (buildType.empty()) { buildType = "Release"; } + UC_INFO("Ds3FsStore-{}({}).", UCM_COMMIT_ID, buildType); + UC_INFO("Set UC::StorageBackends to {}.", config.storageBackends); + UC_INFO("Set UC::BlockSize to {}.", config.kvcacheBlockSize); + UC_INFO("Set UC::TransferEnable to {}.", config.transferEnable); + UC_INFO("Set UC::MountPoint to {}.", config.mountPoint); + if (!config.transferEnable) { return; } + UC_INFO("Set UC::IoSize to {}.", config.transferIoSize); + UC_INFO("Set UC::IoDirect to {}.", config.transferIoDirect); + UC_INFO("Set UC::LocalRankSize to {}.", config.transferLocalRankSize); + UC_INFO("Set UC::DeviceId to {}.", config.transferDeviceId); + UC_INFO("Set UC::StreamNumber to {}.", config.transferStreamNumber); + UC_INFO("Set UC::BufferNumber to {}.", config.transferBufferNumber); + UC_INFO("Set UC::TimeoutMs to {}.", config.transferTimeoutMs); + } + +private: + SpaceManager spaceMgr_; + TransManager transMgr_; +}; + +int32_t Ds3FsStore::Setup(const Config& config) +{ + auto impl = new (std::nothrow) Ds3FsStoreImpl(); + if (!impl) { + UC_ERROR("Out of memory."); + return Status::OutOfMemory().Underlying(); + } + this->impl_ = impl; + return impl->Setup(config); +} + +} // namespace UC diff --git a/ucm/store/ds3fsstore/cc/api/ds3fsstore.h b/ucm/store/ds3fsstore/cc/api/ds3fsstore.h new file mode 100644 index 000000000..094b7e5e5 --- /dev/null +++ b/ucm/store/ds3fsstore/cc/api/ds3fsstore.h @@ -0,0 +1,92 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DS3FSSTORE_H +#define UNIFIEDCACHE_DS3FSSTORE_H + +#include "trans/trans_task.h" +#include "ucmstore.h" + +namespace UC { + +class Ds3FsStore : CCStore { +public: + struct Config { + std::vector storageBackends; + size_t kvcacheBlockSize; + bool transferEnable; + std::string mountPoint{"/3fs/stage"}; + size_t transferIoSize{262144}; + bool transferIoDirect{false}; + size_t transferLocalRankSize{1}; + int32_t transferDeviceId{-1}; + size_t transferStreamNumber{8}; + size_t transferBufferNumber{4096}; + size_t transferTimeoutMs{30000}; + + Config(const std::vector& storageBackends, const size_t kvcacheBlockSize, + const bool transferEnable) + : storageBackends{storageBackends}, kvcacheBlockSize{kvcacheBlockSize}, + transferEnable{transferEnable} + { + } + }; + +public: + ~Ds3FsStore() override + { + if (this->impl_) { delete this->impl_; } + } + int32_t Setup(const Config& config); + int32_t Alloc(const std::string& block) override { return this->impl_->Alloc(block); } + bool Lookup(const std::string& block) override { return this->impl_->Lookup(block); } + void Commit(const std::string& block, const bool success) override + { + this->impl_->Commit(block, success); + } + std::list Alloc(const std::list& blocks) override + { + return this->impl_->Alloc(blocks); + } + std::list Lookup(const std::list& blocks) override + { + return this->impl_->Lookup(blocks); + } + void Commit(const std::list& blocks, const bool success) override + { + this->impl_->Commit(blocks, success); + } + size_t Submit(TransTask&& task) override { return this->impl_->Submit(std::move(task)); } + int32_t Wait(const size_t task) override { return this->impl_->Wait(task); } + int32_t Check(const size_t task, bool& finish) override + { + return this->impl_->Check(task, finish); + } + +private: + Ds3FsStore* impl_{nullptr}; +}; + +} // namespace UC + +#endif diff --git a/ucm/store/ds3fsstore/cc/domain/space/space_layout.cc b/ucm/store/ds3fsstore/cc/domain/space/space_layout.cc new file mode 100644 index 000000000..06cee30d0 --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/space/space_layout.cc @@ -0,0 +1,132 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "space_layout.h" +#include +#include +#include "file/file.h" +#include "logger/logger.h" + +namespace UC { + +Status SpaceLayout::Setup(const std::vector& storageBackends) +{ + if (storageBackends.empty()) { + UC_ERROR("Empty backend list."); + return Status::InvalidParam(); + } + auto status = Status::OK(); + for (auto& path : storageBackends) { + if ((status = this->AddStorageBackend(path)).Failure()) { return status; } + } + return status; +} + +std::string SpaceLayout::DataFilePath(const std::string& blockId, bool activated) const +{ + const auto& backend = StorageBackend(blockId); + const auto& dir = activated ? TempFileRoot() : DataFileRoot(); + uint64_t front, back; + ShardBlockId(blockId, front, back); + return fmt::format("{}{}{:016x}{:016x}", backend, dir, front, back); +} + +Status SpaceLayout::Commit(const std::string& blockId, bool success) const +{ + const auto& activated = this->DataFilePath(blockId, true); + const auto& archived = this->DataFilePath(blockId, false); + if (success) { return File::Rename(activated, archived); } + File::Remove(activated); + return Status::OK(); +} + +std::vector SpaceLayout::RelativeRoots() const +{ + return {DataFileRoot(), TempFileRoot()}; +} + +Status SpaceLayout::AddStorageBackend(const std::string& path) +{ + auto normalizedPath = path; + if (normalizedPath.back() != '/') { normalizedPath += '/'; } + auto status = Status::OK(); + if (this->storageBackends_.empty()) { + status = this->AddFirstStorageBackend(normalizedPath); + } else { + status = this->AddSecondaryStorageBackend(normalizedPath); + } + if (status.Failure()) { + UC_ERROR("Failed({}) to add storage backend({}).", status, normalizedPath); + } + return status; +} + +Status SpaceLayout::AddFirstStorageBackend(const std::string& path) +{ + for (const auto& root : this->RelativeRoots()) { + auto dir = File::Make(path + root); + if (!dir) { return Status::OutOfMemory(); } + auto status = dir->MkDir(); + if (status == Status::DuplicateKey()) { status = Status::OK(); } + if (status.Failure()) { return status; } + } + this->storageBackends_.emplace_back(path); + return Status::OK(); +} + +Status SpaceLayout::AddSecondaryStorageBackend(const std::string& path) +{ + auto iter = std::find(this->storageBackends_.begin(), this->storageBackends_.end(), path); + if (iter != this->storageBackends_.end()) { return Status::OK(); } + constexpr auto accessMode = IFile::AccessMode::READ | IFile::AccessMode::WRITE; + for (const auto& root : this->RelativeRoots()) { + auto dir = File::Make(path + root); + if (!dir) { return Status::OutOfMemory(); } + if (dir->Access(accessMode).Failure()) { return Status::InvalidParam(); } + } + this->storageBackends_.emplace_back(path); + return Status::OK(); +} + +std::string SpaceLayout::StorageBackend(const std::string& blockId) const +{ + static std::hash hasher; + return this->storageBackends_[hasher(blockId) % this->storageBackends_.size()]; +} + +std::string SpaceLayout::DataFileRoot() const { return "data/"; } + +std::string SpaceLayout::TempFileRoot() const { return "temp/"; } + +void SpaceLayout::ShardBlockId(const std::string& blockId, uint64_t& front, uint64_t& back) const +{ + constexpr size_t blockIdSize = 16; + constexpr size_t nU64PerBlock = blockIdSize / sizeof(uint64_t); + using BlockId = std::array; + static_assert(sizeof(BlockId) == blockIdSize); + auto id = static_cast(static_cast(blockId.data())); + front = id->front(); + back = id->back(); +} + +} // namespace UC diff --git a/ucm/store/ds3fsstore/cc/domain/space/space_layout.h b/ucm/store/ds3fsstore/cc/domain/space/space_layout.h new file mode 100644 index 000000000..a9a7a4d79 --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/space/space_layout.h @@ -0,0 +1,55 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_SPACE_LAYOUT_H +#define UNIFIEDCACHE_SPACE_LAYOUT_H + +#include +#include +#include "status/status.h" + +namespace UC { + +class SpaceLayout { +public: + Status Setup(const std::vector& storageBackends); + std::string DataFilePath(const std::string& blockId, bool activated) const; + Status Commit(const std::string& blockId, bool success) const; + +private: + std::vector RelativeRoots() const; + Status AddStorageBackend(const std::string& path); + Status AddFirstStorageBackend(const std::string& path); + Status AddSecondaryStorageBackend(const std::string& path); + std::string StorageBackend(const std::string& blockId) const; + std::string DataFileRoot() const; + std::string TempFileRoot() const; + void ShardBlockId(const std::string& blockId, uint64_t& front, uint64_t& back) const; + +private: + std::vector storageBackends_; +}; + +} // namespace UC + +#endif diff --git a/ucm/store/ds3fsstore/cc/domain/space/space_manager.cc b/ucm/store/ds3fsstore/cc/domain/space/space_manager.cc new file mode 100644 index 000000000..556646f46 --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/space/space_manager.cc @@ -0,0 +1,86 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "space_manager.h" +#include +#include "file/file.h" +#include "logger/logger.h" + +namespace UC { + +Status SpaceManager::Setup(const std::vector& storageBackends, const size_t blockSize) +{ + if (blockSize == 0) { + UC_ERROR("Invalid block size({}).", blockSize); + return Status::InvalidParam(); + } + auto status = this->layout_.Setup(storageBackends); + if (status.Failure()) { return status; } + this->blockSize_ = blockSize; + return Status::OK(); +} + +Status SpaceManager::NewBlock(const std::string& blockId) +{ + const auto& activated = this->layout_.DataFilePath(blockId, true); + const auto& archived = this->layout_.DataFilePath(blockId, false); + if (File::Access(archived, IFile::AccessMode::EXIST).Success()) { + return Status::DuplicateKey(); + } + auto file = File::Make(activated); + if (!file) { return Status::OutOfMemory(); } + auto mode = IFile::OpenFlag::CREATE | IFile::OpenFlag::EXCL | IFile::OpenFlag::READ_WRITE; + auto s = file->Open(mode); + if (s.Failure()) { + if (s != Status::DuplicateKey()) { return s; } + mode = IFile::OpenFlag::READ_WRITE; + if ((s = file->Open(mode)).Failure()) { return s; } + IFile::FileStat st; + if ((s = file->Stat(st)).Failure()) { return s; } + const auto now = std::chrono::system_clock::now(); + const auto mtime = std::chrono::system_clock::from_time_t(st.st_mtime); + constexpr auto reuseBlockAge = std::chrono::seconds(300); + if (now - mtime <= reuseBlockAge) { return Status::DuplicateKey(); } + } + return file->Truncate(this->blockSize_); +} + +Status SpaceManager::CommitBlock(const std::string& blockId, bool success) +{ + return this->layout_.Commit(blockId, success); +} + +bool SpaceManager::LookupBlock(const std::string& blockId) const +{ + const auto& path = this->layout_.DataFilePath(blockId, false); + constexpr auto mode = + IFile::AccessMode::EXIST | IFile::AccessMode::READ | IFile::AccessMode::WRITE; + auto s = File::Access(path, mode); + if (s.Failure()) { + if (s != Status::NotFound()) { UC_ERROR("Failed({}) to access file({}).", s, path); } + return false; + } + return true; +} + +} // namespace UC diff --git a/ucm/store/ds3fsstore/cc/domain/space/space_manager.h b/ucm/store/ds3fsstore/cc/domain/space/space_manager.h new file mode 100644 index 000000000..4656c952f --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/space/space_manager.h @@ -0,0 +1,46 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_SPACE_MANAGER_H +#define UNIFIEDCACHE_SPACE_MANAGER_H + +#include "space_layout.h" + +namespace UC { + +class SpaceManager { +public: + Status Setup(const std::vector& storageBackends, const size_t blockSize); + Status NewBlock(const std::string& blockId); + Status CommitBlock(const std::string& blockId, bool success); + bool LookupBlock(const std::string& blockId) const; + const SpaceLayout* GetSpaceLayout() const { return &this->layout_; } + +private: + SpaceLayout layout_; + size_t blockSize_; +}; + +} // namespace UC + +#endif diff --git a/ucm/store/ds3fsstore/cc/domain/trans/handle_recorder.h b/ucm/store/ds3fsstore/cc/domain/trans/handle_recorder.h new file mode 100644 index 000000000..a36291172 --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/trans/handle_recorder.h @@ -0,0 +1,98 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UC_INFRA_HANDLE_POOL_H +#define UC_INFRA_HANDLE_POOL_H + +#include +#include "hashmap.h" +#include "status/status.h" + +namespace UC { + +template +class HandlePool { +private: + struct PoolEntry { + HandleType handle; + uint64_t refCount; + }; + using PoolMap = HashMap, 10>; + PoolMap pool_; + +public: + HandlePool() = default; + HandlePool(const HandlePool&) = delete; + HandlePool& operator=(const HandlePool&) = delete; + + static HandlePool& Instance() + { + static HandlePool instance; + return instance; + } + + Status Get(const KeyType& key, HandleType& handle, + std::function instantiate) + { + auto result = pool_.GetOrCreate(key, [&instantiate](PoolEntry& entry) -> bool { + HandleType h{}; + + auto status = instantiate(h); + if (status.Failure()) { return false; } + + entry.handle = h; + entry.refCount = 1; + return true; + }); + + if (!result.has_value()) { return Status::Error(); } + + auto& entry = result.value().get(); + entry.refCount++; + handle = entry.handle; + return Status::OK(); + } + + void Put(const KeyType& key, std::function cleanup) + { + pool_.Upsert(key, [&cleanup](PoolEntry& entry) -> bool { + entry.refCount--; + if (entry.refCount > 0) { return false; } + cleanup(entry.handle); + return true; + }); + } + + void ClearAll(std::function cleanup) + { + pool_.ForEach([&cleanup](const KeyType& key, PoolEntry& entry) { + (void)key; + cleanup(entry.handle); + }); + pool_.Clear(); + } +}; + +} // namespace UC + +#endif diff --git a/ucm/store/ds3fsstore/cc/domain/trans/hashmap.h b/ucm/store/ds3fsstore/cc/domain/trans/hashmap.h new file mode 100644 index 000000000..a46fec23b --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/trans/hashmap.h @@ -0,0 +1,172 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_HASHMAP_H +#define UNIFIEDCACHE_HASHMAP_H + +#include +#include +#include +#include +#include +#include + +namespace UC { + +template , size_t ShardBits = 10> +class HashMap { + static_assert(ShardBits <= 10, "ShardBits too large"); + static constexpr size_t Shards = size_t{1} << ShardBits; + + struct alignas(64) Shard { + mutable std::shared_mutex mtx; + std::vector, std::optional>> slots; + size_t used = 0; + }; + + std::array shards_; + Hash hash_; + std::atomic size_{0}; + + static size_t ShardIndex(size_t h) noexcept { return h & (Shards - 1); } + + static size_t ProbeIdx(size_t idx, size_t cap) noexcept { return (idx + 1) & (cap - 1); } + + static bool IsEmpty(const std::optional& slot) noexcept { return !slot.has_value(); } + + void RehashShard(Shard& s) + { + std::vector, std::optional>> old = std::move(s.slots); + size_t new_cap = (old.empty() ? 8 : old.size() * 2); + s.slots.assign(new_cap, {std::optional{}, std::optional{}}); + s.used = 0; + + for (const auto& slot : old) { + if (!slot.first.has_value()) { continue; } + + const Key& k = *slot.first; + const Value& v = *slot.second; + size_t h = hash_(k); + size_t idx = (h >> ShardBits) & (new_cap - 1); + + while (!IsEmpty(s.slots[idx].first)) { idx = ProbeIdx(idx, new_cap); } + + s.slots[idx].first.emplace(k); + s.slots[idx].second.emplace(v); + ++s.used; + } + } + +public: + HashMap() = default; + std::optional> GetOrCreate(const Key& key, + std::function creator) + { + size_t h = hash_(key); + auto& shard = shards_[ShardIndex(h)]; + std::unique_lock lg(shard.mtx); + + if (shard.used * 4 >= shard.slots.size() * 3) [[unlikely]] { RehashShard(shard); } + + size_t cap = shard.slots.size(); + if (cap == 0) { + RehashShard(shard); + cap = shard.slots.size(); + } + + size_t idx = (h >> ShardBits) & (cap - 1); + size_t start = idx; + + do { + if (shard.slots[idx].first.has_value() && *shard.slots[idx].first == key) { + return std::ref(*shard.slots[idx].second); + } + if (IsEmpty(shard.slots[idx].first)) { + Value newValue; + if (!creator(newValue)) { return std::optional>{}; } + shard.slots[idx].first.emplace(key); + shard.slots[idx].second.emplace(std::move(newValue)); + ++shard.used; + ++size_; + return std::ref(*shard.slots[idx].second); + } + idx = ProbeIdx(idx, cap); + } while (idx != start); + RehashShard(shard); + return GetOrCreate(key, creator); + } + + void Upsert(const Key& key, std::function updater) + { + size_t h = hash_(key); + auto& shard = shards_[ShardIndex(h)]; + std::unique_lock lg(shard.mtx); + + size_t cap = shard.slots.size(); + if (cap == 0) { return; } + + size_t idx = (h >> ShardBits) & (cap - 1); + size_t start = idx; + + do { + if (shard.slots[idx].first.has_value() && *shard.slots[idx].first == key) { + bool shouldDelete = updater(*shard.slots[idx].second); + if (shouldDelete) { + shard.slots[idx].first.reset(); + shard.slots[idx].second.reset(); + --shard.used; + --size_; + } + return; + } + + if (IsEmpty(shard.slots[idx].first)) { return; } + + idx = ProbeIdx(idx, cap); + } while (idx != start); + } + + void ForEach(std::function visitor) + { + for (auto& shard : shards_) { + std::shared_lock lg(shard.mtx); + for (auto& slot : shard.slots) { + if (slot.first.has_value()) { visitor(*slot.first, *slot.second); } + } + } + } + + void Clear() + { + for (auto& shard : shards_) { + std::unique_lock lg(shard.mtx); + shard.slots.clear(); + shard.used = 0; + } + size_.store(0); + } +}; + +} // namespace UC + +#endif diff --git a/ucm/store/ds3fsstore/cc/domain/trans/share_buffer.cc b/ucm/store/ds3fsstore/cc/domain/trans/share_buffer.cc new file mode 100644 index 000000000..35e9ae3da --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/trans/share_buffer.cc @@ -0,0 +1,308 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "share_buffer.h" +#include +#include +#include +#include +#include "file/file.h" +#include "logger/logger.h" +#include "trans/buffer.h" + +namespace UC { + +static constexpr int32_t SHARE_BUFFER_MAGIC = (('S' << 16) | ('b' << 8) | 1); + +struct ShareMutex { + pthread_mutex_t mutex; + ~ShareMutex() = delete; + void Init() + { + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP); + pthread_mutex_init(&mutex, &attr); + pthread_mutexattr_destroy(&attr); + } + void Lock() { pthread_mutex_lock(&mutex); } + void Unlock() { pthread_mutex_unlock(&mutex); } +}; + +struct ShareLock { + pthread_spinlock_t lock; + ~ShareLock() = delete; + void Init() { pthread_spin_init(&lock, PTHREAD_PROCESS_SHARED); } + void Lock() { pthread_spin_lock(&lock); } + void Unlock() { pthread_spin_unlock(&lock); } +}; + +struct ShareBlockId { + uint64_t lo{0}; + uint64_t hi{0}; + void Set(const std::string& block) + { + auto data = static_cast((const void*)block.data()); + lo = data[0]; + hi = data[1]; + } + void Reset() { lo = hi = 0; } + bool Used() const { return lo != 0 || hi != 0; } + bool operator==(const std::string& block) const + { + auto data = static_cast((const void*)block.data()); + return lo == data[0] && hi == data[1]; + } +}; + +enum class ShareBlockStatus { INIT, LOADING, LOADED, FAILURE }; + +struct ShareBlockHeader { + ShareBlockId id; + ShareLock mutex; + int32_t ref; + ShareBlockStatus status; + size_t offset; + void* Data() { return reinterpret_cast(this) + offset; } +}; + +struct ShareBufferHeader { + ShareMutex mutex; + std::atomic magic; + int32_t ref; + size_t blockSize; + size_t blockNumber; + ShareBlockHeader headers[0]; +}; + +inline std::string GenShareBufferName(const size_t blockSize, const size_t blockNumber, + const bool ioDirect, const size_t nSharer) +{ + return fmt::format("uc.buf-{}-{}-{}-{:04x}", blockSize, blockNumber, ioDirect, nSharer); +} + +Status ShareBuffer::Setup(const size_t blockSize, const size_t blockNumber, const bool ioDirect, + const size_t nSharer) +{ + this->blockSize_ = blockSize; + this->blockNumber_ = blockNumber; + this->ioDirect_ = ioDirect; + this->nSharer_ = nSharer; + this->addr_ = nullptr; + this->shmName_ = GenShareBufferName(blockSize, blockNumber, ioDirect, nSharer); + auto file = File::Make(this->shmName_); + if (!file) { return Status::OutOfMemory(); } + auto flags = IFile::OpenFlag::CREATE | IFile::OpenFlag::EXCL | IFile::OpenFlag::READ_WRITE; + auto s = file->ShmOpen(flags); + if (s.Success()) { return this->InitShmBuffer(file.get()); } + if (s == Status::DuplicateKey()) { return this->LoadShmBuffer(file.get()); } + return s; +} + +ShareBuffer::~ShareBuffer() +{ + if (!this->addr_) { return; } + auto bufferHeader = (ShareBufferHeader*)this->addr_; + bufferHeader->mutex.Lock(); + auto ref = (--bufferHeader->ref); + bufferHeader->mutex.Unlock(); + void* dataAddr = static_cast(this->addr_) + this->DataOffset(); + Trans::Buffer::UnregisterHostBuffer(dataAddr); + const auto shmSize = this->ShmSize(); + File::MUnmap(this->addr_, shmSize); + if (ref == 0) { File::ShmUnlink(this->shmName_); } +} + +std::shared_ptr ShareBuffer::MakeReader(const std::string& block, + const std::string& path) +{ + auto index = this->AcquireBlock(block); + try { + void* addr = this->BlockAt(index); + return std::shared_ptr( + new Reader{block, path, blockSize_, ioDirect_, nSharer_, addr}, + [this, index](auto) { this->ReleaseBlock(index); }); + } catch (...) { + this->ReleaseBlock(index); + UC_ERROR("Failed to create reader."); + return nullptr; + } +} + +size_t ShareBuffer::DataOffset() const +{ + static const auto pageSize = sysconf(_SC_PAGESIZE); + auto headerSize = sizeof(ShareBufferHeader) + sizeof(ShareBlockHeader) * this->blockNumber_; + return (headerSize + pageSize - 1) & ~(pageSize - 1); +} + +size_t ShareBuffer::ShmSize() const +{ + return this->DataOffset() + this->blockSize_ * this->blockNumber_; +} + +Status ShareBuffer::InitShmBuffer(IFile* file) +{ + const auto shmSize = this->ShmSize(); + auto s = file->Truncate(shmSize); + if (s.Failure()) { return s; } + s = file->MMap(this->addr_, shmSize, true, true, true); + if (s.Failure()) { return s; } + auto bufferHeader = (ShareBufferHeader*)this->addr_; + bufferHeader->magic = 1; + bufferHeader->mutex.Init(); + bufferHeader->ref = this->nSharer_; + bufferHeader->blockSize = this->blockSize_; + bufferHeader->blockNumber = this->blockNumber_; + const auto dataOffset = this->DataOffset(); + for (size_t i = 0; i < this->blockNumber_; i++) { + bufferHeader->headers[i].id.Reset(); + bufferHeader->headers[i].mutex.Init(); + bufferHeader->headers[i].ref = 0; + bufferHeader->headers[i].status = ShareBlockStatus::INIT; + const auto headerOffset = sizeof(ShareBufferHeader) + sizeof(ShareBlockHeader) * i; + bufferHeader->headers[i].offset = dataOffset + this->blockSize_ * i - headerOffset; + } + bufferHeader->magic = SHARE_BUFFER_MAGIC; + void* dataAddr = static_cast(this->addr_) + dataOffset; + auto dataSize = shmSize - dataOffset; + auto status = Trans::Buffer::RegisterHostBuffer(dataAddr, dataSize); + if (status.Success()) { return Status::OK(); } + UC_ERROR("Failed({}) to regitster host buffer({}).", status.ToString(), dataSize); + return Status::Error(); +} + +Status ShareBuffer::LoadShmBuffer(IFile* file) +{ + auto s = file->ShmOpen(IFile::OpenFlag::READ_WRITE); + if (s.Failure()) { return s; } + const auto shmSize = this->ShmSize(); + s = file->Truncate(shmSize); + if (s.Failure()) { return s; } + s = file->MMap(this->addr_, shmSize, true, true, true); + if (s.Failure()) { return s; } + auto bufferHeader = (ShareBufferHeader*)this->addr_; + constexpr auto retryInterval = std::chrono::milliseconds(100); + constexpr auto maxTryTime = 100; + auto tryTime = 0; + do { + if (bufferHeader->magic == SHARE_BUFFER_MAGIC) { break; } + if (tryTime > maxTryTime) { + UC_ERROR("Shm file({}) not ready.", file->Path()); + return Status::Retry(); + } + std::this_thread::sleep_for(retryInterval); + tryTime++; + } while (true); + const auto dataOffset = this->DataOffset(); + void* dataAddr = static_cast(this->addr_) + dataOffset; + auto dataSize = shmSize - dataOffset; + auto status = Trans::Buffer::RegisterHostBuffer(dataAddr, dataSize); + if (status.Success()) { return Status::OK(); } + UC_ERROR("Failed({}) to regitster host buffer({}).", status.ToString(), dataSize); + return Status::Error(); +} + +size_t ShareBuffer::AcquireBlock(const std::string& block) +{ + static std::hash hasher{}; + auto pos = hasher(block) % this->blockNumber_; + auto bufferHeader = (ShareBufferHeader*)this->addr_; + auto reusedIdx = this->blockNumber_; + bufferHeader->mutex.Lock(); + for (size_t i = 0;; i++) { + if (!bufferHeader->headers[pos].id.Used()) { + if (reusedIdx == this->blockNumber_) { reusedIdx = pos; } + break; + } + if (bufferHeader->headers[pos].id == block) { + reusedIdx = pos; + break; + } + if (bufferHeader->headers[pos].ref <= 0) { + if (reusedIdx == this->blockNumber_) { reusedIdx = pos; } + } + pos = (pos + 1) % this->blockNumber_; + if (i == this->blockNumber_) { + UC_WARN("Buffer({}) used out.", this->blockNumber_); + i = 0; + } + } + auto blockHeader = bufferHeader->headers + reusedIdx; + blockHeader->mutex.Lock(); + if (blockHeader->ref <= 0) { + blockHeader->id.Set(block); + blockHeader->ref = this->nSharer_; + blockHeader->status = ShareBlockStatus::INIT; + } + blockHeader->mutex.Unlock(); + bufferHeader->mutex.Unlock(); + return reusedIdx; +} + +void ShareBuffer::ReleaseBlock(const size_t index) +{ + auto bufferHeader = (ShareBufferHeader*)this->addr_; + bufferHeader->headers[index].mutex.Lock(); + bufferHeader->headers[index].ref--; + bufferHeader->headers[index].mutex.Unlock(); +} + +void* ShareBuffer::BlockAt(const size_t index) +{ + auto bufferHeader = (ShareBufferHeader*)this->addr_; + return bufferHeader->headers + index; +} + +Status ShareBuffer::Reader::Ready4Read() +{ + auto header = (ShareBlockHeader*)this->addr_; + if (header->status == ShareBlockStatus::LOADED) { return Status::OK(); } + if (header->status == ShareBlockStatus::FAILURE) { return Status::Error(); } + if (header->status == ShareBlockStatus::LOADING) { return Status::Retry(); } + auto loading = false; + header->mutex.Lock(); + if (header->status == ShareBlockStatus::INIT) { + header->status = ShareBlockStatus::LOADING; + loading = true; + } + header->mutex.Unlock(); + if (!loading) { return Status::Retry(); } + auto s = File::Read(this->path_, 0, this->length_, this->GetData(), this->ioDirect_); + if (s.Success()) { + header->status = ShareBlockStatus::LOADED; + return Status::OK(); + } + header->status = ShareBlockStatus::FAILURE; + return s; +} + +uintptr_t ShareBuffer::Reader::GetData() +{ + auto header = (ShareBlockHeader*)this->addr_; + return (uintptr_t)header->Data(); +} + +} // namespace UC diff --git a/ucm/store/ds3fsstore/cc/domain/trans/share_buffer.h b/ucm/store/ds3fsstore/cc/domain/trans/share_buffer.h new file mode 100644 index 000000000..3fce7a87c --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/trans/share_buffer.h @@ -0,0 +1,85 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_SHARE_BUFFER_H +#define UNIFIEDCACHE_SHARE_BUFFER_H + +#include +#include +#include +#include "file/ifile.h" +#include "status/status.h" + +namespace UC { + +class ShareBuffer { +public: + class Reader { + std::string block_; + std::string path_; + size_t length_; + bool ioDirect_; + size_t nSharer_; + void* addr_; + + public: + Status Ready4Read(); + uintptr_t GetData(); + + private: + Reader(const std::string& block, const std::string& path, const size_t length, + const bool ioDirect, const size_t nSharer, void* addr) + : block_{block}, path_{path}, length_{length}, ioDirect_{ioDirect}, nSharer_{nSharer}, + addr_{addr} + { + } + friend class ShareBuffer; + }; + +public: + Status Setup(const size_t blockSize, const size_t blockNumber, const bool ioDirect, + const size_t nSharer); + ~ShareBuffer(); + std::shared_ptr MakeReader(const std::string& block, const std::string& path); + +private: + size_t DataOffset() const; + size_t ShmSize() const; + Status InitShmBuffer(IFile* file); + Status LoadShmBuffer(IFile* file); + size_t AcquireBlock(const std::string& block); + void ReleaseBlock(const size_t index); + void* BlockAt(const size_t index); + +private: + size_t blockSize_; + size_t blockNumber_; + bool ioDirect_; + size_t nSharer_; + std::string shmName_; + void* addr_; +}; + +} // namespace UC + +#endif diff --git a/ucm/store/ds3fsstore/cc/domain/trans/trans_manager.cc b/ucm/store/ds3fsstore/cc/domain/trans/trans_manager.cc new file mode 100644 index 000000000..b5fd1e671 --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/trans/trans_manager.cc @@ -0,0 +1,118 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "trans_manager.h" +#include "logger/logger.h" + +namespace UC { + +Status TransManager::Setup(const size_t rankSize, const int32_t deviceId, const size_t streamNumber, + const size_t blockSize, const size_t ioSize, const bool ioDirect, + const size_t bufferNumber, const SpaceLayout* layout, + const size_t timeoutMs, const std::string& mountPoint) +{ + auto s = Status::OK(); + if (rankSize > 1) { + s = this->shareQueue_.Setup(rankSize, deviceId, streamNumber, blockSize, ioSize, ioDirect, + bufferNumber, layout, &this->failureSet_); + if (s.Failure()) { return s; } + } + s = this->queue_.Setup(deviceId, streamNumber, blockSize, ioSize, ioDirect, bufferNumber, + layout, &this->failureSet_, mountPoint); + if (s.Failure()) { return s; } + this->rankSize_ = rankSize; + this->timeoutMs_ = timeoutMs; + return Status::OK(); +} + +Status TransManager::Submit(TransTask task, size_t& taskId) noexcept +{ + taskId = task.id; + const auto taskStr = task.Str(); + const auto blockNumber = task.GroupNumber(); + TaskPtr taskPtr = nullptr; + WaiterPtr waiterPtr = nullptr; + try { + taskPtr = std::make_shared(std::move(task)); + waiterPtr = std::make_shared(blockNumber, taskPtr->startTp); + } catch (const std::exception& e) { + UC_ERROR("Failed({}) to submit task({}).", e.what(), taskStr); + return Status::OutOfMemory(); + } + std::unique_lock lg(mutex_); + const auto& [iter, success] = tasks_.emplace(taskId, std::make_pair(taskPtr, waiterPtr)); + if (!success) { + UC_ERROR("Failed to submit task({}).", taskStr); + return Status::OutOfMemory(); + } + lg.unlock(); + if (this->rankSize_ > 1 && iter->second.first->type == TransTask::Type::LOAD) { + this->shareQueue_.Dispatch(iter->second.first, iter->second.second); + return Status::OK(); + } + this->queue_.Dispatch(iter->second.first, iter->second.second); + return Status::OK(); +} + +Status TransManager::Wait(const size_t taskId) noexcept +{ + TaskPtr task = nullptr; + WaiterPtr waiter = nullptr; + { + std::lock_guard lg(mutex_); + auto iter = tasks_.find(taskId); + if (iter == tasks_.end()) { + UC_ERROR("Not found task by id({}).", taskId); + return Status::NotFound(); + } + task = iter->second.first; + waiter = iter->second.second; + tasks_.erase(iter); + } + if (!waiter->Wait(timeoutMs_)) { + UC_ERROR("Task({}) timeout({}).", task->Str(), timeoutMs_); + failureSet_.Insert(taskId); + waiter->Wait(); + } + auto failure = failureSet_.Contains(taskId); + if (failure) { + failureSet_.Remove(taskId); + UC_ERROR("Task({}) failed.", task->Str()); + return Status::Error(); + } + return Status::OK(); +} + +Status TransManager::Check(const size_t taskId, bool& finish) noexcept +{ + std::lock_guard lg(mutex_); + auto iter = tasks_.find(taskId); + if (iter == tasks_.end()) { + UC_ERROR("Not found task by id({}).", taskId); + return Status::NotFound(); + } + finish = iter->second.second->Finish(); + return Status::OK(); +} + +} // namespace UC diff --git a/ucm/store/ds3fsstore/cc/domain/trans/trans_manager.h b/ucm/store/ds3fsstore/cc/domain/trans/trans_manager.h new file mode 100644 index 000000000..2c7165857 --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/trans/trans_manager.h @@ -0,0 +1,57 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_TRANS_MANAGER_H +#define UNIFIEDCACHE_TRANS_MANAGER_H + +#include "trans_queue.h" +#include "trans_share_queue.h" + +namespace UC { + +class TransManager { +public: + Status Setup(const size_t rankSize, const int32_t deviceId, const size_t streamNumber, + const size_t blockSize, const size_t ioSize, const bool ioDirect, + const size_t bufferNumber, const SpaceLayout* layout, const size_t timeoutMs, + const std::string& mountPoint); + Status Submit(TransTask task, size_t& taskId) noexcept; + Status Wait(const size_t taskId) noexcept; + Status Check(const size_t taskId, bool& finish) noexcept; + +private: + using TaskPtr = std::shared_ptr; + using WaiterPtr = std::shared_ptr; + using TaskPair = std::pair; + TransShareQueue shareQueue_; + TransQueue queue_; + size_t rankSize_; + size_t timeoutMs_; + std::mutex mutex_; + std::unordered_map tasks_; + TaskSet failureSet_; +}; + +} // namespace UC + +#endif diff --git a/ucm/store/ds3fsstore/cc/domain/trans/trans_queue.cc b/ucm/store/ds3fsstore/cc/domain/trans/trans_queue.cc new file mode 100644 index 000000000..26daa0a38 --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/trans/trans_queue.cc @@ -0,0 +1,382 @@ +#include "trans_queue.h" +#include +#include +#include +#include +#include +#include "file/file.h" +#include "logger/logger.h" +#include "trans/device.h" + +namespace UC { + +thread_local void* g_threadUsrbioResources = nullptr; + +TransQueue::~TransQueue() +{ + fdHandlePool_.ClearAll([](const FdHandle& handle) { + if (handle.regFd >= 0) { hf3fs_unreg_fd(handle.regFd); } + if (handle.fd >= 0) { close(handle.fd); } + }); + CleanupUsrbio(); +} + +void TransQueue::DeviceWorker(BlockTask&& task) +{ + if (this->failureSet_->Contains(task.owner)) { + task.done(false); + return; + } + + auto number = task.shards.size(); + auto size = this->ioSize_; + auto done = task.done; + auto devPtrs = (void**)task.shards.data(); + auto hostPtr = task.buffer.get(); + auto s = Status::OK(); + + if (task.type == TransTask::Type::LOAD) { + s = stream_->HostToDevice(hostPtr, devPtrs, size, number); + } else { + s = stream_->DeviceToHost(devPtrs, hostPtr, size, number); + if (s.Success()) { this->filePool_.Push(std::move(task)); } + } + + if (s.Failure()) { this->failureSet_->Insert(task.owner); } + + done(s.Success()); +} + +Status TransQueue::OpenFile(const std::string& path, bool isWrite, FdHandle& fdHandle) +{ + int openFlags = isWrite ? (O_WRONLY | O_CREAT | O_TRUNC) : O_RDONLY; + int openMode = isWrite ? 0644 : 0; + + int fd = open(path.c_str(), openFlags, openMode); + if (fd < 0) { return Status::Error(); } + + if (!hf3fs_is_hf3fs(fd)) { + close(fd); + return Status::Error(); + } + + int regRes = hf3fs_reg_fd(fd, 0); + if (regRes > 0) { + close(fd); + return Status::Error(); + } + + fdHandle.fd = fd; + fdHandle.regFd = regRes; + return Status::OK(); +} + +Status TransQueue::DoWrite(const BlockTask& task, const FdHandle& fdHandle, UsrbioResources& usrbio) +{ + auto hostPtr = (uintptr_t)task.buffer.get(); + auto length = this->ioSize_ * task.shards.size(); + int regFd = fdHandle.regFd; + + if (hostPtr > 0 && length > 0) { + if (length > usrbio.writeIov.size) { return Status::Error(); } + memcpy(usrbio.writeIov.base, (void*)hostPtr, length); + } + + int prepRes = hf3fs_prep_io(&usrbio.writeIor, &usrbio.writeIov, false, usrbio.writeIov.base, + regFd, 0, length, (const void*)(uintptr_t)task.owner); + + if (prepRes < 0) { return Status::Error(); } + + int submitRes = hf3fs_submit_ios(&usrbio.writeIor); + if (submitRes < 0) { return Status::Error(); } + + struct hf3fs_cqe cqe; + int waitRes = hf3fs_wait_for_ios(&usrbio.writeIor, &cqe, 1, 1, nullptr); + + if (waitRes <= 0) { return Status::Error(); } + + if (cqe.result < 0) { return Status::Error(); } + + this->layout_->Commit(task.block, true); + return Status::OK(); +} + +Status TransQueue::DoRead(const BlockTask& task, const FdHandle& fdHandle, UsrbioResources& usrbio) +{ + auto hostPtr = (uintptr_t)task.buffer.get(); + auto length = this->ioSize_ * task.shards.size(); + int regFd = fdHandle.regFd; + + if (length > usrbio.readIov.size) { return Status::Error(); } + + int prepRes = hf3fs_prep_io(&usrbio.readIor, &usrbio.readIov, true, usrbio.readIov.base, regFd, + 0, length, (const void*)(uintptr_t)task.owner); + + if (prepRes < 0) { return Status::Error(); } + + int submitRes = hf3fs_submit_ios(&usrbio.readIor); + if (submitRes < 0) { return Status::Error(); } + + struct hf3fs_cqe cqe; + int waitRes = hf3fs_wait_for_ios(&usrbio.readIor, &cqe, 1, 1, nullptr); + + if (waitRes <= 0) { return Status::Error(); } + + if (cqe.result < 0) { return Status::Error(); } + + if (hostPtr > 0 && cqe.result > 0) { memcpy((void*)hostPtr, usrbio.readIov.base, cqe.result); } + + return Status::OK(); +} + +void TransQueue::FileWorker(BlockTask&& task) +{ + if (this->failureSet_->Contains(task.owner)) { + task.done(false); + return; + } + + UsrbioResources& usrbio_ = GetThreadUsrbioResources(); + bool isDump = task.type == TransTask::Type::DUMP; + const auto& path = this->layout_->DataFilePath(task.block, isDump); + + FdHandle fdHandle{}; + auto status = + fdHandlePool_.Get(path, fdHandle, [&path, isDump, this](FdHandle& handle) -> Status { + return OpenFile(path, isDump, handle); + }); + + if (status.Failure()) { + this->failureSet_->Insert(task.owner); + task.done(false); + return; + } + + Status result = isDump ? DoWrite(task, fdHandle, usrbio_) : DoRead(task, fdHandle, usrbio_); + + if (result.Failure()) { + this->failureSet_->Insert(task.owner); + task.done(false); + return; + } + + if (isDump) { + task.done(true); + } else { + this->devPool_.Push(std::move(task)); + } +} + +TransQueue::UsrbioResources& TransQueue::GetThreadUsrbioResources() +{ + auto& usrbioPtr = g_threadUsrbioResources; + if (usrbioPtr == nullptr) { + usrbioPtr = new UsrbioResources(); + auto usrbio_ = static_cast(usrbioPtr); + + int res = hf3fs_iovcreate(&usrbio_->readIov, this->mountPoint_.c_str(), IOV_SIZE, 0, -1); + if (res < 0) { + delete usrbio_; + usrbioPtr = nullptr; + throw std::runtime_error("Failed to create read Iov"); + } + + res = hf3fs_iovcreate(&usrbio_->writeIov, this->mountPoint_.c_str(), IOV_SIZE, 0, -1); + if (res < 0) { + hf3fs_iovdestroy(&usrbio_->readIov); + delete usrbio_; + usrbioPtr = nullptr; + throw std::runtime_error("Failed to create write Iov"); + } + + res = hf3fs_iorcreate4(&usrbio_->readIor, this->mountPoint_.c_str(), IOR_ENTRIES, true, + IO_DEPTH, 0, -1, 0); + if (res < 0) { + hf3fs_iovdestroy(&usrbio_->readIov); + hf3fs_iovdestroy(&usrbio_->writeIov); + delete usrbio_; + usrbioPtr = nullptr; + throw std::runtime_error("Failed to create read Ior"); + } + + res = hf3fs_iorcreate4(&usrbio_->writeIor, this->mountPoint_.c_str(), IOR_ENTRIES, false, + IO_DEPTH, 0, -1, 0); + if (res < 0) { + hf3fs_iovdestroy(&usrbio_->readIov); + hf3fs_iovdestroy(&usrbio_->writeIov); + hf3fs_iordestroy(&usrbio_->readIor); + delete usrbio_; + usrbioPtr = nullptr; + throw std::runtime_error("Failed to create write Ior"); + } + + usrbio_->initialized = true; + } + + return *static_cast(usrbioPtr); +} + +Status TransQueue::InitUsrbio() +{ + int res = hf3fs_iovcreate(&usrbio_.readIov, this->mountPoint_.c_str(), IOV_SIZE, 0, -1); + + if (res < 0) { return Status::Error(); } + + res = hf3fs_iovcreate(&usrbio_.writeIov, this->mountPoint_.c_str(), IOV_SIZE, 0, -1); + + if (res < 0) { + hf3fs_iovdestroy(&usrbio_.readIov); + return Status::Error(); + } + + res = hf3fs_iorcreate4(&usrbio_.readIor, this->mountPoint_.c_str(), IOR_ENTRIES, true, IO_DEPTH, + 0, -1, 0); + + if (res < 0) { + hf3fs_iovdestroy(&usrbio_.readIov); + hf3fs_iovdestroy(&usrbio_.writeIov); + return Status::Error(); + } + + res = hf3fs_iorcreate4(&usrbio_.writeIor, this->mountPoint_.c_str(), IOR_ENTRIES, false, + IO_DEPTH, 0, -1, 0); + + if (res < 0) { + hf3fs_iovdestroy(&usrbio_.readIov); + hf3fs_iovdestroy(&usrbio_.writeIov); + hf3fs_iordestroy(&usrbio_.readIor); + return Status::Error(); + } + + usrbio_.initialized = true; + return Status::OK(); +} + +Status TransQueue::CleanupUsrbio() +{ + if (!usrbio_.initialized) { return Status::OK(); } + + hf3fs_iordestroy(&usrbio_.readIor); + hf3fs_iordestroy(&usrbio_.writeIor); + hf3fs_iovdestroy(&usrbio_.readIov); + hf3fs_iovdestroy(&usrbio_.writeIov); + + usrbio_.initialized = false; + return Status::OK(); +} + +Status TransQueue::Setup(const int32_t deviceId, const size_t streamNumber, const size_t blockSize, + const size_t ioSize, const bool ioDirect, const size_t bufferNumber, + const SpaceLayout* layout, TaskSet* failureSet_, + const std::string& mountPoint) +{ + Trans::Device device; + auto ts = device.Setup(deviceId); + if (ts.Failure()) { return Status::Error(); } + + buffer_ = device.MakeBuffer(); + stream_ = device.MakeStream(); + if (!buffer_ || !stream_) { return Status::Error(); } + + ts = buffer_->MakeHostBuffers(blockSize, bufferNumber); + if (ts.Failure()) { return Status::Error(); } + + auto success = + this->devPool_.SetWorkerFn([this](auto t, auto) { this->DeviceWorker(std::move(t)); }) + .Run(); + if (!success) { return Status::Error(); } + + success = this->filePool_.SetWorkerFn([this](auto t, auto) { this->FileWorker(std::move(t)); }) + .SetNWorker(streamNumber) + .Run(); + if (!success) { return Status::Error(); } + + this->layout_ = layout; + this->mountPoint_ = mountPoint; + this->ioSize_ = ioSize; + this->ioDirect_ = ioDirect; + this->failureSet_ = failureSet_; + + ts = InitUsrbio(); + if (ts.Failure()) { return ts; } + + return Status::OK(); +} + +void TransQueue::Dispatch(TaskPtr task, WaiterPtr waiter) +{ + if (task->type == TransTask::Type::DUMP) { + this->DispatchDump(task, waiter); + return; + } + + task->ForEachGroup( + [task, waiter, this](const std::string& block, std::vector& shards) { + BlockTask blockTask; + blockTask.owner = task->id; + blockTask.block = block; + blockTask.type = task->type; + auto bufferSize = this->ioSize_ * shards.size(); + std::swap(blockTask.shards, shards); + blockTask.buffer = buffer_->GetHostBuffer(bufferSize); + + blockTask.done = [task, waiter, ioSize = this->ioSize_](bool success) { + if (!success) { + waiter->Done(nullptr); + } else { + waiter->Done([task, ioSize] { UC_DEBUG("{}", task->Epilog(ioSize)); }); + } + }; + + if (task->type == TransTask::Type::DUMP) { + this->devPool_.Push(std::move(blockTask)); + } else { + this->filePool_.Push(std::move(blockTask)); + } + }); +} + +void TransQueue::DispatchDump(TaskPtr task, WaiterPtr waiter) +{ + std::vector blocks; + blocks.reserve(task->GroupNumber()); + + task->ForEachGroup( + [task, waiter, &blocks, this](const std::string& block, std::vector& shards) { + BlockTask blockTask; + blockTask.owner = task->id; + blockTask.block = block; + blockTask.type = task->type; + auto bufferSize = this->ioSize_ * shards.size(); + blockTask.buffer = buffer_->GetHostBuffer(bufferSize); + std::swap(blockTask.shards, shards); + + blockTask.done = [task, waiter, ioSize = this->ioSize_](bool success) { + if (!success) { + waiter->Done(nullptr); + } else { + waiter->Done([task, ioSize] { UC_DEBUG("{}", task->Epilog(ioSize)); }); + } + }; + + auto device = (void**)blockTask.shards.data(); + auto host = blockTask.buffer.get(); + + stream_->DeviceToHostAsync(device, host, this->ioSize_, blockTask.shards.size()); + blocks.push_back(std::move(blockTask)); + }); + + auto s = stream_->Synchronized(); + if (s.Failure()) { this->failureSet_->Insert(task->id); } + + for (auto&& block : blocks) { + if (s.Failure()) { + waiter->Done(nullptr); + return; + } + + this->filePool_.Push(std::move(block)); + } +} + +} // namespace UC diff --git a/ucm/store/ds3fsstore/cc/domain/trans/trans_queue.h b/ucm/store/ds3fsstore/cc/domain/trans/trans_queue.h new file mode 100644 index 000000000..8f21bd63b --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/trans/trans_queue.h @@ -0,0 +1,81 @@ +#ifndef UNIFIEDCACHE_TRANS_QUEUE_USRBIO_H +#define UNIFIEDCACHE_TRANS_QUEUE_USRBIO_H + +#include +#include "handle_recorder.h" +#include "space/space_layout.h" +#include "task/task_set.h" +#include "task/task_waiter.h" +#include "thread/thread_pool.h" +#include "trans/buffer.h" +#include "trans/stream.h" +#include "trans_task.h" + +namespace UC { + +class TransQueue { + using TaskPtr = std::shared_ptr; + using WaiterPtr = std::shared_ptr; + + struct BlockTask { + size_t owner; + std::string block; + TransTask::Type type; + std::vector shards; + std::shared_ptr buffer; + std::function done; + }; + + struct FdHandle { + int fd; + int regFd; + }; + + static constexpr size_t IOV_SIZE = 1UL << 30; + static constexpr int IOR_ENTRIES = 64; + static constexpr int IO_DEPTH = 0; + + struct UsrbioResources { + struct hf3fs_iov readIov; + struct hf3fs_iov writeIov; + struct hf3fs_ior readIor; + struct hf3fs_ior writeIor; + bool initialized{false}; + }; + + void DeviceWorker(BlockTask&& task); + void FileWorker(BlockTask&& task); + UsrbioResources& GetThreadUsrbioResources(); + Status InitUsrbio(); + Status CleanupUsrbio(); + Status OpenFile(const std::string& path, bool isWrite, FdHandle& fdHandle); + Status DoWrite(const BlockTask& task, const FdHandle& fdHandle, UsrbioResources& usrbio); + Status DoRead(const BlockTask& task, const FdHandle& fdHandle, UsrbioResources& usrbio); + +public: + ~TransQueue(); + Status Setup(const int32_t deviceId, const size_t streamNumber, const size_t blockSize, + const size_t ioSize, const bool ioDirect, const size_t bufferNumber, + const SpaceLayout* layout, TaskSet* failureSet_, const std::string& mountPoint); + + void Dispatch(TaskPtr task, WaiterPtr waiter); + void DispatchDump(TaskPtr task, WaiterPtr waiter); + +private: + std::unique_ptr buffer_{nullptr}; + std::unique_ptr stream_{nullptr}; + const SpaceLayout* layout_; + std::string mountPoint_; + size_t ioSize_; + bool ioDirect_; + ThreadPool devPool_; + ThreadPool filePool_; + TaskSet* failureSet_; + + UsrbioResources usrbio_; + HandlePool& fdHandlePool_{HandlePool::Instance()}; +}; + +} // namespace UC + +#endif diff --git a/ucm/store/ds3fsstore/cc/domain/trans/trans_share_queue.cc b/ucm/store/ds3fsstore/cc/domain/trans/trans_share_queue.cc new file mode 100644 index 000000000..c43d16a85 --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/trans/trans_share_queue.cc @@ -0,0 +1,169 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "trans_share_queue.h" +#include "logger/logger.h" +#include "trans/device.h" + +namespace UC { + +TransShareQueue::~TransShareQueue() +{ + { + std::lock_guard lg(this->mutex_); + this->stop_ = true; + this->cv_.notify_all(); + } + for (auto& w : this->threads_) { + if (w.joinable()) { w.join(); } + } +} + +Status TransShareQueue::Setup(const size_t nSharer, const int32_t deviceId, + const size_t streamNumber, const size_t blockSize, + const size_t ioSize, const bool ioDirect, const size_t bufferNumber, + const SpaceLayout* layout, TaskSet* failureSet) +{ + this->deviceId_ = deviceId; + this->streamNumber_ = streamNumber; + this->ioSize_ = ioSize; + this->layout_ = layout; + this->failureSet_ = failureSet; + auto status = this->buffer_.Setup(blockSize, bufferNumber, ioDirect, nSharer); + if (status.Failure()) { return status; } + std::list> start(streamNumber); + std::list> fut; + for (auto& s : start) { + fut.push_back(s.get_future()); + this->threads_.emplace_back([&] { this->WorkerLoop(s); }); + } + for (auto& f : fut) { + if (status.Failure()) { break; } + status = f.get(); + } + return status; +} + +void TransShareQueue::Dispatch(TaskPtr task, WaiterPtr waiter) +{ + std::lock_guard lg(this->mutex_); + task->ForEachGroup( + [task, waiter, this](const std::string& block, std::vector& shards) { + BlockTask blockTask; + blockTask.reader = + this->buffer_.MakeReader(block, this->layout_->DataFilePath(block, false)); + blockTask.owner = task->id; + std::swap(blockTask.shards, shards); + blockTask.done = [task, waiter, ioSize = this->ioSize_](bool success) { + if (!success) { + waiter->Done(nullptr); + } else { + waiter->Done([task, ioSize] { UC_DEBUG("{}", task->Epilog(ioSize)); }); + } + }; + this->wait_.push_back(blockTask); + }); + this->cv_.notify_all(); +} + +void TransShareQueue::WorkerLoop(std::promise& status) +{ + Trans::Device device; + auto s = device.Setup(deviceId_); + if (s.Failure()) { + UC_ERROR("Failed({}) to set context on device({}).", s.ToString(), deviceId_); + status.set_value(Status::Error()); + return; + } + auto stream = device.MakeStream(); + if (!stream) { + UC_ERROR("Failed to create stream on device({}).", deviceId_); + status.set_value(Status::Error()); + return; + } + status.set_value(Status::OK()); + while (!stop_) { Worker(*stream); } +} + +void TransShareQueue::Worker(Trans::Stream& stream) +{ + std::unique_lock ul{this->mutex_}; + if (this->load_.empty() && this->wait_.empty()) { + this->cv_.wait( + ul, [this] { return this->stop_ || !this->load_.empty() || !this->wait_.empty(); }); + } + if (this->stop_) { return; } + for (auto iter = this->load_.begin(); iter != this->load_.end(); iter++) { + auto s = iter->reader->Ready4Read(); + if (s != Status::Retry()) { + auto task = std::move(*iter); + this->load_.erase(iter); + ul.unlock(); + this->HandleReadyTask(s, task, stream); + return; + } + } + if (this->load_.size() >= this->streamNumber_) { return; } + if (this->wait_.empty()) { return; } + auto task = std::move(this->wait_.front()); + this->wait_.pop_front(); + ul.unlock(); + this->HandleLoadTask(task, stream); +} + +void TransShareQueue::HandleReadyTask(Status s, BlockTask& task, Trans::Stream& stream) +{ + if (this->failureSet_->Contains(task.owner)) { + task.done(false); + return; + } + if (s.Success()) { + auto host = (void*)task.reader->GetData(); + auto device = (void**)task.shards.data(); + auto status = stream.HostToDeviceAsync(host, device, this->ioSize_, task.shards.size()); + if (status.Failure()) [[unlikely]] { + UC_ERROR("Failed({}) to copy data from host to device.", status.ToString()); + s = Status::Error(); + } + } + if (s.Failure()) { this->failureSet_->Insert(task.owner); } + task.done(s.Success()); +} + +void TransShareQueue::HandleLoadTask(BlockTask& task, Trans::Stream& stream) +{ + if (this->failureSet_->Contains(task.owner)) { + task.done(false); + return; + } + auto s = task.reader->Ready4Read(); + if (s == Status::Retry()) { + std::lock_guard lg{this->mutex_}; + this->load_.push_back(task); + this->cv_.notify_one(); + return; + } + this->HandleReadyTask(s, task, stream); +} + +} // namespace UC diff --git a/ucm/store/ds3fsstore/cc/domain/trans/trans_share_queue.h b/ucm/store/ds3fsstore/cc/domain/trans/trans_share_queue.h new file mode 100644 index 000000000..7c40b0542 --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/trans/trans_share_queue.h @@ -0,0 +1,78 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_TRANS_SHARE_QUEUE_H +#define UNIFIEDCACHE_TRANS_SHARE_QUEUE_H + +#include +#include +#include +#include +#include "share_buffer.h" +#include "space/space_layout.h" +#include "task/task_set.h" +#include "task/task_waiter.h" +#include "trans/stream.h" +#include "trans_task.h" + +namespace UC { + +class TransShareQueue { + using TaskPtr = std::shared_ptr; + using WaiterPtr = std::shared_ptr; + struct BlockTask { + std::shared_ptr reader; + size_t owner; + std::vector shards; + std::function done; + }; + int32_t deviceId_; + size_t streamNumber_; + size_t ioSize_; + ShareBuffer buffer_; + const SpaceLayout* layout_; + TaskSet* failureSet_; + std::atomic_bool stop_{false}; + std::mutex mutex_; + std::condition_variable cv_; + std::list load_; + std::list wait_; + std::list threads_; + +public: + ~TransShareQueue(); + Status Setup(const size_t nSharer, const int32_t deviceId, const size_t streamNumber, + const size_t blockSize, const size_t ioSize, const bool ioDirect, + const size_t bufferNumber, const SpaceLayout* layout, TaskSet* failureSet); + void Dispatch(TaskPtr task, WaiterPtr waiter); + +private: + void WorkerLoop(std::promise& status); + void Worker(Trans::Stream& stream); + void HandleReadyTask(Status s, BlockTask& task, Trans::Stream& stream); + void HandleLoadTask(BlockTask& task, Trans::Stream& stream); +}; + +} // namespace UC + +#endif diff --git a/ucm/store/ds3fsstore/cc/domain/trans/trans_task.h b/ucm/store/ds3fsstore/cc/domain/trans/trans_task.h new file mode 100644 index 000000000..8fcb48fba --- /dev/null +++ b/ucm/store/ds3fsstore/cc/domain/trans/trans_task.h @@ -0,0 +1,89 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_TRANS_TASK_H +#define UNIFIEDCACHE_TRANS_TASK_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace UC { + +class TransTask { + static size_t NextId() noexcept + { + static std::atomic id{invalid + 1}; + return id.fetch_add(1, std::memory_order_relaxed); + }; + static double NowTp() noexcept + { + auto now = std::chrono::steady_clock::now().time_since_epoch(); + return std::chrono::duration(now).count(); + } + +public: + enum class Type { DUMP, LOAD }; + size_t id; + Type type; + double startTp{0}; + static constexpr auto invalid = std::numeric_limits::min(); + TransTask(Type&& type, std::string&& brief) + : id{NextId()}, type{std::move(type)}, startTp{NowTp()}, brief_{std::move(brief)} + { + } + void Append(const std::string& block, const uintptr_t address) + { + grouped_[block].push_back(address); + number_++; + } + auto Str() const noexcept { return fmt::format("{},{},{}", id, brief_, number_); } + size_t GroupNumber() const { return grouped_.size(); } + void ForEachGroup(std::function&)> fn) + { + for (auto& [block, shards] : grouped_) { fn(block, shards); } + } + auto Epilog(const size_t ioSize) const noexcept + { + auto total = ioSize * number_; + auto costs = NowTp() - startTp; + auto bw = double(total) / costs / 1e9; + return fmt::format("Task({},{},{},{}) finished, costs={:.06f}s, bw={:.06f}GB/s.", id, + brief_, number_, total, costs, bw); + } + +private: + std::string brief_; + size_t number_{0}; + std::unordered_map> grouped_; +}; + +} // namespace UC + +#endif diff --git a/ucm/store/ds3fsstore/cpy/ds3fsstore.py.cc b/ucm/store/ds3fsstore/cpy/ds3fsstore.py.cc new file mode 100644 index 000000000..a251d7795 --- /dev/null +++ b/ucm/store/ds3fsstore/cpy/ds3fsstore.py.cc @@ -0,0 +1,119 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "ds3fsstore.h" +#include +#include + +namespace py = pybind11; + +namespace UC { + +class Ds3FsStorePy : public Ds3FsStore { +public: + void* CCStoreImpl() { return this; } + py::list AllocBatch(const py::list& blocks) + { + py::list results; + for (auto& block : blocks) { results.append(this->Alloc(block.cast())); } + return results; + } + py::list LookupBatch(const py::list& blocks) + { + py::list founds; + for (auto& block : blocks) { founds.append(this->Lookup(block.cast())); } + return founds; + } + void CommitBatch(const py::list& blocks, const bool success) + { + for (auto& block : blocks) { this->Commit(block.cast(), success); } + } + py::tuple CheckPy(const size_t task) + { + auto finish = false; + auto ret = this->Check(task, finish); + return py::make_tuple(ret, finish); + } + size_t LoadToDevice(const py::list& blockIds, const py::list& addresses) + { + return this->SubmitPy(blockIds, addresses, TransTask::Type::LOAD, "DS3FS::S2D"); + } + size_t DumpFromDevice(const py::list& blockIds, const py::list& addresses) + { + return this->SubmitPy(blockIds, addresses, TransTask::Type::DUMP, "DS3FS::D2S"); + } + +private: + size_t SubmitPy(const py::list& blockIds, const py::list& addresses, TransTask::Type&& type, + std::string&& brief) + { + TransTask task{std::move(type), std::move(brief)}; + auto blockId = blockIds.begin(); + auto address = addresses.begin(); + while ((blockId != blockIds.end()) && (address != addresses.end())) { + task.Append(blockId->cast(), address->cast()); + blockId++; + address++; + } + return this->Submit(std::move(task)); + } +}; + +} // namespace UC + +PYBIND11_MODULE(ucmds3fsstore, module) +{ + module.attr("project") = UCM_PROJECT_NAME; + module.attr("version") = UCM_PROJECT_VERSION; + module.attr("commit_id") = UCM_COMMIT_ID; + module.attr("build_type") = UCM_BUILD_TYPE; + auto store = py::class_(module, "Ds3FsStore"); + auto config = py::class_(store, "Config"); + config.def(py::init&, const size_t, const bool>(), + py::arg("storageBackends"), py::arg("kvcacheBlockSize"), py::arg("transferEnable")); + config.def_readwrite("storageBackends", &UC::Ds3FsStorePy::Config::storageBackends); + config.def_readwrite("kvcacheBlockSize", &UC::Ds3FsStorePy::Config::kvcacheBlockSize); + config.def_readwrite("transferEnable", &UC::Ds3FsStorePy::Config::transferEnable); + config.def_readwrite("mountPoint", &UC::Ds3FsStorePy::Config::mountPoint); + config.def_readwrite("transferIoDirect", &UC::Ds3FsStorePy::Config::transferIoDirect); + config.def_readwrite("transferLocalRankSize", &UC::Ds3FsStorePy::Config::transferLocalRankSize); + config.def_readwrite("transferDeviceId", &UC::Ds3FsStorePy::Config::transferDeviceId); + config.def_readwrite("transferStreamNumber", &UC::Ds3FsStorePy::Config::transferStreamNumber); + config.def_readwrite("transferIoSize", &UC::Ds3FsStorePy::Config::transferIoSize); + config.def_readwrite("transferBufferNumber", &UC::Ds3FsStorePy::Config::transferBufferNumber); + config.def_readwrite("transferTimeoutMs", &UC::Ds3FsStorePy::Config::transferTimeoutMs); + store.def(py::init<>()); + store.def("CCStoreImpl", &UC::Ds3FsStorePy::CCStoreImpl); + store.def("Setup", &UC::Ds3FsStorePy::Setup); + store.def("Alloc", py::overload_cast(&UC::Ds3FsStorePy::Alloc)); + store.def("AllocBatch", &UC::Ds3FsStorePy::AllocBatch); + store.def("Lookup", py::overload_cast(&UC::Ds3FsStorePy::Lookup)); + store.def("LookupBatch", &UC::Ds3FsStorePy::LookupBatch); + store.def("LoadToDevice", &UC::Ds3FsStorePy::LoadToDevice); + store.def("DumpFromDevice", &UC::Ds3FsStorePy::DumpFromDevice); + store.def("Wait", &UC::Ds3FsStorePy::Wait); + store.def("Check", &UC::Ds3FsStorePy::CheckPy); + store.def("Commit", + py::overload_cast(&UC::Ds3FsStorePy::Commit)); + store.def("CommitBatch", &UC::Ds3FsStorePy::CommitBatch); +} diff --git a/ucm/store/ds3fsstore/ds3fsstore_connector.py b/ucm/store/ds3fsstore/ds3fsstore_connector.py new file mode 100644 index 000000000..9f98f81be --- /dev/null +++ b/ucm/store/ds3fsstore/ds3fsstore_connector.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- + +from dataclasses import dataclass +from typing import Dict, List, Tuple + +import torch + +from ucm.store.ds3fsstore import ucmds3fsstore +from ucm.store.ucmstore import Task, UcmKVStoreBase + + +@dataclass +class NfsTask(Task): + task_id: int + + +class UcmDs3FsStore(UcmKVStoreBase): + def __init__(self, config: Dict): + super().__init__(config) + self.store = ucmds3fsstore.Ds3FsStore() + storage_backends = [ + path for path in config["storage_backends"].split(":") if path + ] + block_size = int(config["kv_block_size"]) + transfer_enable = True if config["role"] == "worker" else False + param = ucmds3fsstore.Ds3FsStore.Config( + storage_backends, block_size, transfer_enable + ) + if transfer_enable: + param.transferDeviceId = config["device"] + param.transferIoSize = config["io_size"] + param.transferIoDirect = True + param.transferStreamNumber = 8 + param.transferBufferNumber = 4096 + if "3fs_mount_point" in config: + param.mountPoint = config["3fs_mount_point"] + ret = self.store.Setup(param) + if ret != 0: + msg = f"Failed to initialize ucmds3fsstore, errcode: {ret}." + raise RuntimeError(msg) + + def cc_store(self) -> int: + return self.store.CCStoreImpl() + + def create(self, block_ids: List[str]) -> List[int]: + return self.store.AllocBatch(block_ids) + + def lookup(self, block_ids: List[str]) -> List[bool]: + return self.store.LookupBatch(block_ids) + + def prefetch(self, block_ids: List[str]) -> None: + pass + + def load( + self, block_ids: List[str], offset: List[int], dst_tensor: List[torch.Tensor] + ) -> Task: + dst_tensor_ptr = [t.data_ptr() for t in dst_tensor] + task_id = self.store.LoadToDevice(block_ids, dst_tensor_ptr) + return NfsTask(task_id=task_id) + + def dump( + self, block_ids: List[str], offset: List[int], src_tensor: List[torch.Tensor] + ) -> Task: + src_tensor_ptr = [t.data_ptr() for t in src_tensor] + task_id = self.store.DumpFromDevice(block_ids, src_tensor_ptr) + return NfsTask(task_id=task_id) + + def fetch_data( + self, + block_ids: List[str], + offset: List[int], + dst_addr: List[int], + size: List[int], + ) -> Task: + pass + + def dump_data( + self, + block_ids: List[str], + offset: List[int], + src_addr: List[int], + size: List[int], + ) -> Task: + pass + + def wait(self, task: Task) -> int: + return self.store.Wait(task.task_id) + + def commit(self, block_ids: List[str], is_success: bool = True) -> None: + self.store.CommitBatch(block_ids, is_success) + + def check(self, task: Task) -> Tuple[int, bool]: + return self.store.Check(task.task_id) diff --git a/ucm/store/test/e2e/ds3fsstore_embed_and_fetch.py b/ucm/store/test/e2e/ds3fsstore_embed_and_fetch.py new file mode 100644 index 000000000..88726a43a --- /dev/null +++ b/ucm/store/test/e2e/ds3fsstore_embed_and_fetch.py @@ -0,0 +1,159 @@ +# -*- coding: utf-8 -*- +# +# MIT License +# +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +import os +import secrets +import time +from typing import List + +import torch + +from ucm.store.ds3fsstore.ds3fsstore_connector import UcmDs3FsStore +from ucm.store.ucmstore import UcmKVStoreBase + + +def setup_store( + storage_backends, block_size, device_id, io_size, mount_point +) -> UcmKVStoreBase: + config = {} + config["storage_backends"] = storage_backends + config["kv_block_size"] = block_size + config["role"] = "worker" + config["device"] = device_id + config["io_size"] = io_size + config["3fs_mount_point"] = mount_point + return UcmDs3FsStore(config) + + +def make_buffers( + block_number, device_id, batch_size, block_dim, block_len, block_layer +): + hashes = [secrets.token_hex(16) for _ in range(block_number)] + tensors = [ + [ + torch.rand( + [block_dim, block_len], + dtype=torch.bfloat16, + device="cuda:{}".format(device_id), + ) + for _ in range(block_layer) + ] + for _ in range(batch_size) + ] + return hashes, tensors + + +def embed(store: UcmKVStoreBase, hashes: List[str], tensors: List[List[torch.Tensor]]): + results = store.create(hashes) + assert sum(results) == 0 + block_ids = [] + offsets = [] + layers = [] + for hash_id, block in zip(hashes, tensors): + offset = 0 + for layer in block: + block_ids.append(hash_id) + offsets.append(offset) + layers.append(layer) + offset += layer.untyped_storage().size() + task = store.dump(block_ids, offsets, layers) + assert task.task_id > 0 + ret = store.wait(task) + assert ret == 0 + store.commit(hashes, True) + + +def fetch(store: UcmKVStoreBase, hashes: List[str], tensors: List[List[torch.Tensor]]): + founds = store.lookup(hashes) + for found in founds: + assert found + block_ids = [] + offsets = [] + layers = [] + for hash_id, block in zip(hashes, tensors): + offset = 0 + for layer in block: + block_ids.append(hash_id) + offsets.append(offset) + layers.append(layer) + offset += layer.untyped_storage().size() + task = store.load(block_ids, offsets, layers) + assert task.task_id > 0 + ret = store.wait(task) + assert ret == 0 + + +def cmp_and_print_diff(a, b, rtol=0.0, atol=0.0): + for r, (row_a, row_b) in enumerate(zip(a, b)): + for c, (ta, tb) in enumerate(zip(row_a, row_b)): + if not torch.allclose(ta, tb, rtol=rtol, atol=atol): + mask = ~torch.isclose(ta, tb, rtol=rtol, atol=atol) + diff_a = ta[mask].cpu() + diff_b = tb[mask].cpu() + print(f"DIFF at [{r}][{c}] total {mask.sum().item()} element(s)") + print(" a val:", diff_a.flatten()) + print(" b val:", diff_b.flatten()) + assert False + + +def store_all_hashes(hashes): + kvcache_block_hashes_file = "kvcache_block_hashes.txt" + current_directory = os.path.dirname(__file__) + file_path = os.path.join(current_directory, kvcache_block_hashes_file) + with open(file_path, "w", encoding="utf-8") as file: + for hs in hashes: + file.write(hs + "\n") + + +def main(): + storage_backends = "." + mount_point = "." + block_number = 4096 + device_id = 1 + block_dim = 576 + block_len = 64 + block_elem_size = 2 + block_layer = 61 + io_size = block_dim * block_len * block_elem_size + block_size = io_size * block_layer + batch_size = 64 + store = setup_store(storage_backends, block_size, device_id, io_size, mount_point) + hashes, tensors = make_buffers( + block_number, device_id, batch_size, block_dim, block_len, block_layer + ) + total_batches = (block_number + batch_size - 1) // batch_size + for batch in range(total_batches): + start = batch_size * batch + end = min(start + batch_size, block_number) + tensors2 = [[torch.empty_like(t) for t in row] for row in tensors] + embed(store, hashes[start:end], tensors) + time.sleep(1) + fetch(store, hashes[start:end], tensors2) + cmp_and_print_diff(tensors, tensors2) + store_all_hashes(hashes) + + +if __name__ == "__main__": + os.environ["UC_LOGGER_LEVEL"] = "debug" + main()