Skip to content

Commit 7e0ce98

Browse files
authored
coro::semaphore uses coro::mutex during shutdown (#399)
* coro::semaphore uses coro::mutex during shutdown If there are multiple in-flight tasks acquiring a semaphore while another task is calling shutdown, it was possible for a race to occur between setting the shutdown flag and popping all awaiters to resume them and the in-flight acquire suspending itself and adding itself to the awaiter list. Consequently the in-progress suspending awaiter would get lost and never wake. Since both acquire and release already grab the internal mutex we can solve the problem by doing the same in shutdown such that flipping the shutdown switch and popping the awaiters is atomic. It now cannot be the case that an acquire mid-suspend can add itself to awaiter list after the shutdown flag is flipped. As a consequence, shutdown is now also a coroutine that must be awaited. * No need to grab the lock if we're already shutdown * Fix semaphore example
1 parent 276b19c commit 7e0ce98

File tree

4 files changed

+12
-6
lines changed

4 files changed

+12
-6
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ int main()
689689
if (result == coro::semaphore_acquire_result::acquired)
690690
{
691691
std::cout << task_num << ", ";
692-
semaphore.release();
692+
co_await semaphore.release();
693693
}
694694
else
695695
{

examples/coro_semaphore.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ int main()
1818
if (result == coro::semaphore_acquire_result::acquired)
1919
{
2020
std::cout << task_num << ", ";
21-
semaphore.release();
21+
co_await semaphore.release();
2222
}
2323
else
2424
{

include/coro/semaphore.hpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "coro/expected.hpp"
55
#include "coro/export.hpp"
66
#include "coro/mutex.hpp"
7+
#include "coro/sync_wait.hpp"
78

89
#include <atomic>
910
#include <coroutine>
@@ -88,7 +89,7 @@ class semaphore
8889
: m_counter(starting_value)
8990
{ }
9091

91-
~semaphore() { shutdown(); }
92+
~semaphore() { coro::sync_wait(shutdown()); }
9293

9394
semaphore(const semaphore&) = delete;
9495
semaphore(semaphore&&) = delete;
@@ -167,12 +168,17 @@ class semaphore
167168
* Stops the semaphore and will notify all release/acquire waiters to wake up in a failed state.
168169
* Once this is set it cannot be un-done and all future oprations on the semaphore will fail.
169170
*/
170-
auto shutdown() noexcept -> void
171+
[[nodiscard]] auto shutdown() noexcept -> coro::task<void>
171172
{
173+
if (is_shutdown()) {
174+
co_return;
175+
}
176+
auto lock = co_await m_mutex.scoped_lock();
172177
bool expected{false};
173178
if (m_shutdown.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
174179
{
175180
auto* waiter = detail::awaiter_list_pop_all(m_acquire_waiters);
181+
lock.unlock();
176182
while (waiter != nullptr)
177183
{
178184
auto* next = waiter->m_next;

test/test_semaphore.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ TEST_CASE("semaphore produce consume", "[semaphore]")
183183
}
184184

185185
std::cerr << "producer exiting\n";
186-
s.shutdown();
186+
co_await s.shutdown();
187187
co_return;
188188
};
189189

@@ -252,7 +252,7 @@ TEST_CASE("semaphore 1 producers and many consumers", "[semaphore]")
252252
}
253253

254254
std::cerr << "producer " << id << " exiting\n";
255-
s.shutdown();
255+
co_await s.shutdown();
256256
co_return;
257257
};
258258

0 commit comments

Comments
 (0)