1616
1717import json
1818import time
19+ import queue
1920from collections import deque
2021
2122from DataLayerClient import DataLayerClient
2728
2829class SessionHelperThread (threading .Thread ):
2930
30- def __init__ (self , helper_params , logger , pubutils , sessutils , queue , datalayer ):
31+ def __init__ (self , helper_params , logger , pubutils , sessutils , queueservice , datalayer ):
3132
3233 self ._logger = logger
3334
@@ -37,7 +38,7 @@ def __init__(self, helper_params, logger, pubutils, sessutils, queue, datalayer)
3738
3839 self ._session_utils = sessutils
3940
40- self ._queue = queue
41+ self ._queue_service = queueservice
4142 self ._datalayer = datalayer
4243
4344 self ._sandboxid = helper_params ["sandboxid" ]
@@ -49,7 +50,8 @@ def __init__(self, helper_params, logger, pubutils, sessutils, queue, datalayer)
4950 # need a separate backup data layer client from the publication utils; otherwise, we run into concurrent modification
5051 # problems from Thrift
5152 # locality = -1 means that the writes happen to the local data layer first and then asynchronously to the global data layer
52- self ._backup_data_layer_client = DataLayerClient (locality = - 1 , for_mfn = True , sid = self ._sandboxid , connect = self ._datalayer )
53+ # will only initialize if heartbeats are enabled
54+ self ._backup_data_layer_client = None
5355
5456 # set up heartbeat parameters
5557 self ._heartbeat_enabled = False
@@ -74,12 +76,10 @@ def __init__(self, helper_params, logger, pubutils, sessutils, queue, datalayer)
7476 # we can send regular heartbeats
7577 self ._local_poll_timeout = py3utils .ensure_long (10000 )
7678
77- # use a deque to keep the list of messages
78- # updating the list and retrieving the list would be done by two threads
79- # this should be safe without lock because of the global interpreter lock in python
80- self ._message_queue = deque ()
79+ # use a queue to keep the incoming update messages for blocking and/or blocking get_update_messages() requests
80+ self ._message_queue = queue .Queue ()
8181
82- self ._local_queue_client = LocalQueueClient (connect = self ._queue )
82+ self ._local_queue_client = LocalQueueClient (connect = self ._queue_service )
8383
8484 self ._special_messages = {}
8585 self ._special_messages ["--stop" ] = True
@@ -102,10 +102,15 @@ def _init_heartbeat_parameters(self, heartbeat_params):
102102
103103 if self ._heartbeat_method == "function" :
104104 if "heartbeat_function" in heartbeat_params :
105+ # enable function related heartbeat
105106 self ._heartbeat_function = heartbeat_params ["heartbeat_function" ]
106107 #self._logger.debug("[SessionHelperThread] New heartbeat function: " + str(self._heartbeat_function))
108+ if self ._backup_data_layer_client is None :
109+ self ._backup_data_layer_client = DataLayerClient (locality = - 1 , for_mfn = True , sid = self ._sandboxid , connect = self ._datalayer )
107110 if self ._local_queue_client_heartbeat is None :
108- self ._local_queue_client_heartbeat = LocalQueueClient (connect = self ._queue )
111+ self ._local_queue_client_heartbeat = LocalQueueClient (connect = self ._queue_service )
112+
113+ # disable data layer related heartbeat
109114 if self ._data_layer_client_heartbeat is not None :
110115 self ._data_layer_client_heartbeat .delete (self ._heartbeat_data_layer_key )
111116 self ._heartbeat_data_layer_key = None
@@ -118,13 +123,22 @@ def _init_heartbeat_parameters(self, heartbeat_params):
118123 # OR keep a new map for heartbeats of the session functions
119124 # so that the checker can retrieve the keys and their values (e.g., timestamps)
120125 # if a session function misses a heartbeat, the checker function reports to policy handler
126+
127+ # enable data layer related heartbeat
121128 self ._heartbeat_data_layer_key = "heartbeat_" + self ._session_id + "_" + self ._session_function_id
122129 if self ._data_layer_client_heartbeat is None :
123130 self ._data_layer_client_heartbeat = DataLayerClient (locality = 1 , for_mfn = True , sid = self ._sandboxid , connect = self ._datalayer )
131+
132+ # disable function related heartbeat
124133 if self ._local_queue_client_heartbeat is not None :
125134 self ._local_queue_client_heartbeat .shutdown ()
126135 self ._local_queue_client_heartbeat = None
127136 self ._heartbeat_function = None
137+ if self ._backup_data_layer_client is not None :
138+ self ._backup_data_layer_client .shutdown ()
139+ self ._backup_data_layer_client = None
140+
141+
128142 else :
129143 raise MicroFunctionsSessionAPIException ("Unsupported heartbeat method for session function." )
130144
@@ -223,19 +237,19 @@ def _process_message(self, lqm):
223237 # _XXX_: we are encoding/decoding the delivered message; should not actually execute this code
224238 # it is here for not envisioned corner case (i.e., let the user code deal with it)
225239 if not is_json :
226- self ._queue_message (msg )
240+ self ._store_message (msg )
227241 self ._publication_utils .set_metadata (metadata )
228242 else :
229243 # the message is json encoded, but it doesn't guarantee that it is a special message
230244 if "action" in msg and msg ["action" ] in self ._special_messages :
231245 self ._handle_special_message (msg )
232246 else :
233- self ._queue_message (msg )
247+ self ._store_message (msg )
234248 self ._publication_utils .set_metadata (metadata )
235249
236250
237- def _queue_message (self , msg ):
238- self ._message_queue .append (msg )
251+ def _store_message (self , msg ):
252+ self ._message_queue .put (msg )
239253
240254 def _handle_special_message (self , msg ):
241255 action = msg ["action" ]
@@ -247,17 +261,16 @@ def _handle_special_message(self, msg):
247261 elif action == "--update-heartbeat" :
248262 self ._init_heartbeat_parameters (msg ["heartbeat_parameters" ])
249263
250- def get_messages (self , count = 1 ):
264+ def get_messages (self , count = 1 , block = False ):
251265 messages = []
252266
253- # this check ensures that we never try to pop() from an empty deque
254- num_messages = len (self ._message_queue )
255- if num_messages < count :
256- count = num_messages
257-
258267 for i in range (count ):
259- msg = self ._message_queue .popleft ()
260- messages .append (msg )
268+ try :
269+ msg = self ._message_queue .get (block = block )
270+ messages .append (msg )
271+ self ._message_queue .task_done ()
272+ except Exception as exc :
273+ pass
261274
262275 #self._logger.debug("returning messages: " + str(messages))
263276 return messages
@@ -317,19 +330,20 @@ def _cleanup(self):
317330 self ._local_queue_client_heartbeat .shutdown ()
318331 self ._local_queue_client_heartbeat = None
319332
333+ if self ._backup_data_layer_client is not None :
334+ self ._backup_data_layer_client .shutdown ()
335+ self ._backup_data_layer_client = None
336+
320337 self ._local_queue_client .shutdown ()
321338 self ._local_queue_client = None
322339
323- self ._backup_data_layer_client .shutdown ()
324- self ._backup_data_layer_client = None
325-
326340 def shutdown (self ):
327341 self ._is_running = False
328342 # remove/unregister the topic here
329343 # if done in _cleanup(), it may be the case that the parent process exits
330344 # and we never get to execute that part in this thread
331345 # need a new lqc, because if using the actual self._local_queue_client,
332346 # it will corrupt the protocol stack, if it is still waiting on new messages
333- lqc = LocalQueueClient (connect = self ._queue )
347+ lqc = LocalQueueClient (connect = self ._queue_service )
348+ lqc .removeTopic (self ._local_topic_communication )
334349 lqc .shutdown ()
335-
0 commit comments