Skip to content

Commit a5ea5f2

Browse files
committed
allow retrieving and passing session function metadata to avoid datalayer lookups
1 parent f95c344 commit a5ea5f2

File tree

2 files changed

+40
-4
lines changed

2 files changed

+40
-4
lines changed

FunctionWorker/python/MicroFunctionsAPI.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,14 +252,15 @@ def update_metadata(self, metadata_name, metadata_value, is_privileged_metadata=
252252

253253
# session related API calls
254254
# only valid if the workflow has at least one session function
255-
def send_to_running_function_in_session(self, rgid, message, send_now=False):
255+
def send_to_running_function_in_session(self, rgid, message, send_now=False, session_metadata=None):
256256
'''
257257
Send a message to a long-running session function instance identified with its id in this session.
258258
259259
Args:
260260
rgid (string): the running long-running session function instance's id.
261261
message (*): the message to be sent; can be any python data type (<type 'dict', 'list', 'str', 'int', 'float', or 'NoneType'>).
262262
send_now (boolean): whether the message should be sent immediately or at the end of current function's execution; default: False.
263+
session_metadata (string): (optional) the running long-running session function instance's metadata
263264
264265
Returns:
265266
None
@@ -280,7 +281,7 @@ def send_to_running_function_in_session(self, rgid, message, send_now=False):
280281

281282
if self._is_session_workflow:
282283
self._session_utils.send_to_running_function_in_session(
283-
rgid, message, send_now)
284+
rgid, message, send_now, session_metadata)
284285
else:
285286
self._logger.warning(
286287
"Cannot send a session update message in a workflow with no session functions.")
@@ -767,6 +768,32 @@ def get_all_session_function_ids(self):
767768
"Cannot get session function ids in a workflow with no session functions.")
768769
return rgidlist
769770

771+
def get_all_session_metadata(self):
772+
'''
773+
Retrieve a dict of metadata related to all ids of the session function instances in this session.
774+
775+
Args:
776+
None
777+
778+
Returns:
779+
Dict of id (string) -> metadata (string) of all the session function instances in this session.
780+
781+
Warns:
782+
When the calling function is not part of a workflow with at least one session function.
783+
784+
Note:
785+
The usage of this function is only possible with a KNIX-specific feature (i.e., session functions).
786+
Using a KNIX-specific feature might make the workflow description incompatible with other platforms.
787+
788+
'''
789+
session_metadata_dict = {}
790+
if self._is_session_workflow:
791+
session_metadata_dict = self._session_utils.get_all_session_metadata()
792+
else:
793+
self._logger.warning(
794+
"Cannot get session metadata in a workflow with no session functions.")
795+
return session_metadata_dict
796+
770797
def is_still_running(self):
771798
'''
772799
Retrieve the status of this session function instance.

FunctionWorker/python/SessionUtils.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,13 @@ def get_all_session_function_ids(self):
211211
rgidlist = list(rgidset)
212212
return rgidlist
213213

214+
def get_all_session_metadata(self):
215+
session_metadata_dict = self._global_data_layer_client.retrieveMap(self._map_name_session_functions)
216+
if session_metadata_dict != None and type(session_metadata_dict) == type({}):
217+
return session_metadata_dict
218+
else:
219+
return {}
220+
214221
def get_all_session_function_aliases(self):
215222
alias_map = {}
216223
alias_map = self._global_data_layer_client.retrieveMap(self._map_name_session_function_alias_id)
@@ -427,10 +434,12 @@ def is_session_function_running(self):
427434
# API to send a message to another session function
428435
# check the locally running functions, and send them the message locally if so
429436
# otherwise, send it to the EventGlobalPublisher's queue
430-
def send_to_running_function_in_session(self, session_function_id, message, send_now=False):
437+
def send_to_running_function_in_session(self, session_function_id, message, send_now=False, session_metadata=None):
431438
#self._logger.debug("[SessionUtils] Sending message to running function: " + str(session_function_id) + " now: " + str(send_now))
432439
# send the message to the specific running function id
433-
function_metadatastr = self._global_data_layer_client.getMapEntry(self._map_name_session_functions, session_function_id)
440+
function_metadatastr = session_metadata
441+
if function_metadatastr == None or function_metadatastr == '' or type(function_metadatastr) != type(''):
442+
function_metadatastr = self._global_data_layer_client.getMapEntry(self._map_name_session_functions, session_function_id)
434443
self._logger.debug(f"[SessionUtils] Sending message to running function: {str(session_function_id)}, now: {str(send_now)}, function_metadata: {str(function_metadatastr)}")
435444
try:
436445
#self._logger.debug("[SessionUtils] function metadata: " + function_metadatastr)

0 commit comments

Comments
 (0)