@@ -1920,6 +1920,29 @@ def measure_downtime() -> None:
19201920 assert not test_failed
19211921
19221922
1923+ def workflow_balancer (c : Composition , parser : WorkflowArgumentParser ) -> None :
1924+ parser .add_argument (
1925+ "--recreate-cluster" ,
1926+ action = argparse .BooleanOptionalAction ,
1927+ help = "Recreate cluster if it exists already" ,
1928+ )
1929+ parser .add_argument (
1930+ "--tag" ,
1931+ type = str ,
1932+ help = "Custom version tag to use" ,
1933+ )
1934+ parser .add_argument (
1935+ "--orchestratord-override" ,
1936+ default = True ,
1937+ action = argparse .BooleanOptionalAction ,
1938+ help = "Override orchestratord tag" ,
1939+ )
1940+ args = parser .parse_args ()
1941+ definition = setup (c , args )
1942+ init (definition )
1943+ run_balancer (definition , False )
1944+
1945+
19231946def workflow_default (c : Composition , parser : WorkflowArgumentParser ) -> None :
19241947 parser .add_argument (
19251948 "--recreate-cluster" ,
@@ -2238,6 +2261,8 @@ def setup(c: Composition, args) -> dict[str, Any]:
22382261 definition ["namespace" ] = materialize_setup [0 ]
22392262 definition ["secret" ] = materialize_setup [1 ]
22402263 definition ["materialize" ] = materialize_setup [2 ]
2264+ with open (MZ_ROOT / "misc" / "helm-charts" / "testing" / "balancer.yaml" ) as f :
2265+ definition ["balancer" ] = yaml .load (f , Loader = yaml .Loader )
22412266
22422267 get_version (args .tag )
22432268 if args .orchestratord_override :
@@ -2259,6 +2284,17 @@ def setup(c: Composition, args) -> dict[str, Any]:
22592284 definition ["materialize" ]["spec" ]["environmentdImageRef" ] = get_image (
22602285 c .compose ["services" ]["environmentd" ]["image" ], args .tag
22612286 )
2287+ definition ["balancer" ]["spec" ]["balancerdImageRef" ] = get_image (
2288+ c .compose ["services" ]["balancerd" ]["image" ], args .tag
2289+ )
2290+ # this is just a hack to get balancerd to start up, sending requests
2291+ # won't actually work
2292+ definition ["balancer" ]["spec" ]["staticRouting" ][
2293+ "environmentdNamespace"
2294+ ] = "materialize"
2295+ definition ["balancer" ]["spec" ]["staticRouting" ][
2296+ "environmentdServiceName"
2297+ ] = "postgres"
22622298 # kubectl get endpoints mzel5y3f42l6-cluster-u1-replica-u1-gen-1 -n materialize-environment -o json
22632299 # more than one address
22642300 return definition
@@ -2516,3 +2552,121 @@ def post_run_check(definition: dict[str, Any], expect_fail: bool) -> None:
25162552 ]
25172553 )
25182554 raise ValueError ("Never completed" )
2555+
2556+
2557+ def run_balancer (definition : dict [str , Any ], expect_fail : bool ) -> None :
2558+ defs = [
2559+ definition ["namespace" ],
2560+ definition ["balancer" ],
2561+ ]
2562+ try :
2563+ spawn .runv (
2564+ ["kubectl" , "apply" , "-f" , "-" ],
2565+ stdin = yaml .dump_all (defs ).encode (),
2566+ )
2567+ except subprocess .CalledProcessError as e :
2568+ print (f"Failed to apply: { e .stdout } \n STDERR:{ e .stderr } " )
2569+ raise
2570+ post_run_check_balancer (definition , expect_fail )
2571+
2572+
2573+ def post_run_check_balancer (definition : dict [str , Any ], expect_fail : bool ) -> None :
2574+ for i in range (900 ):
2575+ time .sleep (1 )
2576+ try :
2577+ data = json .loads (
2578+ spawn .capture (
2579+ [
2580+ "kubectl" ,
2581+ "get" ,
2582+ "balancers" ,
2583+ "-n" ,
2584+ "materialize-environment" ,
2585+ "-o" ,
2586+ "json" ,
2587+ ],
2588+ stderr = subprocess .DEVNULL ,
2589+ )
2590+ )
2591+ status = data ["items" ][0 ].get ("status" )
2592+ if not status :
2593+ continue
2594+ if expect_fail :
2595+ break
2596+ if not status ["conditions" ] or status ["conditions" ][0 ]["type" ] != "Ready" :
2597+ continue
2598+ if status ["conditions" ][0 ]["status" ] == "True" :
2599+ break
2600+ except subprocess .CalledProcessError :
2601+ pass
2602+ else :
2603+ spawn .runv (
2604+ [
2605+ "kubectl" ,
2606+ "get" ,
2607+ "balancers" ,
2608+ "-n" ,
2609+ "materialize-environment" ,
2610+ "-o" ,
2611+ "yaml" ,
2612+ ],
2613+ )
2614+ raise ValueError ("Never completed" )
2615+
2616+ for i in range (480 ):
2617+ print ("kubectl get balancer pods" )
2618+ try :
2619+ status = spawn .capture (
2620+ [
2621+ "kubectl" ,
2622+ "get" ,
2623+ "pods" ,
2624+ "-l" ,
2625+ "app=balancerd" ,
2626+ "-n" ,
2627+ "materialize-environment" ,
2628+ "-o" ,
2629+ "jsonpath={.items[0].status.phase}" ,
2630+ ],
2631+ stderr = subprocess .DEVNULL ,
2632+ )
2633+ if status in ["Running" , "Error" , "CrashLoopBackOff" ]:
2634+ assert not expect_fail
2635+ break
2636+ except subprocess .CalledProcessError :
2637+ if expect_fail :
2638+ try :
2639+ logs = spawn .capture (
2640+ [
2641+ "kubectl" ,
2642+ "logs" ,
2643+ "-l" ,
2644+ "app.kubernetes.io/instance=operator" ,
2645+ "-n" ,
2646+ "materialize" ,
2647+ ],
2648+ stderr = subprocess .DEVNULL ,
2649+ )
2650+ if (
2651+ f"ERROR k8s_controller::controller: Balancer reconciliation error. err=reconciler for object Balancer.v1alpha1.materialize.cloud/{ definition ['balancer' ]['metadata' ]['name' ]} .materialize-environment failed"
2652+ in logs
2653+ ):
2654+ break
2655+ except subprocess .CalledProcessError :
2656+ pass
2657+
2658+ time .sleep (1 )
2659+ else :
2660+ # Helps to debug
2661+ spawn .runv (
2662+ [
2663+ "kubectl" ,
2664+ "describe" ,
2665+ "pod" ,
2666+ "-l" ,
2667+ "app=balancerd" ,
2668+ "-n" ,
2669+ "materialize-environment" ,
2670+ ]
2671+ )
2672+ raise ValueError ("Never completed" )
0 commit comments