@@ -233,15 +233,8 @@ private String resolveListenerToDatanode(String name) {
233233 LOG .debug ("Not a listener: {}" , name );
234234 return name ;
235235 }
236- List <String > ingressAddresses = TopologyUtils .getIngressAddresses (listener );
237- for (String ingressAddress : ingressAddresses ) {
238- LOG .debug ("Address [{}]" , ingressAddress );
239- if (name .equalsIgnoreCase (ingressAddress )) {
240- return resolveListenerEndpoint (listener );
241- }
242- }
243- LOG .info ("Not a listener, returning [{}]" , name );
244- return name ;
236+ // We found a listener, so we can resolve it directly
237+ return resolveListenerEndpoint (listener );
245238 }
246239
247240 private String resolveListenerEndpoint (GenericKubernetesResource listener ) {
@@ -344,7 +337,19 @@ private String resolveToDatanodeOrKeep(
344337 }
345338
346339 private String findColocatedDatanode (Pod clientPod , Map <String , String > nodeToDatanodeIp ) {
347- return nodeToDatanodeIp .get (clientPod .getSpec ().getNodeName ());
340+ String clientNodeName = clientPod .getSpec ().getNodeName ();
341+
342+ if (clientNodeName == null ) {
343+ LOG .warn ("Client pod {} not yet assigned to node" , clientPod .getMetadata ().getName ());
344+ return null ;
345+ }
346+
347+ String datanodeIp = nodeToDatanodeIp .get (clientNodeName );
348+ if (datanodeIp == null ) {
349+ LOG .debug ("No datanode found on node {}" , clientNodeName );
350+ }
351+
352+ return datanodeIp ;
348353 }
349354
350355 private String resolveToIpAddress (String hostname ) {
0 commit comments