@@ -55,37 +55,34 @@ class _ShutdownStatus(Enum):
5555F = t .TypeVar ('F' , bound = t .Callable [..., t .Any ])
5656
5757
58- def in_pending_state (prefix : str = '' ) -> t .Callable [[F ], F ]:
59- def decorator (method : F ) -> F :
60- """Sets the kernel to a pending state by
61- creating a fresh Future for the KernelManager's `ready`
62- attribute. Once the method is finished, set the Future's results.
63- """
64-
65- @t .no_type_check
66- @functools .wraps (method )
67- async def wrapper (self , * args , ** kwargs ):
68- # Create a future for the decorated method
69- name = f"{ prefix } _ready"
70- future = getattr (self , name )
71- if not future or future .done ():
72- future = self ._future_factory ()
73- setattr (self , name , future )
74- try :
75- # call wrapped method, await, and set the result or exception.
76- out = await method (self , * args , ** kwargs )
77- # Add a small sleep to ensure tests can capture the state before done
78- await asyncio .sleep (0.01 )
79- future .set_result (None )
80- return out
81- except Exception as e :
82- future .set_exception (e )
83- self .log .exception (future .exception ())
84- raise e
58+ def in_pending_state (method : F ) -> F :
59+ """Sets the kernel to a pending state by
60+ creating a fresh Future for the KernelManager's `ready`
61+ attribute. Once the method is finished, set the Future's results.
62+ """
8563
86- return t .cast (F , wrapper )
64+ @t .no_type_check
65+ @functools .wraps (method )
66+ async def wrapper (self , * args , ** kwargs ):
67+ # Create a future for the decorated method
68+ try :
69+ self ._ready = Future ()
70+ except RuntimeError :
71+ # No event loop running, use concurrent future
72+ self ._ready = CFuture ()
73+ try :
74+ # call wrapped method, await, and set the result or exception.
75+ out = await method (self , * args , ** kwargs )
76+ # Add a small sleep to ensure tests can capture the state before done
77+ await asyncio .sleep (0.01 )
78+ self ._ready .set_result (None )
79+ return out
80+ except Exception as e :
81+ self ._ready .set_exception (e )
82+ self .log .exception (self ._ready .exception ())
83+ raise e
8784
88- return decorator
85+ return t . cast ( F , wrapper )
8986
9087
9188class KernelManager (ConnectionFileMixin ):
@@ -94,14 +91,18 @@ class KernelManager(ConnectionFileMixin):
9491 This version starts kernels with Popen.
9592 """
9693
97- _ready : t .Optional [t .Union [Future , CFuture ]]
98- _shutdown_ready : t .Optional [CFuture ]
94+ _ready : t .Union [Future , CFuture ]
9995
10096 def __init__ (self , * args , ** kwargs ):
10197 super ().__init__ (** kwargs )
10298 self ._shutdown_status = _ShutdownStatus .Unset
103- self ._ready = None
104- self ._shutdown_ready = None
99+ # Create a place holder future.
100+ try :
101+ asyncio .get_running_loop ()
102+ self ._ready = Future ()
103+ except RuntimeError :
104+ # No event loop running, use concurrent future
105+ self ._ready = CFuture ()
105106
106107 _created_context : Bool = Bool (False )
107108
@@ -119,8 +120,6 @@ def _context_default(self) -> zmq.Context:
119120 )
120121 client_factory : Type = Type (klass = "jupyter_client.KernelClient" )
121122
122- _future_factory : t .Type [CFuture ] = CFuture
123-
124123 @default ("client_factory" ) # type:ignore[misc]
125124 def _client_factory_default (self ) -> Type :
126125 return import_item (self .client_class )
@@ -186,20 +185,9 @@ def _default_cache_ports(self) -> bool:
186185 return self .transport == "tcp"
187186
188187 @property
189- def ready (self ) -> CFuture :
190- """A future that resolves when the kernel process has started."""
191- if not self ._ready :
192- self ._ready = self ._future_factory ()
193- assert self ._ready is not None
194- return self ._ready # type:ignore[return-value]
195-
196- @property
197- def shutdown_ready (self ) -> CFuture :
198- """A future that resolves when the kernel process has shut down."""
199- if not self ._shutdown_ready :
200- self ._shutdown_ready = self ._future_factory ()
201- assert self ._shutdown_ready is not None
202- return self ._shutdown_ready
188+ def ready (self ) -> t .Union [CFuture , Future ]:
189+ """A future that resolves when the kernel process has started for the first time"""
190+ return self ._ready
203191
204192 @property
205193 def ipykernel (self ) -> bool :
@@ -381,7 +369,7 @@ async def _async_post_start_kernel(self, **kw: t.Any) -> None:
381369
382370 post_start_kernel = run_sync (_async_post_start_kernel )
383371
384- @in_pending_state ()
372+ @in_pending_state
385373 async def _async_start_kernel (self , ** kw : t .Any ) -> None :
386374 """Starts a kernel on this host in a separate process.
387375
@@ -474,7 +462,7 @@ async def _async_cleanup_resources(self, restart: bool = False) -> None:
474462
475463 cleanup_resources = run_sync (_async_cleanup_resources )
476464
477- @in_pending_state ( '_shutdown' )
465+ @in_pending_state
478466 async def _async_shutdown_kernel (self , now : bool = False , restart : bool = False ) -> None :
479467 """Attempts to stop the kernel process cleanly.
480468
@@ -493,8 +481,6 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)
493481 Will this kernel be restarted after it is shutdown. When this
494482 is True, connection files will not be cleaned up.
495483 """
496- # Reset the start ready future.
497- self ._ready = self ._future_factory ()
498484 self .shutting_down = True # Used by restarter to prevent race condition
499485 # Stop monitoring for restarting while we shutdown.
500486 self .stop_restarter ()
@@ -657,7 +643,6 @@ class AsyncKernelManager(KernelManager):
657643 )
658644 client_factory : Type = Type (klass = "jupyter_client.asynchronous.AsyncKernelClient" )
659645
660- _future_factory : t .Type [Future ] = Future # type:ignore[assignment]
661646 _launch_kernel = KernelManager ._async_launch_kernel
662647 start_kernel = KernelManager ._async_start_kernel
663648 pre_start_kernel = KernelManager ._async_pre_start_kernel
0 commit comments