Skip to content

Commit ac3ea95

Browse files
author
Istemi Ekin Akkus
committed
minor fix for logging backups; fixes to mfnmetrics.py to use the workflow log index
1 parent 1eb9f5e commit ac3ea95

File tree

4 files changed

+21
-18
lines changed

4 files changed

+21
-18
lines changed

FunctionWorker/python/PublicationUtils.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -550,10 +550,6 @@ def publish_output_direct(self, key, value_output, has_error, error_type, timest
550550

551551
encoded_result = self.encode_output(result)
552552

553-
encapsulated_result = self.encapsulate_output(encoded_result, self._metadata)
554-
555-
dlc.putMapEntry(self._execution_info_map_name, "result_" + current_function_instance_id, encapsulated_result)
556-
557553
# publish a message to the 'exit' topic
558554
trigger = {}
559555
trigger["next"] = self._wf_exit
@@ -572,11 +568,8 @@ def publish_output_direct(self, key, value_output, has_error, error_type, timest
572568

573569
if self._should_checkpoint:
574570
timestamp_map["t_start_dlcbackup"] = time.time() * 1000.0
575-
dlc = self.get_backup_data_layer_client()
576-
577571
timestamp_map["t_start_resultmap"] = time.time() * 1000.0
578-
dlc.putMapEntry(self._execution_info_map_name, "result_" + current_function_instance_id, encapsulated_value_output)
579-
#self._logger.info("[__mfn_backup] [%s] [%s] %s", self._execution_info_map_name, "result_" + current_function_instance_id, encapsulated_value_output)
572+
self._logger.info("[__mfn_backup] [%s] [%s] %s", self._execution_info_map_name, "result_" + current_function_instance_id, encapsulated_value_output)
580573

581574
timestamp_map["t_start_storeoutput"] = time.time() * 1000.0
582575
# store self._sapi.transient_output into the data layer

LoggingService/elasticsearch/mfnmetrics.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import re
2727

2828
NGINX_INDEX= 'mfnnx'
29-
COMPONENT_INDEXES= 'mfnfe,mfnqs,mfnwf,mfnnx'
29+
COMPONENT_INDEXES= 'mfnfe,mfnqs,mfnnx'
3030

31-
def get_metric_logs_from_all_components(eshost, esport=9200, uuids=None, proxies=None, debugReadFromFile=False):
31+
def get_metric_logs_from_all_components(workflow_id, eshost, esport=9200, uuids=None, proxies=None, debugReadFromFile=False):
3232
# Form elasticsearch query
3333
status, request = form_query(uuids)
3434
if status == False:
@@ -37,7 +37,7 @@ def get_metric_logs_from_all_components(eshost, esport=9200, uuids=None, proxies
3737

3838
# Execute the query
3939
if debugReadFromFile == False:
40-
status, output = query_elasticsearch(eshost, esport, request, proxies)
40+
status, output = query_elasticsearch(workflow_id, eshost, esport, request, proxies)
4141
if status == False:
4242
logging.debug("query_elasticsearch error: " + output)
4343
return False, output
@@ -61,7 +61,7 @@ def get_metric_logs_from_all_components(eshost, esport=9200, uuids=None, proxies
6161
status, output = handleFrontendLogline(hit)
6262
elif index == 'mfnqs':
6363
status, output = handleQueueServiceLogline(hit)
64-
elif index == 'mfnwf':
64+
elif index == 'mfnwf-' + workflow_id:
6565
status, output = handleWorkflowTraceLogline(hit)
6666
elif index == 'mfnnx':
6767
status, output = handleNginxLogline(hit)
@@ -731,12 +731,12 @@ def handleNginxLogline(hit):
731731
},
732732
'''
733733

734-
def query_elasticsearch(eshost, esport, request, proxies):
734+
def query_elasticsearch(wid, eshost, esport, request, proxies):
735735
url="http://"+eshost+":" + str(esport)
736736
logging.debug("Url: " + url)
737737

738738
try:
739-
r=requests.get(url+"/"+COMPONENT_INDEXES+'/_search', json=request, proxies=proxies)
739+
r=requests.get(url+"/"+COMPONENT_INDEXES+ ",mfnwf-" + wid + '/_search', json=request, proxies=proxies)
740740

741741
logging.debug('Http response code: ' + str(r.status_code))
742742
logging.debug('Http status reason: ' + r.reason)
@@ -778,6 +778,7 @@ def printlog(outlog):
778778

779779
def main():
780780
parser = argparse.ArgumentParser(description='Generate overhead metrics and timestamps for microfunctions workflow executions', prog='mfnmetrics.py')
781+
parser.add_argument('-wid', '--wid', metavar='WORKFLOW_ID', help='Workflow id to determine the index.')
781782
parser.add_argument('-eid', '--eid', nargs='+', metavar='EXECUTION_UUIDS', help='Generate metric for specific execution id(s).')
782783
parser.add_argument('-eidfile', '--eidfile', type=str, help="Read execution ids from the given file (with 1 uuid per line); overrides -eid option")
783784
parser.add_argument('-eshost', '--eshost', type=str, metavar='ELASTICSEARCH_HOST', default=socket.gethostname(), help='Elasticsearch host. Defaults to ' + socket.gethostname() + ':9200.')
@@ -789,6 +790,8 @@ def main():
789790

790791
args = parser.parse_args()
791792

793+
workflow_id = args.wid
794+
792795
eidfilename = args.eidfile
793796
if eidfilename is None:
794797
uuids = args.eid
@@ -820,6 +823,7 @@ def main():
820823
"https": https_proxy,
821824
}
822825

826+
logging.debug("Workflow id: " + str(workflow_id))
823827
logging.debug("Execution uuid(s): " + str(uuids))
824828
logging.debug("Elasticsearch host: " + str(eshost)+ ':9200')
825829
logging.debug("Proxies: " + str(proxies))
@@ -828,7 +832,7 @@ def main():
828832
if proxy == False:
829833
proxies = None
830834

831-
status, result = get_metric_logs_from_all_components(eshost, esport=9200, uuids=uuids, proxies=proxies, debugReadFromFile=debugReadFromFile)
835+
status, result = get_metric_logs_from_all_components(workflow_id, eshost, esport=9200, uuids=uuids, proxies=proxies, debugReadFromFile=debugReadFromFile)
832836
if status == True:
833837
printlog(result)
834838
else:

tests/mfn_test_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ def plot_latency_breakdown(self, num_last_executions=15):
498498
_, _ = run_command_return_output(cmd)
499499

500500
def parse_metrics(self, eid_filename, timestamps_filename):
501-
cmd = "python3 ../mfnmetrics.py -eidfile " + eid_filename
501+
cmd = "python3 ../mfnmetrics.py -eidfile " + eid_filename + " -wid " + self._workflow.id
502502
output, error = run_command_return_output(cmd)
503503
log_lines = combine_output(output, error)
504504
with open(timestamps_filename, "w") as f:

tests/performance_interaction_latency/test.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,24 @@ def test_chain_response_latency(self):
6666
print("----------------")
6767
print("Checkpoints: False")
6868

69-
test.exec_tests(self._test_tuple_list, check_duration=True, print_report=False)
69+
test.exec_tests(self._test_tuple_list, check_duration=True, print_report=False, should_undeploy=False)
7070

7171
#test.plot_latency_breakdown(COUNT_EXECUTIONS)
7272

73+
test.undeploy_workflow()
74+
test.cleanup()
75+
7376
test = MFNTest(test_name='chain_checkpoints', workflow_filename='wf_chain_checkpoints.json')
7477
print("----------------")
7578
print("Checkpoints: True")
7679

77-
test.exec_tests(self._test_tuple_list, check_duration=True, print_report=False)
80+
test.exec_tests(self._test_tuple_list, check_duration=True, print_report=False, should_undeploy=False)
7881

7982
#test.plot_latency_breakdown(COUNT_EXECUTIONS)
8083

84+
test.undeploy_workflow()
85+
test.cleanup()
86+
8187
def _get_and_print_statistics(self, test, logs, checkpoints_on):
8288
log = logs["log"]
8389
log_lines = log.split("\n")

0 commit comments

Comments
 (0)