From a761ed1e4a1831b7da58981cce691138cfcdac02 Mon Sep 17 00:00:00 2001
From: Mark Payne
+ * A method designed to be overridden by subclasses that need to ensure that any + * blockages to FlowFile drainage are removed. The default implementation is a no-op. + * Typical use cases include notifying Processors that block until a certain amount of data is queued up, + * or until certain conditions are met, that they should immediately allow data to flow through. + *
+ */ + protected void ensureDrainageUnblocked() throws InvocationFailedException { + } + + @Override + public List+ * A Connector is a component that encapsulates and manages a NiFi flow, in such a way that the flow + * can be treated as a single component. The Connector is responsible for managing the lifecycle of the flow, + * including starting and stopping the flow, as well as validating that the flow is correctly configured. + * The Connector exposes a single holistic configuration that is encapsulates the configuration of the + * sources, sinks, transformations, routing logic, and any other components that make up the flow. + *
+ * + *+ * Importantly, a Connector represents a higher-level abstraction and is capable of manipulating the associated + * dataflow, including adding, removing, and configuring components within the flow. This allows a single entity to + * be provided such that configuring properties can result in a flow being dynamically reconfigured (e.g., using a + * different Controller Service implementation). + *
+ * + *+ * When a flow definition is created and shared in NiFi, it can be easily instantiated in another NiFi instance. + * The new instance can then be configured via Parameters. However, if more complex configuration is required, such as + * choosing which Controller Service to use, or enabling or disabling a particular transformation step, the user must + * understand how to manipulate the flow directly. Connectors provide the ability to encapsulate such complexity behind + * a higher-level abstraction, allowing users to configure the flow via a guided configuration experience. + *
+ * + *+ * The Connector API makes use of a {@link FlowContext} abstraction in order to provide effectively two separate instances + * of a flow: the Active flow that can be stopped and started in order to process and move data, and a Working flow that + * can be used to verify configuration in order to ensure that when the configuration is applied to the Active flow, it will + * function as desired. + *
+ * + * Implementation Note: This API is currently experimental, as it is under very active development. As such, + * it is subject to change without notice between minor releases. + */ +public interface Connector { + + /** + * Initializes the Connector instance, providing it the necessary context that it needs to operate. + * @param context the context for initialization + */ + void initialize(ConnectorInitializationContext context); + + /** + * Provides the initial version of the flow that this Connector manages. The Active Flow Context will be + * updated to reflect this flow when the Connector is first added to the NiFi instance but not when the Connector + * is reinitialized upon restart of NiFi. + * + * @return the initial version of the flow + */ + VersionedExternalFlow getInitialFlow(); + + // FIXME: Consider adding two subclasses to FlowContext: ActiveFlowContext and WorkingFlowContext + // They would have no methods, but would serve as markers to make it more clear which context is being used + /** + * Starts the Connector instance. + * @throws FlowUpdateException if there is an error starting the Connector + * @param activeFlowContext the active flow context + */ + void start(FlowContext activeFlowContext) throws FlowUpdateException; + + /** + * Stops the Connector instance. + * @throws FlowUpdateException if there is an error stopping the Connector + * @param activeFlowContext the active flow context + */ + void stop(FlowContext activeFlowContext) throws FlowUpdateException; + + /** + * Validates that the Connector is valid according to its current configuration. Validity of a Connector may be + * defined simply as the all components being valid, or it may encompass more complex validation logic, such + * as ensuring that a Source Processor is able to connect to a remote system, or that a Sink Processor + * is able to write to a remote system. + * + * @param activeFlowContext the active flow context + * @param validationContext the context for validation + * + * @return a list of ValidationResults, each of which may indicate a check that was performed and any associated explanations + * as to why the Connector is valid or invalid. + */ + List+ * The ConnectorInitializationContext provides context about how the connector is being run. + * This includes the identifier and name of Connector as well as access to the crucial components that + * it may need to interact with in order to perform its tasks. + *
+ */ +public interface ConnectorInitializationContext { + + /** + * Returns the identifier of the Connector. + * @return the identifier of the Connector + */ + String getIdentifier(); + + /** + * Returns the name of the Connector. + * @return the name of the Connector + */ + String getName(); + + /** + * Returns the ComponentLog that can be used for logging. Use of the ComponentLog is preferred + * over directly constructing a Logger because it integrates with NiFi's logging system to create bulletins + * as well as delegating to the underlying logging framework. + * + * @return the ComponentLog for logging + */ + ComponentLog getLogger(); + + /** + *+ * Updates the Connector's flow to the given VersionedExternalFlow. This may be a long-running process, as it involves + * several steps, to include: + *
+ *+ * Depending on the changes required in order to update the flow to the provided VersionedProcessGroup, this + * could also result in stopping source processors and waiting for queues to drain, etc. + *
+ * + *+ * This method will block until the update is complete. Note that this could result in the associated flow becoming + * invalid if not properly configured. Otherwise, if the Connector is running, any newly added components will also + * be started. + *
+ * + * @param flowContext the context of the flow to be updated + * @param versionedExternalFlow the new representation of the flow + */ + void updateFlow(FlowContext flowContext, VersionedExternalFlow versionedExternalFlow) throws FlowUpdateException; + +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorParameter.java b/src/main/java/org/apache/nifi/components/connector/ConnectorParameter.java new file mode 100644 index 0000000..ec2a985 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorParameter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +public interface ConnectorParameter { + + String getName(); + + String getDescription(); + + String getDefaultValue(); + + boolean isSensitive(); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorParameterContext.java b/src/main/java/org/apache/nifi/components/connector/ConnectorParameterContext.java new file mode 100644 index 0000000..7ef3884 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorParameterContext.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.Map; + +public interface ConnectorParameterContext { + + Mapnull if not set
+ * @throws NumberFormatException if not able to parse
+ */
+ Integer asInteger();
+
+ /**
+ * @return a Long representation of the property value, or null
+ * if not set
+ * @throws NumberFormatException if not able to parse
+ */
+ Long asLong();
+
+ /**
+ * @return a Boolean representation of the property value, or
+ * null if not set
+ */
+ Boolean asBoolean();
+
+ /**
+ * @return a Float representation of the property value, or
+ * null if not set
+ * @throws NumberFormatException if not able to parse
+ */
+ Float asFloat();
+
+ /**
+ * @return a Double representation of the property value, of
+ * null if not set
+ * @throws NumberFormatException if not able to parse
+ */
+ Double asDouble();
+
+ /**
+ * @param timeUnit specifies the TimeUnit to convert the time duration into
+ * @return a Long value representing the value of the configured time period
+ * in terms of the specified TimeUnit; if the property is not set, returns
+ * null
+ */
+ Long asTimePeriod(TimeUnit timeUnit);
+
+ /**
+ * Returns the value as a Duration
+ *
+ * @return a Duration representing the value, or null if the value is unset
+ */
+ Duration asDuration();
+
+ /**
+ *
+ * @param dataUnit specifies the DataUnit to convert the data size into
+ * @return a Long value representing the value of the configured data size
+ * in terms of the specified DataUnit; if hte property is not set, returns
+ * null
+ */
+ Double asDataSize(DataUnit dataUnit);
+
+ /**
+ * Returns the value as a comma-separated list of values. Leading and trailing
+ * whitespace is trimmed from each value.
+ * @return the property value as a List of Strings
+ */
+ Listtrue if the user has configured a value, or if the
+ * {@link ConnectorPropertyDescriptor} for the associated property has a default
+ * value, false otherwise
+ */
+ boolean isSet();
+
+}
diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorValidationContext.java b/src/main/java/org/apache/nifi/components/connector/ConnectorValidationContext.java
new file mode 100644
index 0000000..bf67b81
--- /dev/null
+++ b/src/main/java/org/apache/nifi/components/connector/ConnectorValidationContext.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.components.ValidationContext;
+
+import java.util.List;
+
+public interface ConnectorValidationContext {
+
+ ValidationContext createValidationContext(String stepName, String groupName);
+
+ List+ * Annotation that can be added to a method in a Processor or ControllerService in order + * to expose the method to connectors for invocation. The method must be public and + * not static. The method may return a value. However, the value that is returned will + * be converted into a JSON object and that JSON object will be returned to the caller. + *
+ * + *+ * The following example shows a method that is exposed to connectors: + *
+ * +
+ * {@code
+ * @ConnectorMethod(
+ * name = "echo",
+ * description = "Returns the provided text after concatenating it the specified number of times.",
+ * allowedStates = {ComponentState.STOPPED, ComponentState.STOPPING, ComponentState.STARTING, ComponentState.RUNNING},
+ * arguments = {
+ * @MethodArgument(name = "text", type = String.class, description = "The text to echo", required = true),
+ * @MethodArgument(name = "iterations", type = int.class, description = "The number of iterations to echo the text", required = false)
+ * }
+ * )
+ * public String echo(Map arguments) {
+ * final StringBuilder sb = new StringBuilder();
+ * final String text = (String) arguments.get("text");
+ * final int iterations = (int) arguments.getOrDefault("iterations", 2);
+ * for (int i = 0; i < iterations; i++) {
+ * sb.append(text);
+ *
+ * if (i < (iterations - 1)) {
+ * sb.append("\n");
+ * }
+ * }
+ *
+ * return sb.toString();
+ * }
+ * }
+ *
+ */
+@Documented
+@Target({ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface ConnectorMethod {
+ /**
+ * The name of the method as it will be exposed to connectors.
+ * @return the method name
+ */
+ String name();
+
+ /**
+ * A description of the method
+ * @return the method description
+ */
+ String description() default "";
+
+ /**
+ * The states in which the component that defines the method is allowed to be in
+ * when the method is invoked. If the Processor or ControllerService is not in one of these states,
+ * any attempt to invoke the method will result in an error. The default states include all but PROCESSOR_DISABLED.
+ *
+ * @return the states in which the component that defines the method is allowed to be in when the method is invoked
+ */
+ ComponentState[] allowedStates() default {
+ ComponentState.STOPPED,
+ ComponentState.STOPPING,
+ ComponentState.STARTING,
+ ComponentState.RUNNING
+ };
+
+ /**
+ * The arguments that the method accepts. Each argument is described by a MethodArgument annotation.
+ * If no arguments are required, this can be left empty.
+ *
+ * @return the method arguments
+ */
+ MethodArgument[] arguments() default {};
+}
diff --git a/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceFacade.java b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceFacade.java
new file mode 100644
index 0000000..b0eae11
--- /dev/null
+++ b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceFacade.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector.components;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.connector.InvocationFailedException;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedParameterContext;
+
+import java.util.List;
+import java.util.Map;
+
+public interface ControllerServiceFacade {
+
+ VersionedControllerService getDefinition();
+
+ ControllerServiceLifecycle getLifecycle();
+
+ List
@@ -133,7 +134,7 @@ public interface Connector {
* @return a list of ConfigVerificationResults, each of which may indicate a check that was performed and any associated explanation
* as to why the configuration step verification succeeded, failed, or was skipped.
*/
- List