@@ -28,6 +28,7 @@ public class StackableTopologyProvider implements DNSToSwitchMapping {
2828 // Default values
2929 public static final String DEFAULT_RACK = "/defaultRack" ;
3030 private static final int CACHE_EXPIRY_DEFAULT_SECONDS = 5 * 60 ;
31+ private final String LISTENER_VERSION ;
3132
3233 private final KubernetesClient client ;
3334 private final List <TopologyLabel > labels ;
@@ -40,6 +41,7 @@ public StackableTopologyProvider() {
4041 this .client = new KubernetesClientBuilder ().build ();
4142 this .cache = new TopologyCache (getCacheExpiration (), CACHE_EXPIRY_DEFAULT_SECONDS );
4243 this .labels = TopologyLabel .initializeTopologyLabels ();
44+ LISTENER_VERSION = getListenerVersion ();
4345
4446 logInitializationStatus ();
4547 }
@@ -184,6 +186,28 @@ private List<Pod> fetchDataNodes() {
184186 // LISTENER RESOLUTION
185187 // ============================================================================
186188
189+ private String getListenerVersion () {
190+ var crd = client .apiextensions ().v1 ().customResourceDefinitions ().withName ("listeners" ).get ();
191+
192+ if (crd != null && !crd .getSpec ().getVersions ().isEmpty ()) {
193+ // Select the version that is served and used for storage (the "stable" version)
194+ for (var version : crd .getSpec ().getVersions ()) {
195+ if (version .getServed () && version .getStorage ()) {
196+ return version .getName (); // Prefer the stable version
197+ }
198+ }
199+
200+ // If no stable version found, return the first served version as a fallback
201+ for (var version : crd .getSpec ().getVersions ()) {
202+ if (version .getServed ()) {
203+ return version .getName (); // Just pick the first served version if no stable one
204+ }
205+ }
206+ }
207+ LOG .error ("Unable to fetch CRD version for listeners" );
208+ throw new RuntimeException ("Unable to fetch CRD version for listeners" );
209+ }
210+
187211 private List <String > resolveListeners (List <String > names ) {
188212 refreshListenerCacheIfNeeded (names );
189213
@@ -258,10 +282,10 @@ private String resolveListenerEndpoint(GenericKubernetesResource listener) {
258282 }
259283
260284 private GenericKubernetesResourceList fetchListeners () {
261- // no version is specified here as we are not always going to be on v1alpha1
262285 ResourceDefinitionContext listenerCrd =
263286 new ResourceDefinitionContext .Builder ()
264287 .withGroup ("listeners.stackable.tech" )
288+ .withVersion (LISTENER_VERSION )
265289 .withPlural ("listeners" )
266290 .withNamespaced (true )
267291 .build ();
0 commit comments