Skip to content

Commit 63a3d33

Browse files
committed
Adjust io_scheduler to use moodycamel::ConcurrentQueue
Large benchmark gains on io_scheduler tcp/tls.
1 parent a651ef3 commit 63a3d33

File tree

5 files changed

+34
-27
lines changed

5 files changed

+34
-27
lines changed

include/coro/io_scheduler.hpp

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "coro/detail/vendor/cameron314/concurrentqueue/concurrentqueue.h"
34
#include "coro/detail/poll_info.hpp"
45
#include "coro/detail/timer_handle.hpp"
56
#include "coro/expected.hpp"
@@ -145,10 +146,7 @@ class io_scheduler
145146
if (m_scheduler.m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
146147
{
147148
m_scheduler.m_size.fetch_add(1, std::memory_order::release);
148-
{
149-
std::scoped_lock lk{m_scheduler.m_scheduled_tasks_mutex};
150-
m_scheduler.m_scheduled_tasks.emplace_back(awaiting_coroutine);
151-
}
149+
m_scheduler.m_scheduled_tasks.enqueue(awaiting_coroutine);
152150

153151
// Trigger the event to wake-up the scheduler if this event isn't currently triggered.
154152
bool expected{false};
@@ -393,10 +391,7 @@ class io_scheduler
393391
if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
394392
{
395393
m_size.fetch_add(1, std::memory_order::release);
396-
{
397-
std::scoped_lock lk{m_scheduled_tasks_mutex};
398-
m_scheduled_tasks.emplace_back(handle);
399-
}
394+
m_scheduled_tasks.enqueue(handle);
400395

401396
bool expected{false};
402397
if (m_schedule_fd_triggered.compare_exchange_strong(
@@ -481,8 +476,7 @@ class io_scheduler
481476
static auto event_to_poll_status(uint32_t events) -> poll_status;
482477

483478
auto process_scheduled_execute_inline() -> void;
484-
std::mutex m_scheduled_tasks_mutex{};
485-
std::vector<std::coroutine_handle<>> m_scheduled_tasks{};
479+
moodycamel::ConcurrentQueue<std::coroutine_handle<>> m_scheduled_tasks;
486480

487481
static constexpr const int m_shutdown_object{0};
488482
static constexpr const void* m_shutdown_ptr = &m_shutdown_object;

include/coro/thread_pool.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,13 @@ class thread_pool
239239
/// The background executor threads.
240240
std::vector<std::thread> m_threads;
241241
/// Local executor worker thread queues.
242-
std::vector<moodycamel::BlockingConcurrentQueue<std::coroutine_handle<>>> m_local_queues;
242+
std::vector<moodycamel::ConcurrentQueue<std::coroutine_handle<>>> m_local_queues;
243243
/// Global queue.
244244
moodycamel::BlockingConcurrentQueue<std::coroutine_handle<>> m_global_queue;
245245

246246
std::vector<std::unique_ptr<executor_state>> m_executor_state;
247247

248-
auto try_steal_work(std::size_t my_idx, std::array<std::coroutine_handle<>, 4>& handles) -> bool;
248+
auto try_steal_work(std::size_t my_idx, std::array<std::coroutine_handle<>, 2>& handles) -> bool;
249249

250250
/**
251251
* Each background thread runs from this function.

src/io_scheduler.cpp

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -290,26 +290,40 @@ auto io_scheduler::process_events_execute(std::chrono::milliseconds timeout) ->
290290

291291
auto io_scheduler::process_scheduled_execute_inline() -> void
292292
{
293-
std::vector<std::coroutine_handle<>> tasks{};
293+
// Clear the schedule eventfd if this is a scheduled task.
294+
int control = 0;
295+
::read(m_schedule_fd[1], reinterpret_cast<void*>(&control), sizeof(control));
296+
297+
// Clear the in memory flag to reduce eventfd_* calls on scheduling.
298+
m_schedule_fd_triggered.exchange(false, std::memory_order::release);
299+
300+
constexpr std::size_t MAX{32};
301+
std::size_t processed{0};
302+
while (true)
294303
{
295-
// Acquire the entire list, and then reset it.
296-
std::scoped_lock lk{m_scheduled_tasks_mutex};
297-
tasks.swap(m_scheduled_tasks);
304+
std::array<std::coroutine_handle<>, MAX> tasks{nullptr};
305+
if (!m_scheduled_tasks.try_dequeue_bulk(tasks.data(), MAX))
306+
{
307+
break;;
308+
}
298309

299-
// Clear the schedule eventfd if this is a scheduled task.
300-
int control = 0;
301-
::read(m_schedule_fd[1], reinterpret_cast<void*>(&control), sizeof(control));
310+
for (std::size_t i = 0; i < MAX; ++i)
311+
{
312+
auto& handle = tasks[i];
313+
if (handle == nullptr)
314+
{
315+
break;
316+
}
302317

303-
// Clear the in memory flag to reduce eventfd_* calls on scheduling.
304-
m_schedule_fd_triggered.exchange(false, std::memory_order::release);
318+
handle.resume();
319+
++processed;
320+
}
305321
}
306322

307-
// This set of handles can be safely resumed now since they do not have a corresponding timeout event.
308-
for (auto& task : tasks)
323+
if (processed > 0)
309324
{
310-
task.resume();
325+
m_size.fetch_sub(processed, std::memory_order::release);
311326
}
312-
m_size.fetch_sub(tasks.size(), std::memory_order::release);
313327
}
314328

315329
auto io_scheduler::process_event_execute(detail::poll_info* pi, poll_status status) -> void

src/thread_pool.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
namespace coro
77
{
8-
static constexpr std::size_t MAX_HANDLES{4};
8+
static constexpr std::size_t MAX_HANDLES{2};
99

1010
thread_pool::schedule_operation::schedule_operation(thread_pool& tp) noexcept : m_thread_pool(tp)
1111
{

test/bench.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ TEST_CASE("benchmark thread_pool{1} counter task", "[benchmark]")
141141

142142
auto make_task = [](std::unique_ptr<coro::thread_pool>& tp, std::atomic<uint64_t>& c) -> coro::task<void>
143143
{
144-
co_await tp->schedule();
145144
c.fetch_add(1, std::memory_order::relaxed);
146145
co_return;
147146
};

0 commit comments

Comments
 (0)