@@ -55,34 +55,37 @@ class _ShutdownStatus(Enum):
5555F = t .TypeVar ('F' , bound = t .Callable [..., t .Any ])
5656
5757
58- def in_pending_state (method : F ) -> F :
59- """Sets the kernel to a pending state by
60- creating a fresh Future for the KernelManager's `ready`
61- attribute. Once the method is finished, set the Future's results.
62- """
58+ def in_pending_state (prefix : str = '' ) -> t .Callable [[F ], F ]:
59+ def decorator (method : F ) -> F :
60+ """Sets the kernel to a pending state by
61+ creating a fresh Future for the KernelManager's `ready`
62+ attribute. Once the method is finished, set the Future's results.
63+ """
6364
64- @t .no_type_check
65- @functools .wraps (method )
66- async def wrapper (self , * args , ** kwargs ):
67- # Create a future for the decorated method
68- try :
69- self . _ready = Future ( )
70- except RuntimeError :
71- # No event loop running, use concurrent future
72- self . _ready = CFuture ( )
73- try :
74- # call wrapped method, await, and set the result or exception.
75- out = await method (self , * args , ** kwargs )
76- # Add a small sleep to ensure tests can capture the state before done
77- await asyncio .sleep (0.01 )
78- self . _ready .set_result (None )
79- return out
80- except Exception as e :
81- self . _ready .set_exception (e )
82- self .log .exception (self . _ready .exception ())
83- raise e
65+ @t .no_type_check
66+ @functools .wraps (method )
67+ async def wrapper (self , * args , ** kwargs ):
68+ # Create a future for the decorated method
69+ name = f" { prefix } _ready"
70+ future = getattr ( self , name )
71+ if not future or future . done () :
72+ future = self . _future_factory ()
73+ setattr ( self , name , future )
74+ try :
75+ # call wrapped method, await, and set the result or exception.
76+ out = await method (self , * args , ** kwargs )
77+ # Add a small sleep to ensure tests can capture the state before done
78+ await asyncio .sleep (0.01 )
79+ future .set_result (None )
80+ return out
81+ except Exception as e :
82+ future .set_exception (e )
83+ self .log .exception (future .exception ())
84+ raise e
8485
85- return t .cast (F , wrapper )
86+ return t .cast (F , wrapper )
87+
88+ return decorator
8689
8790
8891class KernelManager (ConnectionFileMixin ):
@@ -91,18 +94,14 @@ class KernelManager(ConnectionFileMixin):
9194 This version starts kernels with Popen.
9295 """
9396
94- _ready : t .Union [Future , CFuture ]
97+ _ready : t .Optional [t .Union [Future , CFuture ]]
98+ _shutdown_ready : t .Optional [CFuture ]
9599
96100 def __init__ (self , * args , ** kwargs ):
97101 super ().__init__ (** kwargs )
98102 self ._shutdown_status = _ShutdownStatus .Unset
99- # Create a place holder future.
100- try :
101- asyncio .get_running_loop ()
102- self ._ready = Future ()
103- except RuntimeError :
104- # No event loop running, use concurrent future
105- self ._ready = CFuture ()
103+ self ._ready = None
104+ self ._shutdown_ready = None
106105
107106 _created_context : Bool = Bool (False )
108107
@@ -120,6 +119,8 @@ def _context_default(self) -> zmq.Context:
120119 )
121120 client_factory : Type = Type (klass = "jupyter_client.KernelClient" )
122121
122+ _future_factory : t .Type [CFuture ] = CFuture
123+
123124 @default ("client_factory" ) # type:ignore[misc]
124125 def _client_factory_default (self ) -> Type :
125126 return import_item (self .client_class )
@@ -185,9 +186,20 @@ def _default_cache_ports(self) -> bool:
185186 return self .transport == "tcp"
186187
187188 @property
188- def ready (self ) -> t .Union [CFuture , Future ]:
189- """A future that resolves when the kernel process has started for the first time"""
190- return self ._ready
189+ def ready (self ) -> CFuture :
190+ """A future that resolves when the kernel process has started."""
191+ if not self ._ready :
192+ self ._ready = self ._future_factory ()
193+ assert self ._ready is not None
194+ return self ._ready # type:ignore[return-value]
195+
196+ @property
197+ def shutdown_ready (self ) -> CFuture :
198+ """A future that resolves when the kernel process has shut down."""
199+ if not self ._shutdown_ready :
200+ self ._shutdown_ready = self ._future_factory ()
201+ assert self ._shutdown_ready is not None
202+ return self ._shutdown_ready
191203
192204 @property
193205 def ipykernel (self ) -> bool :
@@ -369,7 +381,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None:
369381
370382 post_start_kernel = run_sync (_async_post_start_kernel )
371383
372- @in_pending_state
384+ @in_pending_state ()
373385 async def _async_start_kernel (self , ** kw : t .Any ) -> None :
374386 """Starts a kernel on this host in a separate process.
375387
@@ -462,7 +474,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:
462474
463475 cleanup_resources = run_sync (_async_cleanup_resources )
464476
465- @in_pending_state
477+ @in_pending_state ( '_shutdown' )
466478 async def _async_shutdown_kernel (self , now : bool = False , restart : bool = False ) -> None :
467479 """Attempts to stop the kernel process cleanly.
468480
@@ -481,6 +493,8 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
481493 Will this kernel be restarted after it is shutdown. When this
482494 is True, connection files will not be cleaned up.
483495 """
496+ # Reset the start ready future.
497+ self ._ready = self ._future_factory ()
484498 self .shutting_down = True # Used by restarter to prevent race condition
485499 # Stop monitoring for restarting while we shutdown.
486500 self .stop_restarter ()
@@ -643,6 +657,7 @@ class AsyncKernelManager(KernelManager):
643657 )
644658 client_factory : Type = Type (klass = "jupyter_client.asynchronous.AsyncKernelClient" )
645659
660+ _future_factory : t .Type [Future ] = Future # type:ignore[assignment]
646661 _launch_kernel = KernelManager ._async_launch_kernel
647662 start_kernel = KernelManager ._async_start_kernel
648663 pre_start_kernel = KernelManager ._async_pre_start_kernel
0 commit comments