Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
From ed80d426e85c7b741d865f866092e89b61742c10 Mon Sep 17 00:00:00 2001
From: Andrew Kenworthy <andrew.kenworthy@stackable.tech>
Date: Fri, 10 Oct 2025 15:28:56 +0200
Subject: replace process groups root with root ID

---
.../org/apache/nifi/util/NiFiProperties.java | 3 ++
.../nifi/flow/FlowInitializationCallback.java | 9 ++++
.../FileAccessPolicyProvider.java | 43 +++++++++++++++++++
.../FileAuthorizerInitializer.java | 25 +++++++++++
.../nifi/controller/StandardFlowService.java | 17 ++++++++
5 files changed, 97 insertions(+)
create mode 100644 nifi-framework-api/src/main/java/org/apache/nifi/flow/FlowInitializationCallback.java
create mode 100644 nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAuthorizerInitializer.java

diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 4bd2f4f810..24d31960b7 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -336,6 +336,9 @@ public class NiFiProperties extends ApplicationProperties {
// performance tracking defaults
public static final int DEFAULT_TRACK_PERFORMANCE_PERCENTAGE = 0;

+ // root process group replacement
+ public static final String ROOT_PROCESS_GROUP_PLACEHOLDER ="nifi.process.group.root.placeholder";
+
// defaults
public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = "conf/authorizers.xml";
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/flow/FlowInitializationCallback.java b/nifi-framework-api/src/main/java/org/apache/nifi/flow/FlowInitializationCallback.java
new file mode 100644
index 0000000000..3039c97497
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/flow/FlowInitializationCallback.java
@@ -0,0 +1,9 @@
+package org.apache.nifi.flow;
+
+/**
+ * Simple callback interface invoked when the root process group has been
+ * loaded and the flow is fully initialized for the first time.
+ */
+public interface FlowInitializationCallback {
+ void onRootGroupLoaded();
+}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAccessPolicyProvider.java b/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAccessPolicyProvider.java
index 5363bb5619..a03a18d444 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAccessPolicyProvider.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAccessPolicyProvider.java
@@ -29,6 +29,7 @@ import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.util.IdentityMapping;
import org.apache.nifi.authorization.util.IdentityMappingUtil;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.StandardFlowService;
import org.apache.nifi.util.FlowInfo;
import org.apache.nifi.util.FlowParser;
import org.apache.nifi.util.NiFiProperties;
@@ -77,6 +78,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

+import static org.apache.nifi.util.NiFiProperties.ROOT_PROCESS_GROUP_PLACEHOLDER;
+
public class FileAccessPolicyProvider implements ConfigurableAccessPolicyProvider {

private static final Logger logger = LoggerFactory.getLogger(FileAccessPolicyProvider.class);
@@ -133,6 +136,9 @@ public class FileAccessPolicyProvider implements ConfigurableAccessPolicyProvide
public void initialize(AccessPolicyProviderInitializationContext initializationContext) throws AuthorizerCreationException {
userGroupProviderLookup = initializationContext.getUserGroupProviderLookup();

+ // Register flow initialization hook
+ StandardFlowService.registerInitializationCallback(new FileAuthorizerInitializer(this));
+
try {
final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
authorizationsSchema = schemaFactory.newSchema(FileAccessPolicyProvider.class.getResource(AUTHORIZATIONS_XSD));
@@ -744,6 +750,43 @@ public class FileAccessPolicyProvider implements ConfigurableAccessPolicyProvide
}
}

+ /**
+ * Replaces process group root references with the process group ID.
+ * Relevant when a static authorizations file is provided, which can
+ * then use "root" as a placeholder.
+ */
+ public void replaceWithRootGroupId() throws JAXBException {
+ String placeholder = this.properties.getProperty(ROOT_PROCESS_GROUP_PLACEHOLDER, "");
+
+ if (StringUtils.isNotBlank(placeholder)) {
+ if (rootGroupId == null) {
+ logger.info("Parsing flow as rootGroupId is not yet defined");
+ parseFlow();
+ }
+ if (rootGroupId != null) {
+ logger.info("Parsing root group with {}", rootGroupId);
+ Authorizations authorizations = this.authorizationsHolder.get().getAuthorizations();
+ boolean authorizationsChanged = false;
+ for (Policy policy: authorizations.getPolicies().getPolicy()) {
+ String resource = policy.getResource();
+ String processGroupRoot = ResourceType.ProcessGroup.getValue() + "/" + placeholder;
+ if (resource.endsWith(processGroupRoot)) {
+ int pos = resource.indexOf(processGroupRoot);
+ policy.setResource(resource.substring(0, pos) + ResourceType.ProcessGroup.getValue() + "/" + rootGroupId);
+ authorizationsChanged = true;
+ }
+ }
+ if (authorizationsChanged) {
+ saveAndRefreshHolder(authorizations);
+ }
+ } else {
+ // this is not expected as this is called from the flow service
+ // once it has been configured
+ logger.info("rootGroupId still not established!");
+ }
+ }
+ }
+
/**
* Creates and adds an access policy for the given resource, group identity, and actions to the specified authorizations.
*
diff --git a/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAuthorizerInitializer.java b/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAuthorizerInitializer.java
new file mode 100644
index 0000000000..f67328ef84
--- /dev/null
+++ b/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAuthorizerInitializer.java
@@ -0,0 +1,25 @@
+package org.apache.nifi.authorization;
+
+import org.apache.nifi.flow.FlowInitializationCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FileAuthorizerInitializer implements FlowInitializationCallback {
+ private static final Logger logger = LoggerFactory.getLogger(FileAuthorizerInitializer.class);
+private FileAccessPolicyProvider fileAccessPolicyProvider;
+
+ public FileAuthorizerInitializer(FileAccessPolicyProvider fileAccessPolicyProvider) {
+ this.fileAccessPolicyProvider = fileAccessPolicyProvider;
+ }
+
+ @Override
+ public void onRootGroupLoaded() {
+ try {
+ logger.info("Flow initialized; ensuring root group ID is recorded in authorizations.xml");
+ this.fileAccessPolicyProvider.replaceWithRootGroupId();
+ } catch (Exception e) {
+ logger.warn("Unable to update authorizations.xml with root group ID", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 09f4d38f77..b0137c8302 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -55,6 +55,7 @@ import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.flow.FlowInitializationCallback;
import org.apache.nifi.groups.BundleUpdateStrategy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
@@ -148,6 +149,13 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
private static final String CONNECTION_EXCEPTION_MSG_PREFIX = "Failed to connect node to cluster";
private static final Logger logger = LoggerFactory.getLogger(StandardFlowService.class);

+ // Static callback registration for post-initialization hooks
+ private static volatile FlowInitializationCallback initializationCallback;
+
+ public static void registerInitializationCallback(FlowInitializationCallback callback) {
+ initializationCallback = callback;
+ }
+
public static StandardFlowService createStandaloneInstance(
final FlowController controller,
final NiFiProperties nifiProperties,
@@ -933,6 +941,15 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// start the processors as indicated by the dataflow
controller.onFlowInitialized(autoResumeState);

+ // this should be done once the flow has been initialized
+ if (initializationCallback != null) {
+ try {
+ initializationCallback.onRootGroupLoaded();
+ } catch (Exception e) {
+ logger.warn("Error invoking FlowInitializationCallback", e);
+ }
+ }
+
loadSnippets(dataFlow.getSnippets());

controller.startHeartbeating();
Loading