@@ -45,23 +45,22 @@ func TestClusterCatalogUnpacking(t *testing.T) {
4545 require .Equal (ct , * managerDeployment .Spec .Replicas , managerDeployment .Status .ReadyReplicas )
4646 }, time .Minute , time .Second )
4747
48- var managerPod corev1.Pod
49- t .Log ("Waiting for only one controller-manager pod to remain" )
48+ t .Log ("Waiting for controller-manager pods to match the desired replica count" )
5049 require .EventuallyWithT (t , func (ct * assert.CollectT ) {
5150 var managerPods corev1.PodList
5251 err := c .List (ctx , & managerPods , client .MatchingLabels (managerLabelSelector ))
5352 require .NoError (ct , err )
54- require .Len (ct , managerPods .Items , 1 )
55- managerPod = managerPods .Items [0 ]
53+ require .Len (ct , managerPods .Items , int (* managerDeployment .Spec .Replicas ))
5654 }, time .Minute , time .Second )
5755
5856 t .Log ("Waiting for acquired leader election" )
5957 leaderCtx , leaderCancel := context .WithTimeout (ctx , 3 * time .Minute )
6058 defer leaderCancel ()
61- leaderSubstrings := []string {"successfully acquired lease" }
62- leaderElected , err := watchPodLogsForSubstring (leaderCtx , & managerPod , leaderSubstrings ... )
59+
60+ // When there are multiple replicas, find the leader pod
61+ managerPod , err := findLeaderPod (leaderCtx , "catalogd" )
6362 require .NoError (t , err )
64- require .True (t , leaderElected )
63+ require .NotNil (t , managerPod )
6564
6665 t .Log ("Reading logs to make sure that ClusterCatalog was reconciled by catalogdv1" )
6766 logCtx , cancel := context .WithTimeout (ctx , time .Minute )
@@ -70,7 +69,7 @@ func TestClusterCatalogUnpacking(t *testing.T) {
7069 "reconcile ending" ,
7170 fmt .Sprintf (`ClusterCatalog=%q` , testClusterCatalogName ),
7271 }
73- found , err := watchPodLogsForSubstring (logCtx , & managerPod , substrings ... )
72+ found , err := watchPodLogsForSubstring (logCtx , managerPod , substrings ... )
7473 require .NoError (t , err )
7574 require .True (t , found )
7675
@@ -103,22 +102,30 @@ func TestClusterExtensionAfterOLMUpgrade(t *testing.T) {
103102
104103 // wait for catalogd deployment to finish
105104 t .Log ("Wait for catalogd deployment to be ready" )
106- catalogdManagerPod := waitForDeployment (t , ctx , "catalogd" )
105+ waitForDeployment (t , ctx , "catalogd" )
106+
107+ // Find the catalogd leader pod
108+ catalogdLeaderCtx , catalogdLeaderCancel := context .WithTimeout (ctx , 3 * time .Minute )
109+ defer catalogdLeaderCancel ()
110+ catalogdManagerPod , err := findLeaderPod (catalogdLeaderCtx , "catalogd" )
111+ require .NoError (t , err )
112+ require .NotNil (t , catalogdManagerPod )
107113
108114 // wait for operator-controller deployment to finish
109115 t .Log ("Wait for operator-controller deployment to be ready" )
110- managerPod := waitForDeployment (t , ctx , "operator-controller" )
116+ waitForDeployment (t , ctx , "operator-controller" )
111117
112118 t .Log ("Wait for acquired leader election" )
113119 // Average case is under 1 minute but in the worst case: (previous leader crashed)
114120 // we could have LeaseDuration (137s) + RetryPeriod (26s) +/- 163s
115121 leaderCtx , leaderCancel := context .WithTimeout (ctx , 3 * time .Minute )
116122 defer leaderCancel ()
117123
118- leaderSubstrings := []string {"successfully acquired lease" }
119- leaderElected , err := watchPodLogsForSubstring (leaderCtx , managerPod , leaderSubstrings ... )
124+ // When there are multiple replicas, find the leader pod
125+ var managerPod * corev1.Pod
126+ managerPod , err = findLeaderPod (leaderCtx , "operator-controller" )
120127 require .NoError (t , err )
121- require .True (t , leaderElected )
128+ require .NotNil (t , managerPod )
122129
123130 t .Log ("Reading logs to make sure that ClusterExtension was reconciled by operator-controller before we update it" )
124131 // Make sure that after we upgrade OLM itself we can still reconcile old objects without any changes
@@ -195,9 +202,8 @@ func TestClusterExtensionAfterOLMUpgrade(t *testing.T) {
195202
196203// waitForDeployment checks that the updated deployment with the given app.kubernetes.io/name label
197204// has reached the desired number of replicas and that the number pods matches that number
198- // i.e. no old pods remain. It will return a pointer to the first pod. This is only necessary
199- // to facilitate the mitigation put in place for https://github.com/operator-framework/operator-controller/issues/1626
200- func waitForDeployment (t * testing.T , ctx context.Context , controlPlaneLabel string ) * corev1.Pod {
205+ // i.e. no old pods remain.
206+ func waitForDeployment (t * testing.T , ctx context.Context , controlPlaneLabel string ) {
201207 deploymentLabelSelector := labels.Set {"app.kubernetes.io/name" : controlPlaneLabel }.AsSelector ()
202208
203209 t .Log ("Checking that the deployment is updated" )
@@ -217,13 +223,101 @@ func waitForDeployment(t *testing.T, ctx context.Context, controlPlaneLabel stri
217223 desiredNumReplicas = * managerDeployment .Spec .Replicas
218224 }, time .Minute , time .Second )
219225
220- var managerPods corev1.PodList
221226 t .Logf ("Ensure the number of remaining pods equal the desired number of replicas (%d)" , desiredNumReplicas )
222227 require .EventuallyWithT (t , func (ct * assert.CollectT ) {
228+ var managerPods corev1.PodList
223229 require .NoError (ct , c .List (ctx , & managerPods , client.MatchingLabelsSelector {Selector : deploymentLabelSelector }))
224- require .Len (ct , managerPods .Items , 1 )
230+ require .Len (ct , managerPods .Items , int ( desiredNumReplicas ) )
225231 }, time .Minute , time .Second )
226- return & managerPods .Items [0 ]
232+ }
233+
234+ // findLeaderPod finds the pod that has acquired the leader lease by checking logs of all pods
235+ func findLeaderPod (ctx context.Context , controlPlaneLabel string ) (* corev1.Pod , error ) {
236+ deploymentLabelSelector := labels.Set {"app.kubernetes.io/name" : controlPlaneLabel }.AsSelector ()
237+
238+ // If there's only one pod, it must be the leader
239+ // First, do a quick check to see if we can find the leader in existing logs
240+ var leaderPod * corev1.Pod
241+ leaderSubstrings := []string {"successfully acquired lease" }
242+
243+ // Retry logic: keep checking until we find a leader or context times out
244+ ticker := time .NewTicker (2 * time .Second )
245+ defer ticker .Stop ()
246+
247+ for {
248+ select {
249+ case <- ctx .Done ():
250+ return nil , fmt .Errorf ("timeout waiting for leader election: %w" , ctx .Err ())
251+ case <- ticker .C :
252+ var managerPods corev1.PodList
253+ if err := c .List (ctx , & managerPods , client.MatchingLabelsSelector {Selector : deploymentLabelSelector }); err != nil {
254+ continue
255+ }
256+
257+ if len (managerPods .Items ) == 0 {
258+ continue
259+ }
260+
261+ // If there's only one pod, it must be the leader
262+ if len (managerPods .Items ) == 1 {
263+ return & managerPods .Items [0 ], nil
264+ }
265+
266+ // Check each pod's existing logs (without following) for leader election message
267+ for i := range managerPods .Items {
268+ pod := & managerPods .Items [i ]
269+
270+ // Check existing logs only (don't follow)
271+ isLeader , err := checkPodLogsForSubstring (ctx , pod , leaderSubstrings ... )
272+ if err != nil {
273+ // If we can't read logs from this pod, try the next one
274+ continue
275+ }
276+ if isLeader {
277+ leaderPod = pod
278+ break
279+ }
280+ }
281+
282+ if leaderPod != nil {
283+ return leaderPod , nil
284+ }
285+ // No leader found yet, retry after ticker interval
286+ }
287+ }
288+ }
289+
290+ // checkPodLogsForSubstring checks existing pod logs (without following) for the given substrings
291+ func checkPodLogsForSubstring (ctx context.Context , pod * corev1.Pod , substrings ... string ) (bool , error ) {
292+ podLogOpts := corev1.PodLogOptions {
293+ Follow : false , // Don't follow, just read existing logs
294+ Container : container ,
295+ }
296+
297+ req := kclientset .CoreV1 ().Pods (pod .Namespace ).GetLogs (pod .Name , & podLogOpts )
298+ podLogs , err := req .Stream (ctx )
299+ if err != nil {
300+ return false , err
301+ }
302+ defer podLogs .Close ()
303+
304+ scanner := bufio .NewScanner (podLogs )
305+ for scanner .Scan () {
306+ line := scanner .Text ()
307+
308+ foundCount := 0
309+ for _ , substring := range substrings {
310+ if strings .Contains (line , substring ) {
311+ foundCount ++
312+ }
313+ }
314+ if foundCount == len (substrings ) {
315+ return true , nil
316+ }
317+ }
318+
319+ // If we didn't find the substrings, return false (not an error)
320+ return false , nil
227321}
228322
229323func watchPodLogsForSubstring (ctx context.Context , pod * corev1.Pod , substrings ... string ) (bool , error ) {
0 commit comments