Skip to content

Conversation

@wence-
Copy link
Contributor

@wence- wence- commented Nov 24, 2025

To avoid lost wakeups, use an internal mutex to protect modification of the awaiter list.

Now, while we are notifying, if another notification task arrives it must wait for our modification of the awaiter list to complete. Hence waiters that were not ready and are pushed back on the list have a chance to be woken up by the next notification.

I came back to the issue raised in #398 because we have a use case now where condvars make the most sense and we legitimately might have multiple simultaneous notification calls.

Since the modification awaiter list representing the suspended awaiters waiting for a notification crosses coroutine boundaries, I don't think we can protect it with a std::mutex. Hence I use a coro::mutex (internally managed by the condvar).

To avoid lost wakeups, use an internal mutex to protect modification of the
awaiter list.

Now, while we are notifying, if another notification task arrives it must
wait for our modification of the awaiter list to complete. Hence waiters
that were not ready and are pushed back on the list have a chance to be
woken up by the next notification.

- Closes jbaldwin#398
auto notify_all(std::unique_ptr<executor_type>& executor) -> void
auto notify_all(std::unique_ptr<executor_type>& executor) -> coro::task<void>
{
co_await m_notify_mutex.scoped_lock();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will spawn all the tasks but it doesn't guarantee that the notify tasks have completed and put non-ready tasks back onto the list. I think this is still race conditiony.

Perhaps we need to run the notify is ready / predicate synchronously here and then spawn the user task IIF the predicate is ready?

This is psuedo code and might have some restrictions I haven't thought of but I think splitting the predicate/notify and the user resume into a spawn would allow for the new lock to be held for the correct amount of time to guarantee any waiters that are not ready on the notify_all() will be properly placed back on the waiters list.

co_await m_notify_mutex.scoped_lock();
auto* waiter = detail::awaiter_list_pop_all(m_awaiters);

while (waiter != nullptr)
{
    auto* next = waiter->m_next;
    // the notify calls need to be inline with the new lock to guarantee non-ready waiters are placed
    // back onto the waiters list to not miss notifications
    await make_notify_all_executor_individual_task(waiter);
    waiter = next;
}

co_return;

.... within make_notify_all_executor_individual_task....

auto make_notify_all_executor_individual_task(awaiter_base* waiter) -> coro::task<void>
{
   // this on_notify() is a problem since it resumes internally and we cannot spawn.. so we might
   // need to rethink how this works, perhaps split it as on_notify() -> resume() where this switch
   // statement can call executor->spawn(waiter->resume()) ?

   // or we pass in the executor to on_notify() by pointer and if it isn't nullptr then it can spawn
   // otherwise it resumes inline -- that will hold the internal lock for the duration of the condvar lock
   // as well, which is maybe ok?
    switch (co_await waiter->on_notify())
    {
        case notify_status_t::not_ready:
            // Re-enqueue since the predicate isn't ready and return since the notify has been satisfied.
            detail::awaiter_list_push(m_awaiters, waiter);
            break;
        case notify_status_t::ready:
        case notify_status_t::awaiter_dead:
            // Don't re-enqueue any awaiters that are ready or dead.
            break;
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. I suppose another would be to use a task_container and then yield until the tasks have all completed in this function. Then the lock is held for the notification and wakeups

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I think that would work.

I also think task_container needs a latch style wait though, it spins currently which is really inefficient. coro::when_all is a little heavier on managing the tasks but it doesn't spin wait.

@jbaldwin
Copy link
Owner

@wence- I've played around with renaming to task_group and make it more ergonomic in that you have to give it the full set of tasks upfront. There were definitely some race conditions on the yield_until_empty() if you keep adding tasks into it. I think giving it the full set of tasks upfront makes sense? This way its basically a dynamic lifetime task tracker backed by a coro::latch so the co_await task_group.wait() method is super efficient now.

Let me know your thoughts and if this works for the use case you are considering (I think it should for the cond var change).

task_group PR
#418

@jbaldwin
Copy link
Owner

#419

Heres a stab at getting it to work with the upgraded task_group, I think this should solve the wait()/notify() race conditions.

@wence-
Copy link
Contributor Author

wence- commented Nov 26, 2025

#419

Heres a stab at getting it to work with the upgraded task_group, I think this should solve the wait()/notify() race conditions.

Thanks. It will take me a few days to find the time to do some proper testing, but from a quick glance that looks like it solves the underlying issues to me.

@wence-
Copy link
Contributor Author

wence- commented Nov 26, 2025

One thing I would note with the task_group change, I was using a task_container morally like this:

struct ThingThatReceivesCallbacks {
  coro::task<void> async_notify(coro::latch& latch, args) {
      do_stuff_with(args);
      latch.count_down();
  }

  // This is called from a background thread
  void bridge_from_sync_code(...) {
     task_container_.start(async_notify(latch_, ...));
  }

  coro::task<void> wait_for_all_notifications(...) {
     co_await latch_;
     // This is so the coroutine unwinding doesn't jump into our dtor
     // before the final `async_notify` task completes.
     co_await task_container_.yield_until_empty();
  }
};

I think with the new task_group I can't do this, because I don't have the tasks up front, I only know how many there are.

I think I can instead do:

struct ThingThatReceivesCallbacks {
  coro::task<void> async_notify(coro::latch& latch, args) {
      do_stuff_with(args);
      latch.count_down();
  }

  // This is called from a background thread
  void bridge_from_sync_code(std::unique_ptr<thread_pool>& executor) {
     executor.spawn(async_notify(latch_, ...));
  }

  coro::task<void> wait_for_all_notifications(std::unique_ptr<thread_pool>& executor, ...) {
     co_await latch_;
     // This is so the coroutine unwinding doesn't jump into our dtor
     // before the final `async_notify` task completes.
     co_await executor.yield();
  }
};

I think that's equivalent because I can be guaranteed that I only need to yield once to put myself at the back of the task queue to ensure that the final async_notify completes before I go out of scope.

@jbaldwin
Copy link
Owner

jbaldwin commented Nov 26, 2025

Yeah I think that's correct on your change. You're basically using the latch as a group already so that should work and just scheduling into the executor.

I found that the task group has a real race condition if it allows tasks to be added slowly when you wait for it to be empty, which is why I changed it to be all upfront. Technically it was a problem before with the yield until empty but that must have been slow enough for the tests to never trigger.

@jbaldwin
Copy link
Owner

jbaldwin commented Dec 7, 2025

I re-added the task_group::start(task) method and managed to figure out how to do the tests without the race condition, so if you want to re-update this PR with the updated task_group we can move it forward again.

Sorry for the turbulence in the api, I think we've landed on something good now though.

edit: I had forgotten I made a modification ontop of this PR, if that is the preferred approach I can rebase and get that version merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

deadlock/missing wake in condition_variable notify_all?

2 participants