Skip to content

Commit 09769f9

Browse files
committed
[yugabyte#26818] DocDB: Pull RWQueue from libcds and use it in rpc::ThreadPool
Summary: **libcds** implements a lock-free queues in C++ (which lacks garbage collection) using hazard pointers. However, using its lock-free containers requires attaching each thread to the libcds manager, adding unnecessary friction—especially when working with `std::async`. **libcds** also provides **RWQueue**, a non-lock-free but highly optimized concurrent queue. In benchmarks like `LockfreeTest.QueuePerformance`, it often outperforms lock-free alternatives. Pulled RWQueue's implementation our codebase. Also introduced `TrivialSpinlock` that outperforms `simple_spinlock` when used with RWQueue. The results for RpcStubTest.TestRpcPerformance this diff on n2-standard-64: ``` Total: 258.347ms, calls per second: 193538 (5.000us per call, NOT latency), slow calls: 0% Total: 277.650ms, calls per second: 180082 (5.000us per call, NOT latency), slow calls: 0% Total: 344.553ms, calls per second: 145115 (6.000us per call, NOT latency), slow calls: 0% Total: 289.964ms, calls per second: 172435 (5.000us per call, NOT latency), slow calls: 0% ``` master: ``` Total: 273.951ms, calls per second: 182514 (5.000us per call, NOT latency), slow calls: 0% Total: 341.373ms, calls per second: 146467 (6.000us per call, NOT latency), slow calls: 0% Total: 247.243ms, calls per second: 202230 (4.000us per call, NOT latency), slow calls: 0% Total: 344.846ms, calls per second: 144992 (6.000us per call, NOT latency), slow calls: 0% ``` Jira: DB-16208 Test Plan: Jenkins Reviewers: hsunder Reviewed By: hsunder Subscribers: esheng, ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D43194
1 parent 9e2a92c commit 09769f9

File tree

5 files changed

+223
-15
lines changed

5 files changed

+223
-15
lines changed

src/yb/rpc/thread_pool.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121

2222
#include <boost/intrusive/list.hpp>
2323

24-
#include <cds/container/basket_queue.h>
25-
#include <cds/gc/dhp.h>
26-
24+
#include "yb/util/concurrent_queue.h"
2725
#include "yb/util/flags.h"
2826
#include "yb/util/lockfree.h"
2927
#include "yb/util/scope_exit.h"
@@ -43,7 +41,7 @@ namespace {
4341

4442
class Worker;
4543

46-
using TaskQueue = cds::container::BasketQueue<cds::gc::DHP, ThreadPoolTask*>;
44+
using TaskQueue = RWQueue<ThreadPoolTask*>;
4745
using WaitingWorkers = LockFreeStack<Worker>;
4846

4947
struct ThreadPoolShare {

src/yb/util/concurrent_queue.h

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright (c) YugabyteDB, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4+
// in compliance with the License. You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software distributed under the License
9+
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10+
// or implied. See the License for the specific language governing permissions and limitations
11+
// under the License.
12+
//
13+
14+
#pragma once
15+
16+
#include "yb/util/locks.h"
17+
18+
namespace yb {
19+
20+
// RWQueue implementation from [1998] Maged Michael, Michael Scott
21+
// "Simple, fast, and practical non-blocking and blocking concurrent queue algorithms"
22+
template <typename T>
23+
class RWQueue {
24+
public:
25+
using value_type = T;
26+
27+
RWQueue() {
28+
head_.node = tail_.node = new Node;
29+
}
30+
31+
~RWQueue() {
32+
clear();
33+
DCHECK_EQ(head_.node, tail_.node);
34+
delete head_.node;
35+
}
36+
37+
template <class... Args>
38+
void Push(Args&&... value) {
39+
auto node = new Node(std::forward<Args>(value)...);
40+
std::lock_guard lock(tail_.mutex);
41+
tail_.node->next.store(node, std::memory_order_release);
42+
tail_.node = node;
43+
}
44+
45+
template <class... Args>
46+
bool push(Args&&... value) {
47+
Push(std::forward<Args>(value)...);
48+
return true;
49+
}
50+
51+
bool Pop(value_type& value) {
52+
return DoPop(value);
53+
}
54+
55+
bool pop(value_type& value) {
56+
return Pop(value);
57+
}
58+
59+
std::optional<value_type> Pop() {
60+
std::optional<value_type> result;
61+
DoPop(result);
62+
return result;
63+
}
64+
65+
std::optional<value_type> pop() {
66+
return Pop();
67+
}
68+
69+
void Clear() {
70+
Node* head;
71+
Node* tail;
72+
{
73+
std::lock_guard lock_head(head_.mutex);
74+
std::lock_guard lock_tail(tail_.mutex);
75+
head = head_.node;
76+
tail = tail_.node;
77+
head_.node = tail;
78+
}
79+
while (head != tail) {
80+
auto* next = head->next.load(std::memory_order_relaxed);
81+
if (!next) {
82+
break;
83+
}
84+
delete head;
85+
head = next;
86+
}
87+
}
88+
89+
void clear() {
90+
Clear();
91+
}
92+
93+
bool empty() const {
94+
std::lock_guard lock(head_.mutex);
95+
return head_.node->next.load(std::memory_order_relaxed) == nullptr;
96+
}
97+
98+
private:
99+
template <class Out>
100+
bool DoPop(Out& value) {
101+
Node* node;
102+
{
103+
std::lock_guard lock(head_.mutex);
104+
node = head_.node;
105+
auto new_head = node->next.load(std::memory_order_acquire);
106+
if (!new_head) {
107+
return false;
108+
}
109+
value = std::move(new_head->value);
110+
head_.node = new_head;
111+
}
112+
delete node;
113+
return true;
114+
}
115+
116+
struct Node {
117+
std::atomic<Node*> next{nullptr};
118+
value_type value;
119+
120+
template <typename... Args>
121+
explicit Node(Args&&... args)
122+
: value(std::forward<Args>(args)...) {}
123+
};
124+
125+
struct EndType {
126+
mutable TrivialSpinlock mutex;
127+
Node* node;
128+
};
129+
130+
alignas(CACHELINE_SIZE) EndType head_;
131+
alignas(CACHELINE_SIZE) EndType tail_;
132+
};
133+
134+
} // namespace yb

src/yb/util/lockfree-test.cc

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
//
1515

1616
#include <atomic>
17+
#include <regex>
1718
#include <string>
1819
#include <thread>
1920

@@ -27,6 +28,8 @@
2728
#include <cds/gc/dhp.h>
2829
#include <gtest/gtest.h>
2930

31+
#include "yb/util/concurrent_queue.h"
32+
#include "yb/util/flags.h"
3033
#include "yb/util/lockfree.h"
3134
#include "yb/util/logging.h"
3235
#include "yb/util/monotime.h"
@@ -35,6 +38,9 @@
3538
#include "yb/util/thread.h"
3639
#include "yb/util/tsan_util.h"
3740

41+
DEFINE_test_flag(string, queue_name_regex, "",
42+
"Regex to filter queue by name in LockfreeTest.QueuePerformance test");
43+
3844
using namespace std::literals;
3945

4046
namespace yb {
@@ -44,13 +50,14 @@ struct TestEntry : public MPSCQueueEntry<TestEntry> {
4450
size_t index;
4551
};
4652

47-
TEST(LockfreeTest, MPSCQueueSimple) {
53+
template<class Queue, class NoneValue>
54+
void TestQueueSimple(const NoneValue& none_value) {
4855
const size_t kTotalEntries = 10;
4956
std::vector<TestEntry> entries(kTotalEntries);
5057
for (size_t i = 0; i != entries.size(); ++i) {
5158
entries[i].index = i;
5259
}
53-
MPSCQueue<TestEntry> queue;
60+
Queue queue;
5461

5562
// Push pop 1 entry
5663
queue.Push(&entries[0]);
@@ -65,6 +72,13 @@ TEST(LockfreeTest, MPSCQueueSimple) {
6572
ASSERT_EQ(&entry, queue.Pop());
6673
}
6774

75+
for (auto& entry : entries) {
76+
queue.Push(&entry);
77+
}
78+
79+
queue.Clear();
80+
ASSERT_EQ(none_value, queue.Pop());
81+
6882
// Mixed push and pop
6983
queue.Push(&entries[0]);
7084
queue.Push(&entries[1]);
@@ -82,12 +96,20 @@ TEST(LockfreeTest, MPSCQueueSimple) {
8296
ASSERT_EQ(&entries[5], queue.Pop());
8397
ASSERT_EQ(&entries[6], queue.Pop());
8498
ASSERT_EQ(&entries[7], queue.Pop());
85-
ASSERT_EQ(nullptr, queue.Pop());
99+
ASSERT_EQ(none_value, queue.Pop());
86100
queue.Push(&entries[8]);
87101
queue.Push(&entries[9]);
88102
ASSERT_EQ(&entries[8], queue.Pop());
89103
ASSERT_EQ(&entries[9], queue.Pop());
90-
ASSERT_EQ(nullptr, queue.Pop());
104+
ASSERT_EQ(none_value, queue.Pop());
105+
}
106+
107+
TEST(LockfreeTest, MPSCQueueSimple) {
108+
TestQueueSimple<MPSCQueue<TestEntry>>(nullptr);
109+
}
110+
111+
TEST(LockfreeTest, RWQueueSimple) {
112+
TestQueueSimple<RWQueue<TestEntry*>>(std::nullopt);
91113
}
92114

93115
TEST(LockfreeTest, MPSCQueueConcurrent) {
@@ -281,6 +303,7 @@ class QueuePerformanceHelper {
281303
cds::container::optimistic_queue::make_traits<OptAllocator>::type>>(
282304
"OptimisticQueue/BlockAllocator/DHP");
283305
TestQueue<cds::container::RWQueue<ptrdiff_t>>("RWQueue");
306+
TestQueue<RWQueue<ptrdiff_t>>("YBRWQueue");
284307
// On GCC11, segmented queue seems to call sized delete with a different size than it allocates
285308
// with, which causes a segfault in tcmalloc.
286309
// See issue https://github.com/khizmax/libcds/issues/181.
@@ -368,7 +391,7 @@ class QueuePerformanceHelper {
368391
start_latch.Wait();
369392
auto start = MonoTime::Now();
370393

371-
bool wait_result = finish_latch.WaitUntil(start + 10s);
394+
bool wait_result = finish_latch.WaitUntil(start + 30s);
372395
auto stop = MonoTime::Now();
373396
auto passed = stop - start;
374397

@@ -395,14 +418,14 @@ class QueuePerformanceHelper {
395418
}
396419
}
397420

398-
template <class T>
399-
void TestQueue(const std::string& name) {
400-
T queue;
401-
DoTestQueue(name, &queue);
402-
}
403-
404421
template <class T, class... Args>
405422
void TestQueue(const std::string& name, Args&&... args) {
423+
if (!name.empty() && !FLAGS_TEST_queue_name_regex.empty()) {
424+
std::regex regex(FLAGS_TEST_queue_name_regex, std::regex::egrep);
425+
if (!regex_match(name, regex)) {
426+
return;
427+
}
428+
}
406429
T queue(std::forward<Args>(args)...);
407430
DoTestQueue(name, &queue);
408431
}

src/yb/util/lockfree.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "yb/gutil/dynamic_annotations.h"
2323
#include "yb/gutil/macros.h"
24+
2425
#include "yb/util/atomic.h" // For IsAcceptableAtomicImpl
2526

2627
namespace yb {
@@ -112,6 +113,11 @@ class MPSCQueue {
112113
return push_head_.load(std::memory_order_acquire) == nullptr;
113114
}
114115

116+
void Clear() {
117+
pop_head_ = nullptr;
118+
push_head_.store(nullptr, std::memory_order_release);
119+
}
120+
115121
void Drain() {
116122
while (auto* entry = Pop()) {
117123
delete entry;

src/yb/util/locks.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,4 +329,51 @@ class SemaphoreLock {
329329
Semaphore& semaphore_;
330330
};
331331

332+
// simple_spinlock is something intermediate between spinlock and mutex, because it could fallback
333+
// waiting on futex. Also, it collects stats, etc.
334+
// TrivialSpinlock does not have such overhead and is more performant. For instance while testing
335+
// RWQueue in conjunction with it, the execution time was 1.5 times lower.
336+
class CAPABILITY("mutex") TrivialSpinlock {
337+
public:
338+
TrivialSpinlock() = default;
339+
340+
~TrivialSpinlock() {
341+
DCHECK(!is_locked());
342+
}
343+
344+
void lock() ACQUIRE() {
345+
size_t lock_counter = 16;
346+
while (!try_lock()) {
347+
while (is_locked()) {
348+
// Max of 32752 pauses before we fallback to yield.
349+
if (lock_counter <= 16 * 1024) {
350+
for (size_t n = 0; n < lock_counter; ++n) {
351+
base::subtle::PauseCPU();
352+
}
353+
lock_counter *= 2;
354+
} else {
355+
std::this_thread::yield();
356+
}
357+
}
358+
}
359+
}
360+
361+
void unlock() RELEASE() {
362+
lockword_.store(false, std::memory_order_release);
363+
}
364+
365+
bool try_lock() TRY_ACQUIRE(true) {
366+
return !lockword_.exchange(true, std::memory_order_acquire);
367+
}
368+
369+
bool is_locked() {
370+
return lockword_.load(std::memory_order_relaxed);
371+
}
372+
373+
private:
374+
std::atomic<bool> lockword_{false};
375+
376+
DISALLOW_COPY_AND_ASSIGN(TrivialSpinlock);
377+
};
378+
332379
} // namespace yb

0 commit comments

Comments
 (0)