1414
1515#include < assert.h>
1616#include < functional>
17- #include < optional >
17+ #include < kj/function.h >
1818#include < map>
1919#include < memory>
20+ #include < optional>
2021#include < sstream>
2122#include < string>
2223
@@ -129,6 +130,28 @@ std::string LongThreadName(const char* exe_name);
129130
130131// ! Event loop implementation.
131132// !
133+ // ! Cap'n Proto threading model is very simple: all I/O operations are
134+ // ! asynchronous and must be performed on a single thread. This includes:
135+ // !
136+ // ! - Code starting an asynchronous operation (calling a function that returns a
137+ // ! promise object)
138+ // ! - Code notifying that an asynchronous operation is complete (code using a
139+ // ! fulfiller object)
140+ // ! - Code handling a completed operation (code chaining or waiting for a promise)
141+ // !
142+ // ! All of this code needs to access shared state, and there is no mutex that
143+ // ! can be acquired to lock this state because Cap'n Proto
144+ // ! assumes it will only be accessed from one thread. So all this code needs to
145+ // ! actually run on one thread, and the EventLoop::loop() method is the entry point for
146+ // ! this thread. ProxyClient and ProxyServer objects that use other threads and
147+ // ! need to perform I/O operations post to this thread using EventLoop::post()
148+ // ! and EventLoop::sync() methods.
149+ // !
150+ // ! Specifically, because ProxyClient methods can be called from arbitrary
151+ // ! threads, and ProxyServer methods can run on arbitrary threads, ProxyClient
152+ // ! methods use the EventLoop thread to send requests, and ProxyServer methods
153+ // ! use the thread to return results.
154+ // !
132155// ! Based on https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ
133156class EventLoop
134157{
@@ -144,17 +167,21 @@ class EventLoop
144167
145168 // ! Run function on event loop thread. Does not return until function completes.
146169 // ! Must be called while the loop() function is active.
147- void post (const std::function <void ()>& fn);
170+ void post (kj::Function <void ()> fn);
148171
149172 // ! Wrapper around EventLoop::post that takes advantage of the
150173 // ! fact that callable will not go out of scope to avoid requirement that it
151174 // ! be copyable.
152175 template <typename Callable>
153176 void sync (Callable&& callable)
154177 {
155- post (std::ref (callable));
178+ post (std::forward<Callable> (callable));
156179 }
157180
181+ // ! Register cleanup function to run on asynchronous worker thread without
182+ // ! blocking the event loop thread.
183+ void addAsyncCleanup (std::function<void ()> fn);
184+
158185 // ! Start asynchronous worker thread if necessary. This is only done if
159186 // ! there are ProxyServerBase::m_impl objects that need to be destroyed
160187 // ! asynchronously, without tying up the event loop thread. This can happen
@@ -166,13 +193,10 @@ class EventLoop
166193 // ! is important that ProxyServer::m_impl destructors do not run on the
167194 // ! eventloop thread because they may need it to do I/O if they perform
168195 // ! other IPC calls.
169- void startAsyncThread (std::unique_lock<std::mutex>& lock );
196+ void startAsyncThread () MP_REQUIRES(m_mutex );
170197
171- // ! Add/remove remote client reference counts.
172- void addClient (std::unique_lock<std::mutex>& lock);
173- bool removeClient (std::unique_lock<std::mutex>& lock);
174198 // ! Check if loop should exit.
175- bool done (std::unique_lock<std::mutex>& lock ) const ;
199+ bool done () const MP_REQUIRES(m_mutex) ;
176200
177201 Logger log ()
178202 {
@@ -195,10 +219,10 @@ class EventLoop
195219 std::thread m_async_thread;
196220
197221 // ! Callback function to run on event loop thread during post() or sync() call.
198- const std::function <void ()>* m_post_fn = nullptr ;
222+ kj::Function <void ()>* m_post_fn MP_GUARDED_BY (m_mutex) = nullptr;
199223
200224 // ! Callback functions to run on async thread.
201- CleanupList m_async_fns;
225+ std::optional< CleanupList> m_async_fns MP_GUARDED_BY (m_mutex) ;
202226
203227 // ! Pipe read handle used to wake up the event loop thread.
204228 int m_wait_fd = -1 ;
@@ -208,11 +232,11 @@ class EventLoop
208232
209233 // ! Number of clients holding references to ProxyServerBase objects that
210234 // ! reference this event loop.
211- int m_num_clients = 0 ;
235+ int m_num_clients MP_GUARDED_BY (m_mutex) = 0;
212236
213237 // ! Mutex and condition variable used to post tasks to event loop and async
214238 // ! thread.
215- std::mutex m_mutex;
239+ Mutex m_mutex;
216240 std::condition_variable m_cv;
217241
218242 // ! Capnp IO context.
@@ -263,11 +287,9 @@ struct Waiter
263287 // in the case where a capnp response is sent and a brand new
264288 // request is immediately received.
265289 while (m_fn) {
266- auto fn = std::move (m_fn);
267- m_fn = nullptr ;
268- lock.unlock ();
269- fn ();
270- lock.lock ();
290+ auto fn = std::move (*m_fn);
291+ m_fn.reset ();
292+ Unlock (lock, fn);
271293 }
272294 const bool done = pred ();
273295 return done;
@@ -276,7 +298,7 @@ struct Waiter
276298
277299 std::mutex m_mutex;
278300 std::condition_variable m_cv;
279- std::function< void ()> m_fn;
301+ std::optional<kj::Function< void ()> > m_fn;
280302};
281303
282304// ! Object holding network & rpc state associated with either an incoming server
@@ -290,21 +312,13 @@ class Connection
290312 Connection (EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
291313 : m_loop(loop), m_stream(kj::mv(stream_)),
292314 m_network (*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
293- m_rpc_system(::capnp::makeRpcClient(m_network))
294- {
295- std::unique_lock<std::mutex> lock (m_loop.m_mutex );
296- m_loop.addClient (lock);
297- }
315+ m_rpc_system(::capnp::makeRpcClient(m_network)) {}
298316 Connection (EventLoop& loop,
299317 kj::Own<kj::AsyncIoStream>&& stream_,
300318 const std::function<::capnp::Capability::Client(Connection&)>& make_client)
301319 : m_loop(loop), m_stream(kj::mv(stream_)),
302320 m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
303- m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this )))
304- {
305- std::unique_lock<std::mutex> lock (m_loop.m_mutex );
306- m_loop.addClient (lock);
307- }
321+ m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this ))) {}
308322
309323 // ! Run cleanup functions. Must be called from the event loop thread. First
310324 // ! calls synchronous cleanup functions while blocked (to free capnp
@@ -319,10 +333,6 @@ class Connection
319333 CleanupIt addSyncCleanup (std::function<void ()> fn);
320334 void removeSyncCleanup (CleanupIt it);
321335
322- // ! Register asynchronous cleanup function to run on worker thread when
323- // ! disconnect() is called.
324- void addAsyncCleanup (std::function<void ()> fn);
325-
326336 // ! Add disconnect handler.
327337 template <typename F>
328338 void onDisconnect (F&& f)
@@ -333,12 +343,12 @@ class Connection
333343 // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
334344 // error in cases where f deletes this Connection object.
335345 m_on_disconnect.add (m_network.onDisconnect ().then (
336- [f = std::forward<F>(f), this ]() mutable { m_loop. m_task_set ->add (kj::evalLater (kj::mv (f))); }));
346+ [f = std::forward<F>(f), this ]() mutable { m_loop-> m_task_set ->add (kj::evalLater (kj::mv (f))); }));
337347 }
338348
339- EventLoop& m_loop;
349+ EventLoopRef m_loop;
340350 kj::Own<kj::AsyncIoStream> m_stream;
341- LoggingErrorHandler m_error_handler{m_loop};
351+ LoggingErrorHandler m_error_handler{* m_loop};
342352 kj::TaskSet m_on_disconnect{m_error_handler};
343353 ::capnp::TwoPartyVatNetwork m_network;
344354 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@@ -351,11 +361,10 @@ class Connection
351361 // ! ThreadMap.makeThread) used to service requests to clients.
352362 ::capnp::CapabilityServerSet<Thread> m_threads;
353363
354- // ! Cleanup functions to run if connection is broken unexpectedly.
355- // ! Lists will be empty if all ProxyClient and ProxyServer objects are
356- // ! destroyed cleanly before the connection is destroyed.
364+ // ! Cleanup functions to run if connection is broken unexpectedly. List
365+ // ! will be empty if all ProxyClient are destroyed cleanly before the
366+ // ! connection is destroyed.
357367 CleanupList m_sync_cleanup_fns;
358- CleanupList m_async_cleanup_fns;
359368};
360369
361370// ! Vat id for server side of connection. Required argument to RpcSystem::bootStrap()
@@ -381,21 +390,12 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
381390 : m_client(std::move(client)), m_context(connection)
382391
383392{
384- {
385- std::unique_lock<std::mutex> lock (m_context.connection ->m_loop .m_mutex );
386- m_context.connection ->m_loop .addClient (lock);
387- }
388-
389393 // Handler for the connection getting destroyed before this client object.
390394 auto cleanup_it = m_context.connection ->addSyncCleanup ([this ]() {
391395 // Release client capability by move-assigning to temporary.
392396 {
393397 typename Interface::Client (std::move (m_client));
394398 }
395- {
396- std::unique_lock<std::mutex> lock (m_context.connection ->m_loop .m_mutex );
397- m_context.connection ->m_loop .removeClient (lock);
398- }
399399 m_context.connection = nullptr ;
400400 });
401401
@@ -423,16 +423,11 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
423423 Sub::destroy (*this );
424424
425425 // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
426- m_context.connection -> m_loop . sync ([&]() {
426+ m_context.loop -> sync ([&]() {
427427 // Release client capability by move-assigning to temporary.
428428 {
429429 typename Interface::Client (std::move (m_client));
430430 }
431- {
432- std::unique_lock<std::mutex> lock (m_context.connection ->m_loop .m_mutex );
433- m_context.connection ->m_loop .removeClient (lock);
434- }
435-
436431 if (destroy_connection) {
437432 delete m_context.connection ;
438433 m_context.connection = nullptr ;
@@ -454,12 +449,20 @@ ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Co
454449 : m_impl(std::move(impl)), m_context(&connection)
455450{
456451 assert (m_impl);
457- std::unique_lock<std::mutex> lock (m_context.connection ->m_loop .m_mutex );
458- m_context.connection ->m_loop .addClient (lock);
459452}
460453
461454// ! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
462455// ! garbage collection code after there are no more references to this object.
456+ // ! This will typically happen when the corresponding ProxyClient object on the
457+ // ! other side of the connection is destroyed. It can also happen earlier if the
458+ // ! connection is broken or destroyed. In the latter case this destructor will
459+ // ! typically be called inside m_rpc_system.reset() call in the ~Connection
460+ // ! destructor while the Connection object still exists. However, because
461+ // ! ProxyServer objects are refcounted, and the Connection object could be
462+ // ! destroyed while asynchronous IPC calls are still in-flight, it's possible
463+ // ! for this destructor to be called after the Connection object no longer
464+ // ! exists, so it is NOT valid to dereference the m_context.connection pointer
465+ // ! from this function.
463466template <typename Interface, typename Impl>
464467ProxyServerBase<Interface, Impl>::~ProxyServerBase ()
465468{
@@ -483,14 +486,12 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
483486 // connection is broken). Probably some refactoring of the destructor
484487 // and invokeDestroy function is possible to make this cleaner and more
485488 // consistent.
486- m_context.connection ->addAsyncCleanup ([impl=std::move (m_impl), fns=std::move (m_context.cleanup_fns )]() mutable {
489+ m_context.loop ->addAsyncCleanup ([impl=std::move (m_impl), fns=std::move (m_context.cleanup_fns )]() mutable {
487490 impl.reset ();
488491 CleanupRun (fns);
489492 });
490493 }
491494 assert (m_context.cleanup_fns .empty ());
492- std::unique_lock<std::mutex> lock (m_context.connection ->m_loop .m_mutex );
493- m_context.connection ->m_loop .removeClient (lock);
494495}
495496
496497// ! If the capnp interface defined a special "destroy" method, as described the
0 commit comments