Skip to content

Commit 276b19c

Browse files
authored
Executor types remove circular refs (#400)
* Executor types remove circular refs The executors are circular ref'ing with the std::shared_ptr being copied into worker threads for coro::thread_pool and coro::io_scheduler (spawn mode). I've determined that using std::shared_ptr is not the way to go since it gives an indeterminate destruction time, which can cause the coro::thread_pool or coro::io_scheduler to be destroyed on one of the threads running coroutines, possibly joining to itself and causing a SIGABRT. This should have been expected and I missed it and didn't consider the consequences and thought that using std::shared_ptr would make sure all the task constructs and the executors would keep each other alive. It did, since the executors circular referenced themselves, but without that circular reference the destruction is indeterminate as pointed out and cannot be reliably controlled. I think this means the best course of action is to revert to using std::unique_ptr for all executor types and force the user to make sure it lives for the correct livetime, and gets destroyed at the appropriate time on the correct thread.
1 parent 749e5b4 commit 276b19c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+511
-503
lines changed

README.md

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ int main()
8383
// execution to another thread. We'll pass the thread pool as a parameter so
8484
// the task can be scheduled.
8585
// Note that you will need to guarantee the thread pool outlives the coroutine.
86-
auto tp = coro::thread_pool::make_shared(coro::thread_pool::options{.thread_count = 1});
86+
auto tp = coro::thread_pool::make_unique(coro::thread_pool::options{.thread_count = 1});
8787

88-
auto make_task_offload = [](std::shared_ptr<coro::thread_pool> tp, uint64_t x) -> coro::task<uint64_t>
88+
auto make_task_offload = [](std::unique_ptr<coro::thread_pool>& tp, uint64_t x) -> coro::task<uint64_t>
8989
{
9090
co_await tp->schedule(); // Schedules execution on the thread pool.
9191
co_return x + x; // This will execute on the thread pool.
@@ -115,9 +115,9 @@ The `when_all` construct can be used within coroutines to await a set of tasks,
115115
int main()
116116
{
117117
// Create a thread pool to execute all the tasks in parallel.
118-
auto tp = coro::thread_pool::make_shared(coro::thread_pool::options{.thread_count = 4});
118+
auto tp = coro::thread_pool::make_unique(coro::thread_pool::options{.thread_count = 4});
119119
// Create the task we want to invoke multiple times and execute in parallel on the thread pool.
120-
auto twice = [](std::shared_ptr<coro::thread_pool> tp, uint64_t x) -> coro::task<uint64_t>
120+
auto twice = [](std::unique_ptr<coro::thread_pool>& tp, uint64_t x) -> coro::task<uint64_t>
121121
{
122122
co_await tp->schedule(); // Schedule onto the thread pool.
123123
co_return x + x; // Executed on the thread pool.
@@ -146,7 +146,7 @@ int main()
146146
}
147147

148148
// Use var args instead of a container as input to coro::when_all.
149-
auto square = [](std::shared_ptr<coro::thread_pool> tp, double x) -> coro::task<double>
149+
auto square = [](std::unique_ptr<coro::thread_pool>& tp, double x) -> coro::task<double>
150150
{
151151
co_await tp->schedule();
152152
co_return x* x;
@@ -185,10 +185,10 @@ int main()
185185
{
186186
// Create a scheduler to execute all tasks in parallel and also so we can
187187
// suspend a task to act like a timeout event.
188-
auto scheduler = coro::io_scheduler::make_shared();
188+
auto scheduler = coro::io_scheduler::make_unique();
189189

190190
// This task will behave like a long running task and will produce a valid result.
191-
auto make_long_running_task = [](std::shared_ptr<coro::io_scheduler> scheduler,
191+
auto make_long_running_task = [](std::unique_ptr<coro::io_scheduler>& scheduler,
192192
std::chrono::milliseconds execution_time) -> coro::task<int64_t>
193193
{
194194
// Schedule the task to execute in parallel.
@@ -199,7 +199,7 @@ int main()
199199
co_return 1;
200200
};
201201

202-
auto make_timeout_task = [](std::shared_ptr<coro::io_scheduler> scheduler) -> coro::task<int64_t>
202+
auto make_timeout_task = [](std::unique_ptr<coro::io_scheduler>& scheduler) -> coro::task<int64_t>
203203
{
204204
// Schedule a timer to be fired so we know the task timed out.
205205
co_await scheduler->schedule_after(std::chrono::milliseconds{100});
@@ -436,7 +436,7 @@ int main()
436436
// Complete worker tasks faster on a thread pool, using the io_scheduler version so the worker
437437
// tasks can yield for a specific amount of time to mimic difficult work. The pool is only
438438
// setup with a single thread to showcase yield_for().
439-
auto tp = coro::io_scheduler::make_shared(
439+
auto tp = coro::io_scheduler::make_unique(
440440
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}});
441441

442442
// This task will wait until the given latch setters have completed.
@@ -457,7 +457,7 @@ int main()
457457

458458
// This task does 'work' and counts down on the latch when completed. The final child task to
459459
// complete will end up resuming the latch task when the latch's count reaches zero.
460-
auto make_worker_task = [](std::shared_ptr<coro::io_scheduler> tp, coro::latch& l, int64_t i) -> coro::task<void>
460+
auto make_worker_task = [](std::unique_ptr<coro::io_scheduler>& tp, coro::latch& l, int64_t i) -> coro::task<void>
461461
{
462462
// Schedule the worker task onto the thread pool.
463463
co_await tp->schedule();
@@ -517,12 +517,12 @@ The suspend waiter queue is LIFO, however the worker that current holds the mute
517517

518518
int main()
519519
{
520-
auto tp = coro::thread_pool::make_shared(coro::thread_pool::options{.thread_count = 4});
520+
auto tp = coro::thread_pool::make_unique(coro::thread_pool::options{.thread_count = 4});
521521
std::vector<uint64_t> output{};
522522
coro::mutex mutex{};
523523

524524
auto make_critical_section_task =
525-
[](std::shared_ptr<coro::thread_pool> tp, coro::mutex& mutex, std::vector<uint64_t>& output, uint64_t i) -> coro::task<void>
525+
[](std::unique_ptr<coro::thread_pool>& tp, coro::mutex& mutex, std::vector<uint64_t>& output, uint64_t i) -> coro::task<void>
526526
{
527527
co_await tp->schedule();
528528
// To acquire a mutex lock co_await its scoped_lock() function. Upon acquiring the lock the
@@ -580,10 +580,10 @@ int main()
580580
// to also show the interleaving of coroutines acquiring the shared lock in shared and
581581
// exclusive mode as they resume and suspend in a linear manner. Ideally the thread pool
582582
// executor would have more than 1 thread to resume all shared waiters in parallel.
583-
auto tp = coro::thread_pool::make_shared(coro::thread_pool::options{.thread_count = 1});
583+
auto tp = coro::thread_pool::make_unique(coro::thread_pool::options{.thread_count = 1});
584584
coro::shared_mutex<coro::thread_pool> mutex{tp};
585585

586-
auto make_shared_task = [](std::shared_ptr<coro::thread_pool> tp,
586+
auto make_shared_task = [](std::unique_ptr<coro::thread_pool>& tp,
587587
coro::shared_mutex<coro::thread_pool>& mutex,
588588
uint64_t i) -> coro::task<void>
589589
{
@@ -593,7 +593,7 @@ int main()
593593
// Note that to have a scoped shared lock a task must be passed in for the shared scoped lock to callback,
594594
// this is required since coro::shared_mutex.unlock() and unlock_shared() are coroutines and cannot be
595595
// awaited in a RAII destructor like coro::mutex.
596-
co_await mutex.scoped_lock_shared([](std::shared_ptr<coro::thread_pool> tp, uint64_t i) -> coro::task<void>
596+
co_await mutex.scoped_lock_shared([](std::unique_ptr<coro::thread_pool>& tp, uint64_t i) -> coro::task<void>
597597
{
598598
std::cerr << "shared task " << i << " lock_shared() acquired\n";
599599
/// Immediately yield so the other shared tasks also acquire in shared state
@@ -604,11 +604,11 @@ int main()
604604
co_return;
605605
};
606606

607-
auto make_exclusive_task = [](std::shared_ptr<coro::thread_pool> tp,
607+
auto make_exclusive_task = [](std::unique_ptr<coro::thread_pool>& tp,
608608
coro::shared_mutex<coro::thread_pool>& mutex) -> coro::task<void>
609609
{
610610
co_await tp->schedule();
611-
co_await mutex.scoped_lock([](std::shared_ptr<coro::thread_pool> tp) -> coro::task<void>
611+
co_await mutex.scoped_lock([](std::unique_ptr<coro::thread_pool>& tp) -> coro::task<void>
612612
{
613613
std::cerr << "exclusive task lock()\n";
614614
std::cerr << "exclusive task lock() acquired\n";
@@ -675,11 +675,11 @@ The `coro::semaphore` is a thread safe async tool to protect a limited number of
675675
int main()
676676
{
677677
// Have more threads/tasks than the semaphore will allow for at any given point in time.
678-
auto tp = coro::thread_pool::make_shared(coro::thread_pool::options{.thread_count = 8});
678+
auto tp = coro::thread_pool::make_unique(coro::thread_pool::options{.thread_count = 8});
679679
coro::semaphore<2> semaphore{2};
680680

681681
auto make_rate_limited_task =
682-
[](std::shared_ptr<coro::thread_pool> tp, coro::semaphore<2>& semaphore, uint64_t task_num) -> coro::task<void>
682+
[](std::unique_ptr<coro::thread_pool>& tp, coro::semaphore<2>& semaphore, uint64_t task_num) -> coro::task<void>
683683
{
684684
co_await tp->schedule();
685685

@@ -726,14 +726,14 @@ int main()
726726
{
727727
const size_t iterations = 100;
728728
const size_t consumers = 4;
729-
auto tp = coro::thread_pool::make_shared(coro::thread_pool::options{.thread_count = 4});
729+
auto tp = coro::thread_pool::make_unique(coro::thread_pool::options{.thread_count = 4});
730730
coro::ring_buffer<uint64_t, 16> rb{};
731731
coro::mutex m{};
732732

733733
std::vector<coro::task<void>> tasks{};
734734

735735
auto make_producer_task =
736-
[](std::shared_ptr<coro::thread_pool> tp, coro::ring_buffer<uint64_t, 16>& rb, coro::mutex& m) -> coro::task<void>
736+
[](std::unique_ptr<coro::thread_pool>& tp, coro::ring_buffer<uint64_t, 16>& rb, coro::mutex& m) -> coro::task<void>
737737
{
738738
co_await tp->schedule();
739739

@@ -753,7 +753,7 @@ int main()
753753
};
754754

755755
auto make_consumer_task =
756-
[](std::shared_ptr<coro::thread_pool> tp, coro::ring_buffer<uint64_t, 16>& rb, coro::mutex& m, size_t id) -> coro::task<void>
756+
[](std::unique_ptr<coro::thread_pool>& tp, coro::ring_buffer<uint64_t, 16>& rb, coro::mutex& m, size_t id) -> coro::task<void>
757757
{
758758
co_await tp->schedule();
759759

@@ -816,13 +816,13 @@ int main()
816816
const size_t producers_count = 5;
817817
const size_t consumers_count = 2;
818818

819-
auto tp = coro::thread_pool::make_shared();
819+
auto tp = coro::thread_pool::make_unique();
820820
coro::queue<uint64_t> q{};
821821
coro::latch producers_done{producers_count};
822822
coro::mutex m{}; /// Just for making the console prints look nice.
823823

824824
auto make_producer_task =
825-
[](std::shared_ptr<coro::thread_pool> tp, coro::queue<uint64_t>& q, coro::latch& pd) -> coro::task<void>
825+
[](std::unique_ptr<coro::thread_pool>& tp, coro::queue<uint64_t>& q, coro::latch& pd) -> coro::task<void>
826826
{
827827
co_await tp->schedule();
828828

@@ -835,7 +835,7 @@ int main()
835835
co_return;
836836
};
837837

838-
auto make_shutdown_task = [](std::shared_ptr<coro::thread_pool> tp, coro::queue<uint64_t>& q, coro::latch& pd) -> coro::task<void>
838+
auto make_shutdown_task = [](std::unique_ptr<coro::thread_pool>& tp, coro::queue<uint64_t>& q, coro::latch& pd) -> coro::task<void>
839839
{
840840
// This task will wait for all the producers to complete and then for the
841841
// entire queue to be drained before shutting it down.
@@ -845,7 +845,7 @@ int main()
845845
co_return;
846846
};
847847

848-
auto make_consumer_task = [](std::shared_ptr<coro::thread_pool> tp, coro::queue<uint64_t>& q, coro::mutex& m) -> coro::task<void>
848+
auto make_consumer_task = [](std::unique_ptr<coro::thread_pool>& tp, coro::queue<uint64_t>& q, coro::mutex& m) -> coro::task<void>
849849
{
850850
co_await tp->schedule();
851851

@@ -919,13 +919,13 @@ NOTE: It is important to *not* hold the `coro::scoped_lock` when calling `notify
919919

920920
int main()
921921
{
922-
auto scheduler = coro::io_scheduler::make_shared();
922+
auto scheduler = coro::io_scheduler::make_unique();
923923
coro::condition_variable cv{};
924924
coro::mutex m{};
925925
std::atomic<uint64_t> condition{0};
926926
std::stop_source ss{};
927927

928-
auto make_waiter_task = [](std::shared_ptr<coro::io_scheduler> scheduler, coro::condition_variable& cv, coro::mutex& m, std::stop_source& ss, std::atomic<uint64_t>& condition, int64_t id) -> coro::task<void>
928+
auto make_waiter_task = [](std::unique_ptr<coro::io_scheduler>& scheduler, coro::condition_variable& cv, coro::mutex& m, std::stop_source& ss, std::atomic<uint64_t>& condition, int64_t id) -> coro::task<void>
929929
{
930930
co_await scheduler->schedule();
931931
while (true)
@@ -960,7 +960,7 @@ int main()
960960
}
961961
};
962962

963-
auto make_notifier_task = [](std::shared_ptr<coro::io_scheduler> scheduler, coro::condition_variable& cv, coro::mutex& m, std::stop_source& ss, std::atomic<uint64_t>& condition) -> coro::task<void>
963+
auto make_notifier_task = [](std::unique_ptr<coro::io_scheduler>& scheduler, coro::condition_variable& cv, coro::mutex& m, std::stop_source& ss, std::atomic<uint64_t>& condition) -> coro::task<void>
964964
{
965965
// To make this example more deterministic the notifier will wait between each notify event to showcase
966966
// how exactly the condition variable will behave with the condition in certain states and the notify_one or notify_all.
@@ -1062,7 +1062,7 @@ ss.request_stop() # request to stop, wakeup all waiters an
10621062

10631063
int main()
10641064
{
1065-
auto tp = coro::thread_pool::make_shared(coro::thread_pool::options{
1065+
auto tp = coro::thread_pool::make_unique(coro::thread_pool::options{
10661066
// By default all thread pools will create its thread count with the
10671067
// std::thread::hardware_concurrency() as the number of worker threads in the pool,
10681068
// but this can be changed via this thread_count option. This example will use 4.
@@ -1077,9 +1077,9 @@ int main()
10771077
.on_thread_stop_functor = [](std::size_t worker_idx) -> void
10781078
{ std::cout << "thread pool worker " << worker_idx << " is shutting down.\n"; }});
10791079

1080-
auto primary_task = [](std::shared_ptr<coro::thread_pool> tp) -> coro::task<uint64_t>
1080+
auto primary_task = [](std::unique_ptr<coro::thread_pool>& tp) -> coro::task<uint64_t>
10811081
{
1082-
auto offload_task = [](std::shared_ptr<coro::thread_pool> tp, uint64_t child_idx) -> coro::task<uint64_t>
1082+
auto offload_task = [](std::unique_ptr<coro::thread_pool>& tp, uint64_t child_idx) -> coro::task<uint64_t>
10831083
{
10841084
// Start by scheduling this offload worker task onto the thread pool.
10851085
co_await tp->schedule();
@@ -1192,7 +1192,7 @@ The example provided here shows an i/o scheduler that spins up a basic `coro::ne
11921192

11931193
int main()
11941194
{
1195-
auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{
1195+
auto scheduler = coro::io_scheduler::make_unique(coro::io_scheduler::options{
11961196
// The scheduler will spawn a dedicated event processing thread. This is the default, but
11971197
// it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself.
11981198
.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn,
@@ -1215,7 +1215,7 @@ int main()
12151215
},
12161216
.execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool});
12171217

1218-
auto make_server_task = [](std::shared_ptr<coro::io_scheduler> scheduler) -> coro::task<void>
1218+
auto make_server_task = [](std::unique_ptr<coro::io_scheduler>& scheduler) -> coro::task<void>
12191219
{
12201220
// Start by creating a tcp server, we'll do this before putting it into the scheduler so
12211221
// it is immediately available for the client to connect since this will create a socket,
@@ -1303,7 +1303,7 @@ int main()
13031303
co_return;
13041304
};
13051305

1306-
auto make_client_task = [](std::shared_ptr<coro::io_scheduler> scheduler) -> coro::task<void>
1306+
auto make_client_task = [](std::unique_ptr<coro::io_scheduler>& scheduler) -> coro::task<void>
13071307
{
13081308
// Immediately schedule onto the scheduler.
13091309
co_await scheduler->schedule();

examples/coro_condition_variable.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33

44
int main()
55
{
6-
auto scheduler = coro::io_scheduler::make_shared();
6+
auto scheduler = coro::io_scheduler::make_unique();
77
coro::condition_variable cv{};
88
coro::mutex m{};
99
std::atomic<uint64_t> condition{0};
1010
std::stop_source ss{};
1111

12-
auto make_waiter_task = [](std::shared_ptr<coro::io_scheduler> scheduler, coro::condition_variable& cv, coro::mutex& m, std::stop_source& ss, std::atomic<uint64_t>& condition, int64_t id) -> coro::task<void>
12+
auto make_waiter_task = [](std::unique_ptr<coro::io_scheduler>& scheduler, coro::condition_variable& cv, coro::mutex& m, std::stop_source& ss, std::atomic<uint64_t>& condition, int64_t id) -> coro::task<void>
1313
{
1414
co_await scheduler->schedule();
1515
while (true)
@@ -44,7 +44,7 @@ int main()
4444
}
4545
};
4646

47-
auto make_notifier_task = [](std::shared_ptr<coro::io_scheduler> scheduler, coro::condition_variable& cv, coro::mutex& m, std::stop_source& ss, std::atomic<uint64_t>& condition) -> coro::task<void>
47+
auto make_notifier_task = [](std::unique_ptr<coro::io_scheduler>& scheduler, coro::condition_variable& cv, coro::mutex& m, std::stop_source& ss, std::atomic<uint64_t>& condition) -> coro::task<void>
4848
{
4949
// To make this example more deterministic the notifier will wait between each notify event to showcase
5050
// how exactly the condition variable will behave with the condition in certain states and the notify_one or notify_all.

examples/coro_http_200_ok_server.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
auto main() -> int
44
{
5-
auto make_http_200_ok_server = [](std::shared_ptr<coro::io_scheduler> scheduler) -> coro::task<void>
5+
auto make_http_200_ok_server = [](std::unique_ptr<coro::io_scheduler>& scheduler) -> coro::task<void>
66
{
77
auto make_on_connection_task = [](coro::net::tcp::client client) -> coro::task<void>
88
{
@@ -67,7 +67,7 @@ Connection: keep-alive
6767
std::vector<coro::task<void>> workers{};
6868
for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
6969
{
70-
auto scheduler = coro::io_scheduler::make_shared(
70+
auto scheduler = coro::io_scheduler::make_unique(
7171
coro::io_scheduler::options{
7272
.execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline});
7373

examples/coro_io_scheduler.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
int main()
55
{
6-
auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{
6+
auto scheduler = coro::io_scheduler::make_unique(coro::io_scheduler::options{
77
// The scheduler will spawn a dedicated event processing thread. This is the default, but
88
// it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself.
99
.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn,
@@ -26,7 +26,7 @@ int main()
2626
},
2727
.execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool});
2828

29-
auto make_server_task = [](std::shared_ptr<coro::io_scheduler> scheduler) -> coro::task<void>
29+
auto make_server_task = [](std::unique_ptr<coro::io_scheduler>& scheduler) -> coro::task<void>
3030
{
3131
// Start by creating a tcp server, we'll do this before putting it into the scheduler so
3232
// it is immediately available for the client to connect since this will create a socket,
@@ -114,7 +114,7 @@ int main()
114114
co_return;
115115
};
116116

117-
auto make_client_task = [](std::shared_ptr<coro::io_scheduler> scheduler) -> coro::task<void>
117+
auto make_client_task = [](std::unique_ptr<coro::io_scheduler>& scheduler) -> coro::task<void>
118118
{
119119
// Immediately schedule onto the scheduler.
120120
co_await scheduler->schedule();

examples/coro_latch.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ int main()
66
// Complete worker tasks faster on a thread pool, using the io_scheduler version so the worker
77
// tasks can yield for a specific amount of time to mimic difficult work. The pool is only
88
// setup with a single thread to showcase yield_for().
9-
auto tp = coro::io_scheduler::make_shared(
9+
auto tp = coro::io_scheduler::make_unique(
1010
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}});
1111

1212
// This task will wait until the given latch setters have completed.
@@ -27,7 +27,7 @@ int main()
2727

2828
// This task does 'work' and counts down on the latch when completed. The final child task to
2929
// complete will end up resuming the latch task when the latch's count reaches zero.
30-
auto make_worker_task = [](std::shared_ptr<coro::io_scheduler> tp, coro::latch& l, int64_t i) -> coro::task<void>
30+
auto make_worker_task = [](std::unique_ptr<coro::io_scheduler>& tp, coro::latch& l, int64_t i) -> coro::task<void>
3131
{
3232
// Schedule the worker task onto the thread pool.
3333
co_await tp->schedule();

0 commit comments

Comments
 (0)