Skip to content

Commit ae8f34e

Browse files
authored
Fix deadlock in ring_buffer and queue shutdown_drain (#402)
* Allow timeout version of io_scheduler::schedule with void tasks If the return_type is void, we cannot get the (incomplete) void type out of the result. Instead use constexpr dispatch to default-construct the expected (void) return value in these cases. * IWYU in queue.hpp and ring_buffer.hpp * Fix deadlock in ring_buffer and queue shutdown_drain If a ring_buffer (queue) produces an element such that the buffer is non-empty and then calls shutdown_drain we enter a draining loop where we yield until the ring buffer is empty. If, meanwhile, the consumer side of the buffer decides to forcibly shutdown without consuming the produced value (for example because it has received an exception that should be propagated and therefore wants to wake and shutdown the producer) then we never exit the yield-until-empty loop. To fix this only loop while the buffer is both non-empty and we are still in the draining state. The produced value(s) that are stored in the buffer are now never processed which I think is the right semantics since the consumer has explicitly said "I can't do anything with this". * Use thread_pool rather than io_scheduler to avoid strange link errors - Fixes #401
1 parent 7e0ce98 commit ae8f34e

File tree

5 files changed

+93
-11
lines changed

5 files changed

+93
-11
lines changed

include/coro/io_scheduler.hpp

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "coro/io_notifier.hpp"
88
#include "coro/poll.hpp"
99
#include "coro/thread_pool.hpp"
10+
#include <type_traits>
1011
#include <unistd.h>
1112

1213
#ifdef LIBCORO_FEATURE_NETWORKING
@@ -17,7 +18,6 @@
1718
#include <functional>
1819
#include <map>
1920
#include <memory>
20-
#include <optional>
2121
#include <stop_token>
2222
#include <thread>
2323
#include <vector>
@@ -225,13 +225,28 @@ class io_scheduler
225225
auto timeout_ms = std::max(std::chrono::duration_cast<std::chrono::milliseconds>(timeout), 0ms);
226226
if (timeout_ms == 0ms)
227227
{
228-
co_return coro::expected<return_type, timeout_status>(co_await schedule(std::move(task)));
228+
if constexpr (std::is_void_v<return_type>)
229+
{
230+
co_await schedule(std::move(task));
231+
co_return coro::expected<return_type, timeout_status>();
232+
}
233+
else
234+
{
235+
co_return coro::expected<return_type, timeout_status>(co_await schedule(std::move(task)));
236+
}
229237
}
230238

231239
auto result = co_await when_any(std::move(task), make_timeout_task(timeout_ms));
232240
if (!std::holds_alternative<timeout_status>(result))
233241
{
234-
co_return coro::expected<return_type, timeout_status>(std::move(std::get<0>(result)));
242+
if constexpr (std::is_void_v<return_type>)
243+
{
244+
co_return coro::expected<return_type, timeout_status>();
245+
}
246+
else
247+
{
248+
co_return coro::expected<return_type, timeout_status>(std::move(std::get<0>(result)));
249+
}
235250
}
236251
else
237252
{
@@ -261,13 +276,27 @@ class io_scheduler
261276
auto timeout_ms = std::max(std::chrono::duration_cast<std::chrono::milliseconds>(timeout), 0ms);
262277
if (timeout_ms == 0ms)
263278
{
264-
co_return coro::expected<return_type, timeout_status>(co_await schedule(std::move(task)));
279+
if constexpr (std::is_void_v<return_type>)
280+
{
281+
co_return coro::expected<return_type, timeout_status>();
282+
}
283+
else
284+
{
285+
co_return coro::expected<return_type, timeout_status>(co_await schedule(std::move(task)));
286+
}
265287
}
266288

267289
auto result = co_await when_any(std::move(stop_source), std::move(task), make_timeout_task(timeout_ms));
268290
if (!std::holds_alternative<timeout_status>(result))
269291
{
270-
co_return coro::expected<return_type, timeout_status>(std::move(std::get<0>(result)));
292+
if constexpr (std::is_void_v<return_type>)
293+
{
294+
co_return coro::expected<return_type, timeout_status>();
295+
}
296+
else
297+
{
298+
co_return coro::expected<return_type, timeout_status>(std::move(std::get<0>(result)));
299+
}
271300
}
272301
else
273302
{
@@ -282,7 +311,7 @@ class io_scheduler
282311
* Given zero or negative amount of time this behaves identical to schedule().
283312
*/
284313
template<class rep_type, class period_type>
285-
[[nodiscard]] auto schedule_after(std::chrono::duration<rep_type, period_type> amount) -> coro::task<void>
314+
[[nodiscard]] auto schedule_after(std::chrono::duration<rep_type, period_type> amount) -> coro::task<void>
286315
{
287316
return yield_for_internal(std::chrono::duration_cast<std::chrono::nanoseconds>(amount));
288317
}

include/coro/queue.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
#include "coro/concepts/executor.hpp"
44
#include "coro/expected.hpp"
5+
#include "coro/mutex.hpp"
56
#include "coro/sync_wait.hpp"
67
#include "coro/task.hpp"
78

9+
#include <atomic>
810
#include <queue>
911

1012
namespace coro
@@ -377,7 +379,7 @@ class queue
377379
}
378380
lk.unlock();
379381

380-
while (!empty())
382+
while (!empty() && m_running_state.load(std::memory_order::acquire) == running_state_t::draining)
381383
{
382384
co_await e->yield();
383385
}

include/coro/ring_buffer.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
#pragma once
22

3+
#include "coro/concepts/executor.hpp"
4+
#include "coro/detail/awaiter_list.hpp"
35
#include "coro/expected.hpp"
46
#include "coro/mutex.hpp"
7+
#include "coro/sync_wait.hpp"
58
#include "coro/task.hpp"
69

710
#include <array>
811
#include <atomic>
912
#include <coroutine>
10-
#include <mutex>
13+
#include <memory>
1114
#include <optional>
1215

1316
namespace coro
@@ -301,7 +304,7 @@ class ring_buffer
301304
produce_waiters = next;
302305
}
303306

304-
while (!empty())
307+
while (!empty() && m_running_state.load(std::memory_order::acquire) == running_state_t::draining)
305308
{
306309
co_await e->yield();
307310
}

test/test_queue.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,30 @@ TEST_CASE("queue.try_pop", "[queue]")
237237
REQUIRE(expected.error() == coro::queue_consume_result::stopped);
238238
}
239239

240+
TEST_CASE("queue shutdown_drain non-empty consumer shutdown", "[queue]")
241+
{
242+
std::cerr << "BEGIN queue issue-401\n";
243+
244+
coro::queue<int> q{};
245+
auto exec = coro::thread_pool::make_unique(coro::thread_pool::options{.thread_count = 1});
246+
247+
const auto producer = [](coro::queue<int>& q, std::unique_ptr<coro::thread_pool>& exec) -> coro::task<void>
248+
{
249+
auto r = co_await q.push(1);
250+
REQUIRE(r == coro::queue_produce_result::produced);
251+
co_await q.shutdown_drain(exec);
252+
REQUIRE(q.is_shutdown());
253+
};
254+
const auto consumer = [](coro::queue<int>& q) -> coro::task<void>
255+
{
256+
co_await q.shutdown();
257+
REQUIRE(q.is_shutdown());
258+
};
259+
260+
std::ignore = coro::sync_wait(coro::when_all(exec->schedule(producer(q, exec)), exec->schedule(consumer(q))));
261+
std::cerr << "END queue issue-401\n";
262+
}
263+
240264
TEST_CASE("~queue", "[queue]")
241265
{
242266
std::cerr << "[~queue]\n\n";

test/test_ring_buffer.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
#include <coro/coro.hpp>
44

5-
#include <chrono>
65
#include <iostream>
7-
#include <thread>
86

97
TEST_CASE("ring_buffer", "[ring_buffer]")
108
{
@@ -413,6 +411,32 @@ TEST_CASE("ring_buffer issue-242 basic type", "[ring_buffer]")
413411
std::cerr << "END ring_buffer issue-242 basic type\n";
414412
}
415413

414+
TEST_CASE("ring_buffer shutdown_drain non-empty consumer shutdown", "[ring_buffer]")
415+
{
416+
std::cerr << "BEGIN ring_buffer issue-401\n";
417+
418+
coro::ring_buffer<int, 1> buffer;
419+
auto exec = coro::thread_pool::make_unique(coro::thread_pool::options{.thread_count = 1});
420+
421+
const auto producer = [](coro::ring_buffer<int, 1>& buffer,
422+
std::unique_ptr<coro::thread_pool>& exec) -> coro::task<void>
423+
{
424+
auto r = co_await buffer.produce(1);
425+
REQUIRE(r == coro::ring_buffer_result::produce::produced);
426+
co_await buffer.shutdown_drain(exec);
427+
REQUIRE(buffer.is_shutdown());
428+
};
429+
const auto consumer = [](coro::ring_buffer<int, 1>& buffer) -> coro::task<void>
430+
{
431+
co_await buffer.shutdown();
432+
REQUIRE(buffer.is_shutdown());
433+
};
434+
435+
std::ignore =
436+
coro::sync_wait(coro::when_all(exec->schedule(producer(buffer, exec)), exec->schedule(consumer(buffer))));
437+
std::cerr << "END ring_buffer issue-401\n";
438+
}
439+
416440
TEST_CASE("~ring_buffer", "[ring_buffer]")
417441
{
418442
std::cerr << "[~ring_buffer]\n\n";

0 commit comments

Comments
 (0)