Skip to content

Commit 101ea31

Browse files
committed
add queue depth config
1 parent 3447102 commit 101ea31

File tree

11 files changed

+51
-17
lines changed

11 files changed

+51
-17
lines changed

ucm/shared/infra/template/spsc_ring_queue.h

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,28 +24,40 @@
2424
#ifndef UNIFIEDCACHE_INFRA_SPSC_RING_QUEUE_H
2525
#define UNIFIEDCACHE_INFRA_SPSC_RING_QUEUE_H
2626

27-
#include <array>
2827
#include <atomic>
28+
#include <climits>
2929
#include <cstddef>
3030
#include <functional>
31+
#include <memory>
3132
#include <thread>
3233

3334
namespace UC {
3435

35-
template <typename T, size_t N>
36+
template <typename T>
3637
class SpscRingQueue {
3738
alignas(64) std::atomic<size_t> head_ = 0;
3839
alignas(64) std::atomic<size_t> tail_ = 0;
39-
std::array<T, N> buffer_;
40+
bool pow2_{false};
41+
size_t mask_{0};
42+
size_t capacity_{0};
43+
std::unique_ptr<T[]> buffer_;
44+
45+
size_t Mod(size_t n) { return pow2_ ? (n & mask_) : (n % capacity_); }
4046

4147
public:
42-
static_assert(N > 0 && (N & (N - 1)) == 0, "N must be a power of 2 for fast modulo");
48+
void Setup(size_t capacity)
49+
{
50+
capacity_ = capacity;
51+
mask_ = capacity_ - 1;
52+
pow2_ = (capacity_ & mask_) == 0;
53+
buffer_ = std::make_unique<T[]>(capacity_);
54+
}
4355

4456
void Push(T&& value)
4557
{
4658
while (true) {
4759
const size_t currentHead = head_.load(std::memory_order_relaxed);
48-
const size_t nextHead = (currentHead + 1) & (N - 1);
60+
const size_t nextHead = Mod(currentHead + 1);
4961
if (nextHead != tail_.load(std::memory_order_acquire)) {
5062
buffer_[currentHead] = std::move(value);
5163
head_.store(nextHead, std::memory_order_release);
@@ -58,7 +70,7 @@ class SpscRingQueue {
5870
bool TryPush(T&& value)
5971
{
6072
const size_t currentHead = head_.load(std::memory_order_relaxed);
61-
const size_t nextHead = (currentHead + 1) & (N - 1);
73+
const size_t nextHead = Mod(currentHead + 1);
6274
const size_t currentTail = tail_.load(std::memory_order_acquire);
6375
if (nextHead == currentTail) { return false; }
6476
buffer_[currentHead] = std::move(value);
@@ -72,7 +84,7 @@ class SpscRingQueue {
7284
const size_t currentTail = tail_.load(std::memory_order_relaxed);
7385
if (currentTail == currentHead) { return false; }
7486
value = std::move(buffer_[currentTail]);
75-
tail_.store((currentTail + 1) & (N - 1), std::memory_order_release);
87+
tail_.store(Mod(currentTail + 1), std::memory_order_release);
7688
return true;
7789
}
7890

ucm/shared/test/case/infra/spsc_ring_queue_test.cc

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ class UCSpscRingQueueTest : public testing::Test {};
2828

2929
TEST_F(UCSpscRingQueueTest, Basic)
3030
{
31-
UC::SpscRingQueue<size_t, 16> queue;
31+
UC::SpscRingQueue<size_t> queue;
32+
queue.Setup(16);
3233
size_t data;
3334
ASSERT_FALSE(queue.TryPop(data));
3435
ASSERT_TRUE(queue.TryPush(1023));
@@ -39,7 +40,8 @@ TEST_F(UCSpscRingQueueTest, Basic)
3940

4041
TEST_F(UCSpscRingQueueTest, FIFO)
4142
{
42-
UC::SpscRingQueue<size_t, 16> queue;
43+
UC::SpscRingQueue<size_t> queue;
44+
queue.Setup(16);
4345
constexpr size_t nElem = 10;
4446
for (size_t i = 0; i < nElem; i++) { ASSERT_TRUE(queue.TryPush(std::move(i))); }
4547
for (size_t i = 0; i < nElem; i++) {
@@ -53,8 +55,9 @@ TEST_F(UCSpscRingQueueTest, FIFO)
5355

5456
TEST_F(UCSpscRingQueueTest, Full)
5557
{
56-
constexpr size_t N = 16;
57-
UC::SpscRingQueue<size_t, N> queue;
58+
constexpr size_t N = 10;
59+
UC::SpscRingQueue<size_t> queue;
60+
queue.Setup(N);
5861
constexpr size_t nElem = N - 1;
5962
for (size_t i = 0; i < nElem; i++) { ASSERT_TRUE(queue.TryPush(std::move(i))); }
6063
ASSERT_FALSE(queue.TryPush(999));
@@ -75,7 +78,8 @@ TEST_F(UCSpscRingQueueTest, MoveOnly)
7578
MoveOnly(MoveOnly&&) = default;
7679
MoveOnly& operator=(MoveOnly&&) = default;
7780
};
78-
UC::SpscRingQueue<MoveOnly, 16> queue;
81+
UC::SpscRingQueue<MoveOnly> queue;
82+
queue.Setup(9);
7983
EXPECT_TRUE(queue.TryPush(MoveOnly(42)));
8084
MoveOnly obj;
8185
EXPECT_TRUE(queue.TryPop(obj));

ucm/store/cache/cc/cache_store.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ class CacheStoreImpl {
7979
if (config.bufferSize % config.shardSize != 0) {
8080
return Status::InvalidParam("invalid buffer size({})", config.bufferSize);
8181
}
82+
if (config.waitingQueueDepth <= 1 || config.runningQueueDepth <= 1) {
83+
return Status::InvalidParam("invalid queue depth({},{})", config.waitingQueueDepth,
84+
config.runningQueueDepth);
85+
}
8286
return Status::OK();
8387
}
8488
void ShowConfig(const Config& config)
@@ -95,6 +99,8 @@ class CacheStoreImpl {
9599
UC_INFO("Set {}::BlockSize to {}.", ns, config.blockSize);
96100
UC_INFO("Set {}::BufferSize to {}.", ns, config.bufferSize);
97101
UC_INFO("Set {}::ShareBufferEnable to {}.", ns, config.shareBufferEnable);
102+
UC_INFO("Set {}::WaitingQueueDepth to {}.", ns, config.waitingQueueDepth);
103+
UC_INFO("Set {}::RunningQueueDepth to {}.", ns, config.runningQueueDepth);
98104
UC_INFO("Set {}::TransferTimeoutMs to {}.", ns, config.transferTimeoutMs);
99105
}
100106
};

ucm/store/cache/cc/dump_queue.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ Status DumpQueue::Setup(const Config& config, TaskIdSet* failureSet, TransBuffer
3939
failureSet_ = failureSet;
4040
buffer_ = buffer;
4141
backend_ = static_cast<Store*>((void*)config.backend);
42+
waiting_.Setup(config.waitingQueueDepth);
43+
dumping_.Setup(config.runningQueueDepth);
4244
dumper_ = std::thread{&DumpQueue::BackendDumpStage, this};
4345
std::promise<Status> started;
4446
auto fut = started.get_future();

ucm/store/cache/cc/dump_queue.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ class DumpQueue {
5252
TaskIdSet* failureSet_{nullptr};
5353
TransBuffer* buffer_{nullptr};
5454
Store* backend_{nullptr};
55-
SpscRingQueue<TaskPair, 1024> waiting_;
56-
SpscRingQueue<ShardTask, 32768> dumping_;
55+
SpscRingQueue<TaskPair> waiting_;
56+
SpscRingQueue<ShardTask> dumping_;
5757
std::thread dispatcher_;
5858
std::thread dumper_;
5959

ucm/store/cache/cc/global_config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ struct Config {
3939
size_t blockSize{0};
4040
size_t bufferSize{0};
4141
bool shareBufferEnable{false};
42+
size_t waitingQueueDepth{1024};
43+
size_t runningQueueDepth{32768};
4244
size_t transferTimeoutMs{30000};
4345
};
4446

ucm/store/cache/cc/load_queue.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ Status LoadQueue::Setup(const Config& config, TaskIdSet* failureSet, TransBuffer
3939
failureSet_ = failureSet;
4040
buffer_ = buffer;
4141
backend_ = static_cast<Store*>((void*)config.backend);
42+
waiting_.Setup(config.waitingQueueDepth);
43+
running_.Setup(config.runningQueueDepth);
4244
dispatcher_ = std::thread{&LoadQueue::DispatchStage, this};
4345
std::promise<Status> started;
4446
auto fut = started.get_future();

ucm/store/cache/cc/load_queue.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ class LoadQueue {
5555
TaskIdSet* failureSet_{nullptr};
5656
TransBuffer* buffer_{nullptr};
5757
Store* backend_{nullptr};
58-
SpscRingQueue<TaskPair, 1024> waiting_;
59-
SpscRingQueue<ShardTask, 32768> running_;
58+
SpscRingQueue<TaskPair> waiting_;
59+
SpscRingQueue<ShardTask> running_;
6060
std::thread dispatcher_;
6161
std::thread transfer_;
6262

ucm/store/cache/cc/trans_buffer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class LocalBufferStrategy : public BufferStrategy {
116116
return Status::OutOfMemory();
117117
}
118118
for (size_t i = 0; i < nHashTableBucket; i++) { header_.buckets[i] = invalidIndex; }
119-
for (size_t i = 0; i < nNode; i++) { meta_.get()[i].Init(); }
119+
for (size_t i = 0; i < nNode; i++) { meta_[i].Init(); }
120120
header_.freeHead = 0;
121121
header_.nodeSize = nodeSize;
122122
header_.nNode = nNode;

ucm/store/cache/connector.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ def __init__(self, config: Dict[str, object]) -> None:
5050
"block_size": "blockSize",
5151
"buffer_size": "bufferSize",
5252
"share_buffer_enable": "shareBufferEnable",
53+
"waiting_queue_depth": "waitingQueueDepth",
54+
"running_queue_depth": "runningQueueDepth",
5355
"transfer_timeout_ms": "transferTimeoutMs",
5456
}
5557
self.store = ucmcachestore.CacheStore()
@@ -148,6 +150,8 @@ def check(self, task: Task) -> bool:
148150
config["block_size"] = 32768 * 4 * 64
149151
config["buffer_size"] = 32768 * 4 * 64 * 2048
150152
config["share_buffer_enable"] = False
153+
config["waiting_queue_depth"] = 128
154+
config["running_queue_depth"] = 1024
151155
config["transfer_timeout_ms"] = 30000
152156
store = UcmCacheStore(config)
153157
block_ids = [secrets.token_bytes(16) for _ in range(1024)]

0 commit comments

Comments
 (0)