1111import redis
1212import os
1313from pathlib import Path
14+ import redistimeseries
15+ from docker .models .containers import Container
1416
1517from redisbench_admin .run .common import (
1618 get_start_time_vars ,
@@ -79,6 +81,13 @@ def main():
7981 topologies_map = get_topologies (topologies_files [0 ])
8082 testsuites_folder = os .path .abspath (args .test_suites_folder )
8183 logging .info ("Using test-suites folder dir {}" .format (testsuites_folder ))
84+ testsuite_spec_files = get_benchmark_specs (testsuites_folder )
85+ logging .info (
86+ "There are a total of {} test-suites in folder {}" .format (
87+ len (testsuite_spec_files ), testsuites_folder
88+ )
89+ )
90+
8291 logging .info (
8392 "Using redis available at: {}:{} to read the event streams" .format (
8493 GH_REDIS_SERVER_HOST , GH_REDIS_SERVER_PORT
@@ -108,7 +117,7 @@ def main():
108117 )
109118 )
110119 try :
111- rts = redis . StrictRedis (
120+ rts = redistimeseries . client . Client (
112121 host = args .datasink_redistimeseries_host ,
113122 port = args .datasink_redistimeseries_port ,
114123 decode_responses = True ,
@@ -127,11 +136,36 @@ def main():
127136 exit (1 )
128137
129138 logging .info ("checking build spec requirements" )
139+ build_runners_consumer_group_create (conn )
140+ stream_id = None
141+ docker_client = docker .from_env ()
142+ home = str (Path .home ())
143+ # TODO: confirm we do have enough cores to run the spec
144+ # availabe_cpus = args.cpu_count
145+ datasink_push_results_redistimeseries = args .datasink_push_results_redistimeseries
146+ logging .info ("Entering blocking read waiting for work." )
147+ if stream_id is None :
148+ stream_id = args .consumer_start_id
149+ while True :
150+ _ , stream_id , _ = self_contained_coordinator_blocking_read (
151+ conn ,
152+ datasink_push_results_redistimeseries ,
153+ docker_client ,
154+ home ,
155+ stream_id ,
156+ rts ,
157+ testsuite_spec_files ,
158+ topologies_map ,
159+ )
160+
161+
162+ def build_runners_consumer_group_create (conn , id = "$" ):
130163 try :
131164 conn .xgroup_create (
132165 STREAM_KEYNAME_NEW_BUILD_EVENTS ,
133166 STREAM_GH_NEW_BUILD_RUNNERS_CG ,
134167 mkstream = True ,
168+ id = id ,
135169 )
136170 logging .info (
137171 "Created consumer group named {} to distribute work." .format (
@@ -144,36 +178,44 @@ def main():
144178 STREAM_GH_NEW_BUILD_RUNNERS_CG
145179 )
146180 )
147- previous_id = None
148- docker_client = docker .from_env ()
149- home = str (Path .home ())
150- # TODO: confirm we do have enough cores to run the spec
151- # availabe_cpus = args.cpu_count
152- datasink_push_results_redistimeseries = args .datasink_push_results_redistimeseries
153181
154- while True :
155- logging .info ("Entering blocking read waiting for work." )
156- if previous_id is None :
157- previous_id = args .consumer_start_id
158- newTestInfo = conn .xreadgroup (
159- STREAM_GH_NEW_BUILD_RUNNERS_CG ,
160- "{}-self-contained-proc#{}" .format (STREAM_GH_NEW_BUILD_RUNNERS_CG , "1" ),
161- {STREAM_KEYNAME_NEW_BUILD_EVENTS : previous_id },
162- count = 1 ,
163- block = 0 ,
164- )
165- if len (newTestInfo [0 ]) < 2 or len (newTestInfo [0 ][1 ]) < 1 :
166- previous_id = ">"
167- continue
168- previous_id = process_self_contained_coordinator_stream (
182+
183+ def self_contained_coordinator_blocking_read (
184+ conn ,
185+ datasink_push_results_redistimeseries ,
186+ docker_client ,
187+ home ,
188+ stream_id ,
189+ rts ,
190+ testsuite_spec_files ,
191+ topologies_map ,
192+ ):
193+ num_process_streams = 0
194+ overall_result = False
195+ consumer_name = "{}-self-contained-proc#{}" .format (
196+ STREAM_GH_NEW_BUILD_RUNNERS_CG , "1"
197+ )
198+ newTestInfo = conn .xreadgroup (
199+ STREAM_GH_NEW_BUILD_RUNNERS_CG ,
200+ consumer_name ,
201+ {STREAM_KEYNAME_NEW_BUILD_EVENTS : stream_id },
202+ count = 1 ,
203+ block = 0 ,
204+ )
205+ if len (newTestInfo [0 ]) < 2 or len (newTestInfo [0 ][1 ]) < 1 :
206+ stream_id = ">"
207+ else :
208+ stream_id , overall_result = process_self_contained_coordinator_stream (
169209 datasink_push_results_redistimeseries ,
170210 docker_client ,
171211 home ,
172212 newTestInfo ,
173213 rts ,
174- testsuites_folder ,
214+ testsuite_spec_files ,
175215 topologies_map ,
176216 )
217+ num_process_streams = num_process_streams + 1
218+ return overall_result , stream_id , num_process_streams
177219
178220
179221def process_self_contained_coordinator_stream (
@@ -182,12 +224,14 @@ def process_self_contained_coordinator_stream(
182224 home ,
183225 newTestInfo ,
184226 rts ,
185- testsuites_folder ,
227+ testsuite_spec_files ,
186228 topologies_map ,
187229):
188230 stream_id , testDetails = newTestInfo [0 ][1 ][0 ]
189231 stream_id = stream_id .decode ()
190232 logging .info ("Received work . Stream id {}." .format (stream_id ))
233+ overall_result = False
234+
191235 if b"git_hash" in testDetails :
192236 (
193237 build_artifacts ,
@@ -197,9 +241,8 @@ def process_self_contained_coordinator_stream(
197241 run_image ,
198242 ) = extract_build_info_from_streamdata (testDetails )
199243
200- files = get_benchmark_specs (testsuites_folder )
201-
202- for test_file in files :
244+ overall_result = True
245+ for test_file in testsuite_spec_files :
203246 redis_containers = []
204247 client_containers = []
205248
@@ -213,6 +256,7 @@ def process_self_contained_coordinator_stream(
213256 _ ,
214257 ) = extract_redis_dbconfig_parameters (benchmark_config , "dbconfig" )
215258 for topology_spec_name in benchmark_config ["redis-topologies" ]:
259+ test_result = False
216260 try :
217261 current_cpu_pos = 0
218262 ceil_db_cpu_limit = extract_db_cpu_limit (
@@ -383,13 +427,13 @@ def process_self_contained_coordinator_stream(
383427 with open (local_benchmark_output_filename , "r" ) as json_file :
384428 results_dict = json .load (json_file )
385429 logging .info ("Final JSON result {}" .format (results_dict ))
386-
430+ dataset_load_duration_seconds = 0
387431 timeseries_test_sucess_flow (
388432 datasink_push_results_redistimeseries ,
389433 git_version ,
390434 benchmark_config ,
391435 benchmark_duration_seconds ,
392- None ,
436+ dataset_load_duration_seconds ,
393437 None ,
394438 topology_spec_name ,
395439 None ,
@@ -404,6 +448,7 @@ def process_self_contained_coordinator_stream(
404448 tf_triggering_env ,
405449 tsname_project_total_success ,
406450 )
451+ test_result = True
407452
408453 except :
409454 logging .critical (
@@ -414,18 +459,38 @@ def process_self_contained_coordinator_stream(
414459 print ("-" * 60 )
415460 traceback .print_exc (file = sys .stdout )
416461 print ("-" * 60 )
462+ test_result = False
417463 # tear-down
418464 logging .info ("Tearing down setup" )
419465 for container in redis_containers :
420- container .stop ()
421- for container in client_containers :
422- if type (container ) != bytes :
466+ try :
423467 container .stop ()
468+ except docker .errors .NotFound :
469+ logging .info (
470+ "When trying to stop DB container with id {} and image {} it was already stopped" .format (
471+ container .id , container .image
472+ )
473+ )
474+ pass
475+
476+ for container in client_containers :
477+ if type (container ) == Container :
478+ try :
479+ container .stop ()
480+ except docker .errors .NotFound :
481+ logging .info (
482+ "When trying to stop Client container with id {} and image {} it was already stopped" .format (
483+ container .id , container .image
484+ )
485+ )
486+ pass
424487 shutil .rmtree (temporary_dir , ignore_errors = True )
425488
489+ overall_result &= test_result
490+
426491 else :
427492 logging .error ("Missing commit information within received message." )
428- return stream_id
493+ return stream_id , overall_result
429494
430495
431496def get_benchmark_specs (testsuites_folder ):
0 commit comments