@@ -1749,6 +1749,29 @@ def measure_downtime() -> None:
17491749 assert not test_failed
17501750
17511751
1752+ def workflow_balancer (c : Composition , parser : WorkflowArgumentParser ) -> None :
1753+ parser .add_argument (
1754+ "--recreate-cluster" ,
1755+ action = argparse .BooleanOptionalAction ,
1756+ help = "Recreate cluster if it exists already" ,
1757+ )
1758+ parser .add_argument (
1759+ "--tag" ,
1760+ type = str ,
1761+ help = "Custom version tag to use" ,
1762+ )
1763+ parser .add_argument (
1764+ "--orchestratord-override" ,
1765+ default = True ,
1766+ action = argparse .BooleanOptionalAction ,
1767+ help = "Override orchestratord tag" ,
1768+ )
1769+ args = parser .parse_args ()
1770+ definition = setup (c , args )
1771+ init (definition )
1772+ run_balancer (definition , False )
1773+
1774+
17521775def workflow_default (c : Composition , parser : WorkflowArgumentParser ) -> None :
17531776 parser .add_argument (
17541777 "--recreate-cluster" ,
@@ -2067,6 +2090,8 @@ def setup(c: Composition, args) -> dict[str, Any]:
20672090 definition ["namespace" ] = materialize_setup [0 ]
20682091 definition ["secret" ] = materialize_setup [1 ]
20692092 definition ["materialize" ] = materialize_setup [2 ]
2093+ with open (MZ_ROOT / "misc" / "helm-charts" / "testing" / "balancer.yaml" ) as f :
2094+ definition ["balancer" ] = yaml .load (f , Loader = yaml .Loader )
20702095
20712096 get_version (args .tag )
20722097 if args .orchestratord_override :
@@ -2088,6 +2113,17 @@ def setup(c: Composition, args) -> dict[str, Any]:
20882113 definition ["materialize" ]["spec" ]["environmentdImageRef" ] = get_image (
20892114 c .compose ["services" ]["environmentd" ]["image" ], args .tag
20902115 )
2116+ definition ["balancer" ]["spec" ]["balancerdImageRef" ] = get_image (
2117+ c .compose ["services" ]["balancerd" ]["image" ], args .tag
2118+ )
2119+ # this is just a hack to get balancerd to start up, sending requests
2120+ # won't actually work
2121+ definition ["balancer" ]["spec" ]["staticRouting" ][
2122+ "environmentdNamespace"
2123+ ] = "materialize"
2124+ definition ["balancer" ]["spec" ]["staticRouting" ][
2125+ "environmentdServiceName"
2126+ ] = "postgres"
20912127 # kubectl get endpoints mzel5y3f42l6-cluster-u1-replica-u1-gen-1 -n materialize-environment -o json
20922128 # more than one address
20932129 return definition
@@ -2343,3 +2379,121 @@ def post_run_check(definition: dict[str, Any], expect_fail: bool) -> None:
23432379 ]
23442380 )
23452381 raise ValueError ("Never completed" )
2382+
2383+
2384+ def run_balancer (definition : dict [str , Any ], expect_fail : bool ) -> None :
2385+ defs = [
2386+ definition ["namespace" ],
2387+ definition ["balancer" ],
2388+ ]
2389+ try :
2390+ spawn .runv (
2391+ ["kubectl" , "apply" , "-f" , "-" ],
2392+ stdin = yaml .dump_all (defs ).encode (),
2393+ )
2394+ except subprocess .CalledProcessError as e :
2395+ print (f"Failed to apply: { e .stdout } \n STDERR:{ e .stderr } " )
2396+ raise
2397+ post_run_check_balancer (definition , expect_fail )
2398+
2399+
2400+ def post_run_check_balancer (definition : dict [str , Any ], expect_fail : bool ) -> None :
2401+ for i in range (900 ):
2402+ time .sleep (1 )
2403+ try :
2404+ data = json .loads (
2405+ spawn .capture (
2406+ [
2407+ "kubectl" ,
2408+ "get" ,
2409+ "balancers" ,
2410+ "-n" ,
2411+ "materialize-environment" ,
2412+ "-o" ,
2413+ "json" ,
2414+ ],
2415+ stderr = subprocess .DEVNULL ,
2416+ )
2417+ )
2418+ status = data ["items" ][0 ].get ("status" )
2419+ if not status :
2420+ continue
2421+ if expect_fail :
2422+ break
2423+ if not status ["conditions" ] or status ["conditions" ][0 ]["type" ] != "Ready" :
2424+ continue
2425+ if status ["conditions" ][0 ]["status" ] == "True" :
2426+ break
2427+ except subprocess .CalledProcessError :
2428+ pass
2429+ else :
2430+ spawn .runv (
2431+ [
2432+ "kubectl" ,
2433+ "get" ,
2434+ "balancers" ,
2435+ "-n" ,
2436+ "materialize-environment" ,
2437+ "-o" ,
2438+ "yaml" ,
2439+ ],
2440+ )
2441+ raise ValueError ("Never completed" )
2442+
2443+ for i in range (480 ):
2444+ print ("kubectl get balancer pods" )
2445+ try :
2446+ status = spawn .capture (
2447+ [
2448+ "kubectl" ,
2449+ "get" ,
2450+ "pods" ,
2451+ "-l" ,
2452+ "app=balancerd" ,
2453+ "-n" ,
2454+ "materialize-environment" ,
2455+ "-o" ,
2456+ "jsonpath={.items[0].status.phase}" ,
2457+ ],
2458+ stderr = subprocess .DEVNULL ,
2459+ )
2460+ if status in ["Running" , "Error" , "CrashLoopBackOff" ]:
2461+ assert not expect_fail
2462+ break
2463+ except subprocess .CalledProcessError :
2464+ if expect_fail :
2465+ try :
2466+ logs = spawn .capture (
2467+ [
2468+ "kubectl" ,
2469+ "logs" ,
2470+ "-l" ,
2471+ "app.kubernetes.io/instance=operator" ,
2472+ "-n" ,
2473+ "materialize" ,
2474+ ],
2475+ stderr = subprocess .DEVNULL ,
2476+ )
2477+ if (
2478+ f"ERROR k8s_controller::controller: Balancer reconciliation error. err=reconciler for object Balancer.v1alpha1.materialize.cloud/{ definition ['balancer' ]['metadata' ]['name' ]} .materialize-environment failed"
2479+ in logs
2480+ ):
2481+ break
2482+ except subprocess .CalledProcessError :
2483+ pass
2484+
2485+ time .sleep (1 )
2486+ else :
2487+ # Helps to debug
2488+ spawn .runv (
2489+ [
2490+ "kubectl" ,
2491+ "describe" ,
2492+ "pod" ,
2493+ "-l" ,
2494+ "app=balancerd" ,
2495+ "-n" ,
2496+ "materialize-environment" ,
2497+ ]
2498+ )
2499+ raise ValueError ("Never completed" )
0 commit comments