Skip to content

Commit d12a88e

Browse files
author
Istemi Ekin Akkus
committed
log backups instead of storing them in the data layer; fixes #85
1 parent 1fd941e commit d12a88e

File tree

2 files changed

+7
-35
lines changed

2 files changed

+7
-35
lines changed

FunctionWorker/python/PublicationUtils.py

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -453,14 +453,7 @@ def _publish_output(self, key, trigger, lqcpub, timestamp_map=None):
453453
# and the backup will be overwritten
454454
# if one or more nexts were generated when publishing
455455
# at the end of execution, they will have been appended to our list
456-
# in memory and we will store the backup once for the entire list
457-
def _store_trigger_backups(self, dlc, input_backup_map, current_function_instance_id, store_next_backup_list=False):
458-
if self._execution_info_map_name is not None:
459-
for input_backup_key in input_backup_map:
460-
dlc.putMapEntry(self._execution_info_map_name, input_backup_key, input_backup_map[input_backup_key])
461-
if store_next_backup_list:
462-
dlc.putMapEntry(self._execution_info_map_name, "next_" + current_function_instance_id, json.dumps(self._next_backup_list))
463-
456+
# in memory and we will log the backup once for the entire list
464457
def _log_trigger_backups(self, input_backup_map, current_function_instance_id, store_next_backup_list=False):
465458
if self._execution_info_map_name is not None:
466459
for input_backup_key in input_backup_map:
@@ -469,6 +462,7 @@ def _log_trigger_backups(self, input_backup_map, current_function_instance_id, s
469462
self._logger.info("[__mfn_backup] [%s] [%s] %s", self._execution_info_map_name, "next_" + current_function_instance_id, json.dumps(self._next_backup_list))
470463

471464
def _send_message_to_recovery_manager(self, key, message_type, topic, func_exec_id, has_error, error_type, lqcpub):
465+
# TODO
472466
return
473467
message_rec = {}
474468
message_rec["messageType"] = message_type
@@ -484,8 +478,8 @@ def _send_message_to_recovery_manager(self, key, message_type, topic, func_exec_
484478
# message via global publisher to pub manager's queue for backups
485479
self._send_local_queue_message(lqcpub, self._pub_topic_global, key, outputstr)
486480

487-
# need to store backups of inputs and send message to recovery manager
488-
def send_to_function_now(self, key, trigger, lqcpub=None, dlc=None, backup_to_log=False):
481+
# need to log backups of inputs and send message to recovery manager
482+
def send_to_function_now(self, key, trigger, lqcpub=None):
489483
trigger["value"] = self.encode_output(trigger["value"])
490484

491485
# get a local queue client
@@ -513,12 +507,7 @@ def send_to_function_now(self, key, trigger, lqcpub=None, dlc=None, backup_to_lo
513507
self._next_backup_list.append(next_function_instance_id)
514508
any_next = True
515509

516-
if backup_to_log:
517-
self._log_trigger_backups(input_backup_map, current_function_instance_id, store_next_backup_list=any_next)
518-
else:
519-
if dlc is None:
520-
dlc = self.get_backup_data_layer_client()
521-
self._store_trigger_backups(dlc, input_backup_map, current_function_instance_id, store_next_backup_list=any_next)
510+
self._log_trigger_backups(input_backup_map, current_function_instance_id, store_next_backup_list=any_next)
522511

523512
for next_func_exec_id in starting_next:
524513
next_func_topic = starting_next[next_func_exec_id]
@@ -654,8 +643,7 @@ def publish_output_direct(self, key, value_output, has_error, error_type, timest
654643
if self._should_checkpoint:
655644
timestamp_map["t_start_backtrigger"] = time.time() * 1000.0
656645
# backups for next of successfully completed function execution instances
657-
self._store_trigger_backups(dlc, input_backup_map, current_function_instance_id, store_next_backup_list=any_next)
658-
#self._log_trigger_backups(input_backup_map, current_function_instance_id, store_next_backup_list=any_next)
646+
self._log_trigger_backups(input_backup_map, current_function_instance_id, store_next_backup_list=any_next)
659647

660648
for next_func_exec_id in starting_next:
661649
next_func_topic = starting_next[next_func_exec_id]

FunctionWorker/python/SessionHelperThread.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,6 @@ def __init__(self, helper_params, logger, pubutils, sessutils, queueservice, dat
4646
self._session_function_id = helper_params["session_function_id"]
4747
self._session_id = helper_params["session_id"]
4848

49-
# initialize only needed
50-
# need a separate backup data layer client from the publication utils; otherwise, we run into concurrent modification
51-
# problems from Thrift
52-
# locality = -1 means that the writes happen to the local data layer first and then asynchronously to the global data layer
53-
# will only initialize if heartbeats are enabled
54-
self._backup_data_layer_client = None
55-
5649
# set up heartbeat parameters
5750
self._heartbeat_enabled = False
5851
self._heartbeat_method = None
@@ -105,8 +98,6 @@ def _init_heartbeat_parameters(self, heartbeat_params):
10598
# enable function related heartbeat
10699
self._heartbeat_function = heartbeat_params["heartbeat_function"]
107100
#self._logger.debug("[SessionHelperThread] New heartbeat function: " + str(self._heartbeat_function))
108-
if self._backup_data_layer_client is None:
109-
self._backup_data_layer_client = DataLayerClient(locality=-1, for_mfn=True, sid=self._sandboxid, connect=self._datalayer)
110101
if self._local_queue_client_heartbeat is None:
111102
self._local_queue_client_heartbeat = LocalQueueClient(connect=self._queue_service)
112103

@@ -134,9 +125,6 @@ def _init_heartbeat_parameters(self, heartbeat_params):
134125
self._local_queue_client_heartbeat.shutdown()
135126
self._local_queue_client_heartbeat = None
136127
self._heartbeat_function = None
137-
if self._backup_data_layer_client is not None:
138-
self._backup_data_layer_client.shutdown()
139-
self._backup_data_layer_client = None
140128

141129

142130
else:
@@ -310,7 +298,7 @@ def _send_heartbeat_to_function(self, hb_message):
310298
trigger_hb = {}
311299
trigger_hb["next"] = self._heartbeat_function
312300
trigger_hb["value"] = hb_message
313-
self._publication_utils.send_to_function_now("-1l", trigger_hb, self._local_queue_client_heartbeat, self._backup_data_layer_client)
301+
self._publication_utils.send_to_function_now("-1l", trigger_hb, self._local_queue_client_heartbeat)
314302

315303
def _send_heartbeat_to_data_layer(self, hb_message):
316304
self._data_layer_client_heartbeat.put(self._heartbeat_data_layer_key, json.dumps(hb_message))
@@ -328,10 +316,6 @@ def _cleanup(self):
328316
self._local_queue_client_heartbeat.shutdown()
329317
self._local_queue_client_heartbeat = None
330318

331-
if self._backup_data_layer_client is not None:
332-
self._backup_data_layer_client.shutdown()
333-
self._backup_data_layer_client = None
334-
335319
# remove/unregister the topic
336320
self._local_queue_client.removeTopic(self._local_topic_communication)
337321

0 commit comments

Comments
 (0)