1616 * An implementation of the org.apache.hadoop.net.DNSToSwitchMapping that is used to create a
1717 * topology out of dataNodes.
1818 *
19- * <p>This class is intended to be run as part of the NameNode process and will be used by the
20- * nameNode to retrieve topology strings for dataNodes.
19+ * <p>This class is intended to be run as part of the NameNode process (in the same namespace) and
20+ * will be used by the nameNode to retrieve topology strings for dataNodes.
2121 */
2222public class StackableTopologyProvider implements DNSToSwitchMapping {
2323 private final Logger LOG = LoggerFactory .getLogger (StackableTopologyProvider .class );
@@ -47,11 +47,10 @@ public StackableTopologyProvider() {
4747 @ Override
4848 public void reloadCachedMappings () {
4949 // TODO: According to the upstream comment we should rebuild all cache entries after
50- // invalidating them
51- // this may mean trying to resolve ip addresses that do not exist any more and things like that
52- // though and
53- // require some more thought, so we will for now just invalidate the cache.
54- this .cache .invalidateAll ();
50+ // invalidating them. This may mean trying to resolve ip addresses that do not exist
51+ // any more and things like that though and require some more thought, so we will for
52+ // now just invalidate the cache.
53+ this .cache .invalidateAllTopologyKeys ();
5554 }
5655
5756 @ Override
@@ -73,10 +72,7 @@ private void logInitializationStatus() {
7372 labels .stream ().map (TopologyLabel ::getName ).collect (Collectors .toList ());
7473 LOG .info ("Initialized with topology labels: {}" , labelNames );
7574 }
76- LOG .debug (
77- "Client namespaces {} and configuration {}" ,
78- client .getNamespace (),
79- client .getConfiguration ());
75+ LOG .debug ("Client namespace {}" , client .getNamespace ());
8076 }
8177
8278 @ Override
@@ -106,8 +102,7 @@ private List<String> createDefaultRackList(List<String> names) {
106102
107103 private List <String > tryResolveFromCache (List <String > names ) {
108104 // We need to check if we have cached values for all dataNodes contained in this request.
109- // Unless we can answer everything from the cache we have to talk to k8s anyway and can just
110- // recalculate everything
105+ // Unless we can answer everything from the cache we will perform a full resolution.
111106 List <String > cached = names .stream ().map (cache ::getTopology ).collect (Collectors .toList ());
112107 LOG .debug ("Cached topologyKeyCache values [{}]" , cached );
113108
@@ -263,10 +258,10 @@ private String resolveListenerEndpoint(GenericKubernetesResource listener) {
263258 }
264259
265260 private GenericKubernetesResourceList fetchListeners () {
261+ // no version is specified here as we are not always going to be on v1alpha1
266262 ResourceDefinitionContext listenerCrd =
267263 new ResourceDefinitionContext .Builder ()
268264 .withGroup ("listeners.stackable.tech" )
269- .withVersion ("v1alpha1" )
270265 .withPlural ("listeners" )
271266 .withNamespaced (true )
272267 .build ();
@@ -488,8 +483,10 @@ private Map<String, Map<String, String>> buildPodLabelMap(List<Pod> dataNodes) {
488483 Map <String , Map <String , String >> result = new HashMap <>();
489484 for (Pod pod : dataNodes ) {
490485 Map <String , String > podLabels = pod .getMetadata ().getLabels ();
486+ LOG .debug ("Labels for pod [{}]:[{}]...." , pod .getMetadata ().getName (), podLabels );
491487
492488 for (PodIP podIp : pod .getStatus ().getPodIPs ()) {
489+ LOG .debug ("...assigned to pod IP [{}]" , podIp .getIp ());
493490 result .put (podIp .getIp (), podLabels );
494491 }
495492 }
0 commit comments