Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 80 additions & 28 deletions include/coro/condition_variable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "coro/task.hpp"
#include "coro/event.hpp"
#include "coro/mutex.hpp"
#include "coro/when_any.hpp"
#include "coro/when_all.hpp"

#include <atomic>
#include <chrono>
Expand Down Expand Up @@ -39,7 +39,7 @@ class condition_variable

struct awaiter_base
{
awaiter_base(coro::condition_variable& cv, coro::scoped_lock& l);
awaiter_base(coro::condition_variable& cv, coro::scoped_lock& l, coro::scoped_lock notify_lock);
virtual ~awaiter_base() = default;

awaiter_base(const awaiter_base&) = delete;
Expand All @@ -55,6 +55,8 @@ class condition_variable
coro::condition_variable& m_condition_variable;
/// @brief The lock that the wait() was called with.
coro::scoped_lock& m_lock;
/// @brief Lock used for mutual exclusion between notifiers and waiters
coro::scoped_lock m_notify_lock;

/// @brief Each awaiter type defines its own notify behavior.
/// @return The status of if the waiter's notify result.
Expand All @@ -63,7 +65,7 @@ class condition_variable

struct awaiter : public awaiter_base
{
awaiter(coro::condition_variable& cv, coro::scoped_lock& l) noexcept;
awaiter(coro::condition_variable& cv, coro::scoped_lock& l, coro::scoped_lock notify_lock) noexcept;
~awaiter() override = default;

awaiter(const awaiter&) = delete;
Expand All @@ -83,6 +85,7 @@ class condition_variable
awaiter_with_predicate(
coro::condition_variable& cv,
coro::scoped_lock& l,
coro::scoped_lock notify_lock,
predicate_type p
) noexcept;
~awaiter_with_predicate() override = default;
Expand All @@ -109,6 +112,7 @@ class condition_variable
awaiter_with_predicate_stop_token(
coro::condition_variable& cv,
coro::scoped_lock& l,
coro::scoped_lock notify_lock,
predicate_type p,
std::stop_token stop_token
) noexcept;
Expand Down Expand Up @@ -183,6 +187,7 @@ class condition_variable
awaiter_with_wait_hook(
coro::condition_variable& cv,
coro::scoped_lock& l,
coro::scoped_lock notify_lock,
controller_data& data
) noexcept;
~awaiter_with_wait_hook() override = default;
Expand All @@ -199,11 +204,12 @@ class condition_variable
std::unique_ptr<io_executor_type>& executor,
coro::condition_variable& cv,
coro::scoped_lock& l,
coro::scoped_lock notify_lock,
const std::chrono::nanoseconds wait_for,
std::optional<predicate_type> predicate = std::nullopt,
std::optional<std::stop_token> stop_token = std::nullopt
) noexcept
: awaiter_base(cv, l),
: awaiter_base(cv, l, std::move(notify_lock)),
m_executor(executor),
m_wait_for(wait_for),
m_predicate(std::move(predicate)),
Expand Down Expand Up @@ -277,8 +283,9 @@ class condition_variable
{
controller_data data{m_status, m_predicate_result, std::move(m_predicate), std::move(m_stop_token)};
// We enqueue the hook_task since we can make it live until the notify occurs and will properly resume the actual coroutine only once.
awaiter_with_wait_hook hook_task{m_condition_variable, m_lock, data};
awaiter_with_wait_hook hook_task{m_condition_variable, m_lock, std::move(m_notify_lock), data};
detail::awaiter_list_push(m_condition_variable.m_awaiters, static_cast<awaiter_base*>(&hook_task));
hook_task.m_notify_lock.unlock();
m_lock.m_mutex->unlock(); // Unlock the actual lock now that we are setup, not the fake hook task.

co_await coro::when_all(make_on_notify_callback_task(data), make_timeout_task(data));
Expand Down Expand Up @@ -378,16 +385,16 @@ class condition_variable

/**
* @brief Notifies all waiters and resumes them on the given executor. Note that each waiter must be notified synchronously so
* this is useful if the task is long lived and can be immediately parallelized after the condition is ready. This does not
* need to be co_await'ed like `notify_all()` since this will execute the notify on the given executor.
* this is useful if the task is long lived and can be immediately parallelized after the condition is ready.
*
* @tparam executor_type The type of executor that the waiters will be resumed on.
* @param executor The executor that each waiter will be resumed on.
* @return void
* @return void coroutine to be awaited.
*/
template<coro::concepts::executor executor_type>
auto notify_all(std::unique_ptr<executor_type>& executor) -> void
auto notify_all(std::unique_ptr<executor_type>& executor) -> coro::task<void>
{
co_await m_notify_mutex.scoped_lock();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will spawn all the tasks but it doesn't guarantee that the notify tasks have completed and put non-ready tasks back onto the list. I think this is still race conditiony.

Perhaps we need to run the notify is ready / predicate synchronously here and then spawn the user task IIF the predicate is ready?

This is psuedo code and might have some restrictions I haven't thought of but I think splitting the predicate/notify and the user resume into a spawn would allow for the new lock to be held for the correct amount of time to guarantee any waiters that are not ready on the notify_all() will be properly placed back on the waiters list.

co_await m_notify_mutex.scoped_lock();
auto* waiter = detail::awaiter_list_pop_all(m_awaiters);

while (waiter != nullptr)
{
    auto* next = waiter->m_next;
    // the notify calls need to be inline with the new lock to guarantee non-ready waiters are placed
    // back onto the waiters list to not miss notifications
    await make_notify_all_executor_individual_task(waiter);
    waiter = next;
}

co_return;

.... within make_notify_all_executor_individual_task....

auto make_notify_all_executor_individual_task(awaiter_base* waiter) -> coro::task<void>
{
   // this on_notify() is a problem since it resumes internally and we cannot spawn.. so we might
   // need to rethink how this works, perhaps split it as on_notify() -> resume() where this switch
   // statement can call executor->spawn(waiter->resume()) ?

   // or we pass in the executor to on_notify() by pointer and if it isn't nullptr then it can spawn
   // otherwise it resumes inline -- that will hold the internal lock for the duration of the condvar lock
   // as well, which is maybe ok?
    switch (co_await waiter->on_notify())
    {
        case notify_status_t::not_ready:
            // Re-enqueue since the predicate isn't ready and return since the notify has been satisfied.
            detail::awaiter_list_push(m_awaiters, waiter);
            break;
        case notify_status_t::ready:
        case notify_status_t::awaiter_dead:
            // Don't re-enqueue any awaiters that are ready or dead.
            break;
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. I suppose another would be to use a task_container and then yield until the tasks have all completed in this function. Then the lock is held for the notification and wakeups

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I think that would work.

I also think task_container needs a latch style wait though, it spins currently which is really inefficient. coro::when_all is a little heavier on managing the tasks but it doesn't spin wait.

auto* waiter = detail::awaiter_list_pop_all(m_awaiters);

while (waiter != nullptr)
Expand All @@ -400,31 +407,31 @@ class condition_variable
waiter = next;
}

return;
co_return;
}


/**
* @brief Waits until notified.
*
* @param lock A lock that must be locked by the caller.
* @return awaiter
* @return void
*/
[[nodiscard]] auto wait(
coro::scoped_lock& lock
) -> awaiter;
) -> coro::task<void>;

/**
* @brief Waits until notified but only wakes up if the predicate passes.
*
* @param lock A lock that must be locked by the caller.
* @param predicate The predicate to check whether the waiting can be completed.
* @return awaiter_with_predicate
* @return void
*/
[[nodiscard]] auto wait(
coro::scoped_lock& lock,
predicate_type predicate
) -> awaiter_with_predicate;
) -> coro::task<void>;

#ifndef EMSCRIPTEN
/**
Expand All @@ -433,13 +440,13 @@ class condition_variable
* @param lock A lock which must be locked by the caller.
* @param stop_token A stop token to register interruption for.
* @param predicate The predicate to check whether the waiting can be completed.
* @return awaiter_with_predicate_stop_token The final predicate call result.
* @return bool The final predicate call result.
*/
[[nodiscard]] auto wait(
coro::scoped_lock& lock,
std::stop_token stop_token,
predicate_type predicate
) -> awaiter_with_predicate_stop_token;
) -> coro::task<bool>;
#endif

#ifdef LIBCORO_FEATURE_NETWORKING
Expand All @@ -449,9 +456,15 @@ class condition_variable
std::unique_ptr<io_executor_type>& executor,
coro::scoped_lock& lock,
const std::chrono::duration<rep_type, period_type> wait_for
) -> awaiter_with_wait<io_executor_type, std::cv_status>
) -> coro::task<std::cv_status>
{
return awaiter_with_wait<io_executor_type, std::cv_status>{executor, *this, lock, std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for)};
auto notify_lock = co_await m_notify_mutex.scoped_lock();
co_return co_await awaiter_with_wait<io_executor_type, std::cv_status>{
executor,
*this,
lock,
std::move(notify_lock),
std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for)};
}

template<concepts::io_executor io_executor_type, class rep_type, class period_type>
Expand All @@ -460,9 +473,16 @@ class condition_variable
coro::scoped_lock& lock,
const std::chrono::duration<rep_type, period_type> wait_for,
predicate_type predicate
) -> awaiter_with_wait<io_executor_type, bool>
) -> coro::task<bool>
{
return awaiter_with_wait<io_executor_type, bool>{executor, *this, lock, std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for), std::move(predicate)};
auto notify_lock = co_await m_notify_mutex.scoped_lock();
co_return co_await awaiter_with_wait<io_executor_type, bool>{
executor,
*this,
lock,
std::move(notify_lock),
std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for),
std::move(predicate)};
}

template<concepts::io_executor io_executor_type, class rep_type, class period_type>
Expand All @@ -472,21 +492,35 @@ class condition_variable
std::stop_token stop_token,
const std::chrono::duration<rep_type, period_type> wait_for,
predicate_type predicate
) -> awaiter_with_wait<io_executor_type, bool>
) -> coro::task<bool>
{
return awaiter_with_wait<io_executor_type, bool>{executor, *this, lock, std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for), std::move(predicate), std::move(stop_token)};
auto notify_lock = co_await m_notify_mutex.scoped_lock();
co_return co_await awaiter_with_wait<io_executor_type, bool>{
executor,
*this,
lock,
std::move(notify_lock),
std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for),
std::move(predicate),
std::move(stop_token)};
}

template<concepts::io_executor io_executor_type, class clock_type, class duration_type>
auto wait_until(
std::unique_ptr<io_executor_type>& executor,
coro::scoped_lock& lock,
const std::chrono::time_point<clock_type, duration_type> wait_until_time
) -> awaiter_with_wait<io_executor_type, std::cv_status>
) -> coro::task<std::cv_status>
{
auto now = std::chrono::time_point<clock_type, duration_type>::clock::now();
auto wait_for = (now < wait_until_time) ? (wait_until_time - now) : std::chrono::nanoseconds{1};
return awaiter_with_wait<io_executor_type, std::cv_status>{executor, *this, lock, std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for)};
auto notify_lock = co_await m_notify_mutex.scoped_lock();
co_return co_await awaiter_with_wait<io_executor_type, std::cv_status>{
executor,
*this,
lock,
std::move(notify_lock),
std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for)};
}

template<concepts::io_executor io_executor_type, class clock_type, class duration_type>
Expand All @@ -495,11 +529,18 @@ class condition_variable
coro::scoped_lock& lock,
const std::chrono::time_point<clock_type, duration_type> wait_until_time,
predicate_type predicate
) -> awaiter_with_wait<io_executor_type, bool>
) -> coro::task<bool>
{
auto now = std::chrono::time_point<clock_type, duration_type>::clock::now();
auto wait_for = (now < wait_until_time) ? (wait_until_time - now) : std::chrono::nanoseconds{1};
return awaiter_with_wait<io_executor_type, bool>{executor, *this, lock, std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for), std::move(predicate)};
auto notify_lock = co_await m_notify_mutex.scoped_lock();
co_return co_await awaiter_with_wait<io_executor_type, bool>{
executor,
*this,
lock,
std::move(notify_lock),
std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for),
std::move(predicate)};
}

template<concepts::io_executor io_executor_type, class clock_type, class duration_type>
Expand All @@ -509,20 +550,31 @@ class condition_variable
std::stop_token stop_token,
const std::chrono::time_point<clock_type, duration_type> wait_until_time,
predicate_type predicate
) -> awaiter_with_wait<io_executor_type, bool>
) -> coro::task<bool>
{
auto now = std::chrono::time_point<clock_type, duration_type>::clock::now();
auto wait_for = (now < wait_until_time) ? (wait_until_time - now) : std::chrono::nanoseconds{1};
return awaiter_with_wait<io_executor_type, bool>{executor, *this, lock, std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for), std::move(predicate), std::move(stop_token)};
auto notify_lock = co_await m_notify_mutex.scoped_lock();
co_return co_await awaiter_with_wait<io_executor_type, bool>{
executor,
*this,
lock,
std::move(notify_lock),
std::chrono::duration_cast<std::chrono::nanoseconds>(wait_for),
std::move(predicate),
std::move(stop_token)};
}
#endif

private:
/// @brief The list of waiters.
std::atomic<awaiter_base*> m_awaiters{nullptr};
/// @brief mutual exclusion for notification/arrival
coro::mutex m_notify_mutex;

auto make_notify_all_executor_individual_task(awaiter_base* waiter) -> coro::task<void>
{
// Precondition: m_notify_mutex is held.
switch (co_await waiter->on_notify())
{
case notify_status_t::not_ready:
Expand Down
Loading
Loading