diff --git a/README.md b/README.md index f277d20..4c0b399 100644 --- a/README.md +++ b/README.md @@ -701,7 +701,7 @@ In Kubernetes, the most commonly used mechanism for topology awareness are label The most prevalent example for this is the node label [topology.kubernetes.io/zone](https://kubernetes.io/docs/reference/labels-annotations-taints/#topologykubernetesiozone) which often refers to availability zones in cloud providers or similar things. The purpose of this tool is to feed information from Kubernetes into the HDFS rack awareness functionality. -In order to do this, it implements the Hadoop interface `org.apache.hadoop.net.DNSToSwitchMapping` which then allows this tool to be configured on the NameNode via the parameter `net.topology.node.switch.mapping.impl`. +In order to do this, `tech.stackable.hadoop.StackableTopologyProvider` implements the Hadoop interface `org.apache.hadoop.net.DNSToSwitchMapping` which then allows this tool to be configured on the NameNode via the parameter `net.topology.node.switch.mapping.impl`. The topology provider watches all HDFS pods deployed by Stackable and Kubernetes nodes and keeps an in memory cache of the current state of these objects. From this state store the tool can then calculate rack IDs for nodes that HDFS asks for without needing to talk to the api-server and incurring an extra network round-trip. diff --git a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java index 4d1e917..a2272b0 100644 --- a/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java +++ b/src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java @@ -1,16 +1,13 @@ package tech.stackable.hadoop; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import io.fabric8.kubernetes.api.model.*; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.net.DNSToSwitchMapping; import org.slf4j.Logger; @@ -18,334 +15,294 @@ /** * An implementation of the org.apache.hadoop.net.DNSToSwitchMapping that is used to create a - * topology out of datanodes. + * topology out of dataNodes. * - *

This class is intended to be run as part of the NameNode process and will be used by the - * namenode to retrieve topology strings for datanodes. + *

This class is intended to be run as part of the NameNode process (in the same namespace) and + * will be used by the nameNode to retrieve topology strings for dataNodes. */ public class StackableTopologyProvider implements DNSToSwitchMapping { + private final Logger LOG = LoggerFactory.getLogger(StackableTopologyProvider.class); - public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; + // Environment variable names public static final String VARNAME_CACHE_EXPIRATION = "TOPOLOGY_CACHE_EXPIRATION_SECONDS"; - public static final String VARNAME_MAXLEVELS = "TOPOLOGY_MAX_LEVELS"; + + // Default values public static final String DEFAULT_RACK = "/defaultRack"; - private static final int MAX_LEVELS_DEFAULT = 2; private static final int CACHE_EXPIRY_DEFAULT_SECONDS = 5 * 60; - private final Logger LOG = LoggerFactory.getLogger(StackableTopologyProvider.class); + // Cache on first usage (not on start-up to avoid attempts before listeners are available) + private String listenerVersion; + private final KubernetesClient client; - private final Cache topologyKeyCache = - Caffeine.newBuilder().expireAfterWrite(getCacheExpiration(), TimeUnit.SECONDS).build(); - private final Cache nodeKeyCache = - Caffeine.newBuilder() - .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) - .build(); - - private final Cache listenerKeyCache = - Caffeine.newBuilder() - .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) - .build(); - private final Cache podKeyCache = - Caffeine.newBuilder() - .expireAfterWrite(CACHE_EXPIRY_DEFAULT_SECONDS, TimeUnit.SECONDS) - .build(); - // The list of labels that this provider uses to generate the topology information for any given - // datanode private final List labels; + // Caching layers + private final TopologyCache cache; + public StackableTopologyProvider() { + // By default, the client will operate within the current namespace this.client = new KubernetesClientBuilder().build(); + this.cache = new TopologyCache(getCacheExpiration(), CACHE_EXPIRY_DEFAULT_SECONDS); + this.labels = TopologyLabel.initializeTopologyLabels(); - // Read the labels to be used to build a topology from environment variables. Labels are - // configured in the EnvVar "TOPOLOGY_LABELS". They should be specified in the form - // "[node|pod]:" and separated by ";". So a valid configuration that reads topology - // information from the labels "kubernetes.io/zone" and "kubernetes.io/rack" on the k8s node - // that is running a datanode pod would look like this: - // "node:kubernetes.io/zone;node:kubernetes.io/rack" By default, there is an upper limit of 2 on - // the number of labels that are processed, because this is what Hadoop traditionally allows - - // this can be overridden via setting the EnvVar "MAX_TOPOLOGY_LEVELS". - String topologyConfig = System.getenv(VARNAME_LABELS); - if (topologyConfig != null && !topologyConfig.isEmpty()) { - String[] labelConfigs = topologyConfig.split(";"); - if (labelConfigs.length > getMaxLabels()) { - LOG.error( - "Found [{}] topology labels configured, but maximum allowed number is [{}]: " - + "please check your config or raise the number of allowed labels.", - labelConfigs.length, - getMaxLabels()); - throw new RuntimeException(); - } - // Create TopologyLabels from config strings - this.labels = - Arrays.stream(labelConfigs).map(TopologyLabel::new).collect(Collectors.toList()); - - // Check if any labelConfigs were invalid - if (this.labels.stream().anyMatch(label -> label.labelType == LabelType.Undefined)) { - LOG.error( - "Topologylabel contained invalid configuration for at least one label: " - + "double check your config! Labels should be specified in the " - + "format '[pod|node]:;...'"); - throw new RuntimeException(); - } + logInitializationStatus(); + } - } else { - LOG.error( - "Missing env var [{}] this is required for rack awareness to work.", VARNAME_LABELS); - throw new RuntimeException(); - } - if (this.labels.isEmpty()) { - LOG.info( - "No topology config found, defaulting value for all datanodes to [{}]", DEFAULT_RACK); - } else { - LOG.info( - "Topology config yields labels [{}]", - this.labels.stream().map(label -> label.name).collect(Collectors.toList())); - } + @Override + public void reloadCachedMappings() { + // TODO: According to the upstream comment we should rebuild all cache entries after + // invalidating them. This may mean trying to resolve ip addresses that do not exist + // any more and things like that though and require some more thought, so we will for + // now just invalidate the cache. + this.cache.invalidateAllTopologyKeys(); } - /*** - * Checks if a value for the maximum number of topology levels to allow has been configured in - * the environment variable specified in VARNAME_MAXLEVELS, - * returns the value of MAX_LEVELS_DEFAULT as default if nothing is set. - * - * @return The maximum number of topology levels to allow. - */ - private int getMaxLabels() { - String maxLevelsConfig = System.getenv(VARNAME_MAXLEVELS); - if (maxLevelsConfig != null && !maxLevelsConfig.isEmpty()) { - try { - int maxLevelsFromEnvVar = Integer.parseInt(maxLevelsConfig); - LOG.info( - "Found [{}] env var, changing allowed number of topology levels to [{}]", - VARNAME_MAXLEVELS, - maxLevelsFromEnvVar); - return maxLevelsFromEnvVar; - } catch (NumberFormatException e) { - LOG.warn( - "Unable to parse value of [{}]/[{}] as integer, using default value [{}]", - VARNAME_MAXLEVELS, - maxLevelsConfig, - MAX_LEVELS_DEFAULT); - } - } - return MAX_LEVELS_DEFAULT; + @Override + public void reloadCachedMappings(List names) { + // TODO: See comment above, the same applies here + cache.invalidateTopologyKeys(names); } - /*** - * Checks if a value for the cache expiration time has been configured in - * the environment variable specified in VARNAME_CACHE_EXPIRATION, - * returns the value of CACHE_EXPIRY_DEFAULT_SECONDS as default if nothing is set. - * - * @return The cache expiration time to use for the rack id cache. - */ private int getCacheExpiration() { - String cacheExpirationConfigSeconds = System.getenv(VARNAME_CACHE_EXPIRATION); - if (cacheExpirationConfigSeconds != null && !cacheExpirationConfigSeconds.isEmpty()) { - try { - int cacheExpirationFromEnvVar = Integer.parseInt(cacheExpirationConfigSeconds); - LOG.info( - "Found [{}] env var, changing cache time for topology entries to [{}]", - VARNAME_CACHE_EXPIRATION, - cacheExpirationFromEnvVar); - return cacheExpirationFromEnvVar; - } catch (NumberFormatException e) { - LOG.warn( - "Unable to parse value of [{}]/[{}] as integer, using default value [{}]", - VARNAME_CACHE_EXPIRATION, - cacheExpirationConfigSeconds, - CACHE_EXPIRY_DEFAULT_SECONDS); - } - } - return CACHE_EXPIRY_DEFAULT_SECONDS; + return TopologyUtils.parseIntFromEnv( + VARNAME_CACHE_EXPIRATION, CACHE_EXPIRY_DEFAULT_SECONDS, "cache expiration seconds"); } - /*** - * - * @param datanode the datanode whose IP mis to be resolved - * @param podLabels the pod labels used in the resolution - * @param nodeLabels the node labels used in the resolution - * - * @return the label looked up from the IP address - */ - private String getLabel( - String datanode, - Map> podLabels, - Map> nodeLabels) { - // The internal structures used by this mapper are all based on IP addresses. Depending on - // configuration and network setup it may (probably will) be possible that the namenode uses - // hostnames to resolve a datanode to a topology zone. To allow this, we resolve every input to - // an ip address below and use the ip to lookup labels. - // TODO: this might break with the listener operator, as `pod.status.podips` won't contain - // external addresses - // tracking this in https://github.com/stackabletech/hdfs-topology-provider/issues/2 - InetAddress address; - try { - address = InetAddress.getByName(datanode); - LOG.debug("Resolved [{}] to [{}]", datanode, address.getHostAddress()); - datanode = address.getHostAddress(); - } catch (UnknownHostException e) { - LOG.warn( - "failed to resolve address for [{}] - this should not happen, " - + "defaulting this node to [{}]", - datanode, - DEFAULT_RACK); - return DEFAULT_RACK; - } - StringBuilder resultBuilder = new StringBuilder(); - for (TopologyLabel label : this.labels) { - if (label.labelType == LabelType.Node) { - LOG.debug( - "Looking for node label [{}] of type [{}] in [{}]/[{}]", - label.name, - label.labelType, - nodeLabels.keySet(), - nodeLabels.values()); - resultBuilder - .append("/") - .append( - nodeLabels - .getOrDefault(datanode, new HashMap<>()) - .getOrDefault(label.name, "NotFound")); - } else if (label.labelType == LabelType.Pod) { - LOG.debug( - "Looking for pod label [{}] of type [{}] in [{}]/[{}]", - label.name, - label.labelType, - podLabels.keySet(), - podLabels.values()); - resultBuilder - .append("/") - .append( - podLabels - .getOrDefault(datanode, new HashMap<>()) - .getOrDefault(label.name, "NotFound")); - } + private void logInitializationStatus() { + if (labels.isEmpty()) { + LOG.info("No topology configuration - will use default rack: {}", DEFAULT_RACK); + } else { + List labelNames = + labels.stream().map(TopologyLabel::getName).collect(Collectors.toList()); + LOG.info("Initialized with topology labels: {}", labelNames); } - String result = resultBuilder.toString(); - LOG.debug("Returning label [{}]", result); - return result; + LOG.debug("Client namespace {}", client.getNamespace()); } @Override public List resolve(List names) { - LOG.info("Resolving for listeners/client-pods [{}]", names.toString()); + LOG.info("Resolving topology for: {}", names); - if (this.labels.isEmpty()) { - LOG.info( - "No topology labels defined, returning [{}] for hdfs nodes: [{}]", DEFAULT_RACK, names); - return names.stream().map(name -> DEFAULT_RACK).collect(Collectors.toList()); + if (labels.isEmpty()) { + return createDefaultRackList(names); } - // We need to check if we have cached values for all datanodes contained in this request. - // Unless we can answer everything from the cache we have to talk to k8s anyway and can just - // recalculate everything - List cachedValues = - names.stream().map(this.topologyKeyCache::getIfPresent).collect(Collectors.toList()); - LOG.debug("Cached topologyKeyCache values [{}]", cachedValues); - - if (cachedValues.contains(null)) { - // We cannot completely serve this request from the cache, since we need to talk to k8s anyway - // we'll simply refresh everything. - LOG.debug( - "Cache doesn't contain values for all requested pods: new values will be built for all nodes."); - } else { - // use same log level as the non-cached return statement - LOG.info("Answering from cached topology keys: [{}]", cachedValues); + // Try to serve from cache first + List cachedValues = tryResolveFromCache(names); + if (cachedValues != null) { + LOG.info("Returning cached topology: {}", cachedValues); return cachedValues; } - // The datanodes will be the cache keys. - List datanodes = + // Cache miss - perform full resolution + return performFullResolution(names); + } + + private List createDefaultRackList(List names) { + LOG.info( + "No topology labels defined, returning [{}] for hdfs nodes: [{}]", DEFAULT_RACK, names); + return names.stream().map(name -> DEFAULT_RACK).collect(Collectors.toList()); + } + + private List tryResolveFromCache(List names) { + // We need to check if we have cached values for all dataNodes contained in this request. + // Unless we can answer everything from the cache we will perform a full resolution. + List cached = names.stream().map(cache::getTopology).collect(Collectors.toList()); + LOG.debug("Cached topologyKeyCache values [{}]", cached); + + return cached.contains(null) ? null : cached; + } + + // ============================================================================ + // RESOLUTION WORKFLOW + // ============================================================================ + + private List performFullResolution(List names) { + LOG.debug("Performing full topology resolution for: {}", names); + + // Step 1: Gather all dataNodes + List dataNodes = fetchDataNodes(); + + // Step 2: Resolve listeners to actual datanode IPs + List resolvedNames = resolveListeners(names); + + // Step 3: Build label lookup maps + Map> podLabels = buildPodLabelMap(dataNodes); + Map> nodeLabels = buildNodeLabelMap(dataNodes); + + // Step 4: Build node-to-datanode map for O(1) colocated lookups + Map nodeToDatanodeIp = buildNodeToDatanodeMap(dataNodes); + + // Step 5: Resolve client pods to co-located dataNodes + List datanodeIps = + resolveClientPodsToDataNodes(resolvedNames, podLabels, nodeToDatanodeIp); + + // Step 6: Build topology strings and cache results + return buildAndCacheTopology(names, datanodeIps, podLabels, nodeLabels); + } + + private List buildAndCacheTopology( + List originalNames, + List datanodeIps, + Map> podLabels, + Map> nodeLabels) { + List result = new ArrayList<>(); + for (int i = 0; i < datanodeIps.size(); i++) { + String datanodeIp = datanodeIps.get(i); + String originalName = originalNames.get(i); + + String topology = buildTopologyString(datanodeIp, podLabels, nodeLabels); + result.add(topology); + + // Cache both the resolved IP and original name + cache.putTopology(datanodeIp, topology); + cache.putTopology(originalName, topology); + } + + LOG.info("Built topology: {}", result); + return result; + } + + // ============================================================================ + // DATANODE FETCHING + // ============================================================================ + + private List fetchDataNodes() { + List dataNodes = client .pods() .withLabel("app.kubernetes.io/component", "datanode") .withLabel("app.kubernetes.io/name", "hdfs") .list() .getItems(); + LOG.debug( - "Retrieved datanodes: [{}]", - datanodes.stream() - .map(datanode -> datanode.getMetadata().getName()) + "Retrieved dataNodes: [{}]", + dataNodes.stream() + .map(dataNode -> dataNode.getMetadata().getName()) .collect(Collectors.toList())); + return dataNodes; + } - List namesToDataNodeNames = dataNodesResolvedFromListenerOrOriginal(names); - LOG.debug("Now resolving: [{}]", namesToDataNodeNames); + // ============================================================================ + // LISTENER RESOLUTION + // ============================================================================ - // Build internal state that is later used to look up information. Basically this transposes pod - // and node lists into hashmaps where pod-IPs can be used to look up labels for the pods and - // nodes. This is not terribly memory efficient because it effectively duplicates a lot of data - // in memory. But since we cache lookups, this should hopefully only be done every once in a - // while and is not kept in memory for extended amounts of time. - List result = new ArrayList<>(); + private String getListenerVersion() { + try { + var crd = + client + .apiextensions() + .v1() + .customResourceDefinitions() + .withName("listeners.listeners.stackable.tech") + .get(); + + if (crd != null && !crd.getSpec().getVersions().isEmpty()) { + // Select the version that is served and used for storage (the "stable" version) + for (var version : crd.getSpec().getVersions()) { + if (version.getServed() && version.getStorage()) { + LOG.debug("Returning served/stored version: {}", version.getName()); + return version.getName(); + } + } + // If no stable version found, return the first served version as a fallback + for (var version : crd.getSpec().getVersions()) { + if (version.getServed()) { + LOG.debug("Returning served/un-stored version: {}", version.getName()); + return version.getName(); + } + } + } + LOG.error("Unable to fetch CRD version for listeners. Returning default value."); + return "v1alpha1"; + } catch (KubernetesClientException e) { + LOG.error("Unable to fetch CRD version for listeners. Throwing exception.", e); + throw new RuntimeException("Unable to fetch CRD version for listeners"); + } + } - Map> nodeLabels = getNodeLabels(datanodes); - LOG.debug("Resolved node labels map [{}]/[{}]", nodeLabels.keySet(), nodeLabels.values()); + private List resolveListeners(List names) { + refreshListenerCacheIfNeeded(names); - Map> podLabels = getPodLabels(datanodes); - LOG.debug("Resolved pod labels map [{}]/[{}]", podLabels.keySet(), podLabels.values()); + return names.stream().map(this::resolveListenerToDatanode).collect(Collectors.toList()); + } - List podsResolvedToDataNodes = - resolveDataNodesFromCallingPods(namesToDataNodeNames, podLabels, datanodes); + private void refreshListenerCacheIfNeeded(List names) { + List missingNames = + names.stream().filter(name -> cache.getListener(name) == null).collect(Collectors.toList()); - // Iterate over all nodes to resolve and return the topology zones - for (int i = 0; i < podsResolvedToDataNodes.size(); i++) { - String builtLabel = getLabel(podsResolvedToDataNodes.get(i), podLabels, nodeLabels); - result.add(builtLabel); + if (missingNames.isEmpty()) { + LOG.debug("Listener cache contains all required entries"); + return; + } + // Listeners are typically few, so fetch all + LOG.debug("Fetching all listeners to populate cache"); + if (listenerVersion == null) { + listenerVersion = getListenerVersion(); + LOG.debug("Fetching all listeners in version {}", listenerVersion); + } + GenericKubernetesResourceList listeners = fetchListeners(listenerVersion); - // Cache the value for potential use in a later request - this.topologyKeyCache.put(podsResolvedToDataNodes.get(i), builtLabel); - // also cache the original name, in case that has resolved to a dataNode (so that - // the resolution step can be omitted next time this pod is encountered) - this.topologyKeyCache.put(names.get(i), builtLabel); + for (GenericKubernetesResource listener : listeners.getItems()) { + cacheListenerByNameAndAddresses(listener); + } + } + + private void cacheListenerByNameAndAddresses(GenericKubernetesResource listener) { + String name = listener.getMetadata().getName(); + LOG.debug("Caching listener by name: {}", name); + cache.putListener(name, listener); + + // Also cache by ingress addresses for quick lookup + for (String address : TopologyUtils.getIngressAddresses(listener)) { + LOG.debug("Caching listener by address: {}", address); + cache.putListener(address, listener); } - LOG.info("Returning resolved labels [{}]", result); - return result; } /** - * If the names include listeners then these must be resolved against the dataNode IPs and used - * subsequently. + * We don't know if the name refers to a listener (it could be any client pod) but we check to see + * if it can be resolved to a dataNode just in case. * - * @param names the collection of names to resolve - * @return a collection of either the name (for non-listener) or the dataNode IP to which this - * listener resolves + * @param name the name of the calling pod which should be resolved to a dataNode IP if it is a + * listener + * @return either the name (for non-listener) or the dataNode IP to which this listener resolves */ - private List dataNodesResolvedFromListenerOrOriginal(List names) { - List cachedListeners = - names.stream().map(this.listenerKeyCache::getIfPresent).collect(Collectors.toList()); - if (cachedListeners.contains(null)) { - LOG.debug( - "Refreshing listener cache as not all of [{}] present in [{}]", - names, - this.listenerKeyCache.asMap().keySet()); - - GenericKubernetesResourceList listeners = getListeners(); - - for (GenericKubernetesResource listener : listeners.getItems()) { - this.listenerKeyCache.put(listener.getMetadata().getName(), listener); - // also add the IPs - for (String ingressAddress : TopologyUtils.getIngressAddresses(listener)) { - this.listenerKeyCache.put(ingressAddress, listener); - } - } - } else { - LOG.debug("Listener cache contains [{}]", names); + private String resolveListenerToDatanode(String name) { + GenericKubernetesResource listener = cache.getListener(name); + if (listener == null) { + LOG.debug("Not a listener: {}", name); + return name; } - ConcurrentMap listeners = this.listenerKeyCache.asMap(); + // We found a listener, so we can resolve it directly + return resolveListenerEndpoint(listener); + } - List listenerToDataNodeNames = new ArrayList<>(); + private String resolveListenerEndpoint(GenericKubernetesResource listener) { + String listenerName = listener.getMetadata().getName(); + Endpoints endpoint = client.endpoints().withName(listenerName).get(); + LOG.debug("Matched ingressAddress [{}]", listenerName); - for (String name : names) { - String resolvedName = resolveDataNodesFromListeners(name, listeners); - listenerToDataNodeNames.add(resolvedName); + if (endpoint.getSubsets().isEmpty()) { + LOG.warn("Endpoint {} has no subsets - pod may be restarting", listenerName); + return listenerName; } - return listenerToDataNodeNames; + + EndpointAddress address = endpoint.getSubsets().get(0).getAddresses().get(0); + LOG.info( + "Resolved listener {} to IP {} on node {}", + listenerName, + address.getIp(), + address.getNodeName()); + + return address.getIp(); } - private GenericKubernetesResourceList getListeners() { + private GenericKubernetesResourceList fetchListeners(String listenerVersion) { ResourceDefinitionContext listenerCrd = new ResourceDefinitionContext.Builder() .withGroup("listeners.stackable.tech") - .withVersion("v1alpha1") + .withVersion(listenerVersion) .withPlural("listeners") .withNamespaced(true) .build(); @@ -353,168 +310,187 @@ private GenericKubernetesResourceList getListeners() { return client.genericKubernetesResources(listenerCrd).list(); } - /** - * We don't know if the name refers to a listener (it could be any client pod) but we check to see - * if it can be resolved to a dataNode just in case. - * - * @param name the name of the calling pod which should be resolved to a dataNode IP if it is a - * listener - * @param listeners the current listener collection - * @return either the name (for non-listener) or the dataNode IP to which this listener resolves - */ - private String resolveDataNodesFromListeners( - String name, ConcurrentMap listeners) { - LOG.debug("Attempting to resolve [{}]", name); - for (GenericKubernetesResource listener : listeners.values()) { - List ingressAddresses = TopologyUtils.getIngressAddresses(listener); - for (String ingressAddress : ingressAddresses) { - LOG.debug("Address [{}]", ingressAddress); - if (name.equalsIgnoreCase(ingressAddress)) { - LOG.debug("Matched ingressAddress [{}]/[{}]", name, listener.getMetadata().getName()); - - Endpoints ep = client.endpoints().withName(listener.getMetadata().getName()).get(); - // TODO: Assuming single address per datanode endpoint. - // When does/can an endpoint support multiple data nodes? On restart the address list will - // be empty for a moment or two. - if (ep.getSubsets().size() < 1) { - LOG.warn( - "Endpoint [{}] address not detected, pod maybe restarting...", - ep.getMetadata().getName()); - } else { - EndpointAddress address = ep.getSubsets().get(0).getAddresses().get(0); - LOG.info( - "Endpoint [{}], IP [{}], node [{}]", - ep.getMetadata().getName(), - address.getIp(), - address.getNodeName()); - return address.getIp(); - } - } - } + // ============================================================================ + // CLIENT POD RESOLUTION + // ============================================================================ + + private List resolveClientPodsToDataNodes( + List names, + Map> podLabels, + Map nodeToDatanodeIp) { + + refreshPodCacheIfNeeded(names); + + return names.stream() + .map(name -> resolveToDatanodeOrKeep(name, podLabels, nodeToDatanodeIp)) + .collect(Collectors.toList()); + } + + private void refreshPodCacheIfNeeded(List names) { + if (cache.hasAllPods(names)) { + LOG.debug("Pod cache contains all required entries"); + return; + } + + // Note: We fetch all pods here because: + // 1. Client pods (Spark, etc.) are queried by IP, not name + // 2. K8s doesn't support "get pod by IP" - we must list and filter + LOG.debug("Refreshing pod cache for client pod resolution"); + for (Pod pod : client.pods().list().getItems()) { + cachePodByNameAndIps(pod); } - LOG.info("Not a listener, returning [{}]", name); - return name; } - /** - * The list of names may be datanodes (as is the case when the topology is initialised) or - * non-datanodes, when the data is being written by a client tool, spark executors etc. In this - * case we want to identify the datanodes that are running on the same node as this client. The - * names may also be pod IPs or pod names. - * - * @param names list of client pods to resolve to datanodes - * @param podLabels map of podIPs and labels - * @param dns list of datanode names which will be used to match nodenames - * @return a list of pods resolved to co-located datanodes where possible - */ - private List resolveDataNodesFromCallingPods( - List names, Map> podLabels, List dns) { - List dataNodes = new ArrayList<>(); - - // if any of the names do not exist in the pod cache then refresh it - List cachedPods = - names.stream().map(this.podKeyCache::getIfPresent).collect(Collectors.toList()); - if (cachedPods.contains(null)) { - LOG.info( - "Refreshing pod cache as not all of [{}] present in [{}]", - names, - this.podKeyCache.asMap().keySet()); - for (Pod pod : client.pods().list().getItems()) { - this.podKeyCache.put(pod.getMetadata().getName(), pod); - // also add an entry for each IP - for (PodIP ip : pod.getStatus().getPodIPs()) { - this.podKeyCache.put(ip.getIp(), pod); - } - } - } else { - LOG.info("Pod cache contains [{}]", names); + private void cachePodByNameAndIps(Pod pod) { + String podName = pod.getMetadata().getName(); + LOG.debug("Refreshing pod cache: adding {}", podName); + cache.putPod(podName, pod); + + // Cache by all IPs - this is crucial for IP-based lookups + for (PodIP ip : pod.getStatus().getPodIPs()) { + cache.putPod(ip.getIp(), pod); } - ConcurrentMap pods = this.podKeyCache.asMap(); - - for (String name : names) { - // if we don't find a dataNode running on the same node as a non-dataNode pod, then - // we'll keep the original name to allow it to be resolved to NotFound in the calling routine. - String replacementDataNodeIp = name; - InetAddress address; - try { - // make sure we are looking up using the IP address - address = InetAddress.getByName(name); - String podIp = address.getHostAddress(); - if (podLabels.containsKey(podIp)) { - replacementDataNodeIp = podIp; - LOG.info("Added as found in the datanode map [{}]", podIp); - } else { - // we've received a call from a non-datanode pod - for (Pod pod : pods.values()) { - if (pod.getStatus().getPodIPs().contains(new PodIP(podIp))) { - String nodeName = pod.getSpec().getNodeName(); - for (Pod dn : dns) { - if (dn.getSpec().getNodeName().equals(nodeName)) { - LOG.debug( - "NodeName [{}] matches with [{}]?", dn.getSpec().getNodeName(), nodeName); - replacementDataNodeIp = dn.getStatus().getPodIP(); - break; - } - } - } - } - } - } catch (UnknownHostException e) { - LOG.warn("Error encountered while resolving host [{}]", e.getLocalizedMessage()); + } + + private String resolveToDatanodeOrKeep( + String name, + Map> podLabels, + Map nodeToDatanodeIp) { + + String ipAddress = resolveToIpAddress(name); + + // If it's already a datanode, return its IP + if (podLabels.containsKey(ipAddress)) { + LOG.info("Name is a datanode: {}", ipAddress); + return ipAddress; + } + + // Try to find co-located datanode + Pod clientPod = cache.getPod(ipAddress); + if (clientPod != null) { + String datanodeIp = findColocatedDatanode(clientPod, nodeToDatanodeIp); + if (datanodeIp != null) { + return datanodeIp; } - dataNodes.add(replacementDataNodeIp); } - LOG.info("Replacing names [{}] with IPs [{}]", names, dataNodes); - return dataNodes; + + // Keep original if we can't resolve + return ipAddress; + } + + private String findColocatedDatanode(Pod clientPod, Map nodeToDatanodeIp) { + String clientNodeName = clientPod.getSpec().getNodeName(); + + if (clientNodeName == null) { + LOG.warn("Client pod {} not yet assigned to node", clientPod.getMetadata().getName()); + return null; + } + + String datanodeIp = nodeToDatanodeIp.get(clientNodeName); + if (datanodeIp == null) { + LOG.debug("No datanode found on node {}", clientNodeName); + } + + return datanodeIp; + } + + private String resolveToIpAddress(String hostname) { + try { + InetAddress address = InetAddress.getByName(hostname); + String ip = address.getHostAddress(); + LOG.debug("Resolved {} to {}", hostname, ip); + return ip; + } catch (UnknownHostException e) { + LOG.warn("Failed to resolve address: {} - defaulting to {}", hostname, DEFAULT_RACK); + return hostname; + } } /** - * Given a list of datanodes, return a HashMap that maps pod ips onto Pod labels. The returned Map - * may contain more entries than the list that is given to this function, as an entry will be - * generated for every ip a pod has. + * Build a map from Kubernetes node name to datanode IP. This enables O(1) lookup when finding + * co-located dataNodes for client pods. * - * @param datanodes List of all retrieved pods. - * @return Map of ip addresses to all labels the pod that "owns" that ip has attached to itself + *

Note: If multiple dataNodes run on the same node, the last one wins. This is acceptable + * because all dataNodes on the same node have the same topology. */ - private Map> getPodLabels(List datanodes) { - Map> result = new HashMap<>(); - // Iterate over all pods and then all ips for every pod and add these to the mapping - for (Pod pod : datanodes) { - for (PodIP podIp : pod.getStatus().getPodIPs()) { - result.put(podIp.getIp(), pod.getMetadata().getLabels()); + private Map buildNodeToDatanodeMap(List dataNodes) { + Map nodeToDatanode = new HashMap<>(); + + for (Pod dataNode : dataNodes) { + String nodeName = dataNode.getSpec().getNodeName(); + String dataNodeIp = dataNode.getStatus().getPodIP(); + + if (nodeName != null && dataNodeIp != null) { + nodeToDatanode.put(nodeName, dataNodeIp); } } + + LOG.debug("Built node-to-datanode map with {} entries", nodeToDatanode.size()); + return nodeToDatanode; + } + + // ============================================================================ + // TOPOLOGY STRING BUILDING + // ============================================================================ + + private String buildTopologyString( + String ipAddress, + Map> podLabels, + Map> nodeLabels) { + StringBuilder topology = new StringBuilder(); + + for (TopologyLabel label : labels) { + String labelValue = extractLabelValue(ipAddress, label, podLabels, nodeLabels); + topology.append("/").append(labelValue); + } + + String result = topology.toString(); + LOG.debug("Returning label [{}]", result); return result; } + private String extractLabelValue( + String ipAddress, + TopologyLabel label, + Map> podLabels, + Map> nodeLabels) { + + Map> labelSource = label.isNodeLabel() ? nodeLabels : podLabels; + + String labelValue = + labelSource + .getOrDefault(ipAddress, Collections.emptyMap()) + .getOrDefault(label.getName(), "NotFound"); + + LOG.debug("Label {}.{} = {}", label.getType(), label.getName(), labelValue); + return labelValue; + } + + // ============================================================================ + // LABEL MAPS + // ============================================================================ + /** - * Given a list of datanodes this function will resolve which datanodes run on which node as well - * as all the ips assigned to a datanodes. It will then return a mapping of every ip address to - * the labels that are attached to the "physical" node running the datanodes that this ip belongs + * Given a list of dataNodes this function will resolve which dataNodes run on which node as well + * as all the ips assigned to a dataNodes. It will then return a mapping of every ip address to + * the labels that are attached to the "physical" node running the dataNodes that this ip belongs * to. * - * @param datanodes List of all in-scope datanodes (datanode pods in this namespace) + * @param dataNodes List of all in-scope dataNodes (datanode pods in this namespace) * @return Map of ip addresses to labels of the node running the pod that the ip address belongs * to */ - private Map> getNodeLabels(List datanodes) { + private Map> buildNodeLabelMap(List dataNodes) { Map> result = new HashMap<>(); - for (Pod pod : datanodes) { - // either retrieve the node from the internal cache or fetch it by name + for (Pod pod : dataNodes) { String nodeName = pod.getSpec().getNodeName(); - // nodeName may be null while pod is being provisioned so force a re-try + if (nodeName == null) { - LOG.warn( - "Pod [{}] not yet assigned to a k8s node, forcing re-try", pod.getMetadata().getName()); + LOG.warn("Pod [{}] not yet assigned to node, retrying", pod.getMetadata().getName()); return result; } - Node node = this.nodeKeyCache.getIfPresent(nodeName); - if (node == null) { - LOG.debug("Node not yet cached, fetching by name [{}]", nodeName); - node = client.nodes().withName(nodeName).get(); - this.nodeKeyCache.put(nodeName, node); - } + Node node = getOrFetchNode(nodeName); Map nodeLabels = node.getMetadata().getLabels(); LOG.debug("Labels for node [{}]:[{}]....", nodeName, nodeLabels); @@ -526,78 +502,35 @@ private Map> getNodeLabels(List datanodes) { return result; } - @Override - public void reloadCachedMappings() { - // TODO: According to the upstream comment we should rebuild all cache entries after - // invalidating them - // this may mean trying to resolve ip addresses that do not exist any more and things like that - // though and - // require some more thought, so we will for now just invalidate the cache. - this.topologyKeyCache.invalidateAll(); - } - - @Override - public void reloadCachedMappings(List names) { - // TODO: See comment above, the same applies here - for (String name : names) { - this.topologyKeyCache.invalidate(name); + private Node getOrFetchNode(String nodeName) { + Node node = cache.getNode(nodeName); + if (node == null) { + LOG.debug("Fetching node: {}", nodeName); + node = client.nodes().withName(nodeName).get(); + cache.putNode(nodeName, node); } + return node; } - private enum LabelType { - Node, - Pod, - Undefined - } + /** + * Given a list of dataNodes, return a HashMap that maps pod ips onto Pod labels. The returned Map + * may contain more entries than the list that is given to this function, as an entry will be + * generated for every ip a pod has. + * + * @param dataNodes List of all retrieved pods. + * @return Map of ip addresses to all labels the pod that "owns" that ip has attached to itself + */ + private Map> buildPodLabelMap(List dataNodes) { + Map> result = new HashMap<>(); + for (Pod pod : dataNodes) { + Map podLabels = pod.getMetadata().getLabels(); + LOG.debug("Labels for pod [{}]:[{}]....", pod.getMetadata().getName(), podLabels); - private class TopologyLabel { - private final LabelType labelType; - private String name = null; - - /** - * Create a new TopologyLabel from its string representation - * - * @param value A string in the form of "[node|pod]:" that is deserialized into a - * TopologyLabel. Invalid and empty strings are resolved into the type unspecified. - */ - private TopologyLabel(String value) { - // If this is null the env var was not set, we will return 'undefined' for this level - if (value == null || value.isEmpty()) { - this.labelType = LabelType.Undefined; - return; - } - String[] split = value.toLowerCase(Locale.ROOT).split(":", 2); - - // This should only fail if no : was present in the string - if (split.length != 2) { - this.labelType = LabelType.Undefined; - LOG.warn( - "Ignoring topology label [{}] - label definitions need to be in the form " - + "of \"[node|pod]:\"", - value); - return; - } - // Length has to be two, proceed with "normal" case - String type = split[0]; - this.name = split[1]; - - // Parse type of object labels should be retrieved from - switch (type) { - case "node": - this.labelType = LabelType.Node; - break; - - case "pod": - this.labelType = LabelType.Pod; - break; - - default: - LOG.warn( - "Encountered unsupported label type [{}] - this label definition will be ignored, " - + "supported types are [\"node\", \"pod\"]", - type); - this.labelType = LabelType.Undefined; + for (PodIP podIp : pod.getStatus().getPodIPs()) { + LOG.debug("...assigned to pod IP [{}]", podIp.getIp()); + result.put(podIp.getIp(), podLabels); } } + return result; } } diff --git a/src/main/java/tech/stackable/hadoop/TopologyCache.java b/src/main/java/tech/stackable/hadoop/TopologyCache.java new file mode 100644 index 0000000..15c520a --- /dev/null +++ b/src/main/java/tech/stackable/hadoop/TopologyCache.java @@ -0,0 +1,75 @@ +package tech.stackable.hadoop; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import io.fabric8.kubernetes.api.model.GenericKubernetesResource; +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.Pod; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** Manages all caching layers for the topology provider. */ +public class TopologyCache { + private final Cache topology; + private final Cache nodes; + private final Cache listeners; + private final Cache pods; + + TopologyCache(int expirationSeconds, int defaultExpirationSeconds) { + this.topology = + Caffeine.newBuilder().expireAfterWrite(expirationSeconds, TimeUnit.SECONDS).build(); + + this.nodes = + Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); + + this.listeners = + Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); + + this.pods = + Caffeine.newBuilder().expireAfterWrite(defaultExpirationSeconds, TimeUnit.SECONDS).build(); + } + + String getTopology(String key) { + return topology.getIfPresent(key); + } + + void putTopology(String key, String value) { + topology.put(key, value); + } + + void invalidateAllTopologyKeys() { + topology.invalidateAll(); + } + + void invalidateTopologyKeys(List keys) { + keys.forEach(topology::invalidate); + } + + Node getNode(String name) { + return nodes.getIfPresent(name); + } + + void putNode(String name, Node node) { + nodes.put(name, node); + } + + GenericKubernetesResource getListener(String name) { + return listeners.getIfPresent(name); + } + + void putListener(String name, GenericKubernetesResource listener) { + listeners.put(name, listener); + } + + Pod getPod(String name) { + return pods.getIfPresent(name); + } + + void putPod(String name, Pod pod) { + pods.put(name, pod); + } + + boolean hasAllPods(List names) { + return names.stream().noneMatch(name -> pods.getIfPresent(name) == null); + } +} diff --git a/src/main/java/tech/stackable/hadoop/TopologyLabel.java b/src/main/java/tech/stackable/hadoop/TopologyLabel.java new file mode 100644 index 0000000..5ba2252 --- /dev/null +++ b/src/main/java/tech/stackable/hadoop/TopologyLabel.java @@ -0,0 +1,116 @@ +package tech.stackable.hadoop; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopologyLabel { + private static final Logger LOG = LoggerFactory.getLogger(TopologyLabel.class); + public static final String VARNAME_LABELS = "TOPOLOGY_LABELS"; + public static final String VARNAME_MAX_LEVELS = "TOPOLOGY_MAX_LEVELS"; + private static final int MAX_LEVELS_DEFAULT = 2; + + public enum Type { + NODE, + POD, + UNDEFINED + } + + private final Type type; + private final String name; + + TopologyLabel(String config) { + if (config == null || config.isEmpty()) { + this.type = Type.UNDEFINED; + this.name = null; + return; + } + + String[] parts = config.toLowerCase(Locale.ROOT).split(":", 2); + + if (parts.length != 2) { + LOG.warn("Invalid topology label format '{}' - expected '[node|pod]: