@@ -47,6 +47,8 @@ def _default_kernel_manager_class(self):
4747
4848 _kernel_connections = Dict ()
4949
50+ _kernel_ports = Dict ()
51+
5052 _culler_callback = None
5153
5254 _initialized_culler = False
@@ -183,6 +185,7 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
183185 kwargs ['cwd' ] = self .cwd_for_path (path )
184186 kernel_id = await ensure_async (self .pinned_superclass .start_kernel (self , ** kwargs ))
185187 self ._kernel_connections [kernel_id ] = 0
188+ self ._kernel_ports [kernel_id ] = self ._kernels [kernel_id ].ports
186189 self .start_watching_activity (kernel_id )
187190 self .log .info ("Kernel started: %s" % kernel_id )
188191 self .log .debug ("Kernel args: %r" % kwargs )
@@ -208,6 +211,40 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
208211
209212 return kernel_id
210213
214+ def ports_changed (self , kernel_id ):
215+ """Used by ZMQChannelsHandler to determine how to coordinate nudge and replays.
216+
217+ Ports are captured when starting a kernel (via MappingKernelManager). Ports
218+ are considered changed (following restarts) if the referenced KernelManager
219+ is using a set of ports different from those captured at startup. If changes
220+ are detected, the captured set is updated and a value of True is returned.
221+
222+ NOTE: Use is exclusive to ZMQChannelsHandler because this object is a singleton
223+ instance while ZMQChannelsHandler instances are per WebSocket connection that
224+ can vary per kernel lifetime.
225+ """
226+ changed_ports = self ._get_changed_ports (kernel_id )
227+ if changed_ports :
228+ # If changed, update captured ports and return True, else return False.
229+ self .log .debug (f"Port change detected for kernel: { kernel_id } " )
230+ self ._kernel_ports [kernel_id ] = changed_ports
231+ return True
232+ return False
233+
234+ def _get_changed_ports (self , kernel_id ):
235+ """Internal method to test if a kernel's ports have changed and, if so, return their values.
236+
237+ This method does NOT update the captured ports for the kernel as that can only be done
238+ by ZMQChannelsHandler, but instead returns the new list of ports if they are different
239+ than those captured at startup. This enables the ability to conditionally restart
240+ activity monitoring immediately following a kernel's restart (if ports have changed).
241+ """
242+ # Get current ports and return comparison with ports captured at startup.
243+ km = self .get_kernel (kernel_id )
244+ if km .ports != self ._kernel_ports [kernel_id ]:
245+ return km .ports
246+ return None
247+
211248 def start_buffering (self , kernel_id , session_key , channels ):
212249 """Start buffering messages for a kernel
213250
@@ -300,10 +337,7 @@ def stop_buffering(self, kernel_id):
300337 def shutdown_kernel (self , kernel_id , now = False , restart = False ):
301338 """Shutdown a kernel by kernel_id"""
302339 self ._check_kernel_id (kernel_id )
303- kernel = self ._kernels [kernel_id ]
304- if kernel ._activity_stream :
305- kernel ._activity_stream .close ()
306- kernel ._activity_stream = None
340+ self .stop_watching_activity (kernel_id )
307341 self .stop_buffering (kernel_id )
308342 self ._kernel_connections .pop (kernel_id , None )
309343
@@ -319,6 +353,7 @@ def shutdown_kernel(self, kernel_id, now=False, restart=False):
319353 # method is synchronous. However, we'll keep the relative call orders the same from
320354 # a maintenance perspective.
321355 self ._kernel_connections .pop (kernel_id , None )
356+ self ._kernel_ports .pop (kernel_id , None )
322357
323358 async def restart_kernel (self , kernel_id , now = False ):
324359 """Restart a kernel by kernel_id"""
@@ -359,6 +394,10 @@ def on_restart_failed():
359394 channel .on_recv (on_reply )
360395 loop = IOLoop .current ()
361396 timeout = loop .add_timeout (loop .time () + self .kernel_info_timeout , on_timeout )
397+ # Re-establish activity watching if ports have changed...
398+ if self ._get_changed_ports (kernel_id ) is not None :
399+ self .stop_watching_activity (kernel_id )
400+ self .start_watching_activity (kernel_id )
362401 return future
363402
364403 def notify_connect (self , kernel_id ):
@@ -440,6 +479,13 @@ def record_activity(msg_list):
440479
441480 kernel ._activity_stream .on_recv (record_activity )
442481
482+ def stop_watching_activity (self , kernel_id ):
483+ """Stop watching IOPub messages on a kernel for activity."""
484+ kernel = self ._kernels [kernel_id ]
485+ if kernel ._activity_stream :
486+ kernel ._activity_stream .close ()
487+ kernel ._activity_stream = None
488+
443489 def initialize_culler (self ):
444490 """Start idle culler if 'cull_idle_timeout' is greater than zero.
445491
@@ -511,10 +557,7 @@ def __init__(self, **kwargs):
511557 async def shutdown_kernel (self , kernel_id , now = False , restart = False ):
512558 """Shutdown a kernel by kernel_id"""
513559 self ._check_kernel_id (kernel_id )
514- kernel = self ._kernels [kernel_id ]
515- if kernel ._activity_stream :
516- kernel ._activity_stream .close ()
517- kernel ._activity_stream = None
560+ self .stop_watching_activity (kernel_id )
518561 self .stop_buffering (kernel_id )
519562
520563 # Decrease the metric of number of kernels
@@ -526,4 +569,5 @@ async def shutdown_kernel(self, kernel_id, now=False, restart=False):
526569 # Finish shutting down the kernel before clearing state to avoid a race condition.
527570 ret = await self .pinned_superclass .shutdown_kernel (self , kernel_id , now = now , restart = restart )
528571 self ._kernel_connections .pop (kernel_id , None )
572+ self ._kernel_ports .pop (kernel_id , None )
529573 return ret
0 commit comments