1- From 8ad3983c91711e04629e608337b2d7e87fcfc243 Mon Sep 17 00:00:00 2001
1+ From 71e28fc959dbdfc1c314630ace3fc74425a95bd6 Mon Sep 17 00:00:00 2001
22From: Andrew Kenworthy <andrew.kenworthy@stackable.tech>
33Date: Tue, 9 Dec 2025 18:02:34 +0100
44Subject: refactor-topology-provider
55
66---
7- .../hadoop/StackableTopologyProvider.java | 834 ++++++++----------
7+ .../hadoop/StackableTopologyProvider.java | 841 ++++++++----------
88 .../tech/stackable/hadoop/TopologyCache.java | 88 ++
99 .../tech/stackable/hadoop/TopologyLabel.java | 116 +++
1010 .../tech/stackable/hadoop/TopologyUtils.java | 21 +
11- 4 files changed, 584 insertions(+), 475 deletions(-)
11+ 4 files changed, 590 insertions(+), 476 deletions(-)
1212 create mode 100644 src/main/java/tech/stackable/hadoop/TopologyCache.java
1313 create mode 100644 src/main/java/tech/stackable/hadoop/TopologyLabel.java
1414
1515diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java
16- index 4d1e917..b6f7877 100644
16+ index 4d1e917..0885796 100644
1717--- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java
1818+++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java
1919@@ -1,7 +1,5 @@
@@ -33,7 +33,7 @@ index 4d1e917..b6f7877 100644
3333 import java.util.stream.Collectors;
3434 import org.apache.hadoop.net.DNSToSwitchMapping;
3535 import org.slf4j.Logger;
36- @@ -18,330 +14,257 @@ import org.slf4j.LoggerFactory;
36+ @@ -18,330 +14,250 @@ import org.slf4j.LoggerFactory;
3737
3838 /**
3939 * An implementation of the org.apache.hadoop.net.DNSToSwitchMapping that is used to create a
@@ -510,20 +510,14 @@ index 4d1e917..b6f7877 100644
510510- for (String ingressAddress : TopologyUtils.getIngressAddresses(listener)) {
511511- this.listenerKeyCache.put(ingressAddress, listener);
512512- }
513+ - }
514+ - } else {
515+ - LOG.debug("Listener cache contains [{}]", names);
513516+ private String resolveListenerToDatanode(String name) {
514517+ GenericKubernetesResource listener = cache.getListener(name);
515518+ if (listener == null) {
516519+ LOG.debug("Not a listener: {}", name);
517520+ return name;
518- + }
519- + List<String> ingressAddresses = TopologyUtils.getIngressAddresses(listener);
520- + for (String ingressAddress : ingressAddresses) {
521- + LOG.debug("Address [{}]", ingressAddress);
522- + if (name.equalsIgnoreCase(ingressAddress)) {
523- + return resolveListenerEndpoint(listener);
524- }
525- - } else {
526- - LOG.debug("Listener cache contains [{}]", names);
527521 }
528522- ConcurrentMap<String, GenericKubernetesResource> listeners = this.listenerKeyCache.asMap();
529523-
@@ -534,8 +528,8 @@ index 4d1e917..b6f7877 100644
534528- listenerToDataNodeNames.add(resolvedName);
535529- }
536530- return listenerToDataNodeNames;
537- + LOG.info("Not a listener, returning [{}]", name);
538- + return name ;
531+ + // We found a listener, so we can resolve it directly
532+ + return resolveListenerEndpoint(listener) ;
539533 }
540534
541535- private GenericKubernetesResourceList getListeners() {
@@ -563,7 +557,7 @@ index 4d1e917..b6f7877 100644
563557 ResourceDefinitionContext listenerCrd =
564558 new ResourceDefinitionContext.Builder()
565559 .withGroup("listeners.stackable.tech")
566- @@ -353,168 +276,174 @@ public class StackableTopologyProvider implements DNSToSwitchMapping {
560+ @@ -353,168 +269,186 @@ public class StackableTopologyProvider implements DNSToSwitchMapping {
567561 return client.genericKubernetesResources(listenerCrd).list();
568562 }
569563
@@ -673,7 +667,19 @@ index 4d1e917..b6f7877 100644
673667+ }
674668+
675669+ private String findColocatedDatanode(Pod clientPod, Map<String, String> nodeToDatanodeIp) {
676- + return nodeToDatanodeIp.get(clientPod.getSpec().getNodeName());
670+ + String clientNodeName = clientPod.getSpec().getNodeName();
671+ +
672+ + if (clientNodeName == null) {
673+ + LOG.warn("Client pod {} not yet assigned to node", clientPod.getMetadata().getName());
674+ + return null;
675+ + }
676+ +
677+ + String datanodeIp = nodeToDatanodeIp.get(clientNodeName);
678+ + if (datanodeIp == null) {
679+ + LOG.debug("No datanode found on node {}", clientNodeName);
680+ + }
681+ +
682+ + return datanodeIp;
677683+ }
678684+
679685+ private String resolveToIpAddress(String hostname) {
@@ -870,7 +876,7 @@ index 4d1e917..b6f7877 100644
870876 Map<String, String> nodeLabels = node.getMetadata().getLabels();
871877 LOG.debug("Labels for node [{}]:[{}]....", nodeName, nodeLabels);
872878
873- @@ -526,78 +455 ,33 @@ public class StackableTopologyProvider implements DNSToSwitchMapping {
879+ @@ -526,78 +460 ,33 @@ public class StackableTopologyProvider implements DNSToSwitchMapping {
874880 return result;
875881 }
876882
0 commit comments