From fe6d773821acca90e6446ca40df74c0239c055e7 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 10 Dec 2025 14:45:57 +0100 Subject: [PATCH 1/4] wip: patch 0.4.2 --- .../0001-refactor-topology-provider.patch | 1228 +++++++++++++++++ 1 file changed, 1228 insertions(+) create mode 100644 hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch diff --git a/hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch b/hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch new file mode 100644 index 000000000..185aec525 --- /dev/null +++ b/hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch @@ -0,0 +1,1228 @@ +From 8ad3983c91711e04629e608337b2d7e87fcfc243 Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Tue, 9 Dec 2025 18:02:34 +0100 +Subject: refactor-topology-provider + +--- + .../hadoop/StackableTopologyProvider.java | 834 ++++++++---------- + .../tech/stackable/hadoop/TopologyCache.java | 88 ++ + .../tech/stackable/hadoop/TopologyLabel.java | 116 +++ + .../tech/stackable/hadoop/TopologyUtils.java | 21 + + 4 files changed, 584 insertions(+), 475 deletions(-) + create mode 100644 src/main/java/tech/stackable/hadoop/TopologyCache.java + create mode 100644 src/main/java/tech/stackable/hadoop/TopologyLabel.java + +diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +index 4d1e917..b6f7877 100644 +--- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java ++++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +@@ -1,7 +1,5 @@ + package tech.stackable.hadoop; + +-import com.github.benmanes.caffeine.cache.Cache; +-import com.github.benmanes.caffeine.cache.Caffeine; + import io.fabric8.kubernetes.api.model.*; + import io.fabric8.kubernetes.client.KubernetesClient; + import io.fabric8.kubernetes.client.KubernetesClientBuilder; +@@ -9,8 +7,6 @@ import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext; + import java.net.InetAddress; + import java.net.UnknownHostException; + import java.util.*; +-import java.util.concurrent.ConcurrentMap; +-import java.util.concurrent.TimeUnit; + import java.util.stream.Collectors; + import org.apache.hadoop.net.DNSToSwitchMapping; + import org.slf4j.Logger; +@@ -18,330 +14,257 @@ import org.slf4j.LoggerFactory; + + /** + * An implementation of the org.apache.hadoop.net.DNSToSwitchMapping that is used to create a +- * topology out of datanodes. ++ * topology out of dataNodes. + * + *

This class is intended to be run as part of the NameNode process and will be used by the +- * namenode to retrieve topology strings for datanodes. ++ * nameNode to retrieve topology strings for dataNodes. + */ + public class StackableTopologyProvider implements DNSToSwitchMapping { +- +- public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; +- public static final String VARNAME_CACHE_EXPIRATION = "TOPOLOGY_CACHE_EXPIRATION_SECONDS"; +- public static final String VARNAME_MAXLEVELS = "TOPOLOGY_MAX_LEVELS"; +- public static final String DEFAULT_RACK = "/defaultRack"; +- private static final int MAX_LEVELS_DEFAULT = 2; +- private static final int CACHE_EXPIRY_DEFAULT_SECONDS = 5 * 60; + private final Logger LOG = LoggerFactory.getLogger(StackableTopologyProvider.class); +- private final KubernetesClient client; +- private final Cache topologyKeyCache = +- Caffeine.newBuilder().expireAfterWrite(getCacheExpiration(), TimeUnit.SECONDS).build(); +- private final Cache nodeKeyCache = +- Caffeine.newBuilder() +- .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) +- .build(); + +- private final Cache listenerKeyCache = +- Caffeine.newBuilder() +- .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) +- .build(); +- private final Cache podKeyCache = +- Caffeine.newBuilder() +- .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) +- .build(); +- // The list of labels that this provider uses to generate the topology information for any given +- // datanode ++ // Environment variable names ++ public static final String VARNAME_CACHE_EXPIRATION = "TOPOLOGY_CACHE_EXPIRATION_SECONDS"; ++ ++ // Default values ++ public static final String DEFAULT_RACK = "/defaultRack"; ++ private static final int CACHE_EXPIRY_DEFAULT_SECONDS = 5 * 60; ++ ++ private final KubernetesClient client; + private final List labels; + ++ // Caching layers ++ private final TopologyCache cache; ++ + public StackableTopologyProvider() { + this.client = new KubernetesClientBuilder().build(); ++ this.cache = new TopologyCache(getCacheExpiration(), CACHE_EXPIRY_DEFAULT_SECONDS); ++ this.labels = TopologyLabel.initializeTopologyLabels(); + +- // Read the labels to be used to build a topology from environment variables. Labels are +- // configured in the EnvVar "TOPOLOGY_LABELS". They should be specified in the form +- // "[node|pod]:" and separated by ";". So a valid configuration that reads topology +- // information from the labels "kubernetes.io/zone" and "kubernetes.io/rack" on the k8s node +- // that is running a datanode pod would look like this: +- // "node:kubernetes.io/zone;node:kubernetes.io/rack" By default, there is an upper limit of 2 on +- // the number of labels that are processed, because this is what Hadoop traditionally allows - +- // this can be overridden via setting the EnvVar "MAX_TOPOLOGY_LEVELS". +- String topologyConfig = System.getenv(VARNAME_LABELS); +- if (topologyConfig != null && !topologyConfig.isEmpty()) { +- String[] labelConfigs = topologyConfig.split(";"); +- if (labelConfigs.length > getMaxLabels()) { +- LOG.error( +- "Found [{}] topology labels configured, but maximum allowed number is [{}]: " +- + "please check your config or raise the number of allowed labels.", +- labelConfigs.length, +- getMaxLabels()); +- throw new RuntimeException(); +- } +- // Create TopologyLabels from config strings +- this.labels = +- Arrays.stream(labelConfigs).map(TopologyLabel::new).collect(Collectors.toList()); +- +- // Check if any labelConfigs were invalid +- if (this.labels.stream().anyMatch(label -> label.labelType == LabelType.Undefined)) { +- LOG.error( +- "Topologylabel contained invalid configuration for at least one label: " +- + "double check your config! Labels should be specified in the " +- + "format '[pod|node]:;...'"); +- throw new RuntimeException(); +- } +- +- } else { +- LOG.error( +- "Missing env var [{}] this is required for rack awareness to work.", VARNAME_LABELS); +- throw new RuntimeException(); +- } +- if (this.labels.isEmpty()) { +- LOG.info( +- "No topology config found, defaulting value for all datanodes to [{}]", DEFAULT_RACK); +- } else { +- LOG.info( +- "Topology config yields labels [{}]", +- this.labels.stream().map(label -> label.name).collect(Collectors.toList())); +- } ++ logInitializationStatus(); + } + +- /*** +- * Checks if a value for the maximum number of topology levels to allow has been configured in +- * the environment variable specified in VARNAME_MAXLEVELS, +- * returns the value of MAX_LEVELS_DEFAULT as default if nothing is set. +- * +- * @return The maximum number of topology levels to allow. +- */ +- private int getMaxLabels() { +- String maxLevelsConfig = System.getenv(VARNAME_MAXLEVELS); +- if (maxLevelsConfig != null && !maxLevelsConfig.isEmpty()) { +- try { +- int maxLevelsFromEnvVar = Integer.parseInt(maxLevelsConfig); +- LOG.info( +- "Found [{}] env var, changing allowed number of topology levels to [{}]", +- VARNAME_MAXLEVELS, +- maxLevelsFromEnvVar); +- return maxLevelsFromEnvVar; +- } catch (NumberFormatException e) { +- LOG.warn( +- "Unable to parse value of [{}]/[{}] as integer, using default value [{}]", +- VARNAME_MAXLEVELS, +- maxLevelsConfig, +- MAX_LEVELS_DEFAULT); +- } +- } +- return MAX_LEVELS_DEFAULT; ++ @Override ++ public void reloadCachedMappings() { ++ // TODO: According to the upstream comment we should rebuild all cache entries after ++ // invalidating them ++ // this may mean trying to resolve ip addresses that do not exist any more and things like that ++ // though and ++ // require some more thought, so we will for now just invalidate the cache. ++ this.cache.invalidateAll(); ++ } ++ ++ @Override ++ public void reloadCachedMappings(List names) { ++ // TODO: See comment above, the same applies here ++ cache.invalidateTopologyKeys(names); + } + +- /*** +- * Checks if a value for the cache expiration time has been configured in +- * the environment variable specified in VARNAME_CACHE_EXPIRATION, +- * returns the value of CACHE_EXPIRY_DEFAULT_SECONDS as default if nothing is set. +- * +- * @return The cache expiration time to use for the rack id cache. +- */ + private int getCacheExpiration() { +- String cacheExpirationConfigSeconds = System.getenv(VARNAME_CACHE_EXPIRATION); +- if (cacheExpirationConfigSeconds != null && !cacheExpirationConfigSeconds.isEmpty()) { +- try { +- int cacheExpirationFromEnvVar = Integer.parseInt(cacheExpirationConfigSeconds); +- LOG.info( +- "Found [{}] env var, changing cache time for topology entries to [{}]", +- VARNAME_CACHE_EXPIRATION, +- cacheExpirationFromEnvVar); +- return cacheExpirationFromEnvVar; +- } catch (NumberFormatException e) { +- LOG.warn( +- "Unable to parse value of [{}]/[{}] as integer, using default value [{}]", +- VARNAME_CACHE_EXPIRATION, +- cacheExpirationConfigSeconds, +- CACHE_EXPIRY_DEFAULT_SECONDS); +- } +- } +- return CACHE_EXPIRY_DEFAULT_SECONDS; ++ return TopologyUtils.parseIntFromEnv( ++ VARNAME_CACHE_EXPIRATION, CACHE_EXPIRY_DEFAULT_SECONDS, "cache expiration seconds"); + } + +- /*** +- * +- * @param datanode the datanode whose IP mis to be resolved +- * @param podLabels the pod labels used in the resolution +- * @param nodeLabels the node labels used in the resolution +- * +- * @return the label looked up from the IP address +- */ +- private String getLabel( +- String datanode, +- Map> podLabels, +- Map> nodeLabels) { +- // The internal structures used by this mapper are all based on IP addresses. Depending on +- // configuration and network setup it may (probably will) be possible that the namenode uses +- // hostnames to resolve a datanode to a topology zone. To allow this, we resolve every input to +- // an ip address below and use the ip to lookup labels. +- // TODO: this might break with the listener operator, as `pod.status.podips` won't contain +- // external addresses +- // tracking this in https://github.com/stackabletech/hdfs-topology-provider/issues/2 +- InetAddress address; +- try { +- address = InetAddress.getByName(datanode); +- LOG.debug("Resolved [{}] to [{}]", datanode, address.getHostAddress()); +- datanode = address.getHostAddress(); +- } catch (UnknownHostException e) { +- LOG.warn( +- "failed to resolve address for [{}] - this should not happen, " +- + "defaulting this node to [{}]", +- datanode, +- DEFAULT_RACK); +- return DEFAULT_RACK; ++ private void logInitializationStatus() { ++ if (labels.isEmpty()) { ++ LOG.info("No topology configuration - will use default rack: {}", DEFAULT_RACK); ++ } else { ++ List labelNames = ++ labels.stream().map(TopologyLabel::getName).collect(Collectors.toList()); ++ LOG.info("Initialized with topology labels: {}", labelNames); + } +- StringBuilder resultBuilder = new StringBuilder(); +- for (TopologyLabel label : this.labels) { +- if (label.labelType == LabelType.Node) { +- LOG.debug( +- "Looking for node label [{}] of type [{}] in [{}]/[{}]", +- label.name, +- label.labelType, +- nodeLabels.keySet(), +- nodeLabels.values()); +- resultBuilder +- .append("/") +- .append( +- nodeLabels +- .getOrDefault(datanode, new HashMap<>()) +- .getOrDefault(label.name, "NotFound")); +- } else if (label.labelType == LabelType.Pod) { +- LOG.debug( +- "Looking for pod label [{}] of type [{}] in [{}]/[{}]", +- label.name, +- label.labelType, +- podLabels.keySet(), +- podLabels.values()); +- resultBuilder +- .append("/") +- .append( +- podLabels +- .getOrDefault(datanode, new HashMap<>()) +- .getOrDefault(label.name, "NotFound")); +- } +- } +- String result = resultBuilder.toString(); +- LOG.debug("Returning label [{}]", result); +- return result; + } + + @Override + public List resolve(List names) { +- LOG.info("Resolving for listeners/client-pods [{}]", names.toString()); ++ LOG.info("Resolving topology for: {}", names); + +- if (this.labels.isEmpty()) { +- LOG.info( +- "No topology labels defined, returning [{}] for hdfs nodes: [{}]", DEFAULT_RACK, names); +- return names.stream().map(name -> DEFAULT_RACK).collect(Collectors.toList()); ++ if (labels.isEmpty()) { ++ return createDefaultRackList(names); + } + +- // We need to check if we have cached values for all datanodes contained in this request. +- // Unless we can answer everything from the cache we have to talk to k8s anyway and can just +- // recalculate everything +- List cachedValues = +- names.stream().map(this.topologyKeyCache::getIfPresent).collect(Collectors.toList()); +- LOG.debug("Cached topologyKeyCache values [{}]", cachedValues); +- +- if (cachedValues.contains(null)) { +- // We cannot completely serve this request from the cache, since we need to talk to k8s anyway +- // we'll simply refresh everything. +- LOG.debug( +- "Cache doesn't contain values for all requested pods: new values will be built for all nodes."); +- } else { +- // use same log level as the non-cached return statement +- LOG.info("Answering from cached topology keys: [{}]", cachedValues); ++ // Try to serve from cache first ++ List cachedValues = tryResolveFromCache(names); ++ if (cachedValues != null) { ++ LOG.info("Returning cached topology: {}", cachedValues); + return cachedValues; + } + +- // The datanodes will be the cache keys. +- List datanodes = ++ // Cache miss - perform full resolution ++ return performFullResolution(names); ++ } ++ ++ private List createDefaultRackList(List names) { ++ LOG.info( ++ "No topology labels defined, returning [{}] for hdfs nodes: [{}]", DEFAULT_RACK, names); ++ return names.stream().map(name -> DEFAULT_RACK).collect(Collectors.toList()); ++ } ++ ++ private List tryResolveFromCache(List names) { ++ // We need to check if we have cached values for all dataNodes contained in this request. ++ // Unless we can answer everything from the cache we have to talk to k8s anyway and can just ++ // recalculate everything ++ List cached = names.stream().map(cache::getTopology).collect(Collectors.toList()); ++ LOG.debug("Cached topologyKeyCache values [{}]", cached); ++ ++ return cached.contains(null) ? null : cached; ++ } ++ ++ // ============================================================================ ++ // RESOLUTION WORKFLOW ++ // ============================================================================ ++ ++ private List performFullResolution(List names) { ++ LOG.debug("Performing full topology resolution for: {}", names); ++ ++ // Step 1: Gather all dataNodes ++ List dataNodes = fetchDataNodes(); ++ ++ // Step 2: Resolve listeners to actual datanode IPs ++ List resolvedNames = resolveListeners(names); ++ ++ // Step 3: Build label lookup maps ++ Map> podLabels = buildPodLabelMap(dataNodes); ++ Map> nodeLabels = buildNodeLabelMap(dataNodes); ++ ++ // Step 4: Build node-to-datanode map for O(1) colocated lookups ++ Map nodeToDatanodeIp = buildNodeToDatanodeMap(dataNodes); ++ ++ // Step 5: Resolve client pods to co-located dataNodes ++ List datanodeIps = ++ resolveClientPodsToDataNodes(resolvedNames, podLabels, nodeToDatanodeIp); ++ ++ // Step 6: Build topology strings and cache results ++ return buildAndCacheTopology(names, datanodeIps, podLabels, nodeLabels); ++ } ++ ++ private List buildAndCacheTopology( ++ List originalNames, ++ List datanodeIps, ++ Map> podLabels, ++ Map> nodeLabels) { ++ List result = new ArrayList<>(); ++ for (int i = 0; i < datanodeIps.size(); i++) { ++ String datanodeIp = datanodeIps.get(i); ++ String originalName = originalNames.get(i); ++ ++ String topology = buildTopologyString(datanodeIp, podLabels, nodeLabels); ++ result.add(topology); ++ ++ // Cache both the resolved IP and original name ++ cache.putTopology(datanodeIp, topology); ++ cache.putTopology(originalName, topology); ++ } ++ ++ LOG.info("Built topology: {}", result); ++ return result; ++ } ++ ++ // ============================================================================ ++ // DATANODE FETCHING ++ // ============================================================================ ++ ++ private List fetchDataNodes() { ++ List dataNodes = + client + .pods() + .withLabel("app.kubernetes.io/component", "datanode") + .withLabel("app.kubernetes.io/name", "hdfs") + .list() + .getItems(); ++ + LOG.debug( +- "Retrieved datanodes: [{}]", +- datanodes.stream() +- .map(datanode -> datanode.getMetadata().getName()) ++ "Retrieved dataNodes: [{}]", ++ dataNodes.stream() ++ .map(dataNode -> dataNode.getMetadata().getName()) + .collect(Collectors.toList())); ++ return dataNodes; ++ } + +- List namesToDataNodeNames = dataNodesResolvedFromListenerOrOriginal(names); +- LOG.debug("Now resolving: [{}]", namesToDataNodeNames); ++ // ============================================================================ ++ // LISTENER RESOLUTION ++ // ============================================================================ + +- // Build internal state that is later used to look up information. Basically this transposes pod +- // and node lists into hashmaps where pod-IPs can be used to look up labels for the pods and +- // nodes. This is not terribly memory efficient because it effectively duplicates a lot of data +- // in memory. But since we cache lookups, this should hopefully only be done every once in a +- // while and is not kept in memory for extended amounts of time. +- List result = new ArrayList<>(); ++ private List resolveListeners(List names) { ++ refreshListenerCacheIfNeeded(names); + +- Map> nodeLabels = getNodeLabels(datanodes); +- LOG.debug("Resolved node labels map [{}]/[{}]", nodeLabels.keySet(), nodeLabels.values()); ++ return names.stream().map(this::resolveListenerToDatanode).collect(Collectors.toList()); ++ } + +- Map> podLabels = getPodLabels(datanodes); +- LOG.debug("Resolved pod labels map [{}]/[{}]", podLabels.keySet(), podLabels.values()); ++ private void refreshListenerCacheIfNeeded(List names) { ++ List missingNames = ++ names.stream().filter(name -> cache.getListener(name) == null).collect(Collectors.toList()); + +- List podsResolvedToDataNodes = +- resolveDataNodesFromCallingPods(namesToDataNodeNames, podLabels, datanodes); +- +- // Iterate over all nodes to resolve and return the topology zones +- for (int i = 0; i < podsResolvedToDataNodes.size(); i++) { +- String builtLabel = getLabel(podsResolvedToDataNodes.get(i), podLabels, nodeLabels); +- result.add(builtLabel); +- +- // Cache the value for potential use in a later request +- this.topologyKeyCache.put(podsResolvedToDataNodes.get(i), builtLabel); +- // also cache the original name, in case that has resolved to a dataNode (so that +- // the resolution step can be omitted next time this pod is encountered) +- this.topologyKeyCache.put(names.get(i), builtLabel); ++ if (missingNames.isEmpty()) { ++ LOG.debug("Listener cache contains all required entries"); ++ return; ++ } ++ ++ // Listeners are typically few, so fetch all ++ // (Individual listener fetches would require knowing the namespace) ++ LOG.debug("Fetching all listeners to populate cache"); ++ GenericKubernetesResourceList listeners = fetchListeners(); ++ ++ for (GenericKubernetesResource listener : listeners.getItems()) { ++ cacheListenerByNameAndAddresses(listener); ++ } ++ } ++ ++ private void cacheListenerByNameAndAddresses(GenericKubernetesResource listener) { ++ String name = listener.getMetadata().getName(); ++ cache.putListener(name, listener); ++ ++ // Also cache by ingress addresses for quick lookup ++ for (String address : TopologyUtils.getIngressAddresses(listener)) { ++ cache.putListener(address, listener); + } +- LOG.info("Returning resolved labels [{}]", result); +- return result; + } + + /** +- * If the names include listeners then these must be resolved against the dataNode IPs and used +- * subsequently. ++ * We don't know if the name refers to a listener (it could be any client pod) but we check to see ++ * if it can be resolved to a dataNode just in case. + * +- * @param names the collection of names to resolve +- * @return a collection of either the name (for non-listener) or the dataNode IP to which this +- * listener resolves ++ * @param name the name of the calling pod which should be resolved to a dataNode IP if it is a ++ * listener ++ * @return either the name (for non-listener) or the dataNode IP to which this listener resolves + */ +- private List dataNodesResolvedFromListenerOrOriginal(List names) { +- List cachedListeners = +- names.stream().map(this.listenerKeyCache::getIfPresent).collect(Collectors.toList()); +- if (cachedListeners.contains(null)) { +- LOG.debug( +- "Refreshing listener cache as not all of [{}] present in [{}]", +- names, +- this.listenerKeyCache.asMap().keySet()); +- +- GenericKubernetesResourceList listeners = getListeners(); +- +- for (GenericKubernetesResource listener : listeners.getItems()) { +- this.listenerKeyCache.put(listener.getMetadata().getName(), listener); +- // also add the IPs +- for (String ingressAddress : TopologyUtils.getIngressAddresses(listener)) { +- this.listenerKeyCache.put(ingressAddress, listener); +- } ++ private String resolveListenerToDatanode(String name) { ++ GenericKubernetesResource listener = cache.getListener(name); ++ if (listener == null) { ++ LOG.debug("Not a listener: {}", name); ++ return name; ++ } ++ List ingressAddresses = TopologyUtils.getIngressAddresses(listener); ++ for (String ingressAddress : ingressAddresses) { ++ LOG.debug("Address [{}]", ingressAddress); ++ if (name.equalsIgnoreCase(ingressAddress)) { ++ return resolveListenerEndpoint(listener); + } +- } else { +- LOG.debug("Listener cache contains [{}]", names); + } +- ConcurrentMap listeners = this.listenerKeyCache.asMap(); +- +- List listenerToDataNodeNames = new ArrayList<>(); +- +- for (String name : names) { +- String resolvedName = resolveDataNodesFromListeners(name, listeners); +- listenerToDataNodeNames.add(resolvedName); +- } +- return listenerToDataNodeNames; ++ LOG.info("Not a listener, returning [{}]", name); ++ return name; + } + +- private GenericKubernetesResourceList getListeners() { ++ private String resolveListenerEndpoint(GenericKubernetesResource listener) { ++ String listenerName = listener.getMetadata().getName(); ++ Endpoints endpoint = client.endpoints().withName(listenerName).get(); ++ LOG.debug("Matched ingressAddress [{}]", listenerName); ++ ++ if (endpoint.getSubsets().isEmpty()) { ++ LOG.warn("Endpoint {} has no subsets - pod may be restarting", listenerName); ++ return listenerName; ++ } ++ ++ EndpointAddress address = endpoint.getSubsets().get(0).getAddresses().get(0); ++ LOG.info( ++ "Resolved listener {} to IP {} on node {}", ++ listenerName, ++ address.getIp(), ++ address.getNodeName()); ++ ++ return address.getIp(); ++ } ++ ++ private GenericKubernetesResourceList fetchListeners() { + ResourceDefinitionContext listenerCrd = + new ResourceDefinitionContext.Builder() + .withGroup("listeners.stackable.tech") +@@ -353,168 +276,174 @@ public class StackableTopologyProvider implements DNSToSwitchMapping { + return client.genericKubernetesResources(listenerCrd).list(); + } + +- /** +- * We don't know if the name refers to a listener (it could be any client pod) but we check to see +- * if it can be resolved to a dataNode just in case. +- * +- * @param name the name of the calling pod which should be resolved to a dataNode IP if it is a +- * listener +- * @param listeners the current listener collection +- * @return either the name (for non-listener) or the dataNode IP to which this listener resolves +- */ +- private String resolveDataNodesFromListeners( +- String name, ConcurrentMap listeners) { +- LOG.debug("Attempting to resolve [{}]", name); +- for (GenericKubernetesResource listener : listeners.values()) { +- List ingressAddresses = TopologyUtils.getIngressAddresses(listener); +- for (String ingressAddress : ingressAddresses) { +- LOG.debug("Address [{}]", ingressAddress); +- if (name.equalsIgnoreCase(ingressAddress)) { +- LOG.debug("Matched ingressAddress [{}]/[{}]", name, listener.getMetadata().getName()); ++ // ============================================================================ ++ // CLIENT POD RESOLUTION ++ // ============================================================================ + +- Endpoints ep = client.endpoints().withName(listener.getMetadata().getName()).get(); +- // TODO: Assuming single address per datanode endpoint. +- // When does/can an endpoint support multiple data nodes? On restart the address list will +- // be empty for a moment or two. +- if (ep.getSubsets().size() < 1) { +- LOG.warn( +- "Endpoint [{}] address not detected, pod maybe restarting...", +- ep.getMetadata().getName()); +- } else { +- EndpointAddress address = ep.getSubsets().get(0).getAddresses().get(0); +- LOG.info( +- "Endpoint [{}], IP [{}], node [{}]", +- ep.getMetadata().getName(), +- address.getIp(), +- address.getNodeName()); +- return address.getIp(); +- } +- } ++ private List resolveClientPodsToDataNodes( ++ List names, ++ Map> podLabels, ++ Map nodeToDatanodeIp) { ++ ++ refreshPodCacheIfNeeded(names); ++ ++ return names.stream() ++ .map(name -> resolveToDatanodeOrKeep(name, podLabels, nodeToDatanodeIp)) ++ .collect(Collectors.toList()); ++ } ++ ++ private void refreshPodCacheIfNeeded(List names) { ++ if (cache.hasAllPods(names)) { ++ LOG.debug("Pod cache contains all required entries"); ++ return; ++ } ++ ++ // Note: We fetch all pods here because: ++ // 1. Client pods (Spark, etc.) are queried by IP, not name ++ // 2. K8s doesn't support "get pod by IP" - we must list and filter ++ LOG.debug("Refreshing pod cache for client pod resolution"); ++ for (Pod pod : client.pods().list().getItems()) { ++ cachePodByNameAndIps(pod); ++ } ++ } ++ ++ private void cachePodByNameAndIps(Pod pod) { ++ String podName = pod.getMetadata().getName(); ++ cache.putPod(podName, pod); ++ ++ // Cache by all IPs - this is crucial for IP-based lookups ++ for (PodIP ip : pod.getStatus().getPodIPs()) { ++ cache.putPod(ip.getIp(), pod); ++ } ++ } ++ ++ private String resolveToDatanodeOrKeep( ++ String name, ++ Map> podLabels, ++ Map nodeToDatanodeIp) { ++ ++ String ipAddress = resolveToIpAddress(name); ++ ++ // If it's already a datanode, return its IP ++ if (podLabels.containsKey(ipAddress)) { ++ LOG.info("Name is a datanode: {}", ipAddress); ++ return ipAddress; ++ } ++ ++ // Try to find co-located datanode ++ Pod clientPod = cache.getPod(ipAddress); ++ if (clientPod != null) { ++ String datanodeIp = findColocatedDatanode(clientPod, nodeToDatanodeIp); ++ if (datanodeIp != null) { ++ return datanodeIp; + } + } +- LOG.info("Not a listener, returning [{}]", name); +- return name; ++ ++ // Keep original if we can't resolve ++ return ipAddress; ++ } ++ ++ private String findColocatedDatanode(Pod clientPod, Map nodeToDatanodeIp) { ++ return nodeToDatanodeIp.get(clientPod.getSpec().getNodeName()); ++ } ++ ++ private String resolveToIpAddress(String hostname) { ++ try { ++ InetAddress address = InetAddress.getByName(hostname); ++ String ip = address.getHostAddress(); ++ LOG.debug("Resolved {} to {}", hostname, ip); ++ return ip; ++ } catch (UnknownHostException e) { ++ LOG.warn("Failed to resolve address: {} - defaulting to {}", hostname, DEFAULT_RACK); ++ return hostname; ++ } + } + + /** +- * The list of names may be datanodes (as is the case when the topology is initialised) or +- * non-datanodes, when the data is being written by a client tool, spark executors etc. In this +- * case we want to identify the datanodes that are running on the same node as this client. The +- * names may also be pod IPs or pod names. ++ * Build a map from Kubernetes node name to datanode IP. This enables O(1) lookup when finding ++ * co-located dataNodes for client pods. + * +- * @param names list of client pods to resolve to datanodes +- * @param podLabels map of podIPs and labels +- * @param dns list of datanode names which will be used to match nodenames +- * @return a list of pods resolved to co-located datanodes where possible ++ *

Note: If multiple dataNodes run on the same node, the last one wins. This is acceptable ++ * because all dataNodes on the same node have the same topology. + */ +- private List resolveDataNodesFromCallingPods( +- List names, Map> podLabels, List dns) { +- List dataNodes = new ArrayList<>(); ++ private Map buildNodeToDatanodeMap(List dataNodes) { ++ Map nodeToDatanode = new HashMap<>(); + +- // if any of the names do not exist in the pod cache then refresh it +- List cachedPods = +- names.stream().map(this.podKeyCache::getIfPresent).collect(Collectors.toList()); +- if (cachedPods.contains(null)) { +- LOG.info( +- "Refreshing pod cache as not all of [{}] present in [{}]", +- names, +- this.podKeyCache.asMap().keySet()); +- for (Pod pod : client.pods().list().getItems()) { +- this.podKeyCache.put(pod.getMetadata().getName(), pod); +- // also add an entry for each IP +- for (PodIP ip : pod.getStatus().getPodIPs()) { +- this.podKeyCache.put(ip.getIp(), pod); +- } +- } +- } else { +- LOG.info("Pod cache contains [{}]", names); +- } +- ConcurrentMap pods = this.podKeyCache.asMap(); ++ for (Pod dataNode : dataNodes) { ++ String nodeName = dataNode.getSpec().getNodeName(); ++ String dataNodeIp = dataNode.getStatus().getPodIP(); + +- for (String name : names) { +- // if we don't find a dataNode running on the same node as a non-dataNode pod, then +- // we'll keep the original name to allow it to be resolved to NotFound in the calling routine. +- String replacementDataNodeIp = name; +- InetAddress address; +- try { +- // make sure we are looking up using the IP address +- address = InetAddress.getByName(name); +- String podIp = address.getHostAddress(); +- if (podLabels.containsKey(podIp)) { +- replacementDataNodeIp = podIp; +- LOG.info("Added as found in the datanode map [{}]", podIp); +- } else { +- // we've received a call from a non-datanode pod +- for (Pod pod : pods.values()) { +- if (pod.getStatus().getPodIPs().contains(new PodIP(podIp))) { +- String nodeName = pod.getSpec().getNodeName(); +- for (Pod dn : dns) { +- if (dn.getSpec().getNodeName().equals(nodeName)) { +- LOG.debug( +- "NodeName [{}] matches with [{}]?", dn.getSpec().getNodeName(), nodeName); +- replacementDataNodeIp = dn.getStatus().getPodIP(); +- break; +- } +- } +- } +- } +- } +- } catch (UnknownHostException e) { +- LOG.warn("Error encountered while resolving host [{}]", e.getLocalizedMessage()); ++ if (nodeName != null && dataNodeIp != null) { ++ nodeToDatanode.put(nodeName, dataNodeIp); + } +- dataNodes.add(replacementDataNodeIp); + } +- LOG.info("Replacing names [{}] with IPs [{}]", names, dataNodes); +- return dataNodes; ++ ++ LOG.debug("Built node-to-datanode map with {} entries", nodeToDatanode.size()); ++ return nodeToDatanode; + } + +- /** +- * Given a list of datanodes, return a HashMap that maps pod ips onto Pod labels. The returned Map +- * may contain more entries than the list that is given to this function, as an entry will be +- * generated for every ip a pod has. +- * +- * @param datanodes List of all retrieved pods. +- * @return Map of ip addresses to all labels the pod that "owns" that ip has attached to itself +- */ +- private Map> getPodLabels(List datanodes) { +- Map> result = new HashMap<>(); +- // Iterate over all pods and then all ips for every pod and add these to the mapping +- for (Pod pod : datanodes) { +- for (PodIP podIp : pod.getStatus().getPodIPs()) { +- result.put(podIp.getIp(), pod.getMetadata().getLabels()); +- } ++ // ============================================================================ ++ // TOPOLOGY STRING BUILDING ++ // ============================================================================ ++ ++ private String buildTopologyString( ++ String ipAddress, ++ Map> podLabels, ++ Map> nodeLabels) { ++ StringBuilder topology = new StringBuilder(); ++ ++ for (TopologyLabel label : labels) { ++ String labelValue = extractLabelValue(ipAddress, label, podLabels, nodeLabels); ++ topology.append("/").append(labelValue); + } ++ ++ String result = topology.toString(); ++ LOG.debug("Returning label [{}]", result); + return result; + } + ++ private String extractLabelValue( ++ String ipAddress, ++ TopologyLabel label, ++ Map> podLabels, ++ Map> nodeLabels) { ++ ++ Map> labelSource = label.isNodeLabel() ? nodeLabels : podLabels; ++ ++ String labelValue = ++ labelSource ++ .getOrDefault(ipAddress, Collections.emptyMap()) ++ .getOrDefault(label.getName(), "NotFound"); ++ ++ LOG.debug("Label {}.{} = {}", label.getType(), label.getName(), labelValue); ++ return labelValue; ++ } ++ ++ // ============================================================================ ++ // LABEL MAPS ++ // ============================================================================ ++ + /** +- * Given a list of datanodes this function will resolve which datanodes run on which node as well +- * as all the ips assigned to a datanodes. It will then return a mapping of every ip address to +- * the labels that are attached to the "physical" node running the datanodes that this ip belongs ++ * Given a list of dataNodes this function will resolve which dataNodes run on which node as well ++ * as all the ips assigned to a dataNodes. It will then return a mapping of every ip address to ++ * the labels that are attached to the "physical" node running the dataNodes that this ip belongs + * to. + * +- * @param datanodes List of all in-scope datanodes (datanode pods in this namespace) ++ * @param dataNodes List of all in-scope dataNodes (datanode pods in this namespace) + * @return Map of ip addresses to labels of the node running the pod that the ip address belongs + * to + */ +- private Map> getNodeLabels(List datanodes) { ++ private Map> buildNodeLabelMap(List dataNodes) { + Map> result = new HashMap<>(); +- for (Pod pod : datanodes) { +- // either retrieve the node from the internal cache or fetch it by name ++ for (Pod pod : dataNodes) { + String nodeName = pod.getSpec().getNodeName(); +- // nodeName may be null while pod is being provisioned so force a re-try ++ + if (nodeName == null) { +- LOG.warn( +- "Pod [{}] not yet assigned to a k8s node, forcing re-try", pod.getMetadata().getName()); ++ LOG.warn("Pod [{}] not yet assigned to node, retrying", pod.getMetadata().getName()); + return result; + } +- Node node = this.nodeKeyCache.getIfPresent(nodeName); +- if (node == null) { +- LOG.debug("Node not yet cached, fetching by name [{}]", nodeName); +- node = client.nodes().withName(nodeName).get(); +- this.nodeKeyCache.put(nodeName, node); +- } + ++ Node node = getOrFetchNode(nodeName); + Map nodeLabels = node.getMetadata().getLabels(); + LOG.debug("Labels for node [{}]:[{}]....", nodeName, nodeLabels); + +@@ -526,78 +455,33 @@ public class StackableTopologyProvider implements DNSToSwitchMapping { + return result; + } + +- @Override +- public void reloadCachedMappings() { +- // TODO: According to the upstream comment we should rebuild all cache entries after +- // invalidating them +- // this may mean trying to resolve ip addresses that do not exist any more and things like that +- // though and +- // require some more thought, so we will for now just invalidate the cache. +- this.topologyKeyCache.invalidateAll(); +- } +- +- @Override +- public void reloadCachedMappings(List names) { +- // TODO: See comment above, the same applies here +- for (String name : names) { +- this.topologyKeyCache.invalidate(name); ++ private Node getOrFetchNode(String nodeName) { ++ Node node = cache.getNode(nodeName); ++ if (node == null) { ++ LOG.debug("Fetching node: {}", nodeName); ++ node = client.nodes().withName(nodeName).get(); ++ cache.putNode(nodeName, node); + } ++ return node; + } + +- private enum LabelType { +- Node, +- Pod, +- Undefined +- } ++ /** ++ * Given a list of dataNodes, return a HashMap that maps pod ips onto Pod labels. The returned Map ++ * may contain more entries than the list that is given to this function, as an entry will be ++ * generated for every ip a pod has. ++ * ++ * @param dataNodes List of all retrieved pods. ++ * @return Map of ip addresses to all labels the pod that "owns" that ip has attached to itself ++ */ ++ private Map> buildPodLabelMap(List dataNodes) { ++ Map> result = new HashMap<>(); ++ for (Pod pod : dataNodes) { ++ Map podLabels = pod.getMetadata().getLabels(); + +- private class TopologyLabel { +- private final LabelType labelType; +- private String name = null; +- +- /** +- * Create a new TopologyLabel from its string representation +- * +- * @param value A string in the form of "[node|pod]:" that is deserialized into a +- * TopologyLabel. Invalid and empty strings are resolved into the type unspecified. +- */ +- private TopologyLabel(String value) { +- // If this is null the env var was not set, we will return 'undefined' for this level +- if (value == null || value.isEmpty()) { +- this.labelType = LabelType.Undefined; +- return; +- } +- String[] split = value.toLowerCase(Locale.ROOT).split(":", 2); +- +- // This should only fail if no : was present in the string +- if (split.length != 2) { +- this.labelType = LabelType.Undefined; +- LOG.warn( +- "Ignoring topology label [{}] - label definitions need to be in the form " +- + "of \"[node|pod]:\"", +- value); +- return; +- } +- // Length has to be two, proceed with "normal" case +- String type = split[0]; +- this.name = split[1]; +- +- // Parse type of object labels should be retrieved from +- switch (type) { +- case "node": +- this.labelType = LabelType.Node; +- break; +- +- case "pod": +- this.labelType = LabelType.Pod; +- break; +- +- default: +- LOG.warn( +- "Encountered unsupported label type [{}] - this label definition will be ignored, " +- + "supported types are [\"node\", \"pod\"]", +- type); +- this.labelType = LabelType.Undefined; ++ for (PodIP podIp : pod.getStatus().getPodIPs()) { ++ result.put(podIp.getIp(), podLabels); + } + } ++ return result; + } + } +diff --git a/src/main/java/tech/stackable/hadoop/TopologyCache.java b/src/main/java/tech/stackable/hadoop/TopologyCache.java +new file mode 100644 +index 0000000..3944fd4 +--- /dev/null ++++ b/src/main/java/tech/stackable/hadoop/TopologyCache.java +@@ -0,0 +1,88 @@ ++package tech.stackable.hadoop; ++ ++import com.github.benmanes.caffeine.cache.Cache; ++import com.github.benmanes.caffeine.cache.Caffeine; ++import io.fabric8.kubernetes.api.model.GenericKubernetesResource; ++import io.fabric8.kubernetes.api.model.Node; ++import io.fabric8.kubernetes.api.model.Pod; ++import java.util.List; ++import java.util.concurrent.ConcurrentMap; ++import java.util.concurrent.TimeUnit; ++ ++/** Manages all caching layers for the topology provider. */ ++public class TopologyCache { ++ private final Cache topology; ++ private final Cache nodes; ++ private final Cache listeners; ++ private final Cache pods; ++ ++ TopologyCache(int expirationSeconds, int defaultExpirationSeconds) { ++ this.topology = ++ Caffeine.newBuilder().expireAfterWrite(expirationSeconds, TimeUnit.SECONDS).build(); ++ ++ this.nodes = ++ Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); ++ ++ this.listeners = ++ Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); ++ ++ this.pods = ++ Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); ++ } ++ ++ String getTopology(String key) { ++ return topology.getIfPresent(key); ++ } ++ ++ void putTopology(String key, String value) { ++ topology.put(key, value); ++ } ++ ++ void invalidateAll() { ++ topology.invalidateAll(); ++ } ++ ++ void invalidateTopologyKeys(List keys) { ++ keys.forEach(topology::invalidate); ++ } ++ ++ Node getNode(String name) { ++ return nodes.getIfPresent(name); ++ } ++ ++ void putNode(String name, Node node) { ++ nodes.put(name, node); ++ } ++ ++ GenericKubernetesResource getListener(String name) { ++ return listeners.getIfPresent(name); ++ } ++ ++ ConcurrentMap getListenerMap() { ++ return listeners.asMap(); ++ } ++ ++ void putListener(String name, GenericKubernetesResource listener) { ++ listeners.put(name, listener); ++ } ++ ++ boolean hasAllListeners(List names) { ++ return names.stream().noneMatch(name -> listeners.getIfPresent(name) == null); ++ } ++ ++ Pod getPod(String name) { ++ return pods.getIfPresent(name); ++ } ++ ++ ConcurrentMap getPodMap() { ++ return pods.asMap(); ++ } ++ ++ void putPod(String name, Pod pod) { ++ pods.put(name, pod); ++ } ++ ++ boolean hasAllPods(List names) { ++ return names.stream().noneMatch(name -> pods.getIfPresent(name) == null); ++ } ++} +diff --git a/src/main/java/tech/stackable/hadoop/TopologyLabel.java b/src/main/java/tech/stackable/hadoop/TopologyLabel.java +new file mode 100644 +index 0000000..ba91b79 +--- /dev/null ++++ b/src/main/java/tech/stackable/hadoop/TopologyLabel.java +@@ -0,0 +1,116 @@ ++package tech.stackable.hadoop; ++ ++import java.util.Arrays; ++import java.util.List; ++import java.util.Locale; ++import java.util.stream.Collectors; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++public class TopologyLabel { ++ private static final Logger LOG = LoggerFactory.getLogger(TopologyLabel.class); ++ public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; ++ public static final String VARNAME_MAXLEVELS = "TOPOLOGY_MAX_LEVELS"; ++ private static final int MAX_LEVELS_DEFAULT = 2; ++ ++ public enum Type { ++ NODE, ++ POD, ++ UNDEFINED ++ } ++ ++ private final Type type; ++ private final String name; ++ ++ TopologyLabel(String config) { ++ if (config == null || config.isEmpty()) { ++ this.type = Type.UNDEFINED; ++ this.name = null; ++ return; ++ } ++ ++ String[] parts = config.toLowerCase(Locale.ROOT).split(":", 2); ++ ++ if (parts.length != 2) { ++ LOG.warn("Invalid topology label format '{}' - expected '[node|pod]:'", config); ++ this.type = Type.UNDEFINED; ++ this.name = null; ++ return; ++ } ++ ++ this.name = parts[1]; ++ ++ switch (parts[0]) { ++ case "node": ++ this.type = Type.NODE; ++ break; ++ case "pod": ++ this.type = Type.POD; ++ break; ++ default: ++ LOG.warn("Unsupported label type '{}' - must be 'node' or 'pod'", parts[0]); ++ this.type = Type.UNDEFINED; ++ } ++ } ++ ++ boolean isNodeLabel() { ++ return type == Type.NODE; ++ } ++ ++ boolean isUndefined() { ++ return type == Type.UNDEFINED; ++ } ++ ++ String getName() { ++ return name; ++ } ++ ++ Type getType() { ++ return type; ++ } ++ ++ public static List initializeTopologyLabels() { ++ // Read the labels to be used to build a topology from environment variables. Labels are ++ // configured in the EnvVar "TOPOLOGY_LABELS". They should be specified in the form ++ // "[node|pod]:" and separated by ";". So a valid configuration that reads topology ++ // information from the labels "kubernetes.io/zone" and "kubernetes.io/rack" on the k8s node ++ // that is running a datanode pod would look like this: ++ // "node:kubernetes.io/zone;node:kubernetes.io/rack" By default, there is an upper limit of 2 on ++ // the number of labels that are processed, because this is what Hadoop traditionally allows - ++ // this can be overridden via setting the EnvVar "MAX_TOPOLOGY_LEVELS". ++ String topologyConfig = System.getenv(VARNAME_LABELS); ++ ++ if (topologyConfig == null || topologyConfig.isEmpty()) { ++ LOG.error( ++ "Missing env var [{}] this is required for rack awareness to work.", VARNAME_LABELS); ++ throw new RuntimeException("TOPOLOGY_LABELS environment variable not set"); ++ } ++ ++ String[] labelConfigs = topologyConfig.split(";"); ++ ++ if (labelConfigs.length > getMaxLabels()) { ++ LOG.error( ++ "Found [{}] topology labels configured, but maximum allowed number is [{}]: " ++ + "please check your config or raise the number of allowed labels.", ++ labelConfigs.length, ++ getMaxLabels()); ++ throw new RuntimeException("Too many topology labels configured"); ++ } ++ // Create TopologyLabels from config strings ++ List labels = ++ Arrays.stream(labelConfigs).map(TopologyLabel::new).collect(Collectors.toList()); ++ ++ if (labels.stream().anyMatch(TopologyLabel::isUndefined)) { ++ LOG.error( ++ "Invalid topology label configuration - labels must be in format '[pod|node]:'"); ++ throw new RuntimeException("Invalid topology label configuration"); ++ } ++ ++ return labels; ++ } ++ ++ private static int getMaxLabels() { ++ return TopologyUtils.parseIntFromEnv( ++ VARNAME_MAXLEVELS, MAX_LEVELS_DEFAULT, "maximum topology levels"); ++ } ++} +diff --git a/src/main/java/tech/stackable/hadoop/TopologyUtils.java b/src/main/java/tech/stackable/hadoop/TopologyUtils.java +index 686c4f2..be7cb51 100644 +--- a/src/main/java/tech/stackable/hadoop/TopologyUtils.java ++++ b/src/main/java/tech/stackable/hadoop/TopologyUtils.java +@@ -4,8 +4,12 @@ import io.fabric8.kubernetes.api.model.GenericKubernetesResource; + import java.util.List; + import java.util.Map; + import java.util.stream.Collectors; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; + + public class TopologyUtils { ++ private static final Logger LOG = LoggerFactory.getLogger(TopologyUtils.class); ++ + private static final String ADDRESS = "address"; + private static final String STATUS = "status"; + private static final String INGRESS_ADDRESSES = "ingressAddresses"; +@@ -21,4 +25,21 @@ public class TopologyUtils { + .map(ingress -> (String) ingress.get(ADDRESS)) + .collect(Collectors.toList()); + } ++ ++ public static int parseIntFromEnv(String varName, int defaultValue, String description) { ++ String value = System.getenv(varName); ++ if (value == null || value.isEmpty()) { ++ return defaultValue; ++ } ++ ++ try { ++ int parsed = Integer.parseInt(value); ++ LOG.info("Set {} to {} from environment variable {}", description, parsed, varName); ++ return parsed; ++ } catch (NumberFormatException e) { ++ LOG.warn( ++ "Invalid integer value '{}' for {} - using default: {}", value, varName, defaultValue); ++ return defaultValue; ++ } ++ } + } From 62003240eba085d3b37dd0b163f8800ec02c79a7 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 10 Dec 2025 17:31:36 +0100 Subject: [PATCH 2/4] improve listener resolution --- .../0001-refactor-topology-provider.patch | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch b/hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch index 185aec525..1ff6df2e8 100644 --- a/hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch +++ b/hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch @@ -1,19 +1,19 @@ -From 8ad3983c91711e04629e608337b2d7e87fcfc243 Mon Sep 17 00:00:00 2001 +From 71e28fc959dbdfc1c314630ace3fc74425a95bd6 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 9 Dec 2025 18:02:34 +0100 Subject: refactor-topology-provider --- - .../hadoop/StackableTopologyProvider.java | 834 ++++++++---------- + .../hadoop/StackableTopologyProvider.java | 841 ++++++++---------- .../tech/stackable/hadoop/TopologyCache.java | 88 ++ .../tech/stackable/hadoop/TopologyLabel.java | 116 +++ .../tech/stackable/hadoop/TopologyUtils.java | 21 + - 4 files changed, 584 insertions(+), 475 deletions(-) + 4 files changed, 590 insertions(+), 476 deletions(-) create mode 100644 src/main/java/tech/stackable/hadoop/TopologyCache.java create mode 100644 src/main/java/tech/stackable/hadoop/TopologyLabel.java diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java -index 4d1e917..b6f7877 100644 +index 4d1e917..0885796 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -1,7 +1,5 @@ @@ -33,7 +33,7 @@ index 4d1e917..b6f7877 100644 import java.util.stream.Collectors; import org.apache.hadoop.net.DNSToSwitchMapping; import org.slf4j.Logger; -@@ -18,330 +14,257 @@ import org.slf4j.LoggerFactory; +@@ -18,330 +14,250 @@ import org.slf4j.LoggerFactory; /** * An implementation of the org.apache.hadoop.net.DNSToSwitchMapping that is used to create a @@ -510,20 +510,14 @@ index 4d1e917..b6f7877 100644 - for (String ingressAddress : TopologyUtils.getIngressAddresses(listener)) { - this.listenerKeyCache.put(ingressAddress, listener); - } +- } +- } else { +- LOG.debug("Listener cache contains [{}]", names); + private String resolveListenerToDatanode(String name) { + GenericKubernetesResource listener = cache.getListener(name); + if (listener == null) { + LOG.debug("Not a listener: {}", name); + return name; -+ } -+ List ingressAddresses = TopologyUtils.getIngressAddresses(listener); -+ for (String ingressAddress : ingressAddresses) { -+ LOG.debug("Address [{}]", ingressAddress); -+ if (name.equalsIgnoreCase(ingressAddress)) { -+ return resolveListenerEndpoint(listener); - } -- } else { -- LOG.debug("Listener cache contains [{}]", names); } - ConcurrentMap listeners = this.listenerKeyCache.asMap(); - @@ -534,8 +528,8 @@ index 4d1e917..b6f7877 100644 - listenerToDataNodeNames.add(resolvedName); - } - return listenerToDataNodeNames; -+ LOG.info("Not a listener, returning [{}]", name); -+ return name; ++ // We found a listener, so we can resolve it directly ++ return resolveListenerEndpoint(listener); } - private GenericKubernetesResourceList getListeners() { @@ -563,7 +557,7 @@ index 4d1e917..b6f7877 100644 ResourceDefinitionContext listenerCrd = new ResourceDefinitionContext.Builder() .withGroup("listeners.stackable.tech") -@@ -353,168 +276,174 @@ public class StackableTopologyProvider implements DNSToSwitchMapping { +@@ -353,168 +269,186 @@ public class StackableTopologyProvider implements DNSToSwitchMapping { return client.genericKubernetesResources(listenerCrd).list(); } @@ -673,7 +667,19 @@ index 4d1e917..b6f7877 100644 + } + + private String findColocatedDatanode(Pod clientPod, Map nodeToDatanodeIp) { -+ return nodeToDatanodeIp.get(clientPod.getSpec().getNodeName()); ++ String clientNodeName = clientPod.getSpec().getNodeName(); ++ ++ if (clientNodeName == null) { ++ LOG.warn("Client pod {} not yet assigned to node", clientPod.getMetadata().getName()); ++ return null; ++ } ++ ++ String datanodeIp = nodeToDatanodeIp.get(clientNodeName); ++ if (datanodeIp == null) { ++ LOG.debug("No datanode found on node {}", clientNodeName); ++ } ++ ++ return datanodeIp; + } + + private String resolveToIpAddress(String hostname) { @@ -870,7 +876,7 @@ index 4d1e917..b6f7877 100644 Map nodeLabels = node.getMetadata().getLabels(); LOG.debug("Labels for node [{}]:[{}]....", nodeName, nodeLabels); -@@ -526,78 +455,33 @@ public class StackableTopologyProvider implements DNSToSwitchMapping { +@@ -526,78 +460,33 @@ public class StackableTopologyProvider implements DNSToSwitchMapping { return result; } From 58d21ec831a01f0706fd595f28b8fdc775b1ec33 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 12 Dec 2025 13:04:20 +0100 Subject: [PATCH 3/4] replace patch with commit hash from hdfs-utils repo --- .../0001-refactor-topology-provider.patch | 1234 ----------------- .../stackable/patches/0.4.2/patchable.toml | 3 +- 2 files changed, 2 insertions(+), 1235 deletions(-) delete mode 100644 hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch diff --git a/hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch b/hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch deleted file mode 100644 index 1ff6df2e8..000000000 --- a/hadoop/hdfs-utils/stackable/patches/0.4.2/0001-refactor-topology-provider.patch +++ /dev/null @@ -1,1234 +0,0 @@ -From 71e28fc959dbdfc1c314630ace3fc74425a95bd6 Mon Sep 17 00:00:00 2001 -From: Andrew Kenworthy -Date: Tue, 9 Dec 2025 18:02:34 +0100 -Subject: refactor-topology-provider - ---- - .../hadoop/StackableTopologyProvider.java | 841 ++++++++---------- - .../tech/stackable/hadoop/TopologyCache.java | 88 ++ - .../tech/stackable/hadoop/TopologyLabel.java | 116 +++ - .../tech/stackable/hadoop/TopologyUtils.java | 21 + - 4 files changed, 590 insertions(+), 476 deletions(-) - create mode 100644 src/main/java/tech/stackable/hadoop/TopologyCache.java - create mode 100644 src/main/java/tech/stackable/hadoop/TopologyLabel.java - -diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java -index 4d1e917..0885796 100644 ---- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java -+++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java -@@ -1,7 +1,5 @@ - package tech.stackable.hadoop; - --import com.github.benmanes.caffeine.cache.Cache; --import com.github.benmanes.caffeine.cache.Caffeine; - import io.fabric8.kubernetes.api.model.*; - import io.fabric8.kubernetes.client.KubernetesClient; - import io.fabric8.kubernetes.client.KubernetesClientBuilder; -@@ -9,8 +7,6 @@ import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext; - import java.net.InetAddress; - import java.net.UnknownHostException; - import java.util.*; --import java.util.concurrent.ConcurrentMap; --import java.util.concurrent.TimeUnit; - import java.util.stream.Collectors; - import org.apache.hadoop.net.DNSToSwitchMapping; - import org.slf4j.Logger; -@@ -18,330 +14,250 @@ import org.slf4j.LoggerFactory; - - /** - * An implementation of the org.apache.hadoop.net.DNSToSwitchMapping that is used to create a -- * topology out of datanodes. -+ * topology out of dataNodes. - * - *

This class is intended to be run as part of the NameNode process and will be used by the -- * namenode to retrieve topology strings for datanodes. -+ * nameNode to retrieve topology strings for dataNodes. - */ - public class StackableTopologyProvider implements DNSToSwitchMapping { -- -- public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; -- public static final String VARNAME_CACHE_EXPIRATION = "TOPOLOGY_CACHE_EXPIRATION_SECONDS"; -- public static final String VARNAME_MAXLEVELS = "TOPOLOGY_MAX_LEVELS"; -- public static final String DEFAULT_RACK = "/defaultRack"; -- private static final int MAX_LEVELS_DEFAULT = 2; -- private static final int CACHE_EXPIRY_DEFAULT_SECONDS = 5 * 60; - private final Logger LOG = LoggerFactory.getLogger(StackableTopologyProvider.class); -- private final KubernetesClient client; -- private final Cache topologyKeyCache = -- Caffeine.newBuilder().expireAfterWrite(getCacheExpiration(), TimeUnit.SECONDS).build(); -- private final Cache nodeKeyCache = -- Caffeine.newBuilder() -- .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) -- .build(); - -- private final Cache listenerKeyCache = -- Caffeine.newBuilder() -- .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) -- .build(); -- private final Cache podKeyCache = -- Caffeine.newBuilder() -- .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) -- .build(); -- // The list of labels that this provider uses to generate the topology information for any given -- // datanode -+ // Environment variable names -+ public static final String VARNAME_CACHE_EXPIRATION = "TOPOLOGY_CACHE_EXPIRATION_SECONDS"; -+ -+ // Default values -+ public static final String DEFAULT_RACK = "/defaultRack"; -+ private static final int CACHE_EXPIRY_DEFAULT_SECONDS = 5 * 60; -+ -+ private final KubernetesClient client; - private final List labels; - -+ // Caching layers -+ private final TopologyCache cache; -+ - public StackableTopologyProvider() { - this.client = new KubernetesClientBuilder().build(); -+ this.cache = new TopologyCache(getCacheExpiration(), CACHE_EXPIRY_DEFAULT_SECONDS); -+ this.labels = TopologyLabel.initializeTopologyLabels(); - -- // Read the labels to be used to build a topology from environment variables. Labels are -- // configured in the EnvVar "TOPOLOGY_LABELS". They should be specified in the form -- // "[node|pod]:" and separated by ";". So a valid configuration that reads topology -- // information from the labels "kubernetes.io/zone" and "kubernetes.io/rack" on the k8s node -- // that is running a datanode pod would look like this: -- // "node:kubernetes.io/zone;node:kubernetes.io/rack" By default, there is an upper limit of 2 on -- // the number of labels that are processed, because this is what Hadoop traditionally allows - -- // this can be overridden via setting the EnvVar "MAX_TOPOLOGY_LEVELS". -- String topologyConfig = System.getenv(VARNAME_LABELS); -- if (topologyConfig != null && !topologyConfig.isEmpty()) { -- String[] labelConfigs = topologyConfig.split(";"); -- if (labelConfigs.length > getMaxLabels()) { -- LOG.error( -- "Found [{}] topology labels configured, but maximum allowed number is [{}]: " -- + "please check your config or raise the number of allowed labels.", -- labelConfigs.length, -- getMaxLabels()); -- throw new RuntimeException(); -- } -- // Create TopologyLabels from config strings -- this.labels = -- Arrays.stream(labelConfigs).map(TopologyLabel::new).collect(Collectors.toList()); -- -- // Check if any labelConfigs were invalid -- if (this.labels.stream().anyMatch(label -> label.labelType == LabelType.Undefined)) { -- LOG.error( -- "Topologylabel contained invalid configuration for at least one label: " -- + "double check your config! Labels should be specified in the " -- + "format '[pod|node]:;...'"); -- throw new RuntimeException(); -- } -- -- } else { -- LOG.error( -- "Missing env var [{}] this is required for rack awareness to work.", VARNAME_LABELS); -- throw new RuntimeException(); -- } -- if (this.labels.isEmpty()) { -- LOG.info( -- "No topology config found, defaulting value for all datanodes to [{}]", DEFAULT_RACK); -- } else { -- LOG.info( -- "Topology config yields labels [{}]", -- this.labels.stream().map(label -> label.name).collect(Collectors.toList())); -- } -+ logInitializationStatus(); - } - -- /*** -- * Checks if a value for the maximum number of topology levels to allow has been configured in -- * the environment variable specified in VARNAME_MAXLEVELS, -- * returns the value of MAX_LEVELS_DEFAULT as default if nothing is set. -- * -- * @return The maximum number of topology levels to allow. -- */ -- private int getMaxLabels() { -- String maxLevelsConfig = System.getenv(VARNAME_MAXLEVELS); -- if (maxLevelsConfig != null && !maxLevelsConfig.isEmpty()) { -- try { -- int maxLevelsFromEnvVar = Integer.parseInt(maxLevelsConfig); -- LOG.info( -- "Found [{}] env var, changing allowed number of topology levels to [{}]", -- VARNAME_MAXLEVELS, -- maxLevelsFromEnvVar); -- return maxLevelsFromEnvVar; -- } catch (NumberFormatException e) { -- LOG.warn( -- "Unable to parse value of [{}]/[{}] as integer, using default value [{}]", -- VARNAME_MAXLEVELS, -- maxLevelsConfig, -- MAX_LEVELS_DEFAULT); -- } -- } -- return MAX_LEVELS_DEFAULT; -+ @Override -+ public void reloadCachedMappings() { -+ // TODO: According to the upstream comment we should rebuild all cache entries after -+ // invalidating them -+ // this may mean trying to resolve ip addresses that do not exist any more and things like that -+ // though and -+ // require some more thought, so we will for now just invalidate the cache. -+ this.cache.invalidateAll(); -+ } -+ -+ @Override -+ public void reloadCachedMappings(List names) { -+ // TODO: See comment above, the same applies here -+ cache.invalidateTopologyKeys(names); - } - -- /*** -- * Checks if a value for the cache expiration time has been configured in -- * the environment variable specified in VARNAME_CACHE_EXPIRATION, -- * returns the value of CACHE_EXPIRY_DEFAULT_SECONDS as default if nothing is set. -- * -- * @return The cache expiration time to use for the rack id cache. -- */ - private int getCacheExpiration() { -- String cacheExpirationConfigSeconds = System.getenv(VARNAME_CACHE_EXPIRATION); -- if (cacheExpirationConfigSeconds != null && !cacheExpirationConfigSeconds.isEmpty()) { -- try { -- int cacheExpirationFromEnvVar = Integer.parseInt(cacheExpirationConfigSeconds); -- LOG.info( -- "Found [{}] env var, changing cache time for topology entries to [{}]", -- VARNAME_CACHE_EXPIRATION, -- cacheExpirationFromEnvVar); -- return cacheExpirationFromEnvVar; -- } catch (NumberFormatException e) { -- LOG.warn( -- "Unable to parse value of [{}]/[{}] as integer, using default value [{}]", -- VARNAME_CACHE_EXPIRATION, -- cacheExpirationConfigSeconds, -- CACHE_EXPIRY_DEFAULT_SECONDS); -- } -- } -- return CACHE_EXPIRY_DEFAULT_SECONDS; -+ return TopologyUtils.parseIntFromEnv( -+ VARNAME_CACHE_EXPIRATION, CACHE_EXPIRY_DEFAULT_SECONDS, "cache expiration seconds"); - } - -- /*** -- * -- * @param datanode the datanode whose IP mis to be resolved -- * @param podLabels the pod labels used in the resolution -- * @param nodeLabels the node labels used in the resolution -- * -- * @return the label looked up from the IP address -- */ -- private String getLabel( -- String datanode, -- Map> podLabels, -- Map> nodeLabels) { -- // The internal structures used by this mapper are all based on IP addresses. Depending on -- // configuration and network setup it may (probably will) be possible that the namenode uses -- // hostnames to resolve a datanode to a topology zone. To allow this, we resolve every input to -- // an ip address below and use the ip to lookup labels. -- // TODO: this might break with the listener operator, as `pod.status.podips` won't contain -- // external addresses -- // tracking this in https://github.com/stackabletech/hdfs-topology-provider/issues/2 -- InetAddress address; -- try { -- address = InetAddress.getByName(datanode); -- LOG.debug("Resolved [{}] to [{}]", datanode, address.getHostAddress()); -- datanode = address.getHostAddress(); -- } catch (UnknownHostException e) { -- LOG.warn( -- "failed to resolve address for [{}] - this should not happen, " -- + "defaulting this node to [{}]", -- datanode, -- DEFAULT_RACK); -- return DEFAULT_RACK; -+ private void logInitializationStatus() { -+ if (labels.isEmpty()) { -+ LOG.info("No topology configuration - will use default rack: {}", DEFAULT_RACK); -+ } else { -+ List labelNames = -+ labels.stream().map(TopologyLabel::getName).collect(Collectors.toList()); -+ LOG.info("Initialized with topology labels: {}", labelNames); - } -- StringBuilder resultBuilder = new StringBuilder(); -- for (TopologyLabel label : this.labels) { -- if (label.labelType == LabelType.Node) { -- LOG.debug( -- "Looking for node label [{}] of type [{}] in [{}]/[{}]", -- label.name, -- label.labelType, -- nodeLabels.keySet(), -- nodeLabels.values()); -- resultBuilder -- .append("/") -- .append( -- nodeLabels -- .getOrDefault(datanode, new HashMap<>()) -- .getOrDefault(label.name, "NotFound")); -- } else if (label.labelType == LabelType.Pod) { -- LOG.debug( -- "Looking for pod label [{}] of type [{}] in [{}]/[{}]", -- label.name, -- label.labelType, -- podLabels.keySet(), -- podLabels.values()); -- resultBuilder -- .append("/") -- .append( -- podLabels -- .getOrDefault(datanode, new HashMap<>()) -- .getOrDefault(label.name, "NotFound")); -- } -- } -- String result = resultBuilder.toString(); -- LOG.debug("Returning label [{}]", result); -- return result; - } - - @Override - public List resolve(List names) { -- LOG.info("Resolving for listeners/client-pods [{}]", names.toString()); -+ LOG.info("Resolving topology for: {}", names); - -- if (this.labels.isEmpty()) { -- LOG.info( -- "No topology labels defined, returning [{}] for hdfs nodes: [{}]", DEFAULT_RACK, names); -- return names.stream().map(name -> DEFAULT_RACK).collect(Collectors.toList()); -+ if (labels.isEmpty()) { -+ return createDefaultRackList(names); - } - -- // We need to check if we have cached values for all datanodes contained in this request. -- // Unless we can answer everything from the cache we have to talk to k8s anyway and can just -- // recalculate everything -- List cachedValues = -- names.stream().map(this.topologyKeyCache::getIfPresent).collect(Collectors.toList()); -- LOG.debug("Cached topologyKeyCache values [{}]", cachedValues); -- -- if (cachedValues.contains(null)) { -- // We cannot completely serve this request from the cache, since we need to talk to k8s anyway -- // we'll simply refresh everything. -- LOG.debug( -- "Cache doesn't contain values for all requested pods: new values will be built for all nodes."); -- } else { -- // use same log level as the non-cached return statement -- LOG.info("Answering from cached topology keys: [{}]", cachedValues); -+ // Try to serve from cache first -+ List cachedValues = tryResolveFromCache(names); -+ if (cachedValues != null) { -+ LOG.info("Returning cached topology: {}", cachedValues); - return cachedValues; - } - -- // The datanodes will be the cache keys. -- List datanodes = -+ // Cache miss - perform full resolution -+ return performFullResolution(names); -+ } -+ -+ private List createDefaultRackList(List names) { -+ LOG.info( -+ "No topology labels defined, returning [{}] for hdfs nodes: [{}]", DEFAULT_RACK, names); -+ return names.stream().map(name -> DEFAULT_RACK).collect(Collectors.toList()); -+ } -+ -+ private List tryResolveFromCache(List names) { -+ // We need to check if we have cached values for all dataNodes contained in this request. -+ // Unless we can answer everything from the cache we have to talk to k8s anyway and can just -+ // recalculate everything -+ List cached = names.stream().map(cache::getTopology).collect(Collectors.toList()); -+ LOG.debug("Cached topologyKeyCache values [{}]", cached); -+ -+ return cached.contains(null) ? null : cached; -+ } -+ -+ // ============================================================================ -+ // RESOLUTION WORKFLOW -+ // ============================================================================ -+ -+ private List performFullResolution(List names) { -+ LOG.debug("Performing full topology resolution for: {}", names); -+ -+ // Step 1: Gather all dataNodes -+ List dataNodes = fetchDataNodes(); -+ -+ // Step 2: Resolve listeners to actual datanode IPs -+ List resolvedNames = resolveListeners(names); -+ -+ // Step 3: Build label lookup maps -+ Map> podLabels = buildPodLabelMap(dataNodes); -+ Map> nodeLabels = buildNodeLabelMap(dataNodes); -+ -+ // Step 4: Build node-to-datanode map for O(1) colocated lookups -+ Map nodeToDatanodeIp = buildNodeToDatanodeMap(dataNodes); -+ -+ // Step 5: Resolve client pods to co-located dataNodes -+ List datanodeIps = -+ resolveClientPodsToDataNodes(resolvedNames, podLabels, nodeToDatanodeIp); -+ -+ // Step 6: Build topology strings and cache results -+ return buildAndCacheTopology(names, datanodeIps, podLabels, nodeLabels); -+ } -+ -+ private List buildAndCacheTopology( -+ List originalNames, -+ List datanodeIps, -+ Map> podLabels, -+ Map> nodeLabels) { -+ List result = new ArrayList<>(); -+ for (int i = 0; i < datanodeIps.size(); i++) { -+ String datanodeIp = datanodeIps.get(i); -+ String originalName = originalNames.get(i); -+ -+ String topology = buildTopologyString(datanodeIp, podLabels, nodeLabels); -+ result.add(topology); -+ -+ // Cache both the resolved IP and original name -+ cache.putTopology(datanodeIp, topology); -+ cache.putTopology(originalName, topology); -+ } -+ -+ LOG.info("Built topology: {}", result); -+ return result; -+ } -+ -+ // ============================================================================ -+ // DATANODE FETCHING -+ // ============================================================================ -+ -+ private List fetchDataNodes() { -+ List dataNodes = - client - .pods() - .withLabel("app.kubernetes.io/component", "datanode") - .withLabel("app.kubernetes.io/name", "hdfs") - .list() - .getItems(); -+ - LOG.debug( -- "Retrieved datanodes: [{}]", -- datanodes.stream() -- .map(datanode -> datanode.getMetadata().getName()) -+ "Retrieved dataNodes: [{}]", -+ dataNodes.stream() -+ .map(dataNode -> dataNode.getMetadata().getName()) - .collect(Collectors.toList())); -+ return dataNodes; -+ } - -- List namesToDataNodeNames = dataNodesResolvedFromListenerOrOriginal(names); -- LOG.debug("Now resolving: [{}]", namesToDataNodeNames); -+ // ============================================================================ -+ // LISTENER RESOLUTION -+ // ============================================================================ - -- // Build internal state that is later used to look up information. Basically this transposes pod -- // and node lists into hashmaps where pod-IPs can be used to look up labels for the pods and -- // nodes. This is not terribly memory efficient because it effectively duplicates a lot of data -- // in memory. But since we cache lookups, this should hopefully only be done every once in a -- // while and is not kept in memory for extended amounts of time. -- List result = new ArrayList<>(); -+ private List resolveListeners(List names) { -+ refreshListenerCacheIfNeeded(names); - -- Map> nodeLabels = getNodeLabels(datanodes); -- LOG.debug("Resolved node labels map [{}]/[{}]", nodeLabels.keySet(), nodeLabels.values()); -+ return names.stream().map(this::resolveListenerToDatanode).collect(Collectors.toList()); -+ } - -- Map> podLabels = getPodLabels(datanodes); -- LOG.debug("Resolved pod labels map [{}]/[{}]", podLabels.keySet(), podLabels.values()); -+ private void refreshListenerCacheIfNeeded(List names) { -+ List missingNames = -+ names.stream().filter(name -> cache.getListener(name) == null).collect(Collectors.toList()); - -- List podsResolvedToDataNodes = -- resolveDataNodesFromCallingPods(namesToDataNodeNames, podLabels, datanodes); -- -- // Iterate over all nodes to resolve and return the topology zones -- for (int i = 0; i < podsResolvedToDataNodes.size(); i++) { -- String builtLabel = getLabel(podsResolvedToDataNodes.get(i), podLabels, nodeLabels); -- result.add(builtLabel); -- -- // Cache the value for potential use in a later request -- this.topologyKeyCache.put(podsResolvedToDataNodes.get(i), builtLabel); -- // also cache the original name, in case that has resolved to a dataNode (so that -- // the resolution step can be omitted next time this pod is encountered) -- this.topologyKeyCache.put(names.get(i), builtLabel); -+ if (missingNames.isEmpty()) { -+ LOG.debug("Listener cache contains all required entries"); -+ return; -+ } -+ -+ // Listeners are typically few, so fetch all -+ // (Individual listener fetches would require knowing the namespace) -+ LOG.debug("Fetching all listeners to populate cache"); -+ GenericKubernetesResourceList listeners = fetchListeners(); -+ -+ for (GenericKubernetesResource listener : listeners.getItems()) { -+ cacheListenerByNameAndAddresses(listener); -+ } -+ } -+ -+ private void cacheListenerByNameAndAddresses(GenericKubernetesResource listener) { -+ String name = listener.getMetadata().getName(); -+ cache.putListener(name, listener); -+ -+ // Also cache by ingress addresses for quick lookup -+ for (String address : TopologyUtils.getIngressAddresses(listener)) { -+ cache.putListener(address, listener); - } -- LOG.info("Returning resolved labels [{}]", result); -- return result; - } - - /** -- * If the names include listeners then these must be resolved against the dataNode IPs and used -- * subsequently. -+ * We don't know if the name refers to a listener (it could be any client pod) but we check to see -+ * if it can be resolved to a dataNode just in case. - * -- * @param names the collection of names to resolve -- * @return a collection of either the name (for non-listener) or the dataNode IP to which this -- * listener resolves -+ * @param name the name of the calling pod which should be resolved to a dataNode IP if it is a -+ * listener -+ * @return either the name (for non-listener) or the dataNode IP to which this listener resolves - */ -- private List dataNodesResolvedFromListenerOrOriginal(List names) { -- List cachedListeners = -- names.stream().map(this.listenerKeyCache::getIfPresent).collect(Collectors.toList()); -- if (cachedListeners.contains(null)) { -- LOG.debug( -- "Refreshing listener cache as not all of [{}] present in [{}]", -- names, -- this.listenerKeyCache.asMap().keySet()); -- -- GenericKubernetesResourceList listeners = getListeners(); -- -- for (GenericKubernetesResource listener : listeners.getItems()) { -- this.listenerKeyCache.put(listener.getMetadata().getName(), listener); -- // also add the IPs -- for (String ingressAddress : TopologyUtils.getIngressAddresses(listener)) { -- this.listenerKeyCache.put(ingressAddress, listener); -- } -- } -- } else { -- LOG.debug("Listener cache contains [{}]", names); -+ private String resolveListenerToDatanode(String name) { -+ GenericKubernetesResource listener = cache.getListener(name); -+ if (listener == null) { -+ LOG.debug("Not a listener: {}", name); -+ return name; - } -- ConcurrentMap listeners = this.listenerKeyCache.asMap(); -- -- List listenerToDataNodeNames = new ArrayList<>(); -- -- for (String name : names) { -- String resolvedName = resolveDataNodesFromListeners(name, listeners); -- listenerToDataNodeNames.add(resolvedName); -- } -- return listenerToDataNodeNames; -+ // We found a listener, so we can resolve it directly -+ return resolveListenerEndpoint(listener); - } - -- private GenericKubernetesResourceList getListeners() { -+ private String resolveListenerEndpoint(GenericKubernetesResource listener) { -+ String listenerName = listener.getMetadata().getName(); -+ Endpoints endpoint = client.endpoints().withName(listenerName).get(); -+ LOG.debug("Matched ingressAddress [{}]", listenerName); -+ -+ if (endpoint.getSubsets().isEmpty()) { -+ LOG.warn("Endpoint {} has no subsets - pod may be restarting", listenerName); -+ return listenerName; -+ } -+ -+ EndpointAddress address = endpoint.getSubsets().get(0).getAddresses().get(0); -+ LOG.info( -+ "Resolved listener {} to IP {} on node {}", -+ listenerName, -+ address.getIp(), -+ address.getNodeName()); -+ -+ return address.getIp(); -+ } -+ -+ private GenericKubernetesResourceList fetchListeners() { - ResourceDefinitionContext listenerCrd = - new ResourceDefinitionContext.Builder() - .withGroup("listeners.stackable.tech") -@@ -353,168 +269,186 @@ public class StackableTopologyProvider implements DNSToSwitchMapping { - return client.genericKubernetesResources(listenerCrd).list(); - } - -- /** -- * We don't know if the name refers to a listener (it could be any client pod) but we check to see -- * if it can be resolved to a dataNode just in case. -- * -- * @param name the name of the calling pod which should be resolved to a dataNode IP if it is a -- * listener -- * @param listeners the current listener collection -- * @return either the name (for non-listener) or the dataNode IP to which this listener resolves -- */ -- private String resolveDataNodesFromListeners( -- String name, ConcurrentMap listeners) { -- LOG.debug("Attempting to resolve [{}]", name); -- for (GenericKubernetesResource listener : listeners.values()) { -- List ingressAddresses = TopologyUtils.getIngressAddresses(listener); -- for (String ingressAddress : ingressAddresses) { -- LOG.debug("Address [{}]", ingressAddress); -- if (name.equalsIgnoreCase(ingressAddress)) { -- LOG.debug("Matched ingressAddress [{}]/[{}]", name, listener.getMetadata().getName()); -+ // ============================================================================ -+ // CLIENT POD RESOLUTION -+ // ============================================================================ - -- Endpoints ep = client.endpoints().withName(listener.getMetadata().getName()).get(); -- // TODO: Assuming single address per datanode endpoint. -- // When does/can an endpoint support multiple data nodes? On restart the address list will -- // be empty for a moment or two. -- if (ep.getSubsets().size() < 1) { -- LOG.warn( -- "Endpoint [{}] address not detected, pod maybe restarting...", -- ep.getMetadata().getName()); -- } else { -- EndpointAddress address = ep.getSubsets().get(0).getAddresses().get(0); -- LOG.info( -- "Endpoint [{}], IP [{}], node [{}]", -- ep.getMetadata().getName(), -- address.getIp(), -- address.getNodeName()); -- return address.getIp(); -- } -- } -+ private List resolveClientPodsToDataNodes( -+ List names, -+ Map> podLabels, -+ Map nodeToDatanodeIp) { -+ -+ refreshPodCacheIfNeeded(names); -+ -+ return names.stream() -+ .map(name -> resolveToDatanodeOrKeep(name, podLabels, nodeToDatanodeIp)) -+ .collect(Collectors.toList()); -+ } -+ -+ private void refreshPodCacheIfNeeded(List names) { -+ if (cache.hasAllPods(names)) { -+ LOG.debug("Pod cache contains all required entries"); -+ return; -+ } -+ -+ // Note: We fetch all pods here because: -+ // 1. Client pods (Spark, etc.) are queried by IP, not name -+ // 2. K8s doesn't support "get pod by IP" - we must list and filter -+ LOG.debug("Refreshing pod cache for client pod resolution"); -+ for (Pod pod : client.pods().list().getItems()) { -+ cachePodByNameAndIps(pod); -+ } -+ } -+ -+ private void cachePodByNameAndIps(Pod pod) { -+ String podName = pod.getMetadata().getName(); -+ cache.putPod(podName, pod); -+ -+ // Cache by all IPs - this is crucial for IP-based lookups -+ for (PodIP ip : pod.getStatus().getPodIPs()) { -+ cache.putPod(ip.getIp(), pod); -+ } -+ } -+ -+ private String resolveToDatanodeOrKeep( -+ String name, -+ Map> podLabels, -+ Map nodeToDatanodeIp) { -+ -+ String ipAddress = resolveToIpAddress(name); -+ -+ // If it's already a datanode, return its IP -+ if (podLabels.containsKey(ipAddress)) { -+ LOG.info("Name is a datanode: {}", ipAddress); -+ return ipAddress; -+ } -+ -+ // Try to find co-located datanode -+ Pod clientPod = cache.getPod(ipAddress); -+ if (clientPod != null) { -+ String datanodeIp = findColocatedDatanode(clientPod, nodeToDatanodeIp); -+ if (datanodeIp != null) { -+ return datanodeIp; - } - } -- LOG.info("Not a listener, returning [{}]", name); -- return name; -+ -+ // Keep original if we can't resolve -+ return ipAddress; -+ } -+ -+ private String findColocatedDatanode(Pod clientPod, Map nodeToDatanodeIp) { -+ String clientNodeName = clientPod.getSpec().getNodeName(); -+ -+ if (clientNodeName == null) { -+ LOG.warn("Client pod {} not yet assigned to node", clientPod.getMetadata().getName()); -+ return null; -+ } -+ -+ String datanodeIp = nodeToDatanodeIp.get(clientNodeName); -+ if (datanodeIp == null) { -+ LOG.debug("No datanode found on node {}", clientNodeName); -+ } -+ -+ return datanodeIp; -+ } -+ -+ private String resolveToIpAddress(String hostname) { -+ try { -+ InetAddress address = InetAddress.getByName(hostname); -+ String ip = address.getHostAddress(); -+ LOG.debug("Resolved {} to {}", hostname, ip); -+ return ip; -+ } catch (UnknownHostException e) { -+ LOG.warn("Failed to resolve address: {} - defaulting to {}", hostname, DEFAULT_RACK); -+ return hostname; -+ } - } - - /** -- * The list of names may be datanodes (as is the case when the topology is initialised) or -- * non-datanodes, when the data is being written by a client tool, spark executors etc. In this -- * case we want to identify the datanodes that are running on the same node as this client. The -- * names may also be pod IPs or pod names. -+ * Build a map from Kubernetes node name to datanode IP. This enables O(1) lookup when finding -+ * co-located dataNodes for client pods. - * -- * @param names list of client pods to resolve to datanodes -- * @param podLabels map of podIPs and labels -- * @param dns list of datanode names which will be used to match nodenames -- * @return a list of pods resolved to co-located datanodes where possible -+ *

Note: If multiple dataNodes run on the same node, the last one wins. This is acceptable -+ * because all dataNodes on the same node have the same topology. - */ -- private List resolveDataNodesFromCallingPods( -- List names, Map> podLabels, List dns) { -- List dataNodes = new ArrayList<>(); -+ private Map buildNodeToDatanodeMap(List dataNodes) { -+ Map nodeToDatanode = new HashMap<>(); - -- // if any of the names do not exist in the pod cache then refresh it -- List cachedPods = -- names.stream().map(this.podKeyCache::getIfPresent).collect(Collectors.toList()); -- if (cachedPods.contains(null)) { -- LOG.info( -- "Refreshing pod cache as not all of [{}] present in [{}]", -- names, -- this.podKeyCache.asMap().keySet()); -- for (Pod pod : client.pods().list().getItems()) { -- this.podKeyCache.put(pod.getMetadata().getName(), pod); -- // also add an entry for each IP -- for (PodIP ip : pod.getStatus().getPodIPs()) { -- this.podKeyCache.put(ip.getIp(), pod); -- } -- } -- } else { -- LOG.info("Pod cache contains [{}]", names); -- } -- ConcurrentMap pods = this.podKeyCache.asMap(); -+ for (Pod dataNode : dataNodes) { -+ String nodeName = dataNode.getSpec().getNodeName(); -+ String dataNodeIp = dataNode.getStatus().getPodIP(); - -- for (String name : names) { -- // if we don't find a dataNode running on the same node as a non-dataNode pod, then -- // we'll keep the original name to allow it to be resolved to NotFound in the calling routine. -- String replacementDataNodeIp = name; -- InetAddress address; -- try { -- // make sure we are looking up using the IP address -- address = InetAddress.getByName(name); -- String podIp = address.getHostAddress(); -- if (podLabels.containsKey(podIp)) { -- replacementDataNodeIp = podIp; -- LOG.info("Added as found in the datanode map [{}]", podIp); -- } else { -- // we've received a call from a non-datanode pod -- for (Pod pod : pods.values()) { -- if (pod.getStatus().getPodIPs().contains(new PodIP(podIp))) { -- String nodeName = pod.getSpec().getNodeName(); -- for (Pod dn : dns) { -- if (dn.getSpec().getNodeName().equals(nodeName)) { -- LOG.debug( -- "NodeName [{}] matches with [{}]?", dn.getSpec().getNodeName(), nodeName); -- replacementDataNodeIp = dn.getStatus().getPodIP(); -- break; -- } -- } -- } -- } -- } -- } catch (UnknownHostException e) { -- LOG.warn("Error encountered while resolving host [{}]", e.getLocalizedMessage()); -+ if (nodeName != null && dataNodeIp != null) { -+ nodeToDatanode.put(nodeName, dataNodeIp); - } -- dataNodes.add(replacementDataNodeIp); - } -- LOG.info("Replacing names [{}] with IPs [{}]", names, dataNodes); -- return dataNodes; -+ -+ LOG.debug("Built node-to-datanode map with {} entries", nodeToDatanode.size()); -+ return nodeToDatanode; - } - -- /** -- * Given a list of datanodes, return a HashMap that maps pod ips onto Pod labels. The returned Map -- * may contain more entries than the list that is given to this function, as an entry will be -- * generated for every ip a pod has. -- * -- * @param datanodes List of all retrieved pods. -- * @return Map of ip addresses to all labels the pod that "owns" that ip has attached to itself -- */ -- private Map> getPodLabels(List datanodes) { -- Map> result = new HashMap<>(); -- // Iterate over all pods and then all ips for every pod and add these to the mapping -- for (Pod pod : datanodes) { -- for (PodIP podIp : pod.getStatus().getPodIPs()) { -- result.put(podIp.getIp(), pod.getMetadata().getLabels()); -- } -+ // ============================================================================ -+ // TOPOLOGY STRING BUILDING -+ // ============================================================================ -+ -+ private String buildTopologyString( -+ String ipAddress, -+ Map> podLabels, -+ Map> nodeLabels) { -+ StringBuilder topology = new StringBuilder(); -+ -+ for (TopologyLabel label : labels) { -+ String labelValue = extractLabelValue(ipAddress, label, podLabels, nodeLabels); -+ topology.append("/").append(labelValue); - } -+ -+ String result = topology.toString(); -+ LOG.debug("Returning label [{}]", result); - return result; - } - -+ private String extractLabelValue( -+ String ipAddress, -+ TopologyLabel label, -+ Map> podLabels, -+ Map> nodeLabels) { -+ -+ Map> labelSource = label.isNodeLabel() ? nodeLabels : podLabels; -+ -+ String labelValue = -+ labelSource -+ .getOrDefault(ipAddress, Collections.emptyMap()) -+ .getOrDefault(label.getName(), "NotFound"); -+ -+ LOG.debug("Label {}.{} = {}", label.getType(), label.getName(), labelValue); -+ return labelValue; -+ } -+ -+ // ============================================================================ -+ // LABEL MAPS -+ // ============================================================================ -+ - /** -- * Given a list of datanodes this function will resolve which datanodes run on which node as well -- * as all the ips assigned to a datanodes. It will then return a mapping of every ip address to -- * the labels that are attached to the "physical" node running the datanodes that this ip belongs -+ * Given a list of dataNodes this function will resolve which dataNodes run on which node as well -+ * as all the ips assigned to a dataNodes. It will then return a mapping of every ip address to -+ * the labels that are attached to the "physical" node running the dataNodes that this ip belongs - * to. - * -- * @param datanodes List of all in-scope datanodes (datanode pods in this namespace) -+ * @param dataNodes List of all in-scope dataNodes (datanode pods in this namespace) - * @return Map of ip addresses to labels of the node running the pod that the ip address belongs - * to - */ -- private Map> getNodeLabels(List datanodes) { -+ private Map> buildNodeLabelMap(List dataNodes) { - Map> result = new HashMap<>(); -- for (Pod pod : datanodes) { -- // either retrieve the node from the internal cache or fetch it by name -+ for (Pod pod : dataNodes) { - String nodeName = pod.getSpec().getNodeName(); -- // nodeName may be null while pod is being provisioned so force a re-try -+ - if (nodeName == null) { -- LOG.warn( -- "Pod [{}] not yet assigned to a k8s node, forcing re-try", pod.getMetadata().getName()); -+ LOG.warn("Pod [{}] not yet assigned to node, retrying", pod.getMetadata().getName()); - return result; - } -- Node node = this.nodeKeyCache.getIfPresent(nodeName); -- if (node == null) { -- LOG.debug("Node not yet cached, fetching by name [{}]", nodeName); -- node = client.nodes().withName(nodeName).get(); -- this.nodeKeyCache.put(nodeName, node); -- } - -+ Node node = getOrFetchNode(nodeName); - Map nodeLabels = node.getMetadata().getLabels(); - LOG.debug("Labels for node [{}]:[{}]....", nodeName, nodeLabels); - -@@ -526,78 +460,33 @@ public class StackableTopologyProvider implements DNSToSwitchMapping { - return result; - } - -- @Override -- public void reloadCachedMappings() { -- // TODO: According to the upstream comment we should rebuild all cache entries after -- // invalidating them -- // this may mean trying to resolve ip addresses that do not exist any more and things like that -- // though and -- // require some more thought, so we will for now just invalidate the cache. -- this.topologyKeyCache.invalidateAll(); -- } -- -- @Override -- public void reloadCachedMappings(List names) { -- // TODO: See comment above, the same applies here -- for (String name : names) { -- this.topologyKeyCache.invalidate(name); -+ private Node getOrFetchNode(String nodeName) { -+ Node node = cache.getNode(nodeName); -+ if (node == null) { -+ LOG.debug("Fetching node: {}", nodeName); -+ node = client.nodes().withName(nodeName).get(); -+ cache.putNode(nodeName, node); - } -+ return node; - } - -- private enum LabelType { -- Node, -- Pod, -- Undefined -- } -+ /** -+ * Given a list of dataNodes, return a HashMap that maps pod ips onto Pod labels. The returned Map -+ * may contain more entries than the list that is given to this function, as an entry will be -+ * generated for every ip a pod has. -+ * -+ * @param dataNodes List of all retrieved pods. -+ * @return Map of ip addresses to all labels the pod that "owns" that ip has attached to itself -+ */ -+ private Map> buildPodLabelMap(List dataNodes) { -+ Map> result = new HashMap<>(); -+ for (Pod pod : dataNodes) { -+ Map podLabels = pod.getMetadata().getLabels(); - -- private class TopologyLabel { -- private final LabelType labelType; -- private String name = null; -- -- /** -- * Create a new TopologyLabel from its string representation -- * -- * @param value A string in the form of "[node|pod]:" that is deserialized into a -- * TopologyLabel. Invalid and empty strings are resolved into the type unspecified. -- */ -- private TopologyLabel(String value) { -- // If this is null the env var was not set, we will return 'undefined' for this level -- if (value == null || value.isEmpty()) { -- this.labelType = LabelType.Undefined; -- return; -- } -- String[] split = value.toLowerCase(Locale.ROOT).split(":", 2); -- -- // This should only fail if no : was present in the string -- if (split.length != 2) { -- this.labelType = LabelType.Undefined; -- LOG.warn( -- "Ignoring topology label [{}] - label definitions need to be in the form " -- + "of \"[node|pod]:\"", -- value); -- return; -- } -- // Length has to be two, proceed with "normal" case -- String type = split[0]; -- this.name = split[1]; -- -- // Parse type of object labels should be retrieved from -- switch (type) { -- case "node": -- this.labelType = LabelType.Node; -- break; -- -- case "pod": -- this.labelType = LabelType.Pod; -- break; -- -- default: -- LOG.warn( -- "Encountered unsupported label type [{}] - this label definition will be ignored, " -- + "supported types are [\"node\", \"pod\"]", -- type); -- this.labelType = LabelType.Undefined; -+ for (PodIP podIp : pod.getStatus().getPodIPs()) { -+ result.put(podIp.getIp(), podLabels); - } - } -+ return result; - } - } -diff --git a/src/main/java/tech/stackable/hadoop/TopologyCache.java b/src/main/java/tech/stackable/hadoop/TopologyCache.java -new file mode 100644 -index 0000000..3944fd4 ---- /dev/null -+++ b/src/main/java/tech/stackable/hadoop/TopologyCache.java -@@ -0,0 +1,88 @@ -+package tech.stackable.hadoop; -+ -+import com.github.benmanes.caffeine.cache.Cache; -+import com.github.benmanes.caffeine.cache.Caffeine; -+import io.fabric8.kubernetes.api.model.GenericKubernetesResource; -+import io.fabric8.kubernetes.api.model.Node; -+import io.fabric8.kubernetes.api.model.Pod; -+import java.util.List; -+import java.util.concurrent.ConcurrentMap; -+import java.util.concurrent.TimeUnit; -+ -+/** Manages all caching layers for the topology provider. */ -+public class TopologyCache { -+ private final Cache topology; -+ private final Cache nodes; -+ private final Cache listeners; -+ private final Cache pods; -+ -+ TopologyCache(int expirationSeconds, int defaultExpirationSeconds) { -+ this.topology = -+ Caffeine.newBuilder().expireAfterWrite(expirationSeconds, TimeUnit.SECONDS).build(); -+ -+ this.nodes = -+ Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); -+ -+ this.listeners = -+ Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); -+ -+ this.pods = -+ Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); -+ } -+ -+ String getTopology(String key) { -+ return topology.getIfPresent(key); -+ } -+ -+ void putTopology(String key, String value) { -+ topology.put(key, value); -+ } -+ -+ void invalidateAll() { -+ topology.invalidateAll(); -+ } -+ -+ void invalidateTopologyKeys(List keys) { -+ keys.forEach(topology::invalidate); -+ } -+ -+ Node getNode(String name) { -+ return nodes.getIfPresent(name); -+ } -+ -+ void putNode(String name, Node node) { -+ nodes.put(name, node); -+ } -+ -+ GenericKubernetesResource getListener(String name) { -+ return listeners.getIfPresent(name); -+ } -+ -+ ConcurrentMap getListenerMap() { -+ return listeners.asMap(); -+ } -+ -+ void putListener(String name, GenericKubernetesResource listener) { -+ listeners.put(name, listener); -+ } -+ -+ boolean hasAllListeners(List names) { -+ return names.stream().noneMatch(name -> listeners.getIfPresent(name) == null); -+ } -+ -+ Pod getPod(String name) { -+ return pods.getIfPresent(name); -+ } -+ -+ ConcurrentMap getPodMap() { -+ return pods.asMap(); -+ } -+ -+ void putPod(String name, Pod pod) { -+ pods.put(name, pod); -+ } -+ -+ boolean hasAllPods(List names) { -+ return names.stream().noneMatch(name -> pods.getIfPresent(name) == null); -+ } -+} -diff --git a/src/main/java/tech/stackable/hadoop/TopologyLabel.java b/src/main/java/tech/stackable/hadoop/TopologyLabel.java -new file mode 100644 -index 0000000..ba91b79 ---- /dev/null -+++ b/src/main/java/tech/stackable/hadoop/TopologyLabel.java -@@ -0,0 +1,116 @@ -+package tech.stackable.hadoop; -+ -+import java.util.Arrays; -+import java.util.List; -+import java.util.Locale; -+import java.util.stream.Collectors; -+import org.slf4j.Logger; -+import org.slf4j.LoggerFactory; -+ -+public class TopologyLabel { -+ private static final Logger LOG = LoggerFactory.getLogger(TopologyLabel.class); -+ public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; -+ public static final String VARNAME_MAXLEVELS = "TOPOLOGY_MAX_LEVELS"; -+ private static final int MAX_LEVELS_DEFAULT = 2; -+ -+ public enum Type { -+ NODE, -+ POD, -+ UNDEFINED -+ } -+ -+ private final Type type; -+ private final String name; -+ -+ TopologyLabel(String config) { -+ if (config == null || config.isEmpty()) { -+ this.type = Type.UNDEFINED; -+ this.name = null; -+ return; -+ } -+ -+ String[] parts = config.toLowerCase(Locale.ROOT).split(":", 2); -+ -+ if (parts.length != 2) { -+ LOG.warn("Invalid topology label format '{}' - expected '[node|pod]:'", config); -+ this.type = Type.UNDEFINED; -+ this.name = null; -+ return; -+ } -+ -+ this.name = parts[1]; -+ -+ switch (parts[0]) { -+ case "node": -+ this.type = Type.NODE; -+ break; -+ case "pod": -+ this.type = Type.POD; -+ break; -+ default: -+ LOG.warn("Unsupported label type '{}' - must be 'node' or 'pod'", parts[0]); -+ this.type = Type.UNDEFINED; -+ } -+ } -+ -+ boolean isNodeLabel() { -+ return type == Type.NODE; -+ } -+ -+ boolean isUndefined() { -+ return type == Type.UNDEFINED; -+ } -+ -+ String getName() { -+ return name; -+ } -+ -+ Type getType() { -+ return type; -+ } -+ -+ public static List initializeTopologyLabels() { -+ // Read the labels to be used to build a topology from environment variables. Labels are -+ // configured in the EnvVar "TOPOLOGY_LABELS". They should be specified in the form -+ // "[node|pod]:" and separated by ";". So a valid configuration that reads topology -+ // information from the labels "kubernetes.io/zone" and "kubernetes.io/rack" on the k8s node -+ // that is running a datanode pod would look like this: -+ // "node:kubernetes.io/zone;node:kubernetes.io/rack" By default, there is an upper limit of 2 on -+ // the number of labels that are processed, because this is what Hadoop traditionally allows - -+ // this can be overridden via setting the EnvVar "MAX_TOPOLOGY_LEVELS". -+ String topologyConfig = System.getenv(VARNAME_LABELS); -+ -+ if (topologyConfig == null || topologyConfig.isEmpty()) { -+ LOG.error( -+ "Missing env var [{}] this is required for rack awareness to work.", VARNAME_LABELS); -+ throw new RuntimeException("TOPOLOGY_LABELS environment variable not set"); -+ } -+ -+ String[] labelConfigs = topologyConfig.split(";"); -+ -+ if (labelConfigs.length > getMaxLabels()) { -+ LOG.error( -+ "Found [{}] topology labels configured, but maximum allowed number is [{}]: " -+ + "please check your config or raise the number of allowed labels.", -+ labelConfigs.length, -+ getMaxLabels()); -+ throw new RuntimeException("Too many topology labels configured"); -+ } -+ // Create TopologyLabels from config strings -+ List labels = -+ Arrays.stream(labelConfigs).map(TopologyLabel::new).collect(Collectors.toList()); -+ -+ if (labels.stream().anyMatch(TopologyLabel::isUndefined)) { -+ LOG.error( -+ "Invalid topology label configuration - labels must be in format '[pod|node]:'"); -+ throw new RuntimeException("Invalid topology label configuration"); -+ } -+ -+ return labels; -+ } -+ -+ private static int getMaxLabels() { -+ return TopologyUtils.parseIntFromEnv( -+ VARNAME_MAXLEVELS, MAX_LEVELS_DEFAULT, "maximum topology levels"); -+ } -+} -diff --git a/src/main/java/tech/stackable/hadoop/TopologyUtils.java b/src/main/java/tech/stackable/hadoop/TopologyUtils.java -index 686c4f2..be7cb51 100644 ---- a/src/main/java/tech/stackable/hadoop/TopologyUtils.java -+++ b/src/main/java/tech/stackable/hadoop/TopologyUtils.java -@@ -4,8 +4,12 @@ import io.fabric8.kubernetes.api.model.GenericKubernetesResource; - import java.util.List; - import java.util.Map; - import java.util.stream.Collectors; -+import org.slf4j.Logger; -+import org.slf4j.LoggerFactory; - - public class TopologyUtils { -+ private static final Logger LOG = LoggerFactory.getLogger(TopologyUtils.class); -+ - private static final String ADDRESS = "address"; - private static final String STATUS = "status"; - private static final String INGRESS_ADDRESSES = "ingressAddresses"; -@@ -21,4 +25,21 @@ public class TopologyUtils { - .map(ingress -> (String) ingress.get(ADDRESS)) - .collect(Collectors.toList()); - } -+ -+ public static int parseIntFromEnv(String varName, int defaultValue, String description) { -+ String value = System.getenv(varName); -+ if (value == null || value.isEmpty()) { -+ return defaultValue; -+ } -+ -+ try { -+ int parsed = Integer.parseInt(value); -+ LOG.info("Set {} to {} from environment variable {}", description, parsed, varName); -+ return parsed; -+ } catch (NumberFormatException e) { -+ LOG.warn( -+ "Invalid integer value '{}' for {} - using default: {}", value, varName, defaultValue); -+ return defaultValue; -+ } -+ } - } diff --git a/hadoop/hdfs-utils/stackable/patches/0.4.2/patchable.toml b/hadoop/hdfs-utils/stackable/patches/0.4.2/patchable.toml index 6dbd29dc4..cf6599dc1 100644 --- a/hadoop/hdfs-utils/stackable/patches/0.4.2/patchable.toml +++ b/hadoop/hdfs-utils/stackable/patches/0.4.2/patchable.toml @@ -1 +1,2 @@ -base = "921c42df13e2f7b72bf010cfa1e474243b186873" +# base = "921c42df13e2f7b72bf010cfa1e474243b186873" +base="0a53a65e8dbf00db41d36b977b36931051f36c2d" \ No newline at end of file From 8e90c04f3b848e9020aad1c6b43d0a528dfc5de2 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 12 Dec 2025 14:17:30 +0100 Subject: [PATCH 4/4] linting --- hadoop/hdfs-utils/stackable/patches/0.4.2/patchable.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop/hdfs-utils/stackable/patches/0.4.2/patchable.toml b/hadoop/hdfs-utils/stackable/patches/0.4.2/patchable.toml index cf6599dc1..0261a3aaa 100644 --- a/hadoop/hdfs-utils/stackable/patches/0.4.2/patchable.toml +++ b/hadoop/hdfs-utils/stackable/patches/0.4.2/patchable.toml @@ -1,2 +1,2 @@ # base = "921c42df13e2f7b72bf010cfa1e474243b186873" -base="0a53a65e8dbf00db41d36b977b36931051f36c2d" \ No newline at end of file +base="0a53a65e8dbf00db41d36b977b36931051f36c2d"