From b83e70b2996b027d1be2686c7257ca9828a72619 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 9 Dec 2025 17:52:06 +0100 Subject: [PATCH 01/14] wip: refactoring --- .../hadoop/StackableTopologyProvider.java | 818 ++++++++---------- .../tech/stackable/hadoop/TopologyCache.java | 88 ++ .../tech/stackable/hadoop/TopologyLabel.java | 116 +++ .../tech/stackable/hadoop/TopologyUtils.java | 21 + 4 files changed, 576 insertions(+), 467 deletions(-) create mode 100644 src/main/java/tech/stackable/hadoop/TopologyCache.java create mode 100644 src/main/java/tech/stackable/hadoop/TopologyLabel.java diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index 4d1e917..b6f7877 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -1,7 +1,5 @@ package tech.stackable.hadoop; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import io.fabric8.kubernetes.api.model.*; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; @@ -9,8 +7,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.net.DNSToSwitchMapping; import org.slf4j.Logger; @@ -18,330 +14,257 @@ /** * An implementation of the org.apache.hadoop.net.DNSToSwitchMapping that is used to create a - * topology out of datanodes. + * topology out of dataNodes. * *

This class is intended to be run as part of the NameNode process and will be used by the - * namenode to retrieve topology strings for datanodes. + * nameNode to retrieve topology strings for dataNodes. */ public class StackableTopologyProvider implements DNSToSwitchMapping { + private final Logger LOG = LoggerFactory.getLogger(StackableTopologyProvider.class); - public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; + // Environment variable names public static final String VARNAME_CACHE_EXPIRATION = "TOPOLOGY_CACHE_EXPIRATION_SECONDS"; - public static final String VARNAME_MAXLEVELS = "TOPOLOGY_MAX_LEVELS"; + + // Default values public static final String DEFAULT_RACK = "/defaultRack"; - private static final int MAX_LEVELS_DEFAULT = 2; private static final int CACHE_EXPIRY_DEFAULT_SECONDS = 5 * 60; - private final Logger LOG = LoggerFactory.getLogger(StackableTopologyProvider.class); + private final KubernetesClient client; - private final Cache topologyKeyCache = - Caffeine.newBuilder().expireAfterWrite(getCacheExpiration(), TimeUnit.SECONDS).build(); - private final Cache nodeKeyCache = - Caffeine.newBuilder() - .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) - .build(); - - private final Cache listenerKeyCache = - Caffeine.newBuilder() - .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) - .build(); - private final Cache podKeyCache = - Caffeine.newBuilder() - .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) - .build(); - // The list of labels that this provider uses to generate the topology information for any given - // datanode private final List labels; + // Caching layers + private final TopologyCache cache; + public StackableTopologyProvider() { this.client = new KubernetesClientBuilder().build(); + this.cache = new TopologyCache(getCacheExpiration(), CACHE_EXPIRY_DEFAULT_SECONDS); + this.labels = TopologyLabel.initializeTopologyLabels(); - // Read the labels to be used to build a topology from environment variables. Labels are - // configured in the EnvVar "TOPOLOGY_LABELS". They should be specified in the form - // "[node|pod]:" and separated by ";". So a valid configuration that reads topology - // information from the labels "kubernetes.io/zone" and "kubernetes.io/rack" on the k8s node - // that is running a datanode pod would look like this: - // "node:kubernetes.io/zone;node:kubernetes.io/rack" By default, there is an upper limit of 2 on - // the number of labels that are processed, because this is what Hadoop traditionally allows - - // this can be overridden via setting the EnvVar "MAX_TOPOLOGY_LEVELS". - String topologyConfig = System.getenv(VARNAME_LABELS); - if (topologyConfig != null && !topologyConfig.isEmpty()) { - String[] labelConfigs = topologyConfig.split(";"); - if (labelConfigs.length > getMaxLabels()) { - LOG.error( - "Found [{}] topology labels configured, but maximum allowed number is [{}]: " - + "please check your config or raise the number of allowed labels.", - labelConfigs.length, - getMaxLabels()); - throw new RuntimeException(); - } - // Create TopologyLabels from config strings - this.labels = - Arrays.stream(labelConfigs).map(TopologyLabel::new).collect(Collectors.toList()); - - // Check if any labelConfigs were invalid - if (this.labels.stream().anyMatch(label -> label.labelType == LabelType.Undefined)) { - LOG.error( - "Topologylabel contained invalid configuration for at least one label: " - + "double check your config! Labels should be specified in the " - + "format '[pod|node]:;...'"); - throw new RuntimeException(); - } + logInitializationStatus(); + } - } else { - LOG.error( - "Missing env var [{}] this is required for rack awareness to work.", VARNAME_LABELS); - throw new RuntimeException(); - } - if (this.labels.isEmpty()) { - LOG.info( - "No topology config found, defaulting value for all datanodes to [{}]", DEFAULT_RACK); - } else { - LOG.info( - "Topology config yields labels [{}]", - this.labels.stream().map(label -> label.name).collect(Collectors.toList())); - } + @Override + public void reloadCachedMappings() { + // TODO: According to the upstream comment we should rebuild all cache entries after + // invalidating them + // this may mean trying to resolve ip addresses that do not exist any more and things like that + // though and + // require some more thought, so we will for now just invalidate the cache. + this.cache.invalidateAll(); } - /*** - * Checks if a value for the maximum number of topology levels to allow has been configured in - * the environment variable specified in VARNAME_MAXLEVELS, - * returns the value of MAX_LEVELS_DEFAULT as default if nothing is set. - * - * @return The maximum number of topology levels to allow. - */ - private int getMaxLabels() { - String maxLevelsConfig = System.getenv(VARNAME_MAXLEVELS); - if (maxLevelsConfig != null && !maxLevelsConfig.isEmpty()) { - try { - int maxLevelsFromEnvVar = Integer.parseInt(maxLevelsConfig); - LOG.info( - "Found [{}] env var, changing allowed number of topology levels to [{}]", - VARNAME_MAXLEVELS, - maxLevelsFromEnvVar); - return maxLevelsFromEnvVar; - } catch (NumberFormatException e) { - LOG.warn( - "Unable to parse value of [{}]/[{}] as integer, using default value [{}]", - VARNAME_MAXLEVELS, - maxLevelsConfig, - MAX_LEVELS_DEFAULT); - } - } - return MAX_LEVELS_DEFAULT; + @Override + public void reloadCachedMappings(List names) { + // TODO: See comment above, the same applies here + cache.invalidateTopologyKeys(names); } - /*** - * Checks if a value for the cache expiration time has been configured in - * the environment variable specified in VARNAME_CACHE_EXPIRATION, - * returns the value of CACHE_EXPIRY_DEFAULT_SECONDS as default if nothing is set. - * - * @return The cache expiration time to use for the rack id cache. - */ private int getCacheExpiration() { - String cacheExpirationConfigSeconds = System.getenv(VARNAME_CACHE_EXPIRATION); - if (cacheExpirationConfigSeconds != null && !cacheExpirationConfigSeconds.isEmpty()) { - try { - int cacheExpirationFromEnvVar = Integer.parseInt(cacheExpirationConfigSeconds); - LOG.info( - "Found [{}] env var, changing cache time for topology entries to [{}]", - VARNAME_CACHE_EXPIRATION, - cacheExpirationFromEnvVar); - return cacheExpirationFromEnvVar; - } catch (NumberFormatException e) { - LOG.warn( - "Unable to parse value of [{}]/[{}] as integer, using default value [{}]", - VARNAME_CACHE_EXPIRATION, - cacheExpirationConfigSeconds, - CACHE_EXPIRY_DEFAULT_SECONDS); - } - } - return CACHE_EXPIRY_DEFAULT_SECONDS; + return TopologyUtils.parseIntFromEnv( + VARNAME_CACHE_EXPIRATION, CACHE_EXPIRY_DEFAULT_SECONDS, "cache expiration seconds"); } - /*** - * - * @param datanode the datanode whose IP mis to be resolved - * @param podLabels the pod labels used in the resolution - * @param nodeLabels the node labels used in the resolution - * - * @return the label looked up from the IP address - */ - private String getLabel( - String datanode, - Map> podLabels, - Map> nodeLabels) { - // The internal structures used by this mapper are all based on IP addresses. Depending on - // configuration and network setup it may (probably will) be possible that the namenode uses - // hostnames to resolve a datanode to a topology zone. To allow this, we resolve every input to - // an ip address below and use the ip to lookup labels. - // TODO: this might break with the listener operator, as `pod.status.podips` won't contain - // external addresses - // tracking this in https://github.com/stackabletech/hdfs-topology-provider/issues/2 - InetAddress address; - try { - address = InetAddress.getByName(datanode); - LOG.debug("Resolved [{}] to [{}]", datanode, address.getHostAddress()); - datanode = address.getHostAddress(); - } catch (UnknownHostException e) { - LOG.warn( - "failed to resolve address for [{}] - this should not happen, " - + "defaulting this node to [{}]", - datanode, - DEFAULT_RACK); - return DEFAULT_RACK; - } - StringBuilder resultBuilder = new StringBuilder(); - for (TopologyLabel label : this.labels) { - if (label.labelType == LabelType.Node) { - LOG.debug( - "Looking for node label [{}] of type [{}] in [{}]/[{}]", - label.name, - label.labelType, - nodeLabels.keySet(), - nodeLabels.values()); - resultBuilder - .append("/") - .append( - nodeLabels - .getOrDefault(datanode, new HashMap<>()) - .getOrDefault(label.name, "NotFound")); - } else if (label.labelType == LabelType.Pod) { - LOG.debug( - "Looking for pod label [{}] of type [{}] in [{}]/[{}]", - label.name, - label.labelType, - podLabels.keySet(), - podLabels.values()); - resultBuilder - .append("/") - .append( - podLabels - .getOrDefault(datanode, new HashMap<>()) - .getOrDefault(label.name, "NotFound")); - } + private void logInitializationStatus() { + if (labels.isEmpty()) { + LOG.info("No topology configuration - will use default rack: {}", DEFAULT_RACK); + } else { + List labelNames = + labels.stream().map(TopologyLabel::getName).collect(Collectors.toList()); + LOG.info("Initialized with topology labels: {}", labelNames); } - String result = resultBuilder.toString(); - LOG.debug("Returning label [{}]", result); - return result; } @Override public List resolve(List names) { - LOG.info("Resolving for listeners/client-pods [{}]", names.toString()); + LOG.info("Resolving topology for: {}", names); - if (this.labels.isEmpty()) { - LOG.info( - "No topology labels defined, returning [{}] for hdfs nodes: [{}]", DEFAULT_RACK, names); - return names.stream().map(name -> DEFAULT_RACK).collect(Collectors.toList()); + if (labels.isEmpty()) { + return createDefaultRackList(names); } - // We need to check if we have cached values for all datanodes contained in this request. + // Try to serve from cache first + List cachedValues = tryResolveFromCache(names); + if (cachedValues != null) { + LOG.info("Returning cached topology: {}", cachedValues); + return cachedValues; + } + + // Cache miss - perform full resolution + return performFullResolution(names); + } + + private List createDefaultRackList(List names) { + LOG.info( + "No topology labels defined, returning [{}] for hdfs nodes: [{}]", DEFAULT_RACK, names); + return names.stream().map(name -> DEFAULT_RACK).collect(Collectors.toList()); + } + + private List tryResolveFromCache(List names) { + // We need to check if we have cached values for all dataNodes contained in this request. // Unless we can answer everything from the cache we have to talk to k8s anyway and can just // recalculate everything - List cachedValues = - names.stream().map(this.topologyKeyCache::getIfPresent).collect(Collectors.toList()); - LOG.debug("Cached topologyKeyCache values [{}]", cachedValues); - - if (cachedValues.contains(null)) { - // We cannot completely serve this request from the cache, since we need to talk to k8s anyway - // we'll simply refresh everything. - LOG.debug( - "Cache doesn't contain values for all requested pods: new values will be built for all nodes."); - } else { - // use same log level as the non-cached return statement - LOG.info("Answering from cached topology keys: [{}]", cachedValues); - return cachedValues; + List cached = names.stream().map(cache::getTopology).collect(Collectors.toList()); + LOG.debug("Cached topologyKeyCache values [{}]", cached); + + return cached.contains(null) ? null : cached; + } + + // ============================================================================ + // RESOLUTION WORKFLOW + // ============================================================================ + + private List performFullResolution(List names) { + LOG.debug("Performing full topology resolution for: {}", names); + + // Step 1: Gather all dataNodes + List dataNodes = fetchDataNodes(); + + // Step 2: Resolve listeners to actual datanode IPs + List resolvedNames = resolveListeners(names); + + // Step 3: Build label lookup maps + Map> podLabels = buildPodLabelMap(dataNodes); + Map> nodeLabels = buildNodeLabelMap(dataNodes); + + // Step 4: Build node-to-datanode map for O(1) colocated lookups + Map nodeToDatanodeIp = buildNodeToDatanodeMap(dataNodes); + + // Step 5: Resolve client pods to co-located dataNodes + List datanodeIps = + resolveClientPodsToDataNodes(resolvedNames, podLabels, nodeToDatanodeIp); + + // Step 6: Build topology strings and cache results + return buildAndCacheTopology(names, datanodeIps, podLabels, nodeLabels); + } + + private List buildAndCacheTopology( + List originalNames, + List datanodeIps, + Map> podLabels, + Map> nodeLabels) { + List result = new ArrayList<>(); + for (int i = 0; i < datanodeIps.size(); i++) { + String datanodeIp = datanodeIps.get(i); + String originalName = originalNames.get(i); + + String topology = buildTopologyString(datanodeIp, podLabels, nodeLabels); + result.add(topology); + + // Cache both the resolved IP and original name + cache.putTopology(datanodeIp, topology); + cache.putTopology(originalName, topology); } - // The datanodes will be the cache keys. - List datanodes = + LOG.info("Built topology: {}", result); + return result; + } + + // ============================================================================ + // DATANODE FETCHING + // ============================================================================ + + private List fetchDataNodes() { + List dataNodes = client .pods() .withLabel("app.kubernetes.io/component", "datanode") .withLabel("app.kubernetes.io/name", "hdfs") .list() .getItems(); + LOG.debug( - "Retrieved datanodes: [{}]", - datanodes.stream() - .map(datanode -> datanode.getMetadata().getName()) + "Retrieved dataNodes: [{}]", + dataNodes.stream() + .map(dataNode -> dataNode.getMetadata().getName()) .collect(Collectors.toList())); + return dataNodes; + } - List namesToDataNodeNames = dataNodesResolvedFromListenerOrOriginal(names); - LOG.debug("Now resolving: [{}]", namesToDataNodeNames); + // ============================================================================ + // LISTENER RESOLUTION + // ============================================================================ - // Build internal state that is later used to look up information. Basically this transposes pod - // and node lists into hashmaps where pod-IPs can be used to look up labels for the pods and - // nodes. This is not terribly memory efficient because it effectively duplicates a lot of data - // in memory. But since we cache lookups, this should hopefully only be done every once in a - // while and is not kept in memory for extended amounts of time. - List result = new ArrayList<>(); + private List resolveListeners(List names) { + refreshListenerCacheIfNeeded(names); + + return names.stream().map(this::resolveListenerToDatanode).collect(Collectors.toList()); + } + + private void refreshListenerCacheIfNeeded(List names) { + List missingNames = + names.stream().filter(name -> cache.getListener(name) == null).collect(Collectors.toList()); - Map> nodeLabels = getNodeLabels(datanodes); - LOG.debug("Resolved node labels map [{}]/[{}]", nodeLabels.keySet(), nodeLabels.values()); + if (missingNames.isEmpty()) { + LOG.debug("Listener cache contains all required entries"); + return; + } - Map> podLabels = getPodLabels(datanodes); - LOG.debug("Resolved pod labels map [{}]/[{}]", podLabels.keySet(), podLabels.values()); + // Listeners are typically few, so fetch all + // (Individual listener fetches would require knowing the namespace) + LOG.debug("Fetching all listeners to populate cache"); + GenericKubernetesResourceList listeners = fetchListeners(); - List podsResolvedToDataNodes = - resolveDataNodesFromCallingPods(namesToDataNodeNames, podLabels, datanodes); + for (GenericKubernetesResource listener : listeners.getItems()) { + cacheListenerByNameAndAddresses(listener); + } + } - // Iterate over all nodes to resolve and return the topology zones - for (int i = 0; i < podsResolvedToDataNodes.size(); i++) { - String builtLabel = getLabel(podsResolvedToDataNodes.get(i), podLabels, nodeLabels); - result.add(builtLabel); + private void cacheListenerByNameAndAddresses(GenericKubernetesResource listener) { + String name = listener.getMetadata().getName(); + cache.putListener(name, listener); - // Cache the value for potential use in a later request - this.topologyKeyCache.put(podsResolvedToDataNodes.get(i), builtLabel); - // also cache the original name, in case that has resolved to a dataNode (so that - // the resolution step can be omitted next time this pod is encountered) - this.topologyKeyCache.put(names.get(i), builtLabel); + // Also cache by ingress addresses for quick lookup + for (String address : TopologyUtils.getIngressAddresses(listener)) { + cache.putListener(address, listener); } - LOG.info("Returning resolved labels [{}]", result); - return result; } /** - * If the names include listeners then these must be resolved against the dataNode IPs and used - * subsequently. + * We don't know if the name refers to a listener (it could be any client pod) but we check to see + * if it can be resolved to a dataNode just in case. * - * @param names the collection of names to resolve - * @return a collection of either the name (for non-listener) or the dataNode IP to which this - * listener resolves + * @param name the name of the calling pod which should be resolved to a dataNode IP if it is a + * listener + * @return either the name (for non-listener) or the dataNode IP to which this listener resolves */ - private List dataNodesResolvedFromListenerOrOriginal(List names) { - List cachedListeners = - names.stream().map(this.listenerKeyCache::getIfPresent).collect(Collectors.toList()); - if (cachedListeners.contains(null)) { - LOG.debug( - "Refreshing listener cache as not all of [{}] present in [{}]", - names, - this.listenerKeyCache.asMap().keySet()); - - GenericKubernetesResourceList listeners = getListeners(); - - for (GenericKubernetesResource listener : listeners.getItems()) { - this.listenerKeyCache.put(listener.getMetadata().getName(), listener); - // also add the IPs - for (String ingressAddress : TopologyUtils.getIngressAddresses(listener)) { - this.listenerKeyCache.put(ingressAddress, listener); - } + private String resolveListenerToDatanode(String name) { + GenericKubernetesResource listener = cache.getListener(name); + if (listener == null) { + LOG.debug("Not a listener: {}", name); + return name; + } + List ingressAddresses = TopologyUtils.getIngressAddresses(listener); + for (String ingressAddress : ingressAddresses) { + LOG.debug("Address [{}]", ingressAddress); + if (name.equalsIgnoreCase(ingressAddress)) { + return resolveListenerEndpoint(listener); } - } else { - LOG.debug("Listener cache contains [{}]", names); } - ConcurrentMap listeners = this.listenerKeyCache.asMap(); + LOG.info("Not a listener, returning [{}]", name); + return name; + } - List listenerToDataNodeNames = new ArrayList<>(); + private String resolveListenerEndpoint(GenericKubernetesResource listener) { + String listenerName = listener.getMetadata().getName(); + Endpoints endpoint = client.endpoints().withName(listenerName).get(); + LOG.debug("Matched ingressAddress [{}]", listenerName); - for (String name : names) { - String resolvedName = resolveDataNodesFromListeners(name, listeners); - listenerToDataNodeNames.add(resolvedName); + if (endpoint.getSubsets().isEmpty()) { + LOG.warn("Endpoint {} has no subsets - pod may be restarting", listenerName); + return listenerName; } - return listenerToDataNodeNames; + + EndpointAddress address = endpoint.getSubsets().get(0).getAddresses().get(0); + LOG.info( + "Resolved listener {} to IP {} on node {}", + listenerName, + address.getIp(), + address.getNodeName()); + + return address.getIp(); } - private GenericKubernetesResourceList getListeners() { + private GenericKubernetesResourceList fetchListeners() { ResourceDefinitionContext listenerCrd = new ResourceDefinitionContext.Builder() .withGroup("listeners.stackable.tech") @@ -353,168 +276,174 @@ private GenericKubernetesResourceList getListeners() { return client.genericKubernetesResources(listenerCrd).list(); } - /** - * We don't know if the name refers to a listener (it could be any client pod) but we check to see - * if it can be resolved to a dataNode just in case. - * - * @param name the name of the calling pod which should be resolved to a dataNode IP if it is a - * listener - * @param listeners the current listener collection - * @return either the name (for non-listener) or the dataNode IP to which this listener resolves - */ - private String resolveDataNodesFromListeners( - String name, ConcurrentMap listeners) { - LOG.debug("Attempting to resolve [{}]", name); - for (GenericKubernetesResource listener : listeners.values()) { - List ingressAddresses = TopologyUtils.getIngressAddresses(listener); - for (String ingressAddress : ingressAddresses) { - LOG.debug("Address [{}]", ingressAddress); - if (name.equalsIgnoreCase(ingressAddress)) { - LOG.debug("Matched ingressAddress [{}]/[{}]", name, listener.getMetadata().getName()); - - Endpoints ep = client.endpoints().withName(listener.getMetadata().getName()).get(); - // TODO: Assuming single address per datanode endpoint. - // When does/can an endpoint support multiple data nodes? On restart the address list will - // be empty for a moment or two. - if (ep.getSubsets().size() < 1) { - LOG.warn( - "Endpoint [{}] address not detected, pod maybe restarting...", - ep.getMetadata().getName()); - } else { - EndpointAddress address = ep.getSubsets().get(0).getAddresses().get(0); - LOG.info( - "Endpoint [{}], IP [{}], node [{}]", - ep.getMetadata().getName(), - address.getIp(), - address.getNodeName()); - return address.getIp(); - } - } - } + // ============================================================================ + // CLIENT POD RESOLUTION + // ============================================================================ + + private List resolveClientPodsToDataNodes( + List names, + Map> podLabels, + Map nodeToDatanodeIp) { + + refreshPodCacheIfNeeded(names); + + return names.stream() + .map(name -> resolveToDatanodeOrKeep(name, podLabels, nodeToDatanodeIp)) + .collect(Collectors.toList()); + } + + private void refreshPodCacheIfNeeded(List names) { + if (cache.hasAllPods(names)) { + LOG.debug("Pod cache contains all required entries"); + return; + } + + // Note: We fetch all pods here because: + // 1. Client pods (Spark, etc.) are queried by IP, not name + // 2. K8s doesn't support "get pod by IP" - we must list and filter + LOG.debug("Refreshing pod cache for client pod resolution"); + for (Pod pod : client.pods().list().getItems()) { + cachePodByNameAndIps(pod); } - LOG.info("Not a listener, returning [{}]", name); - return name; } - /** - * The list of names may be datanodes (as is the case when the topology is initialised) or - * non-datanodes, when the data is being written by a client tool, spark executors etc. In this - * case we want to identify the datanodes that are running on the same node as this client. The - * names may also be pod IPs or pod names. - * - * @param names list of client pods to resolve to datanodes - * @param podLabels map of podIPs and labels - * @param dns list of datanode names which will be used to match nodenames - * @return a list of pods resolved to co-located datanodes where possible - */ - private List resolveDataNodesFromCallingPods( - List names, Map> podLabels, List dns) { - List dataNodes = new ArrayList<>(); - - // if any of the names do not exist in the pod cache then refresh it - List cachedPods = - names.stream().map(this.podKeyCache::getIfPresent).collect(Collectors.toList()); - if (cachedPods.contains(null)) { - LOG.info( - "Refreshing pod cache as not all of [{}] present in [{}]", - names, - this.podKeyCache.asMap().keySet()); - for (Pod pod : client.pods().list().getItems()) { - this.podKeyCache.put(pod.getMetadata().getName(), pod); - // also add an entry for each IP - for (PodIP ip : pod.getStatus().getPodIPs()) { - this.podKeyCache.put(ip.getIp(), pod); - } - } - } else { - LOG.info("Pod cache contains [{}]", names); + private void cachePodByNameAndIps(Pod pod) { + String podName = pod.getMetadata().getName(); + cache.putPod(podName, pod); + + // Cache by all IPs - this is crucial for IP-based lookups + for (PodIP ip : pod.getStatus().getPodIPs()) { + cache.putPod(ip.getIp(), pod); + } + } + + private String resolveToDatanodeOrKeep( + String name, + Map> podLabels, + Map nodeToDatanodeIp) { + + String ipAddress = resolveToIpAddress(name); + + // If it's already a datanode, return its IP + if (podLabels.containsKey(ipAddress)) { + LOG.info("Name is a datanode: {}", ipAddress); + return ipAddress; } - ConcurrentMap pods = this.podKeyCache.asMap(); - - for (String name : names) { - // if we don't find a dataNode running on the same node as a non-dataNode pod, then - // we'll keep the original name to allow it to be resolved to NotFound in the calling routine. - String replacementDataNodeIp = name; - InetAddress address; - try { - // make sure we are looking up using the IP address - address = InetAddress.getByName(name); - String podIp = address.getHostAddress(); - if (podLabels.containsKey(podIp)) { - replacementDataNodeIp = podIp; - LOG.info("Added as found in the datanode map [{}]", podIp); - } else { - // we've received a call from a non-datanode pod - for (Pod pod : pods.values()) { - if (pod.getStatus().getPodIPs().contains(new PodIP(podIp))) { - String nodeName = pod.getSpec().getNodeName(); - for (Pod dn : dns) { - if (dn.getSpec().getNodeName().equals(nodeName)) { - LOG.debug( - "NodeName [{}] matches with [{}]?", dn.getSpec().getNodeName(), nodeName); - replacementDataNodeIp = dn.getStatus().getPodIP(); - break; - } - } - } - } - } - } catch (UnknownHostException e) { - LOG.warn("Error encountered while resolving host [{}]", e.getLocalizedMessage()); + + // Try to find co-located datanode + Pod clientPod = cache.getPod(ipAddress); + if (clientPod != null) { + String datanodeIp = findColocatedDatanode(clientPod, nodeToDatanodeIp); + if (datanodeIp != null) { + return datanodeIp; } - dataNodes.add(replacementDataNodeIp); } - LOG.info("Replacing names [{}] with IPs [{}]", names, dataNodes); - return dataNodes; + + // Keep original if we can't resolve + return ipAddress; + } + + private String findColocatedDatanode(Pod clientPod, Map nodeToDatanodeIp) { + return nodeToDatanodeIp.get(clientPod.getSpec().getNodeName()); + } + + private String resolveToIpAddress(String hostname) { + try { + InetAddress address = InetAddress.getByName(hostname); + String ip = address.getHostAddress(); + LOG.debug("Resolved {} to {}", hostname, ip); + return ip; + } catch (UnknownHostException e) { + LOG.warn("Failed to resolve address: {} - defaulting to {}", hostname, DEFAULT_RACK); + return hostname; + } } /** - * Given a list of datanodes, return a HashMap that maps pod ips onto Pod labels. The returned Map - * may contain more entries than the list that is given to this function, as an entry will be - * generated for every ip a pod has. + * Build a map from Kubernetes node name to datanode IP. This enables O(1) lookup when finding + * co-located dataNodes for client pods. * - * @param datanodes List of all retrieved pods. - * @return Map of ip addresses to all labels the pod that "owns" that ip has attached to itself + *

Note: If multiple dataNodes run on the same node, the last one wins. This is acceptable + * because all dataNodes on the same node have the same topology. */ - private Map> getPodLabels(List datanodes) { - Map> result = new HashMap<>(); - // Iterate over all pods and then all ips for every pod and add these to the mapping - for (Pod pod : datanodes) { - for (PodIP podIp : pod.getStatus().getPodIPs()) { - result.put(podIp.getIp(), pod.getMetadata().getLabels()); + private Map buildNodeToDatanodeMap(List dataNodes) { + Map nodeToDatanode = new HashMap<>(); + + for (Pod dataNode : dataNodes) { + String nodeName = dataNode.getSpec().getNodeName(); + String dataNodeIp = dataNode.getStatus().getPodIP(); + + if (nodeName != null && dataNodeIp != null) { + nodeToDatanode.put(nodeName, dataNodeIp); } } + + LOG.debug("Built node-to-datanode map with {} entries", nodeToDatanode.size()); + return nodeToDatanode; + } + + // ============================================================================ + // TOPOLOGY STRING BUILDING + // ============================================================================ + + private String buildTopologyString( + String ipAddress, + Map> podLabels, + Map> nodeLabels) { + StringBuilder topology = new StringBuilder(); + + for (TopologyLabel label : labels) { + String labelValue = extractLabelValue(ipAddress, label, podLabels, nodeLabels); + topology.append("/").append(labelValue); + } + + String result = topology.toString(); + LOG.debug("Returning label [{}]", result); return result; } + private String extractLabelValue( + String ipAddress, + TopologyLabel label, + Map> podLabels, + Map> nodeLabels) { + + Map> labelSource = label.isNodeLabel() ? nodeLabels : podLabels; + + String labelValue = + labelSource + .getOrDefault(ipAddress, Collections.emptyMap()) + .getOrDefault(label.getName(), "NotFound"); + + LOG.debug("Label {}.{} = {}", label.getType(), label.getName(), labelValue); + return labelValue; + } + + // ============================================================================ + // LABEL MAPS + // ============================================================================ + /** - * Given a list of datanodes this function will resolve which datanodes run on which node as well - * as all the ips assigned to a datanodes. It will then return a mapping of every ip address to - * the labels that are attached to the "physical" node running the datanodes that this ip belongs + * Given a list of dataNodes this function will resolve which dataNodes run on which node as well + * as all the ips assigned to a dataNodes. It will then return a mapping of every ip address to + * the labels that are attached to the "physical" node running the dataNodes that this ip belongs * to. * - * @param datanodes List of all in-scope datanodes (datanode pods in this namespace) + * @param dataNodes List of all in-scope dataNodes (datanode pods in this namespace) * @return Map of ip addresses to labels of the node running the pod that the ip address belongs * to */ - private Map> getNodeLabels(List datanodes) { + private Map> buildNodeLabelMap(List dataNodes) { Map> result = new HashMap<>(); - for (Pod pod : datanodes) { - // either retrieve the node from the internal cache or fetch it by name + for (Pod pod : dataNodes) { String nodeName = pod.getSpec().getNodeName(); - // nodeName may be null while pod is being provisioned so force a re-try + if (nodeName == null) { - LOG.warn( - "Pod [{}] not yet assigned to a k8s node, forcing re-try", pod.getMetadata().getName()); + LOG.warn("Pod [{}] not yet assigned to node, retrying", pod.getMetadata().getName()); return result; } - Node node = this.nodeKeyCache.getIfPresent(nodeName); - if (node == null) { - LOG.debug("Node not yet cached, fetching by name [{}]", nodeName); - node = client.nodes().withName(nodeName).get(); - this.nodeKeyCache.put(nodeName, node); - } + Node node = getOrFetchNode(nodeName); Map nodeLabels = node.getMetadata().getLabels(); LOG.debug("Labels for node [{}]:[{}]....", nodeName, nodeLabels); @@ -526,78 +455,33 @@ private Map> getNodeLabels(List datanodes) { return result; } - @Override - public void reloadCachedMappings() { - // TODO: According to the upstream comment we should rebuild all cache entries after - // invalidating them - // this may mean trying to resolve ip addresses that do not exist any more and things like that - // though and - // require some more thought, so we will for now just invalidate the cache. - this.topologyKeyCache.invalidateAll(); - } - - @Override - public void reloadCachedMappings(List names) { - // TODO: See comment above, the same applies here - for (String name : names) { - this.topologyKeyCache.invalidate(name); + private Node getOrFetchNode(String nodeName) { + Node node = cache.getNode(nodeName); + if (node == null) { + LOG.debug("Fetching node: {}", nodeName); + node = client.nodes().withName(nodeName).get(); + cache.putNode(nodeName, node); } + return node; } - private enum LabelType { - Node, - Pod, - Undefined - } + /** + * Given a list of dataNodes, return a HashMap that maps pod ips onto Pod labels. The returned Map + * may contain more entries than the list that is given to this function, as an entry will be + * generated for every ip a pod has. + * + * @param dataNodes List of all retrieved pods. + * @return Map of ip addresses to all labels the pod that "owns" that ip has attached to itself + */ + private Map> buildPodLabelMap(List dataNodes) { + Map> result = new HashMap<>(); + for (Pod pod : dataNodes) { + Map podLabels = pod.getMetadata().getLabels(); - private class TopologyLabel { - private final LabelType labelType; - private String name = null; - - /** - * Create a new TopologyLabel from its string representation - * - * @param value A string in the form of "[node|pod]:" that is deserialized into a - * TopologyLabel. Invalid and empty strings are resolved into the type unspecified. - */ - private TopologyLabel(String value) { - // If this is null the env var was not set, we will return 'undefined' for this level - if (value == null || value.isEmpty()) { - this.labelType = LabelType.Undefined; - return; - } - String[] split = value.toLowerCase(Locale.ROOT).split(":", 2); - - // This should only fail if no : was present in the string - if (split.length != 2) { - this.labelType = LabelType.Undefined; - LOG.warn( - "Ignoring topology label [{}] - label definitions need to be in the form " - + "of \"[node|pod]:\"", - value); - return; - } - // Length has to be two, proceed with "normal" case - String type = split[0]; - this.name = split[1]; - - // Parse type of object labels should be retrieved from - switch (type) { - case "node": - this.labelType = LabelType.Node; - break; - - case "pod": - this.labelType = LabelType.Pod; - break; - - default: - LOG.warn( - "Encountered unsupported label type [{}] - this label definition will be ignored, " - + "supported types are [\"node\", \"pod\"]", - type); - this.labelType = LabelType.Undefined; + for (PodIP podIp : pod.getStatus().getPodIPs()) { + result.put(podIp.getIp(), podLabels); } } + return result; } } diff --git a/src/main/java/tech/stackable/hadoop/TopologyCache.java b/src/main/java/tech/stackable/hadoop/TopologyCache.java new file mode 100644 index 0000000..3944fd4 --- /dev/null +++ b/src/main/java/tech/stackable/hadoop/TopologyCache.java @@ -0,0 +1,88 @@ +package tech.stackable.hadoop; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import io.fabric8.kubernetes.api.model.GenericKubernetesResource; +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.Pod; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +/** Manages all caching layers for the topology provider. */ +public class TopologyCache { + private final Cache topology; + private final Cache nodes; + private final Cache listeners; + private final Cache pods; + + TopologyCache(int expirationSeconds, int defaultExpirationSeconds) { + this.topology = + Caffeine.newBuilder().expireAfterWrite(expirationSeconds, TimeUnit.SECONDS).build(); + + this.nodes = + Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); + + this.listeners = + Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); + + this.pods = + Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); + } + + String getTopology(String key) { + return topology.getIfPresent(key); + } + + void putTopology(String key, String value) { + topology.put(key, value); + } + + void invalidateAll() { + topology.invalidateAll(); + } + + void invalidateTopologyKeys(List keys) { + keys.forEach(topology::invalidate); + } + + Node getNode(String name) { + return nodes.getIfPresent(name); + } + + void putNode(String name, Node node) { + nodes.put(name, node); + } + + GenericKubernetesResource getListener(String name) { + return listeners.getIfPresent(name); + } + + ConcurrentMap getListenerMap() { + return listeners.asMap(); + } + + void putListener(String name, GenericKubernetesResource listener) { + listeners.put(name, listener); + } + + boolean hasAllListeners(List names) { + return names.stream().noneMatch(name -> listeners.getIfPresent(name) == null); + } + + Pod getPod(String name) { + return pods.getIfPresent(name); + } + + ConcurrentMap getPodMap() { + return pods.asMap(); + } + + void putPod(String name, Pod pod) { + pods.put(name, pod); + } + + boolean hasAllPods(List names) { + return names.stream().noneMatch(name -> pods.getIfPresent(name) == null); + } +} diff --git a/src/main/java/tech/stackable/hadoop/TopologyLabel.java b/src/main/java/tech/stackable/hadoop/TopologyLabel.java new file mode 100644 index 0000000..ba91b79 --- /dev/null +++ b/src/main/java/tech/stackable/hadoop/TopologyLabel.java @@ -0,0 +1,116 @@ +package tech.stackable.hadoop; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopologyLabel { + private static final Logger LOG = LoggerFactory.getLogger(TopologyLabel.class); + public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; + public static final String VARNAME_MAXLEVELS = "TOPOLOGY_MAX_LEVELS"; + private static final int MAX_LEVELS_DEFAULT = 2; + + public enum Type { + NODE, + POD, + UNDEFINED + } + + private final Type type; + private final String name; + + TopologyLabel(String config) { + if (config == null || config.isEmpty()) { + this.type = Type.UNDEFINED; + this.name = null; + return; + } + + String[] parts = config.toLowerCase(Locale.ROOT).split(":", 2); + + if (parts.length != 2) { + LOG.warn("Invalid topology label format '{}' - expected '[node|pod]:'", config); + this.type = Type.UNDEFINED; + this.name = null; + return; + } + + this.name = parts[1]; + + switch (parts[0]) { + case "node": + this.type = Type.NODE; + break; + case "pod": + this.type = Type.POD; + break; + default: + LOG.warn("Unsupported label type '{}' - must be 'node' or 'pod'", parts[0]); + this.type = Type.UNDEFINED; + } + } + + boolean isNodeLabel() { + return type == Type.NODE; + } + + boolean isUndefined() { + return type == Type.UNDEFINED; + } + + String getName() { + return name; + } + + Type getType() { + return type; + } + + public static List initializeTopologyLabels() { + // Read the labels to be used to build a topology from environment variables. Labels are + // configured in the EnvVar "TOPOLOGY_LABELS". They should be specified in the form + // "[node|pod]:" and separated by ";". So a valid configuration that reads topology + // information from the labels "kubernetes.io/zone" and "kubernetes.io/rack" on the k8s node + // that is running a datanode pod would look like this: + // "node:kubernetes.io/zone;node:kubernetes.io/rack" By default, there is an upper limit of 2 on + // the number of labels that are processed, because this is what Hadoop traditionally allows - + // this can be overridden via setting the EnvVar "MAX_TOPOLOGY_LEVELS". + String topologyConfig = System.getenv(VARNAME_LABELS); + + if (topologyConfig == null || topologyConfig.isEmpty()) { + LOG.error( + "Missing env var [{}] this is required for rack awareness to work.", VARNAME_LABELS); + throw new RuntimeException("TOPOLOGY_LABELS environment variable not set"); + } + + String[] labelConfigs = topologyConfig.split(";"); + + if (labelConfigs.length > getMaxLabels()) { + LOG.error( + "Found [{}] topology labels configured, but maximum allowed number is [{}]: " + + "please check your config or raise the number of allowed labels.", + labelConfigs.length, + getMaxLabels()); + throw new RuntimeException("Too many topology labels configured"); + } + // Create TopologyLabels from config strings + List labels = + Arrays.stream(labelConfigs).map(TopologyLabel::new).collect(Collectors.toList()); + + if (labels.stream().anyMatch(TopologyLabel::isUndefined)) { + LOG.error( + "Invalid topology label configuration - labels must be in format '[pod|node]:'"); + throw new RuntimeException("Invalid topology label configuration"); + } + + return labels; + } + + private static int getMaxLabels() { + return TopologyUtils.parseIntFromEnv( + VARNAME_MAXLEVELS, MAX_LEVELS_DEFAULT, "maximum topology levels"); + } +} diff --git a/src/main/java/tech/stackable/hadoop/TopologyUtils.java b/src/main/java/tech/stackable/hadoop/TopologyUtils.java index 686c4f2..be7cb51 100644 --- a/src/main/java/tech/stackable/hadoop/TopologyUtils.java +++ b/src/main/java/tech/stackable/hadoop/TopologyUtils.java @@ -4,8 +4,12 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TopologyUtils { + private static final Logger LOG = LoggerFactory.getLogger(TopologyUtils.class); + private static final String ADDRESS = "address"; private static final String STATUS = "status"; private static final String INGRESS_ADDRESSES = "ingressAddresses"; @@ -21,4 +25,21 @@ public static List getIngressAddresses(GenericKubernetesResource listene .map(ingress -> (String) ingress.get(ADDRESS)) .collect(Collectors.toList()); } + + public static int parseIntFromEnv(String varName, int defaultValue, String description) { + String value = System.getenv(varName); + if (value == null || value.isEmpty()) { + return defaultValue; + } + + try { + int parsed = Integer.parseInt(value); + LOG.info("Set {} to {} from environment variable {}", description, parsed, varName); + return parsed; + } catch (NumberFormatException e) { + LOG.warn( + "Invalid integer value '{}' for {} - using default: {}", value, varName, defaultValue); + return defaultValue; + } + } } From a3cf6e8a3309aa60cda0ce5910d644277a48f39b Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 10 Dec 2025 16:37:03 +0100 Subject: [PATCH 02/14] refer to class in readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f277d20..4c0b399 100644 --- a/README.md +++ b/README.md @@ -701,7 +701,7 @@ In Kubernetes, the most commonly used mechanism for topology awareness are label The most prevalent example for this is the node label [topology.kubernetes.io/zone](https://kubernetes.io/docs/reference/labels-annotations-taints/#topologykubernetesiozone) which often refers to availability zones in cloud providers or similar things. The purpose of this tool is to feed information from Kubernetes into the HDFS rack awareness functionality. -In order to do this, it implements the Hadoop interface `org.apache.hadoop.net.DNSToSwitchMapping` which then allows this tool to be configured on the NameNode via the parameter `net.topology.node.switch.mapping.impl`. +In order to do this, `tech.stackable.hadoop.StackableTopologyProvider` implements the Hadoop interface `org.apache.hadoop.net.DNSToSwitchMapping` which then allows this tool to be configured on the NameNode via the parameter `net.topology.node.switch.mapping.impl`. The topology provider watches all HDFS pods deployed by Stackable and Kubernetes nodes and keeps an in memory cache of the current state of these objects. From this state store the tool can then calculate rack IDs for nodes that HDFS asks for without needing to talk to the api-server and incurring an extra network round-trip. From 201717b357a2202bd8a0f52defe7b7af6e835bc8 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 10 Dec 2025 17:08:44 +0100 Subject: [PATCH 03/14] improved listener resolution --- .../hadoop/StackableTopologyProvider.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index b6f7877..0885796 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -233,15 +233,8 @@ private String resolveListenerToDatanode(String name) { LOG.debug("Not a listener: {}", name); return name; } - List ingressAddresses = TopologyUtils.getIngressAddresses(listener); - for (String ingressAddress : ingressAddresses) { - LOG.debug("Address [{}]", ingressAddress); - if (name.equalsIgnoreCase(ingressAddress)) { - return resolveListenerEndpoint(listener); - } - } - LOG.info("Not a listener, returning [{}]", name); - return name; + // We found a listener, so we can resolve it directly + return resolveListenerEndpoint(listener); } private String resolveListenerEndpoint(GenericKubernetesResource listener) { @@ -344,7 +337,19 @@ private String resolveToDatanodeOrKeep( } private String findColocatedDatanode(Pod clientPod, Map nodeToDatanodeIp) { - return nodeToDatanodeIp.get(clientPod.getSpec().getNodeName()); + String clientNodeName = clientPod.getSpec().getNodeName(); + + if (clientNodeName == null) { + LOG.warn("Client pod {} not yet assigned to node", clientPod.getMetadata().getName()); + return null; + } + + String datanodeIp = nodeToDatanodeIp.get(clientNodeName); + if (datanodeIp == null) { + LOG.debug("No datanode found on node {}", clientNodeName); + } + + return datanodeIp; } private String resolveToIpAddress(String hostname) { From a55e6bed2cd4b772a7ed2239e10417f9e26f4849 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 11 Dec 2025 13:31:42 +0100 Subject: [PATCH 04/14] added logging --- .../java/tech/stackable/hadoop/StackableTopologyProvider.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index 0885796..ecc3e71 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -72,6 +72,7 @@ private void logInitializationStatus() { labels.stream().map(TopologyLabel::getName).collect(Collectors.toList()); LOG.info("Initialized with topology labels: {}", labelNames); } + LOG.debug("Client namespaces {} and config {}", client.namespaces(), client.configMaps()); } @Override @@ -302,6 +303,7 @@ private void refreshPodCacheIfNeeded(List names) { private void cachePodByNameAndIps(Pod pod) { String podName = pod.getMetadata().getName(); + LOG.debug("Refreshing pod cache: adding {}", podName); cache.putPod(podName, pod); // Cache by all IPs - this is crucial for IP-based lookups From 563edb156351e90163daeda663e1e33917ae9f50 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 11 Dec 2025 15:19:56 +0100 Subject: [PATCH 05/14] more logging changes --- .../tech/stackable/hadoop/StackableTopologyProvider.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index ecc3e71..89962c7 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -36,6 +36,7 @@ public class StackableTopologyProvider implements DNSToSwitchMapping { private final TopologyCache cache; public StackableTopologyProvider() { + // By default, the client will operate within the current namespace this.client = new KubernetesClientBuilder().build(); this.cache = new TopologyCache(getCacheExpiration(), CACHE_EXPIRY_DEFAULT_SECONDS); this.labels = TopologyLabel.initializeTopologyLabels(); @@ -72,7 +73,10 @@ private void logInitializationStatus() { labels.stream().map(TopologyLabel::getName).collect(Collectors.toList()); LOG.info("Initialized with topology labels: {}", labelNames); } - LOG.debug("Client namespaces {} and config {}", client.namespaces(), client.configMaps()); + LOG.debug( + "Client namespaces {} and configuration {}", + client.getNamespace(), + client.getConfiguration()); } @Override From e96fbd7830e1f8f87bc3e95777ef12a3be70d90e Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 11 Dec 2025 15:55:01 +0100 Subject: [PATCH 06/14] code cleanup --- .../hadoop/StackableTopologyProvider.java | 25 ++++++++----------- .../tech/stackable/hadoop/TopologyCache.java | 15 +---------- .../tech/stackable/hadoop/TopologyLabel.java | 8 +++--- .../tech/stackable/hadoop/TopologyUtils.java | 1 + 4 files changed, 17 insertions(+), 32 deletions(-) diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index 89962c7..a83550e 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -16,8 +16,8 @@ * An implementation of the org.apache.hadoop.net.DNSToSwitchMapping that is used to create a * topology out of dataNodes. * - *

This class is intended to be run as part of the NameNode process and will be used by the - * nameNode to retrieve topology strings for dataNodes. + *

This class is intended to be run as part of the NameNode process (in the same namespace) and + * will be used by the nameNode to retrieve topology strings for dataNodes. */ public class StackableTopologyProvider implements DNSToSwitchMapping { private final Logger LOG = LoggerFactory.getLogger(StackableTopologyProvider.class); @@ -47,11 +47,10 @@ public StackableTopologyProvider() { @Override public void reloadCachedMappings() { // TODO: According to the upstream comment we should rebuild all cache entries after - // invalidating them - // this may mean trying to resolve ip addresses that do not exist any more and things like that - // though and - // require some more thought, so we will for now just invalidate the cache. - this.cache.invalidateAll(); + // invalidating them. This may mean trying to resolve ip addresses that do not exist + // any more and things like that though and require some more thought, so we will for + // now just invalidate the cache. + this.cache.invalidateAllTopologyKeys(); } @Override @@ -73,10 +72,7 @@ private void logInitializationStatus() { labels.stream().map(TopologyLabel::getName).collect(Collectors.toList()); LOG.info("Initialized with topology labels: {}", labelNames); } - LOG.debug( - "Client namespaces {} and configuration {}", - client.getNamespace(), - client.getConfiguration()); + LOG.debug("Client namespace {}", client.getNamespace()); } @Override @@ -106,8 +102,7 @@ private List createDefaultRackList(List names) { private List tryResolveFromCache(List names) { // We need to check if we have cached values for all dataNodes contained in this request. - // Unless we can answer everything from the cache we have to talk to k8s anyway and can just - // recalculate everything + // Unless we can answer everything from the cache we will perform a full resolution. List cached = names.stream().map(cache::getTopology).collect(Collectors.toList()); LOG.debug("Cached topologyKeyCache values [{}]", cached); @@ -263,10 +258,10 @@ private String resolveListenerEndpoint(GenericKubernetesResource listener) { } private GenericKubernetesResourceList fetchListeners() { + // no version is specified here as we are not always going to be on v1alpha1 ResourceDefinitionContext listenerCrd = new ResourceDefinitionContext.Builder() .withGroup("listeners.stackable.tech") - .withVersion("v1alpha1") .withPlural("listeners") .withNamespaced(true) .build(); @@ -488,8 +483,10 @@ private Map> buildPodLabelMap(List dataNodes) { Map> result = new HashMap<>(); for (Pod pod : dataNodes) { Map podLabels = pod.getMetadata().getLabels(); + LOG.debug("Labels for pod [{}]:[{}]....", pod.getMetadata().getName(), podLabels); for (PodIP podIp : pod.getStatus().getPodIPs()) { + LOG.debug("...assigned to pod IP [{}]", podIp.getIp()); result.put(podIp.getIp(), podLabels); } } diff --git a/src/main/java/tech/stackable/hadoop/TopologyCache.java b/src/main/java/tech/stackable/hadoop/TopologyCache.java index 3944fd4..15c520a 100644 --- a/src/main/java/tech/stackable/hadoop/TopologyCache.java +++ b/src/main/java/tech/stackable/hadoop/TopologyCache.java @@ -6,7 +6,6 @@ import io.fabric8.kubernetes.api.model.Node; import io.fabric8.kubernetes.api.model.Pod; import java.util.List; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; /** Manages all caching layers for the topology provider. */ @@ -38,7 +37,7 @@ void putTopology(String key, String value) { topology.put(key, value); } - void invalidateAll() { + void invalidateAllTopologyKeys() { topology.invalidateAll(); } @@ -58,26 +57,14 @@ GenericKubernetesResource getListener(String name) { return listeners.getIfPresent(name); } - ConcurrentMap getListenerMap() { - return listeners.asMap(); - } - void putListener(String name, GenericKubernetesResource listener) { listeners.put(name, listener); } - boolean hasAllListeners(List names) { - return names.stream().noneMatch(name -> listeners.getIfPresent(name) == null); - } - Pod getPod(String name) { return pods.getIfPresent(name); } - ConcurrentMap getPodMap() { - return pods.asMap(); - } - void putPod(String name, Pod pod) { pods.put(name, pod); } diff --git a/src/main/java/tech/stackable/hadoop/TopologyLabel.java b/src/main/java/tech/stackable/hadoop/TopologyLabel.java index ba91b79..5ba2252 100644 --- a/src/main/java/tech/stackable/hadoop/TopologyLabel.java +++ b/src/main/java/tech/stackable/hadoop/TopologyLabel.java @@ -10,7 +10,7 @@ public class TopologyLabel { private static final Logger LOG = LoggerFactory.getLogger(TopologyLabel.class); public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; - public static final String VARNAME_MAXLEVELS = "TOPOLOGY_MAX_LEVELS"; + public static final String VARNAME_MAX_LEVELS = "TOPOLOGY_MAX_LEVELS"; private static final int MAX_LEVELS_DEFAULT = 2; public enum Type { @@ -32,7 +32,7 @@ public enum Type { String[] parts = config.toLowerCase(Locale.ROOT).split(":", 2); if (parts.length != 2) { - LOG.warn("Invalid topology label format '{}' - expected '[node|pod]:'", config); + LOG.warn("Invalid topology label format '{}' - expected '[node|pod]: