diff --git a/checkstyle.xml b/checkstyle.xml index 7c87426..0f93902 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -60,7 +60,6 @@ - diff --git a/src/main/java/org/apache/nifi/action/Component.java b/src/main/java/org/apache/nifi/action/Component.java index 9a65185..085e37f 100644 --- a/src/main/java/org/apache/nifi/action/Component.java +++ b/src/main/java/org/apache/nifi/action/Component.java @@ -38,5 +38,7 @@ public enum Component { AccessPolicy, User, UserGroup, - Label; + Label, + Connector; + } diff --git a/src/main/java/org/apache/nifi/annotation/lifecycle/OnAdded.java b/src/main/java/org/apache/nifi/annotation/lifecycle/OnAdded.java index 4933a38..7656f87 100644 --- a/src/main/java/org/apache/nifi/annotation/lifecycle/OnAdded.java +++ b/src/main/java/org/apache/nifi/annotation/lifecycle/OnAdded.java @@ -29,9 +29,10 @@ * {@link org.apache.nifi.controller.ControllerService ControllerService}, * {@link org.apache.nifi.registry.flow.FlowRegistryClient FlowRegistryClient}, * {@link org.apache.nifi.parameter.ParameterProvider ParameterProvider}, - * {@link org.apache.nifi.flowanalysis.FlowAnalysisRule FlowAnalysisRule}, or - * {@link org.apache.nifi.reporting.ReportingTask ReportingTask} implementation - * can use to indicate a method should be called whenever the component is added + * {@link org.apache.nifi.flowanalysis.FlowAnalysisRule FlowAnalysisRule}, + * {@link org.apache.nifi.reporting.ReportingTask ReportingTask}, or + * {@link org.apache.nifi.components.connector.Connector Connector} + * implementation can use to indicate a method should be called whenever the component is added * to the flow. This method will be called once for the entire life of a * component instance. *

diff --git a/src/main/java/org/apache/nifi/components/ConfigVerificationResult.java b/src/main/java/org/apache/nifi/components/ConfigVerificationResult.java index 9f2ab77..6c39db2 100644 --- a/src/main/java/org/apache/nifi/components/ConfigVerificationResult.java +++ b/src/main/java/org/apache/nifi/components/ConfigVerificationResult.java @@ -20,11 +20,13 @@ public class ConfigVerificationResult { private final Outcome outcome; private final String verificationStepName; + private final String subject; private final String explanation; private ConfigVerificationResult(final Builder builder) { outcome = builder.outcome; verificationStepName = builder.verificationStepName; + subject = builder.subject; explanation = builder.explanation; } @@ -36,6 +38,10 @@ public String getVerificationStepName() { return verificationStepName; } + public String getSubject() { + return subject; + } + public String getExplanation() { return explanation; } @@ -44,6 +50,7 @@ public String getExplanation() { public String toString() { return "ConfigVerificationResult[" + "outcome=" + outcome + + ", subject=" + (subject == null ? "null" : "'" + subject + "'") + ", verificationStepName='" + verificationStepName + "'" + ", explanation='" + explanation + "']"; } @@ -51,6 +58,7 @@ public String toString() { public static class Builder { private Outcome outcome = Outcome.SKIPPED; private String verificationStepName = "Unknown Step Name"; + private String subject; private String explanation; public Builder outcome(final Outcome outcome) { @@ -63,6 +71,11 @@ public Builder verificationStepName(final String verificationStepName) { return this; } + public Builder subject(final String subject) { + this.subject = subject; + return this; + } + public Builder explanation(final String explanation) { this.explanation = explanation; return this; diff --git a/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java new file mode 100644 index 0000000..6db3acc --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/AbstractConnector.java @@ -0,0 +1,588 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.components.ConnectionFacade; +import org.apache.nifi.components.connector.components.ControllerServiceFacade; +import org.apache.nifi.components.connector.components.ControllerServiceReferenceHierarchy; +import org.apache.nifi.components.connector.components.ControllerServiceReferenceScope; +import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.components.ProcessGroupFacade; +import org.apache.nifi.components.connector.components.ProcessGroupLifecycle; +import org.apache.nifi.components.connector.components.ProcessorFacade; +import org.apache.nifi.components.connector.components.ProcessorState; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.logging.ComponentLog; + +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public abstract class AbstractConnector implements Connector { + private volatile ConnectorInitializationContext initializationContext; + private volatile ComponentLog logger; + private volatile CompletableFuture prepareUpdateFuture; + + protected abstract void onStepConfigured(final String stepName, final FlowContext workingContext) throws FlowUpdateException; + + + @Override + public final void initialize(final ConnectorInitializationContext context) { + this.initializationContext = context; + this.logger = context.getLogger(); + + init(); + } + + /** + * No-op method for subclasses to override to perform any initialization logic + */ + protected void init() { + } + + protected final ComponentLog getLogger() { + return logger; + } + + protected final ConnectorInitializationContext getInitializationContext() { + if (initializationContext == null) { + throw new IllegalStateException("Connector has not been initialized"); + } + + return initializationContext; + } + + @Override + public void start(final FlowContext context) throws FlowUpdateException { + final ProcessGroupLifecycle lifecycle = context.getRootGroup().getLifecycle(); + + try { + lifecycle.enableControllerServices(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY, ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(); + } catch (final Exception e) { + throw new FlowUpdateException("Failed to enable Controller Services", e); + } + + lifecycle.startProcessors(); + } + + @Override + public void stop(final FlowContext context) throws FlowUpdateException { + try { + stopAsync(context).get(); + } catch (final Exception e) { + throw new FlowUpdateException("Failed to stop Connector", e); + } + } + + private CompletableFuture stopAsync(final FlowContext context) { + final ProcessGroupFacade rootGroup = context.getRootGroup(); + final ProcessGroupLifecycle lifecycle = rootGroup.getLifecycle(); + + final CompletableFuture stopProcessorsFuture = lifecycle.stopProcessors() + .orTimeout(1, TimeUnit.MINUTES) + .exceptionally(throwable -> { + if (throwable instanceof TimeoutException || throwable.getCause() instanceof TimeoutException) { + final List running = findProcessors(rootGroup, processor -> + processor.getLifecycle().getState() != ProcessorState.STOPPED && processor.getLifecycle().getState() != ProcessorState.DISABLED); + + if (!running.isEmpty()) { + getLogger().warn("After waiting 60 seconds for all Processors to stop, {} are still running. Terminating now.", running.size()); + running.forEach(processor -> processor.getLifecycle().terminate()); + } + + // Continue with the chain after handling timeout + return null; + } else { + throw new RuntimeException("Failed to stop all Processors", throwable); + } + }); + + return stopProcessorsFuture.thenRun(() -> { + try { + lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(); + } catch (final Exception e) { + throw new RuntimeException("Failed to complete disabling of all Controller Services", e); + } + }); + } + + @Override + public void prepareForUpdate(final FlowContext workingContext, final FlowContext activeContext) throws FlowUpdateException { + final CompletableFuture future = stopAsync(activeContext); + prepareUpdateFuture = future; + + try { + future.get(); + } catch (final Exception e) { + throw new FlowUpdateException("Failed to prepare Connector for update", e); + } + } + + /** + * Drains all FlowFiles from the Connector instance. + * + * @param flowContext the FlowContext to use for drainage + * @return a CompletableFuture that will be completed when drainage is complete + */ + protected CompletableFuture drainFlowFiles(final FlowContext flowContext) { + final CompletableFuture result = new CompletableFuture<>(); + final QueueSize initialQueueSize = flowContext.getRootGroup().getQueueSize(); + if (initialQueueSize.getObjectCount() == 0) { + getLogger().debug("No FlowFiles to drain from Connector"); + result.complete(null); + return result; + } + + getLogger().info("Draining {} FlowFiles ({} bytes) from Connector", + initialQueueSize.getObjectCount(), NumberFormat.getNumberInstance().format(initialQueueSize.getByteCount())); + + final CompletableFuture stopProcessorsFuture = stopSourceProcessors(flowContext); + + final CompletableFuture startNonSourceFuture = stopProcessorsFuture.thenRun(() -> { + if (result.isDone()) { + return; + } + + final CompletableFuture enableServices = flowContext.getRootGroup().getLifecycle().enableControllerServices( + ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY, + ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS); + + try { + // Wait for all referenced services to be enabled. + enableServices.get(); + + if (!result.isDone()) { + getLogger().info("Starting all non-source processors to facilitate drainage of FlowFiles"); + startNonSourceProcessors(flowContext).get(); + } + } catch (final Exception e) { + try { + flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(); + } catch (final Exception e1) { + e.addSuppressed(e1); + } + + result.completeExceptionally(new RuntimeException("Failed to start non-source processors while draining FlowFiles", e.getCause())); + } + }); + + startNonSourceFuture.thenRun(() -> { + try { + ensureDrainageUnblocked(); + } catch (final Exception e) { + getLogger().warn("Failed to ensure drainage is unblocked when draining FlowFiles", e); + } + + Exception failureReason = null; + int iterations = 0; + while (!isGroupDrained(flowContext.getRootGroup())) { + if (result.isDone()) { + getLogger().info("Drainage has been cancelled; will no longer wait for FlowFiles to drain"); + break; + } + + // Log the current queue size every 10 seconds (20 iterations of 500ms) so that it's clear + // whether or not progress is being made. + if (iterations++ % 20 == 0) { + final QueueSize queueSize = flowContext.getRootGroup().getQueueSize(); + getLogger().info("Waiting for {} FlowFiles ({} bytes) to drain", + queueSize.getObjectCount(), NumberFormat.getNumberInstance().format(queueSize.getByteCount())); + } + + try { + Thread.sleep(500); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + failureReason = e; + break; + } + } + + // Log completion unless the result was completed exceptionally or cancelled. + if (!result.isDone()) { + getLogger().info("All {} FlowFiles have drained from Connector", initialQueueSize.getObjectCount()); + } + + try { + stop(flowContext); + } catch (final Exception e) { + getLogger().warn("Failed to stop source Processors after draining FlowFiles", e); + if (failureReason == null) { + failureReason = e; + } else { + failureReason.addSuppressed(e); + } + } + + if (failureReason != null && !result.isDone()) { + result.completeExceptionally(new RuntimeException("Interrupted while waiting for " + AbstractConnector.this + " to drain", failureReason)); + } + + if (!result.isDone()) { + result.complete(null); + } + }); + + return result; + } + + /** + *

+ * A method designed to be overridden by subclasses that need to ensure that any + * blockages to FlowFile drainage are removed. The default implementation is a no-op. + * Typical use cases include notifying Processors that block until a certain amount of data is queued up, + * or until certain conditions are met, that they should immediately allow data to flow through. + *

+ */ + protected void ensureDrainageUnblocked() throws InvocationFailedException { + } + + @Override + public List verify(final FlowContext flowContext) { + final List results = new ArrayList<>(); + + final List configSteps = getConfigurationSteps(flowContext); + for (final ConfigurationStep configStep : configSteps) { + final List stepResults = verifyConfigurationStep(configStep.getName(), Map.of(), flowContext); + results.addAll(stepResults); + } + + return results; + } + + @Override + public List validate(final FlowContext flowContext, final ConnectorValidationContext validationContext) { + final ConnectorConfigurationContext configContext = flowContext.getConfigurationContext(); + final List results = new ArrayList<>(); + final List configurationSteps = getConfigurationSteps(flowContext); + + for (final ConfigurationStep configurationStep : configurationSteps) { + results.addAll(validateConfigurationStep(configurationStep, configContext, validationContext)); + } + + // only run customValidate if regular validation is successful. This allows Processor developers to not have to check + // if values are null or invalid so that they can focus only on the interaction between the properties, etc. + if (results.isEmpty()) { + final Collection customResults = customValidate(configContext); + if (customResults != null) { + for (final ValidationResult result : customResults) { + if (!result.isValid()) { + results.add(result); + } + } + } + } + + return results; + } + + + protected List validateComponents(final FlowContext context, final ProcessGroupFacade group, final ConnectorValidationContext validationContext) { + final List validationResults = new ArrayList<>(); + validateComponents(context, group, validationContext, validationResults); + return validationResults; + } + + private void validateComponents(final FlowContext context, final ProcessGroupFacade group, final ConnectorValidationContext validationContext, + final List validationResults) { + + for (final ProcessorFacade processor : group.getProcessors()) { + final List processorResults = processor.validate(); + for (final ValidationResult result : processorResults) { + if (result.isValid()) { + continue; + } + + validationResults.add(new ValidationResult.Builder() + .valid(false) + .subject(result.getSubject()) + .input(result.getInput()) + .explanation("Processor [%s] is invalid: %s".formatted(processor.getDefinition().getName(), result.getExplanation())) + .build()); + } + } + + final Set referencedServices = group.getControllerServices( + ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY, + ControllerServiceReferenceHierarchy.DIRECT_SERVICES_ONLY); + + for (final ControllerServiceFacade service : referencedServices) { + final List serviceResults = service.validate(); + for (final ValidationResult result : serviceResults) { + if (result.isValid()) { + continue; + } + + validationResults.add(new ValidationResult.Builder() + .valid(false) + .subject(result.getSubject()) + .input(result.getInput()) + .explanation("Controller Service [%s] is invalid: %s".formatted(service.getDefinition().getName(), result.getExplanation())) + .build()); + } + } + + for (final ProcessGroupFacade childGroup : group.getProcessGroups()) { + validateComponents(context, childGroup, validationContext, validationResults); + } + } + + protected boolean isGroupDrained(final ProcessGroupFacade group) { + return group.getQueueSize().getObjectCount() == 0; + } + + protected CompletableFuture stopSourceProcessors(final FlowContext context) { + final List sourceProcessors = getSourceProcessors(context.getRootGroup()); + + final List> stopFutures = new ArrayList<>(); + for (final ProcessorFacade sourceProcessor : sourceProcessors) { + final CompletableFuture stopFuture = sourceProcessor.getLifecycle().stop(); + stopFutures.add(stopFuture); + } + + return CompletableFuture.allOf(stopFutures.toArray(new CompletableFuture[0])); + } + + protected CompletableFuture startNonSourceProcessors(final FlowContext flowContext) { + final List nonSourceProcessors = getNonSourceProcessors(flowContext.getRootGroup()); + + final List> startFutures = new ArrayList<>(); + for (final ProcessorFacade nonSourceProcessor : nonSourceProcessors) { + final CompletableFuture startFuture = nonSourceProcessor.getLifecycle().start(); + startFutures.add(startFuture); + } + + return CompletableFuture.allOf(startFutures.toArray(new CompletableFuture[0])); + } + + protected List getSourceProcessors(final ProcessGroupFacade group) { + final Set nonSourceIds = getNonSourceProcessorIds(group); + + return findProcessors(group, processor -> !nonSourceIds.contains(processor.getDefinition().getIdentifier())); + } + + protected List getNonSourceProcessors(final ProcessGroupFacade group) { + final Set nonSourceIds = getNonSourceProcessorIds(group); + + return findProcessors(group, processor -> nonSourceIds.contains(processor.getDefinition().getIdentifier())); + } + + protected Set getNonSourceProcessorIds(final ProcessGroupFacade group) { + final Set destinationIds = new HashSet<>(); + forEachConnection(group, conn -> { + final VersionedConnection definition = conn.getDefinition(); + final String sourceId = definition.getSource().getId(); + final String destinationId = definition.getDestination().getId(); + if (!Objects.equals(sourceId, destinationId)) { + destinationIds.add(destinationId); + } + }); + + return destinationIds; + } + + protected List findProcessors(final ProcessGroupFacade group, final Predicate filter) { + final List matching = new ArrayList<>(); + findProcessors(group, filter, matching); + return matching; + } + + private void findProcessors(final ProcessGroupFacade group, final Predicate filter, final List found) { + for (final ProcessorFacade processor : group.getProcessors()) { + if (filter.test(processor)) { + found.add(processor); + } + } + + for (final ProcessGroupFacade childGroup : group.getProcessGroups()) { + findProcessors(childGroup, filter, found); + } + } + + private void forEachConnection(final ProcessGroupFacade group, final Consumer connectionConsumer) { + for (final ConnectionFacade connection : group.getConnections()) { + connectionConsumer.accept(connection); + } + + for (final ProcessGroupFacade childGroup : group.getProcessGroups()) { + forEachConnection(childGroup, connectionConsumer); + } + } + + + @Override + public List validateConfigurationStep(final ConfigurationStep configurationStep, final ConnectorConfigurationContext configurationContext, + final ConnectorValidationContext validationContext) { + + final String stepName = configurationStep.getName(); + final List results = new ArrayList<>(); + + final List propertyGroups = configurationStep.getPropertyGroups(); + for (final ConnectorPropertyGroup propertyGroup : propertyGroups) { + final List descriptors = propertyGroup.getProperties(); + final Map descriptorMap = descriptors.stream() + .collect(Collectors.toMap(ConnectorPropertyDescriptor::getName, Function.identity())); + + final Function propertyValueLookup = name -> configurationContext.getProperty(stepName, name); + + for (final ConnectorPropertyDescriptor descriptor : descriptors) { + final boolean dependencySatisfied = isDependencySatisfied(descriptor, descriptorMap::get, propertyValueLookup); + + // If the property descriptor's dependency is not satisfied, the property does not need to be considered, as it's not relevant to the + if (!dependencySatisfied) { + continue; + } + + final ConnectorPropertyValue propertyValue = configurationContext.getProperty(stepName, descriptor.getName()); + if (propertyValue == null || !propertyValue.isSet()) { + if (descriptor.isRequired()) { + final ValidationResult invalidResult = new ValidationResult.Builder() + .valid(false) + .input(null) + .subject(descriptor.getName()) + .explanation(descriptor.getName() + " is required") + .build(); + results.add(invalidResult); + } + + continue; + } + + final ValidationResult result = descriptor.validate(stepName, propertyGroup.getName(), propertyValue.getValue(), validationContext); + if (!result.isValid()) { + results.add(result); + } + } + } + + return results; + } + + private boolean isDependencySatisfied(final ConnectorPropertyDescriptor propertyDescriptor, final Function propertyDescriptorLookup, + final Function propertyValueLookup) { + + return isDependencySatisfied(propertyDescriptor, propertyDescriptorLookup, propertyValueLookup, new HashSet<>()); + } + + private boolean isDependencySatisfied(final ConnectorPropertyDescriptor propertyDescriptor, final Function propertyDescriptorLookup, + final Function propertyValueLookup, final Set propertiesSeen) { + + final Set dependencies = propertyDescriptor.getDependencies(); + if (dependencies.isEmpty()) { + return true; + } + + final boolean added = propertiesSeen.add(propertyDescriptor.getName()); + if (!added) { + return false; + } + + try { + for (final ConnectorPropertyDependency dependency : dependencies) { + final String dependencyName = dependency.getPropertyName(); + + // Check if the property being depended upon has its dependencies satisfied. + final ConnectorPropertyDescriptor dependencyDescriptor = propertyDescriptorLookup.apply(dependencyName); + if (dependencyDescriptor == null) { + return false; + } + + final ConnectorPropertyValue propertyValue = propertyValueLookup.apply(dependencyDescriptor.getName()); + final String dependencyValue = propertyValue == null ? dependencyDescriptor.getDefaultValue() : propertyValue.getValue(); + if (dependencyValue == null) { + return false; + } + + final boolean transitiveDependencySatisfied = isDependencySatisfied(dependencyDescriptor, propertyDescriptorLookup, propertyValueLookup, propertiesSeen); + if (!transitiveDependencySatisfied) { + return false; + } + + // Check if the property being depended upon is set to one of the values that satisfies this dependency. + // If the dependency has no dependent values, then any non-null value satisfies the dependency. + // The value is already known to be non-null due to the check above. + final Set dependentValues = dependency.getDependentValues(); + if (dependentValues != null && !dependentValues.contains(dependencyValue)) { + return false; + } + } + + return true; + } finally { + propertiesSeen.remove(propertyDescriptor.getName()); + } + } + + @Override + public final void onConfigurationStepConfigured(final String stepName, final FlowContext workingContext) throws FlowUpdateException { + onStepConfigured(stepName, workingContext); + } + + @Override + public void abortUpdate(final FlowContext workingContext, final Throwable throwable) { + if (prepareUpdateFuture != null && !prepareUpdateFuture.isDone()) { + prepareUpdateFuture.completeExceptionally(throwable); + } + } + + @Override + public List fetchAllowableValues(final String stepName, final String propertyName, final FlowContext flowContext) { + throw new UnsupportedOperationException("Property %s in Configuration Step %s does not support fetching Allowable Values.".formatted(propertyName, stepName)); + } + + @Override + public List fetchAllowableValues(final String stepName, final String propertyName, final FlowContext flowContext, final String filter) { + final List allowableValues = fetchAllowableValues(stepName, propertyName, flowContext); + if (filter == null || filter.isEmpty()) { + return allowableValues; + } else { + return allowableValues.stream() + .filter(value -> value.getValue().toLowerCase().contains(filter.toLowerCase()) || value.getValue().toUpperCase().contains(filter.toUpperCase())) + .toList(); + } + } + + + /** + * No-op implementation that allows concrete subclasses to perform validation of property configuration + * + * @param context the context that should be used for validation + * @return a collection of validation results indicating any problems with the configuration. + */ + protected Collection customValidate(final ConnectorConfigurationContext context) { + return Collections.emptyList(); + } + +} diff --git a/src/main/java/org/apache/nifi/components/connector/AssetReference.java b/src/main/java/org/apache/nifi/components/connector/AssetReference.java new file mode 100644 index 0000000..13abf73 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/AssetReference.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.Objects; + +/** + * A ConnectorValueReference implementation representing a reference to an asset. + */ +public final class AssetReference implements ConnectorValueReference { + + private final String assetIdentifier; + + public AssetReference(final String assetIdentifier) { + this.assetIdentifier = assetIdentifier; + } + + /** + * Returns the asset identifier. + * + * @return the asset identifier + */ + public String getAssetIdentifier() { + return assetIdentifier; + } + + @Override + public ConnectorValueType getValueType() { + return ConnectorValueType.ASSET_REFERENCE; + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + final AssetReference that = (AssetReference) object; + return Objects.equals(assetIdentifier, that.assetIdentifier); + } + + @Override + public int hashCode() { + return Objects.hashCode(assetIdentifier); + } + + @Override + public String toString() { + return "AssetReference[assetId=" + assetIdentifier + "]"; + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConfigurationStep.java b/src/main/java/org/apache/nifi/components/connector/ConfigurationStep.java new file mode 100644 index 0000000..585e74c --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConfigurationStep.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public final class ConfigurationStep { + private final String name; + private final String description; + private final List propertyGroups; + private final String documentation; + + private ConfigurationStep(final Builder builder) { + this.name = builder.name; + this.description = builder.description; + this.propertyGroups = Collections.unmodifiableList(builder.propertyGroups); + this.documentation = builder.documentation; + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + + public List getPropertyGroups() { + return propertyGroups; + } + + /** + * @return the configuration step documentation in markdown + */ + public String getDocumentation() { + return documentation; + } + + public static final class Builder { + private String name; + private String description; + private List propertyGroups = Collections.emptyList(); + private String documentation; + + public Builder name(String name) { + this.name = name; + return this; + } + + public Builder description(String description) { + this.description = description; + return this; + } + + public Builder propertyGroups(final List propertyGroups) { + this.propertyGroups = new ArrayList<>(propertyGroups); + return this; + } + + /** + * @param documentation the documentation for this configuration step in markdown + * @return this builder + */ + public Builder documentation(String documentation) { + this.documentation = documentation; + return this; + } + + public ConfigurationStep build() { + if (name == null) { + throw new IllegalStateException("Configuration Step's name must be provided"); + } + + // Ensure that all Property Descriptor names are unique + final Set propertyNames = new HashSet<>(); + for (final ConnectorPropertyGroup propertyGroup : propertyGroups) { + for (final ConnectorPropertyDescriptor descriptor : propertyGroup.getProperties()) { + if (!propertyNames.add(descriptor.getName())) { + throw new IllegalStateException("All Property Descriptor names must be unique within a Configuration Step. Duplicate name found: " + descriptor.getName()); + } + } + } + + return new ConfigurationStep(this); + } + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/Connector.java b/src/main/java/org/apache/nifi/components/connector/Connector.java new file mode 100644 index 0000000..5de886a --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/Connector.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.flow.VersionedExternalFlow; + +import java.util.List; +import java.util.Map; + +/** + *

+ * A Connector is a component that encapsulates and manages a NiFi flow, in such a way that the flow + * can be treated as a single component. The Connector is responsible for managing the lifecycle of the flow, + * including starting and stopping the flow, as well as validating that the flow is correctly configured. + * The Connector exposes a single holistic configuration that is encapsulates the configuration of the + * sources, sinks, transformations, routing logic, and any other components that make up the flow. + *

+ * + *

+ * Importantly, a Connector represents a higher-level abstraction and is capable of manipulating the associated + * dataflow, including adding, removing, and configuring components within the flow. This allows a single entity to + * be provided such that configuring properties can result in a flow being dynamically reconfigured (e.g., using a + * different Controller Service implementation). + *

+ * + *

+ * When a flow definition is created and shared in NiFi, it can be easily instantiated in another NiFi instance. + * The new instance can then be configured via Parameters. However, if more complex configuration is required, such as + * choosing which Controller Service to use, or enabling or disabling a particular transformation step, the user must + * understand how to manipulate the flow directly. Connectors provide the ability to encapsulate such complexity behind + * a higher-level abstraction, allowing users to configure the flow via a guided configuration experience. + *

+ * + *

+ * The Connector API makes use of a {@link FlowContext} abstraction in order to provide effectively two separate instances + * of a flow: the Active flow that can be stopped and started in order to process and move data, and a Working flow that + * can be used to verify configuration in order to ensure that when the configuration is applied to the Active flow, it will + * function as desired. + *

+ * + * Implementation Note: This API is currently experimental, as it is under very active development. As such, + * it is subject to change without notice between minor releases. + */ +public interface Connector { + + /** + * Initializes the Connector instance, providing it the necessary context that it needs to operate. + * @param context the context for initialization + */ + void initialize(ConnectorInitializationContext context); + + /** + * Provides the initial version of the flow that this Connector manages. The Active Flow Context will be + * updated to reflect this flow when the Connector is first added to the NiFi instance but not when the Connector + * is reinitialized upon restart of NiFi. + * + * @return the initial version of the flow + */ + VersionedExternalFlow getInitialFlow(); + + // FIXME: Consider adding two subclasses to FlowContext: ActiveFlowContext and WorkingFlowContext + // They would have no methods, but would serve as markers to make it more clear which context is being used + /** + * Starts the Connector instance. + * @throws FlowUpdateException if there is an error starting the Connector + * @param activeFlowContext the active flow context + */ + void start(FlowContext activeFlowContext) throws FlowUpdateException; + + /** + * Stops the Connector instance. + * @throws FlowUpdateException if there is an error stopping the Connector + * @param activeFlowContext the active flow context + */ + void stop(FlowContext activeFlowContext) throws FlowUpdateException; + + /** + * Validates that the Connector is valid according to its current configuration. Validity of a Connector may be + * defined simply as the all components being valid, or it may encompass more complex validation logic, such + * as ensuring that a Source Processor is able to connect to a remote system, or that a Sink Processor + * is able to write to a remote system. + * + * @param activeFlowContext the active flow context + * @param validationContext the context for validation + * + * @return a list of ValidationResults, each of which may indicate a check that was performed and any associated explanations + * as to why the Connector is valid or invalid. + */ + List validate(FlowContext activeFlowContext, ConnectorValidationContext validationContext); + + /** + * Validates the configuration for a specific configuration step. This allows the Connector to indicate any + * issues with syntactic configuration issues but is not as comprehensive as the overall validation provided + * by {@link #validate(FlowContext, ConnectorValidationContext)} due to the fact that it does not have access + * to the full configuration of the Connector. This provides immediate feedback to users + * as they are configuring each step. + * + * @param configurationStep the configuration step being validated + * @param configurationContext the context for the configuration + * @param validationContext the context for validation + * @return a list of ValidationResults, each of which may indicate a check that was performed and any associated explanations + * as to why the configuration step is valid or invalid. + */ + // TODO: Should look at making verifyConfigurationStep / validateConfigurationStep more consistent in arguments. + List validateConfigurationStep(ConfigurationStep configurationStep, ConnectorConfigurationContext configurationContext, ConnectorValidationContext validationContext); + + /** + * Verifies the configuration for a specific configuration step. This allows the Connector to perform + * more comprehensive verification of the configuration for a step than does validation, such as attempting to connect to + * remote systems, sample data and ensure that it can be parsed correctly, etc. + * + * @param stepName the name of the configuration step being verified + * @param propertyValueOverrides any overrides to the currently configured property values that should be used for verification + * @param flowContext the flow context that is being used for the verification + * @return a list of ConfigVerificationResults, each of which may indicate a check that was performed and any associated explanation + * as to why the configuration step verification succeeded, failed, or was skipped. + */ + List verifyConfigurationStep(String stepName, Map propertyValueOverrides, FlowContext flowContext); + + /** + * Verifies the overall configuration of the Connector based on the configuration that has already been provided for the given Flow Context. + * + * @param flowContext the flow context that houses the configuration being used to drive the verification + * @return a list of ConfigVerificationResults, each of which may indicate a check that was performed and any associated explanation + * as to why the configuration verification succeeded, failed, or was skipped. + */ + List verify(FlowContext flowContext); + + /** + * Returns the list of configuration steps that define the configuration of this Connector. Each step + * represents a logical grouping of properties that should be configured together. The order of the steps + * in the list represents the order in which the steps should be configured. + * + * @param flowContext the flow context that houses the configuration being used to drive the available configuration steps + * @return the list of configuration steps + */ + List getConfigurationSteps(FlowContext flowContext); + + /** + * Called whenever a specific configuration step has been configured. This allows the Connector to perform any necessary + * actions specific to that step, such as updating parameter values, updating the flow, etc. + * + * @param stepName the name of the step + * @param workingFlowContext the working flow context that is being used for the update + */ + void onConfigurationStepConfigured(String stepName, FlowContext workingFlowContext) throws FlowUpdateException; + + /** + * Called before any updates to the Connector's configuration are applied. This allows the Connector to perform any necessary + * preparation work before the configuration is changed, such as stopping the flow, draining queues, etc. + * + * @param workingFlowContext the working flow context that has been created for the update + * @param activeFlowContext the active flow context that is currently in use + */ + void prepareForUpdate(FlowContext workingFlowContext, FlowContext activeFlowContext) throws FlowUpdateException; + + /** + * Called if the update preparation (i.e., {@link #prepareForUpdate(FlowContext, FlowContext)}) fails or is cancelled. + * This allows the Connector to perform any necessary + * cleanup work after a failed preparation, such as cancelling any in-progress operations, etc. + * + * @param workingFlowContext the working flow context that was being used for the update preparation + * @param cause the cause for the update preparation to be aborted + */ + void abortUpdate(FlowContext workingFlowContext, Throwable cause); + + /** + * Applies the configuration of the working FlowContext to the active flow. Once the active FlowContext has been updated, + * the existing working FlowContext is destroyed, along with any components that are part of the flow and any FlowFiles that + * might be queued up as part of the flow. A new working FlowContext is then created that reflects the newly updated active flow. + * + * @param workingFlowContext the working flow context that represents the updated configuration + * @param activeFlowContext the flow context that represents the active flow + */ + void applyUpdate(FlowContext workingFlowContext, FlowContext activeFlowContext) throws FlowUpdateException; + + List fetchAllowableValues(String stepName, String propertyName, FlowContext flowContext); + + List fetchAllowableValues(String stepName, String propertyName, FlowContext flowContext, String filter); +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationContext.java b/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationContext.java new file mode 100644 index 0000000..a64d533 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationContext.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.Map; +import java.util.Set; + +public interface ConnectorConfigurationContext extends Cloneable { + + /** + * Returns the property value for the given property name in the specified configuration step. + * @param configurationStepName the name of the configuration step + * @param propertyName the name of the property + * @return the property value for the given property name in the specified configuration step + */ + ConnectorPropertyValue getProperty(String configurationStepName, String propertyName); + + /** + * Returns the property value for the given property descriptor in the specified configuration step. + * @param configurationStep the configuration step + * @param propertyDescriptor the property descriptor + * @return the property value for the given property descriptor in the specified configuration step + */ + ConnectorPropertyValue getProperty(ConfigurationStep configurationStep, ConnectorPropertyDescriptor propertyDescriptor); + + /** + * Returns a set of all property names for the specified configuration step. + * @param configurationStepName the name of the configuration step + * @return a set of all property names for the specified configuration step + */ + Set getPropertyNames(String configurationStepName); + + /** + * Returns a set of all property names for the specified configuration step. + * @param configurationStep the configuration step + * @return a set of all property names for the specified configuration step + */ + Set getPropertyNames(ConfigurationStep configurationStep); + + /** + * Returns a view of this configuration context scoped to the provided step name. + * @param stepName the name of the configuration step + * @return a StepConfigurationContext scoped to the provided step name + */ + StepConfigurationContext scopedToStep(String stepName); + + /** + * Returns a view of this configuration context scoped to the provided configuration step. + * @param configurationStep the configuration step + * @return a StepConfigurationContext scoped to the provided configuration step + */ + StepConfigurationContext scopedToStep(ConfigurationStep configurationStep); + + /** + * Creates a new ConnectorConfigurationContext based on this context's values but with the + * values for the given step overridden. If the provided map of values does not override all properties + * for the step, the remaining properties will retain their existing values. Said another way, this is a + * "partial override" for the specified step. If the provided map contains a null value for a property, + * the returned context will have that property removed. + * + * @param stepName the name of the configuration step for which the overrides should be applied + * @param propertyValues a map of property name to property value containing the overrides + * @return a new ConnectorConfigurationContext with the overrides applied + */ + ConnectorConfigurationContext createWithOverrides(String stepName, Map propertyValues); + + /** + * Creates a deep copy of this ConnectorConfigurationContext. + * @return a deep copy of this ConnectorConfigurationContext + */ + ConnectorConfigurationContext clone(); +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationManager.java b/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationManager.java new file mode 100644 index 0000000..98d5904 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationManager.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.Map; + +public interface ConnectorConfigurationManager { + + void setProperties(Map properties); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorInitializationContext.java b/src/main/java/org/apache/nifi/components/connector/ConnectorInitializationContext.java new file mode 100644 index 0000000..3626a47 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorInitializationContext.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.logging.ComponentLog; + +/** + *

+ * The ConnectorInitializationContext provides context about how the connector is being run. + * This includes the identifier and name of Connector as well as access to the crucial components that + * it may need to interact with in order to perform its tasks. + *

+ */ +public interface ConnectorInitializationContext { + + /** + * Returns the identifier of the Connector. + * @return the identifier of the Connector + */ + String getIdentifier(); + + /** + * Returns the name of the Connector. + * @return the name of the Connector + */ + String getName(); + + /** + * Returns the ComponentLog that can be used for logging. Use of the ComponentLog is preferred + * over directly constructing a Logger because it integrates with NiFi's logging system to create bulletins + * as well as delegating to the underlying logging framework. + * + * @return the ComponentLog for logging + */ + ComponentLog getLogger(); + + /** + *

+ * Updates the Connector's flow to the given VersionedExternalFlow. This may be a long-running process, as it involves + * several steps, to include: + *

+ *
    + *
  • Identifying which elements in the flow have changed
  • + *
  • Stopping affected Processors and Controller Services, waiting for them to stop fully
  • + *
  • Applying necessary changes, to include changing component configuration, adding, and removing components
  • + *
  • Restarting all components
  • + *
+ * + * + *

+ * Depending on the changes required in order to update the flow to the provided VersionedProcessGroup, this + * could also result in stopping source processors and waiting for queues to drain, etc. + *

+ * + *

+ * This method will block until the update is complete. Note that this could result in the associated flow becoming + * invalid if not properly configured. Otherwise, if the Connector is running, any newly added components will also + * be started. + *

+ * + * @param flowContext the context of the flow to be updated + * @param versionedExternalFlow the new representation of the flow + */ + void updateFlow(FlowContext flowContext, VersionedExternalFlow versionedExternalFlow) throws FlowUpdateException; + +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorParameter.java b/src/main/java/org/apache/nifi/components/connector/ConnectorParameter.java new file mode 100644 index 0000000..ec2a985 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorParameter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +public interface ConnectorParameter { + + String getName(); + + String getDescription(); + + String getDefaultValue(); + + boolean isSensitive(); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorParameterContext.java b/src/main/java/org/apache/nifi/components/connector/ConnectorParameterContext.java new file mode 100644 index 0000000..7ef3884 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorParameterContext.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.Map; + +public interface ConnectorParameterContext { + + Map getParameters(); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyDependency.java b/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyDependency.java new file mode 100644 index 0000000..0f2d1e2 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyDependency.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.Set; + +public final class ConnectorPropertyDependency { + private final String propertyName; + private final Set dependentValues; + + public ConnectorPropertyDependency(final String propertyName, final Set dependentValues) { + this.propertyName = propertyName; + this.dependentValues = Set.copyOf(dependentValues); + } + + public ConnectorPropertyDependency(final String propertyName) { + this.propertyName = propertyName; + this.dependentValues = null; + } + + public String getPropertyName() { + return propertyName; + } + + public Set getDependentValues() { + return dependentValues; + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyDescriptor.java b/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyDescriptor.java new file mode 100644 index 0000000..209039b --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyDescriptor.java @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public final class ConnectorPropertyDescriptor { + private static final Pattern INTEGER_PATTERN = Pattern.compile("^-?\\d+$"); + private static final Pattern DOUBLE_PATTERN = Pattern.compile("^-?\\d+(\\.\\d+)?$"); + private static final Pattern BOOLEAN_PATTERN = Pattern.compile("^(?i)(true|false)$"); + + private final String name; + private final String description; + private final String defaultValue; + private final boolean required; + private final PropertyType type; + private final List allowableValues; + private final boolean allowableValuesFetchable; + private final List validators; + private final Set dependencies; + + private ConnectorPropertyDescriptor(final Builder builder) { + this.name = builder.name; + this.description = builder.description; + this.defaultValue = builder.defaultValue; + this.required = builder.required; + this.type = builder.type; + this.allowableValues = builder.allowableValues == null ? null : Collections.unmodifiableList(builder.allowableValues); + this.allowableValuesFetchable = builder.allowableValuesFetchable; + this.validators = List.copyOf(builder.validators); + this.dependencies = Collections.unmodifiableSet(builder.dependencies); + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + + public String getDefaultValue() { + return defaultValue; + } + + public boolean isRequired() { + return required; + } + + public PropertyType getType() { + return type; + } + + public List getAllowableValues() { + return allowableValues; + } + + public boolean isAllowableValuesFetchable() { + return allowableValuesFetchable; + } + + public Set getDependencies() { + return dependencies; + } + + public List getValidators() { + return validators; + } + + public ValidationResult validate(final String stepName, final String groupName, final String value, final ConnectorValidationContext validationContext) { + final List fetchedAllowableValues; + if (isAllowableValuesFetchable()) { + try { + fetchedAllowableValues = validationContext.fetchAllowableValues(stepName, getName()); + } catch (final Exception e) { + return new ValidationResult.Builder() + .subject(name) + .input(value) + .valid(false) + .explanation("Failed to fetch allowable values: " + e.getMessage()) + .build(); + } + } else { + fetchedAllowableValues = null; + } + + if (type != PropertyType.STRING_LIST) { + return validateIndividual(stepName, groupName, value, validationContext, fetchedAllowableValues); + } + + if (required && value == null) { + return new ValidationResult.Builder() + .subject(name) + .input(value) + .valid(false) + .explanation("Property is required but no value was specified") + .build(); + } + + final String[] values = value.split(","); + for (final String individualValue : values) { + final ValidationResult result = validateIndividual(stepName, groupName, individualValue.trim(), validationContext, fetchedAllowableValues); + if (!result.isValid()) { + return result; + } + } + + return new ValidationResult.Builder() + .subject(name) + .input(value) + .valid(true) + .build(); + } + + private ValidationResult validateIndividual(final String stepName, final String groupName, final String value, + final ConnectorValidationContext validationContext, final List fetchedAllowableValues) { + + if (!isValueAllowed(value, allowableValues) || !isValueAllowed(value, fetchedAllowableValues)) { + return new ValidationResult.Builder() + .subject(name) + .input(value) + .valid(false) + .explanation("Value is not one of the allowable values") + .build(); + } + + final ValidationResult invalidResult = validateType(value); + if (invalidResult != null) { + return invalidResult; + } + + for (final Validator validator : validators) { + final ValidationResult result = validator.validate(name, value, validationContext.createValidationContext(stepName, groupName)); + if (!result.isValid()) { + return result; + } + } + + return new ValidationResult.Builder() + .subject(name) + .input(value) + .valid(true) + .build(); + } + + private boolean isValueAllowed(final String value, final List allowableValues) { + if (allowableValues == null || allowableValues.isEmpty()) { + // If no allowable values are explicitly specified, consider all values to be allowable + return true; + } + if (value == null) { + return false; + } + + for (final DescribedValue describedValue : allowableValues) { + if (value.equalsIgnoreCase(describedValue.getValue())) { + return true; + } + } + + return false; + } + + + private ValidationResult validateType(final String value) { + final String explanation = switch (type) { + case SECRET, STRING, STRING_LIST -> null; + case BOOLEAN -> BOOLEAN_PATTERN.matcher(value).matches() ? null : "Value must be true or false"; + case INTEGER -> INTEGER_PATTERN.matcher(value).matches() ? null : "Value must be an integer"; + case DOUBLE, FLOAT -> DOUBLE_PATTERN.matcher(value).matches() ? null : "Value must be a floating point number"; + }; + + if (explanation == null) { + return null; + } + + return new ValidationResult.Builder() + .subject(name) + .input(value) + .valid(false) + .explanation(explanation) + .build(); + } + + @Override + public boolean equals(final Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + final ConnectorPropertyDescriptor that = (ConnectorPropertyDescriptor) o; + return Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hashCode(name); + } + + @Override + public String toString() { + return "ConnectorPropertyDescriptor[name=" + name + "]"; + } + + public static final class Builder { + private String name; + private String description; + private String defaultValue = null; + private boolean required = false; + private PropertyType type = PropertyType.STRING; + private List allowableValues = null; + private boolean allowableValuesFetchable = false; + private final List validators = new ArrayList<>(); + private final Set dependencies = new HashSet<>(); + + public Builder from(final ConnectorPropertyDescriptor other) { + this.name = other.name; + this.description = other.description; + this.defaultValue = other.defaultValue; + this.required = other.required; + this.type = other.type; + this.allowableValues = other.allowableValues == null ? null : new ArrayList<>(other.allowableValues); + this.allowableValuesFetchable = other.allowableValuesFetchable; + this.validators.clear(); + this.validators.addAll(other.validators); + this.dependencies.clear(); + this.dependencies.addAll(other.dependencies); + return this; + } + + public Builder name(final String name) { + this.name = name; + return this; + } + + public Builder description(final String description) { + this.description = description; + return this; + } + + public Builder defaultValue(final String defaultValue) { + this.defaultValue = defaultValue; + return this; + } + + public Builder defaultValue(final DescribedValue defaultValue) { + return defaultValue(defaultValue == null ? null : defaultValue.getValue()); + } + + public Builder required(final boolean required) { + this.required = required; + return this; + } + + public Builder type(final PropertyType type) { + this.type = type; + return this; + } + + public Builder allowableValuesFetchable(final boolean fetchable) { + this.allowableValuesFetchable = fetchable; + return this; + } + + public Builder allowableValues(final DescribedValue... values) { + this.allowableValues = Arrays.stream(values) + .map(Builder::describedValue) + .toList(); + + return this; + } + + public > Builder allowableValues(final E[] values) { + if (values == null || values.length == 0) { + this.allowableValues = null; + } else { + this.allowableValues = Arrays.stream(values) + .map(enumValue -> enumValue instanceof DescribedValue describedValue + ? AllowableValue.fromDescribedValue(describedValue) : new AllowableValue(enumValue.name())) + .map(av -> (DescribedValue) av) + .toList(); + } + + return this; + } + + public > Builder allowableValues(final EnumSet enumValues) { //NOPMD + if (enumValues == null || enumValues.isEmpty()) { + this.allowableValues = null; + } else { + this.allowableValues = enumValues.stream() + .map(enumValue -> enumValue instanceof DescribedValue describedValue + ? AllowableValue.fromDescribedValue(describedValue) : new AllowableValue(enumValue.name())) + .map(av -> (DescribedValue) av) + .toList(); + } + + return this; + } + + public Builder allowableValues(final String... allowableValues) { + if (allowableValues == null || allowableValues.length == 0) { + this.allowableValues = null; + } else { + this.allowableValues = Arrays.stream(allowableValues) + .map(Builder::describedValue) + .toList(); + } + + return this; + } + + public Builder allowableValues(final List allowableValues) { + if (allowableValues == null || allowableValues.isEmpty()) { + this.allowableValues = null; + } else { + this.allowableValues = allowableValues.stream() + .map(Builder::describedValue) + .toList(); + } + + return this; + } + + /** + * Adds a validator for this property + * + * @param validator the validator to add + * @return this Builder for method chaining + */ + public Builder addValidator(final Validator validator) { + if (validator != null) { + this.validators.add(validator); + } + return this; + } + + /** + * Removes all validators for this property + * + * @return this Builder for method chaining + */ + public Builder clearValidators() { + this.validators.clear(); + return this; + } + + /** + * Sets the validators for this property, replacing any previously added validators + * + * @param validators the validators to set + * @return this Builder for method chaining + */ + public Builder validators(final Validator... validators) { + this.validators.clear(); + + if (validators != null) { + for (final Validator validator : validators) { + if (validator != null) { + this.validators.add(validator); + } + } + } + + return this; + } + + public Builder dependsOn(final ConnectorPropertyDescriptor descriptor, final List dependentValues) { + if (dependentValues == null || dependentValues.isEmpty()) { + dependencies.add(new ConnectorPropertyDependency(descriptor.getName())); + } else { + final Set dependentValueSet = dependentValues.stream() + .map(DescribedValue::getValue) + .collect(Collectors.toSet()); + + dependencies.add(new ConnectorPropertyDependency(descriptor.getName(), dependentValueSet)); + } + + return this; + } + + public Builder dependsOn(final ConnectorPropertyDescriptor descriptor, final DescribedValue firstDependentValue, final DescribedValue... additionalDependentValues) { + final List dependentValues = new ArrayList<>(); + dependentValues.add(firstDependentValue); + dependentValues.addAll(Arrays.asList(additionalDependentValues)); + return dependsOn(descriptor, dependentValues); + } + + public Builder dependsOn(final ConnectorPropertyDescriptor descriptor, final String... dependentValues) { + final List describedValues = Arrays.stream(dependentValues) + .map(Builder::describedValue) + .toList(); + + return dependsOn(descriptor, describedValues); + } + + private static DescribedValue describedValue(final String value) { + if (value == null) { + return null; + } + + // Otherwise, return a generic DescribedValue with no display name or description + return new AllowableValue(value); + } + + private static DescribedValue describedValue(final DescribedValue describedValue) { + if (describedValue == null) { + return null; + } + + return new AllowableValue(describedValue.getValue(), describedValue.getDisplayName(), describedValue.getDescription()); + } + + public ConnectorPropertyDescriptor build() { + if (name == null || name.isEmpty()) { + throw new IllegalStateException("Property name must be specified"); + } + if (allowableValues != null && allowableValuesFetchable) { + throw new IllegalStateException("Property cannot have both fetchable allowable values and a static list of allowable values"); + } + + return new ConnectorPropertyDescriptor(this); + } + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyGroup.java b/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyGroup.java new file mode 100644 index 0000000..d0f3538 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyGroup.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +public final class ConnectorPropertyGroup { + private final String name; + private final String description; + private final List properties; + + private ConnectorPropertyGroup(final Builder builder) { + this.name = builder.name; + this.description = builder.description; + this.properties = List.copyOf(builder.properties); + } + + /** + * Returns the name of the property sub-group. + * + * @return the name of the sub-group + */ + public String getName() { + return name; + } + + /** + * Returns the description of the property sub-group. + * + * @return the description of the sub-group + */ + public String getDescription() { + return description; + } + + /** + * Returns the properties defined in this sub-group. + * + * @return the properties in this sub-group + */ + public List getProperties() { + return properties; + } + + /** + * Creates a new Builder for constructing ConnectorPropertySubGroup instances. + * + * @return a new Builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Creates a new Builder initialized with values from an existing ConnectorPropertySubGroup. + * + * @param subGroup the sub-group to copy values from + * @return a new Builder with copied values + */ + public static Builder builder(final ConnectorPropertyGroup subGroup) { + return new Builder() + .name(subGroup.getName()) + .description(subGroup.getDescription()) + .properties(subGroup.getProperties()); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ConnectorPropertyGroup that = (ConnectorPropertyGroup) o; + return Objects.equals(name, that.name) && + Objects.equals(description, that.description) && + Objects.equals(properties, that.properties); + } + + @Override + public int hashCode() { + return Objects.hash(name, description, properties); + } + + @Override + public String toString() { + return "ConnectorPropertyGroup[" + + "name='" + name + '\'' + + ", description='" + description + '\'' + + ", properties=" + properties + + "]"; + } + + public static class Builder { + private String name; + private String description; + private final List properties = new ArrayList<>(); + + /** + * Sets the name of the property sub-group. + * + * @param name the name of the sub-group + * @return this Builder for method chaining + */ + public Builder name(final String name) { + this.name = name; + return this; + } + + /** + * Sets the description of the property sub-group. + * + * @param description the description of the sub-group + * @return this Builder for method chaining + */ + public Builder description(final String description) { + this.description = description; + return this; + } + + /** + * Adds a property to the sub-group. + * + * @param property the property to add + * @return this Builder for method chaining + */ + public Builder addProperty(final ConnectorPropertyDescriptor property) { + if (property != null) { + this.properties.add(property); + } + return this; + } + + /** + * Sets the properties for the sub-group, replacing any previously added properties. + * + * @param properties the properties to set + * @return this Builder for method chaining + */ + public Builder properties(final List properties) { + this.properties.clear(); + if (properties != null) { + this.properties.addAll(properties); + } + return this; + } + + /** + * Adds multiple properties to the sub-group. + * + * @param properties the properties to add + * @return this Builder for method chaining + */ + public Builder addProperties(final List properties) { + if (properties != null) { + this.properties.addAll(properties); + } + return this; + } + + /** + * Builds and returns a new ConnectorPropertySubGroup instance. + * + * @return a new ConnectorPropertySubGroup + * @throws IllegalStateException if required fields are not set + */ + public ConnectorPropertyGroup build() { + if (description != null && (name == null || name.isBlank())) { + throw new IllegalStateException("Property Group's name must be provided if a description is set"); + } + + // Ensure that all Property Descriptor names are unique within this group + final Set propertyNames = new HashSet<>(); + for (final ConnectorPropertyDescriptor property : properties) { + if (!propertyNames.add(property.getName())) { + throw new IllegalStateException("All Property Descriptor names must be unique within a Property Group. Duplicate name found: " + property.getName()); + } + } + + return new ConnectorPropertyGroup(this); + } + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyValue.java b/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyValue.java new file mode 100644 index 0000000..71d6121 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorPropertyValue.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.processor.DataUnit; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public interface ConnectorPropertyValue { + + /** + * @return the raw property value as a string + */ + String getValue(); + + /** + * @return an integer representation of the property value, or + * null if not set + * @throws NumberFormatException if not able to parse + */ + Integer asInteger(); + + /** + * @return a Long representation of the property value, or null + * if not set + * @throws NumberFormatException if not able to parse + */ + Long asLong(); + + /** + * @return a Boolean representation of the property value, or + * null if not set + */ + Boolean asBoolean(); + + /** + * @return a Float representation of the property value, or + * null if not set + * @throws NumberFormatException if not able to parse + */ + Float asFloat(); + + /** + * @return a Double representation of the property value, of + * null if not set + * @throws NumberFormatException if not able to parse + */ + Double asDouble(); + + /** + * @param timeUnit specifies the TimeUnit to convert the time duration into + * @return a Long value representing the value of the configured time period + * in terms of the specified TimeUnit; if the property is not set, returns + * null + */ + Long asTimePeriod(TimeUnit timeUnit); + + /** + * Returns the value as a Duration + * + * @return a Duration representing the value, or null if the value is unset + */ + Duration asDuration(); + + /** + * + * @param dataUnit specifies the DataUnit to convert the data size into + * @return a Long value representing the value of the configured data size + * in terms of the specified DataUnit; if hte property is not set, returns + * null + */ + Double asDataSize(DataUnit dataUnit); + + /** + * Returns the value as a comma-separated list of values. Leading and trailing + * whitespace is trimmed from each value. + * @return the property value as a List of Strings + */ + List asList(); + + /** + * @return true if the user has configured a value, or if the + * {@link ConnectorPropertyDescriptor} for the associated property has a default + * value, false otherwise + */ + boolean isSet(); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorValidationContext.java b/src/main/java/org/apache/nifi/components/connector/ConnectorValidationContext.java new file mode 100644 index 0000000..842bb33 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorValidationContext.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.components.ValidationContext; + +import java.util.List; + +public interface ConnectorValidationContext { + + ValidationContext createValidationContext(String stepName, String groupName); + + List fetchAllowableValues(String stepName, String propertyName); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorValueReference.java b/src/main/java/org/apache/nifi/components/connector/ConnectorValueReference.java new file mode 100644 index 0000000..fe01441 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorValueReference.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +/** + * Represents a value reference for a connector property. A value can be a string literal, + * a reference to an asset, or a reference to a secret. + */ +public sealed interface ConnectorValueReference permits StringLiteralValue, AssetReference, SecretReference { + + /** + * Returns the type of value reference. + * + * @return the value type + */ + ConnectorValueType getValueType(); +} diff --git a/src/main/java/org/apache/nifi/components/connector/ConnectorValueType.java b/src/main/java/org/apache/nifi/components/connector/ConnectorValueType.java new file mode 100644 index 0000000..2bf2eb4 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/ConnectorValueType.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +public enum ConnectorValueType { + /** + * The value is a string literal. + */ + STRING_LITERAL, + + /** + * The value identifies an Asset + */ + ASSET_REFERENCE, + + /** + * The value identifies a Secret. + */ + SECRET_REFERENCE +} diff --git a/src/main/java/org/apache/nifi/components/connector/FlowUpdateException.java b/src/main/java/org/apache/nifi/components/connector/FlowUpdateException.java new file mode 100644 index 0000000..3d2c4e5 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/FlowUpdateException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +public class FlowUpdateException extends Exception { + public FlowUpdateException(final String message) { + super(message); + } + + public FlowUpdateException(final String message, final Throwable cause) { + super(message, cause); + } + + public FlowUpdateException(final Throwable cause) { + super(cause); + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/InvocationFailedException.java b/src/main/java/org/apache/nifi/components/connector/InvocationFailedException.java new file mode 100644 index 0000000..276df7f --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/InvocationFailedException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +public class InvocationFailedException extends Exception { + + public InvocationFailedException(final String message) { + super(message); + } + + public InvocationFailedException(final String message, final Throwable cause) { + super(message, cause); + } + + public InvocationFailedException(final Throwable cause) { + super(cause); + } + +} diff --git a/src/main/java/org/apache/nifi/components/connector/PropertyType.java b/src/main/java/org/apache/nifi/components/connector/PropertyType.java new file mode 100644 index 0000000..f01a91e --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/PropertyType.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +public enum PropertyType { + STRING, + INTEGER, + BOOLEAN, + FLOAT, + DOUBLE, + STRING_LIST, + SECRET +} diff --git a/src/main/java/org/apache/nifi/components/connector/Secret.java b/src/main/java/org/apache/nifi/components/connector/Secret.java new file mode 100644 index 0000000..5b50ea4 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/Secret.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +public interface Secret { + + String getProviderName(); + + String getGroupName(); + + String getName(); + + String getDescription(); + + String getValue(); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/SecretReference.java b/src/main/java/org/apache/nifi/components/connector/SecretReference.java new file mode 100644 index 0000000..8b5bd11 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/SecretReference.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.Objects; + +/** + * A ConnectorValueReference implementation representing a reference to a secret. + */ +public final class SecretReference implements ConnectorValueReference { + + private final String providerId; + private final String providerName; + private final String secretName; + + public SecretReference(final String providerId, final String providerName, final String secretName) { + this.providerId = providerId; + this.providerName = providerName; + this.secretName = secretName; + } + + /** + * Returns the identifier of the secret provider. + * + * @return the provider identifier + */ + public String getProviderId() { + return providerId; + } + + /** + * Returns the name of the secret provider. + * + * @return the provider name + */ + public String getProviderName() { + return providerName; + } + + /** + * Returns the secret name. + * + * @return the secret name + */ + public String getSecretName() { + return secretName; + } + + @Override + public ConnectorValueType getValueType() { + return ConnectorValueType.SECRET_REFERENCE; + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + final SecretReference that = (SecretReference) object; + return Objects.equals(providerId, that.providerId) && Objects.equals(providerName, that.providerName) + && Objects.equals(secretName, that.secretName); + } + + @Override + public int hashCode() { + return Objects.hash(providerId, providerName, secretName); + } + + @Override + public String toString() { + return "SecretReference[providerId=" + providerId + ", providerName=" + providerName + ", secretName=" + secretName + "]"; + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/StepConfiguration.java b/src/main/java/org/apache/nifi/components/connector/StepConfiguration.java new file mode 100644 index 0000000..ad31a78 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/StepConfiguration.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.Map; +import java.util.Objects; + +public class StepConfiguration { + private final Map propertyValues; + + public StepConfiguration(final Map propertyValues) { + this.propertyValues = propertyValues; + } + + public Map getPropertyValues() { + return propertyValues; + } + + public ConnectorValueReference getPropertyValue(final String propertyName) { + return propertyValues.get(propertyName); + } + + @Override + public String toString() { + return "StepConfiguration[" + + "propertyValues=" + propertyValues + + "]"; + } + + @Override + public boolean equals(final Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + final StepConfiguration that = (StepConfiguration) o; + return Objects.equals(propertyValues, that.propertyValues); + } + + @Override + public int hashCode() { + return Objects.hashCode(propertyValues); + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/StepConfigurationContext.java b/src/main/java/org/apache/nifi/components/connector/StepConfigurationContext.java new file mode 100644 index 0000000..23220e1 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/StepConfigurationContext.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.Map; + +/** + * A view of a ConnectorConfigurationContext scoped to a specific configuration step. + * Any changes to the underlying ConnectorConfigurationContext will be reflected in this context. + */ +public interface StepConfigurationContext { + + /** + * Returns the value of the property with the given name. + * @param propertyName the name of the property + * @return the value of the property with the given name + */ + ConnectorPropertyValue getProperty(String propertyName); + + /** + * Returns the value of the property specified by the given descriptor. + * @param propertyDescriptor the property descriptor + * @return the value of the property specified by the given descriptor + */ + ConnectorPropertyValue getProperty(ConnectorPropertyDescriptor propertyDescriptor); + + /** + * Creates a new ConnectorConfigurationContext based on this context's values but with the provided property overrides applied. + * @param propertyValues a map of property name to property value containing the overrides + * @return a new ConnectorConfigurationContext with the overrides applied + */ + StepConfigurationContext createWithOverrides(Map propertyValues); + + /** + * Returns a map of all property names to their corresponding values + * @return a map of all property names to their corresponding values + */ + Map getProperties(); +} diff --git a/src/main/java/org/apache/nifi/components/connector/StringLiteralValue.java b/src/main/java/org/apache/nifi/components/connector/StringLiteralValue.java new file mode 100644 index 0000000..8a9de1d --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/StringLiteralValue.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import java.util.Objects; + +/** + * A ConnectorValueReference implementation representing a string literal value. + */ +public final class StringLiteralValue implements ConnectorValueReference { + + /** + * An empty value reference, represented as a StringLiteralValue with a null value. + */ + public static final StringLiteralValue EMPTY = new StringLiteralValue(null); + + private final String value; + + public StringLiteralValue(final String value) { + this.value = value; + } + + /** + * Returns the string literal value. + * + * @return the string literal value + */ + public String getValue() { + return value; + } + + @Override + public ConnectorValueType getValueType() { + return ConnectorValueType.STRING_LITERAL; + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + final StringLiteralValue that = (StringLiteralValue) object; + return Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hashCode(value); + } + + @Override + public String toString() { + return "StringLiteralValue[value=" + value + "]"; + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ComponentState.java b/src/main/java/org/apache/nifi/components/connector/components/ComponentState.java new file mode 100644 index 0000000..b5220cc --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ComponentState.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +public enum ComponentState { + /** + * State in which a Processor is disabled. Note that Controller Services' notions of Disabled and Enabled + * are different than Processors' notions of Disabled and Enabled. For Controller Services, + * Component States of STOPPED, RUNNING are mapped to Controller Services' DISABLED, ENABLED states respectively. + */ + PROCESSOR_DISABLED, + + /** + * Processor is stopped or Controller Service is disabled + */ + STOPPED, + + /** + * Processor is starting or Controller Service is enabling + */ + STARTING, + + /** + * Processor is running or Controller Service is enabled + */ + RUNNING, + + /** + * Processor is stopping or Controller Service is disabling + */ + STOPPING; +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java b/src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java new file mode 100644 index 0000000..58cbac2 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.flow.VersionedConnection; + +public interface ConnectionFacade { + + VersionedConnection getDefinition(); + + /** + * Returns the size of the Connection's queue. + * @return the size of the Connection's queue + */ + QueueSize getQueueSize(); + + /** + * Purges all data from the connection. + */ + void purge(); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ConnectorMethod.java b/src/main/java/org/apache/nifi/components/connector/components/ConnectorMethod.java new file mode 100644 index 0000000..e58e0b4 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ConnectorMethod.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + *

+ * Annotation that can be added to a method in a Processor or ControllerService in order + * to expose the method to connectors for invocation. The method must be public and + * not static. The method may return a value. However, the value that is returned will + * be converted into a JSON object and that JSON object will be returned to the caller. + *

+ * + *

+ * The following example shows a method that is exposed to connectors: + *

+ * +
+ * {@code
+ * @ConnectorMethod(
+ *     name = "echo",
+ *     description = "Returns the provided text after concatenating it the specified number of times.",
+ *     allowedStates = {ComponentState.STOPPED, ComponentState.STOPPING, ComponentState.STARTING, ComponentState.RUNNING},
+ *     arguments = {
+ *         @MethodArgument(name = "text", type = String.class, description = "The text to echo", required = true),
+ *         @MethodArgument(name = "iterations", type = int.class, description = "The number of iterations to echo the text", required = false)
+ *     }
+ * )
+ * public String echo(Map arguments) {
+ *     final StringBuilder sb = new StringBuilder();
+ *     final String text = (String) arguments.get("text");
+ *     final int iterations = (int) arguments.getOrDefault("iterations", 2);
+ *     for (int i = 0; i < iterations; i++) {
+ *       sb.append(text);
+ *
+ *       if (i < (iterations - 1)) {
+ *         sb.append("\n");
+ *       }
+ *     }
+ *
+ *     return sb.toString();
+ * }
+ * }
+ * 
+ */ +@Documented +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface ConnectorMethod { + /** + * The name of the method as it will be exposed to connectors. + * @return the method name + */ + String name(); + + /** + * A description of the method + * @return the method description + */ + String description() default ""; + + /** + * The states in which the component that defines the method is allowed to be in + * when the method is invoked. If the Processor or ControllerService is not in one of these states, + * any attempt to invoke the method will result in an error. The default states include all but PROCESSOR_DISABLED. + * + * @return the states in which the component that defines the method is allowed to be in when the method is invoked + */ + ComponentState[] allowedStates() default { + ComponentState.STOPPED, + ComponentState.STOPPING, + ComponentState.STARTING, + ComponentState.RUNNING + }; + + /** + * The arguments that the method accepts. Each argument is described by a MethodArgument annotation. + * If no arguments are required, this can be left empty. + * + * @return the method arguments + */ + MethodArgument[] arguments() default {}; +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceFacade.java b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceFacade.java new file mode 100644 index 0000000..b0eae11 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceFacade.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.InvocationFailedException; +import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.flow.VersionedParameterContext; + +import java.util.List; +import java.util.Map; + +public interface ControllerServiceFacade { + + VersionedControllerService getDefinition(); + + ControllerServiceLifecycle getLifecycle(); + + List validate(); + + List validate(Map propertyValues); + + List verify(Map propertyValues, Map variables); + + List verify(Map propertyValues, VersionedParameterContext parameterContext, Map variables); + + List verify(VersionedExternalFlow versionedExternalFlow, Map variables); + + Object invokeConnectorMethod(String methodName, Map arguments) throws InvocationFailedException; + + T invokeConnectorMethod(String methodName, Map arguments, Class returnType) throws InvocationFailedException; + +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceLifecycle.java b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceLifecycle.java new file mode 100644 index 0000000..230391a --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceLifecycle.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import java.util.concurrent.CompletableFuture; + +public interface ControllerServiceLifecycle { + + ControllerServiceState getState(); + + CompletableFuture enable(); + + CompletableFuture disable(); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceReferenceHierarchy.java b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceReferenceHierarchy.java new file mode 100644 index 0000000..d9b2b80 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceReferenceHierarchy.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +public enum ControllerServiceReferenceHierarchy { + + /** + * Interact only with Controller Services that are directly within the Process Group. + */ + DIRECT_SERVICES_ONLY, + + /** + * Interact with Controller Services within the Process Group and all child Process Groups, recursively. + */ + INCLUDE_CHILD_GROUPS; +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceReferenceScope.java b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceReferenceScope.java new file mode 100644 index 0000000..22edd2f --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceReferenceScope.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +public enum ControllerServiceReferenceScope { + + /** + * Enable all Controller Services in this Process Group and all child Process Groups. + */ + INCLUDE_ALL, + + /** + * Enable only those Controller Services that are directly referenced by Processors in this group and + * Controller Services that are referenced by those Controller Services, recursively. + */ + INCLUDE_REFERENCED_SERVICES_ONLY; +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceState.java b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceState.java new file mode 100644 index 0000000..25d3484 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ControllerServiceState.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +public enum ControllerServiceState { + DISABLED, + + DISABLING, + + ENABLED, + + ENABLING; + +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/FlowContext.java b/src/main/java/org/apache/nifi/components/connector/components/FlowContext.java new file mode 100644 index 0000000..214d2d7 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/FlowContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import org.apache.nifi.components.connector.ConnectorConfigurationContext; +import org.apache.nifi.flow.Bundle; + +public interface FlowContext { + + ProcessGroupFacade getRootGroup(); + + ParameterContextFacade getParameterContext(); + + ConnectorConfigurationContext getConfigurationContext(); + + FlowContextType getType(); + + /** + * Returns the bundle that indicates the version/coordinates that were used to create the + * configuration represented by this FlowContext. + * + * @return the bundle + */ + Bundle getBundle(); +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/FlowContextType.java b/src/main/java/org/apache/nifi/components/connector/components/FlowContextType.java new file mode 100644 index 0000000..c668a13 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/FlowContextType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +public enum FlowContextType { + + ACTIVE, + + WORKING; +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/MethodArgument.java b/src/main/java/org/apache/nifi/components/connector/components/MethodArgument.java new file mode 100644 index 0000000..5e6a4af --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/MethodArgument.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +/** + * Annotation that can be provided as part of a {@link ConnectorMethod} definition to describe the arguments + * that the method accepts. This annotation is used to provide metadata about the method's arguments + * to facilitate dynamic invocation and documentation generation. + */ +public @interface MethodArgument { + String name(); + + Class type(); + + String description() default ""; + + boolean required() default true; +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ParameterContextFacade.java b/src/main/java/org/apache/nifi/components/connector/components/ParameterContextFacade.java new file mode 100644 index 0000000..eb0b02d --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ParameterContextFacade.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import org.apache.nifi.asset.Asset; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Set; + +public interface ParameterContextFacade { + + /** + * Updates the parameters in the Parameter Context with the given collection of ParameterValue objects. + * If a parameter does not already exist, it will be created. If it does exist, its value will be updated. + * If any parameter already exists but is not included in the given collection, it will remain unchanged. + * + * @param parameterValues the collection of ParameterValue objects to set or update in the Parameter Context + * @throws IllegalArgumentException if the sensitivity of a parameter does not match the existing parameter's sensitivity + */ + void updateParameters(Collection parameterValues); + + /** + * Gets the value of a parameter from the Parameter Context. + * @param parameterName the name of the parameter to retrieve + * @return the value of the parameter, or null if it is not set + */ + String getValue(String parameterName); + + /** + * Returns the names of all parameters that have been set in the Parameter Context. + * @return the names of all parameters that have been set in the Parameter Context. + */ + Set getDefinedParameterNames(); + + /** + * Checks if a parameter is marked as sensitive in the Parameter Context. + * @param parameterName the name of the parameter to check + * @return true if the parameter is marked as sensitive, false if it is not sensitive or is not known + */ + boolean isSensitive(String parameterName); + + /** + * Creates an asset whose contents are provided by the given InputStream. The asset may then be associated with a parameter + * by creating a ParameterValue that references the asset and updating parameters via updateParameters(Collection). + * + * @param inputStream the InputStream containing the asset contents + * @throws IOException if an error occurs while reading from the InputStream or storing the asset + * @return the asset that was created + */ + Asset createAsset(InputStream inputStream) throws IOException; +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ParameterValue.java b/src/main/java/org/apache/nifi/components/connector/components/ParameterValue.java new file mode 100644 index 0000000..6a77467 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ParameterValue.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import org.apache.nifi.asset.Asset; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class ParameterValue { + private final String name; + private final String value; + private final boolean sensitive; + private final List assets; + + private ParameterValue(final Builder builder) { + this.name = builder.name; + this.value = builder.value; + this.sensitive = builder.sensitive; + this.assets = Collections.unmodifiableList(builder.referencedAssets); + } + + public String getName() { + return name; + } + + public String getValue() { + return value; + } + + public boolean isSensitive() { + return sensitive; + } + + public List getAssets() { + return assets; + } + + @Override + public String toString() { + return "ParameterValue{" + + "name=" + name + + ", value=" + (sensitive ? "****" : value) + + ", sensitive=" + sensitive + + ", assets=" + assets + + '}'; + } + + public static class Builder { + private String name; + private String value; + private boolean sensitive; + private final List referencedAssets = new ArrayList<>(); + + public Builder name(final String name) { + this.name = name; + return this; + } + + public Builder value(final String value) { + this.value = value; + return this; + } + + public Builder sensitive(final boolean sensitive) { + this.sensitive = sensitive; + return this; + } + + public Builder addReferencedAsset(final Asset asset) { + this.referencedAssets.add(asset); + return this; + } + + public ParameterValue build() { + if (name == null || name.isEmpty()) { + throw new IllegalStateException("Parameter name must be provided"); + } + if (value != null && !referencedAssets.isEmpty()) { + throw new IllegalStateException("Parameter cannot have both a value and referenced assets"); + } + return new ParameterValue(this); + } + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupFacade.java b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupFacade.java new file mode 100644 index 0000000..79655fd --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupFacade.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.flow.VersionedProcessGroup; + +import java.util.Set; + +public interface ProcessGroupFacade { + + VersionedProcessGroup getDefinition(); + + ProcessorFacade getProcessor(String id); + + Set getProcessors(); + + ControllerServiceFacade getControllerService(String id); + + Set getControllerServices(); + + Set getControllerServices(ControllerServiceReferenceScope referenceScope, ControllerServiceReferenceHierarchy hierarchy); + + ConnectionFacade getConnection(String id); + + Set getConnections(); + + ProcessGroupFacade getProcessGroup(String id); + + Set getProcessGroups(); + + QueueSize getQueueSize(); + + boolean isFlowEmpty(); + + StatelessGroupLifecycle getStatelessLifecycle(); + + ProcessGroupLifecycle getLifecycle(); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java new file mode 100644 index 0000000..963643c --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +public interface ProcessGroupLifecycle { + + CompletableFuture enableControllerServices(ControllerServiceReferenceScope scope, ControllerServiceReferenceHierarchy hierarchy); + + CompletableFuture enableControllerServices(Collection serviceIdentifiers); + + CompletableFuture disableControllerServices(ControllerServiceReferenceHierarchy hierarchy); + + CompletableFuture disableControllerServices(Collection serviceIdentifiers); + + CompletableFuture startProcessors(); + + CompletableFuture start(ControllerServiceReferenceScope serviceReferenceScope); + + CompletableFuture stop(); + + CompletableFuture stopProcessors(); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ProcessorFacade.java b/src/main/java/org/apache/nifi/components/connector/components/ProcessorFacade.java new file mode 100644 index 0000000..bfa32b8 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ProcessorFacade.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.InvocationFailedException; +import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedProcessor; + +import java.util.List; +import java.util.Map; + +public interface ProcessorFacade { + + VersionedProcessor getDefinition(); + + ProcessorLifecycle getLifecycle(); + + List validate(); + + List validate(Map propertyValues); + + List verify(Map propertyValues, Map attributes); + + List verify(Map propertyValues, VersionedParameterContext parameterContext, Map attributes); + + List verify(VersionedExternalFlow versionedExternalFlow, Map attributes); + + Object invokeConnectorMethod(String methodName, Map arguments) throws InvocationFailedException; + + T invokeConnectorMethod(String methodName, Map arguments, Class returnType) throws InvocationFailedException; +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ProcessorLifecycle.java b/src/main/java/org/apache/nifi/components/connector/components/ProcessorLifecycle.java new file mode 100644 index 0000000..92ef6f7 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ProcessorLifecycle.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import java.util.concurrent.CompletableFuture; + +public interface ProcessorLifecycle { + + ProcessorState getState(); + + int getActiveThreadCount(); + + void terminate(); + + CompletableFuture stop(); + + CompletableFuture start(); + + void disable(); + + void enable(); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ProcessorState.java b/src/main/java/org/apache/nifi/components/connector/components/ProcessorState.java new file mode 100644 index 0000000..b0dee6e --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/ProcessorState.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +public enum ProcessorState { + DISABLED, + + STOPPING, + + STOPPED, + + RUNNING, + + STARTING; +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/StatelessGroupLifecycle.java b/src/main/java/org/apache/nifi/components/connector/components/StatelessGroupLifecycle.java new file mode 100644 index 0000000..d1b606a --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/StatelessGroupLifecycle.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import java.util.concurrent.CompletableFuture; + +public interface StatelessGroupLifecycle { + + CompletableFuture start(); + + CompletableFuture stop(); + + CompletableFuture terminate(); + +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/StatelessGroupState.java b/src/main/java/org/apache/nifi/components/connector/components/StatelessGroupState.java new file mode 100644 index 0000000..0762056 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/StatelessGroupState.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +public enum StatelessGroupState { + + STARTING, + + RUNNING, + + STOPPING, + + STOPPED, + + DISABLED; + +} diff --git a/src/main/java/org/apache/nifi/flow/ComponentType.java b/src/main/java/org/apache/nifi/flow/ComponentType.java index 23b2ef3..25f25f3 100644 --- a/src/main/java/org/apache/nifi/flow/ComponentType.java +++ b/src/main/java/org/apache/nifi/flow/ComponentType.java @@ -34,7 +34,8 @@ public enum ComponentType { FLOW_ANALYSIS_RULE("Flow Analysis Rule"), PARAMETER_CONTEXT("Parameter Context"), PARAMETER_PROVIDER("Parameter Provider"), - FLOW_REGISTRY_CLIENT("Flow Registry Client"); + FLOW_REGISTRY_CLIENT("Flow Registry Client"), + CONNECTOR("Connector"); private final String typeName; diff --git a/src/main/java/org/apache/nifi/flow/VersionedConfigurationStep.java b/src/main/java/org/apache/nifi/flow/VersionedConfigurationStep.java new file mode 100644 index 0000000..f2c6bac --- /dev/null +++ b/src/main/java/org/apache/nifi/flow/VersionedConfigurationStep.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.flow; + +import java.util.Map; + +public class VersionedConfigurationStep { + private String name; + private Map properties; + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(final Map properties) { + this.properties = properties; + } +} diff --git a/src/main/java/org/apache/nifi/flow/VersionedConnector.java b/src/main/java/org/apache/nifi/flow/VersionedConnector.java new file mode 100644 index 0000000..96dad34 --- /dev/null +++ b/src/main/java/org/apache/nifi/flow/VersionedConnector.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.flow; + +import java.util.List; + +public class VersionedConnector { + private String instanceIdentifier; + private String name; + private ScheduledState scheduledState; + private List activeFlowConfiguration; + private List workingFlowConfiguration; + private String type; + private Bundle bundle; + + public String getInstanceIdentifier() { + return instanceIdentifier; + } + + public void setInstanceIdentifier(final String instanceIdentifier) { + this.instanceIdentifier = instanceIdentifier; + } + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public ScheduledState getScheduledState() { + return scheduledState; + } + + public void setScheduledState(final ScheduledState scheduledState) { + this.scheduledState = scheduledState; + } + + public List getActiveFlowConfiguration() { + return activeFlowConfiguration; + } + + public void setActiveFlowConfiguration(final List configurationSteps) { + this.activeFlowConfiguration = configurationSteps; + } + + public List getWorkingFlowConfiguration() { + return workingFlowConfiguration; + } + + public void setWorkingFlowConfiguration(final List workingFlowConfiguration) { + this.workingFlowConfiguration = workingFlowConfiguration; + } + + public String getType() { + return type; + } + + public void setType(final String type) { + this.type = type; + } + + public Bundle getBundle() { + return bundle; + } + + public void setBundle(final Bundle bundle) { + this.bundle = bundle; + } +} diff --git a/src/main/java/org/apache/nifi/flow/VersionedConnectorPropertyGroup.java b/src/main/java/org/apache/nifi/flow/VersionedConnectorPropertyGroup.java new file mode 100644 index 0000000..dcf1722 --- /dev/null +++ b/src/main/java/org/apache/nifi/flow/VersionedConnectorPropertyGroup.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.flow; + +import java.util.Map; + +public class VersionedConnectorPropertyGroup { + private String name; + private Map properties; + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(final Map properties) { + this.properties = properties; + } +} diff --git a/src/main/java/org/apache/nifi/flow/VersionedConnectorValueReference.java b/src/main/java/org/apache/nifi/flow/VersionedConnectorValueReference.java new file mode 100644 index 0000000..4748508 --- /dev/null +++ b/src/main/java/org/apache/nifi/flow/VersionedConnectorValueReference.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.flow; + +import java.util.Objects; + +/** + * Represents a property value reference for a Connector in a versioned flow. + * This class is used for serialization/deserialization of connector property values + * that may reference different types of values (literals, assets, secrets). + */ +public class VersionedConnectorValueReference { + private String valueType; + private String value; + private String assetId; + private String providerId; + private String providerName; + private String secretName; + + public String getValueType() { + return valueType; + } + + public void setValueType(final String valueType) { + this.valueType = valueType; + } + + public String getValue() { + return value; + } + + public void setValue(final String value) { + this.value = value; + } + + public String getAssetId() { + return assetId; + } + + public void setAssetId(final String assetId) { + this.assetId = assetId; + } + + public String getProviderId() { + return providerId; + } + + public void setProviderId(final String providerId) { + this.providerId = providerId; + } + + public String getProviderName() { + return providerName; + } + + public void setProviderName(final String providerName) { + this.providerName = providerName; + } + + public String getSecretName() { + return secretName; + } + + public void setSecretName(final String secretName) { + this.secretName = secretName; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof VersionedConnectorValueReference other)) { + return false; + } + return Objects.equals(valueType, other.valueType) + && Objects.equals(value, other.value) + && Objects.equals(assetId, other.assetId) + && Objects.equals(providerId, other.providerId) + && Objects.equals(secretName, other.secretName); + } + + @Override + public int hashCode() { + return Objects.hash(valueType, value, assetId, providerId, secretName); + } + + @Override + public String toString() { + return "VersionedConnectorValueReference[valueType=" + valueType + ", value=" + value + + ", assetId=" + assetId + ", providerId=" + providerId + ", secretName=" + secretName + "]"; + } +} diff --git a/src/main/java/org/apache/nifi/parameter/ParameterProvider.java b/src/main/java/org/apache/nifi/parameter/ParameterProvider.java index 7179bfb..c542e61 100644 --- a/src/main/java/org/apache/nifi/parameter/ParameterProvider.java +++ b/src/main/java/org/apache/nifi/parameter/ParameterProvider.java @@ -22,7 +22,10 @@ import org.apache.nifi.reporting.InitializationException; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * Defines a provider that is responsible for fetching from an external source Parameters with @@ -84,4 +87,56 @@ public interface ParameterProvider extends ConfigurableComponent { * @throws IOException if there is an I/O problem while fetching the Parameters */ List fetchParameters(ConfigurationContext context) throws IOException; + + /** + * Fetches named groups of parameters from an external source, filtering to only include the specified parameter names. + * It is up to the implementation to determine how a fully qualified parameter name maps to a group and parameter name + * and to optimize the fetching accordingly. The default implementation fetches all parameters and filters them, assuming + * that the fully qualified parameter name is of the form "GroupName.ParameterName". + * + * @param context The ConfigurationContextfor the provider + * @param fullyQualifiedParameterNames the fully qualified names of the parameters to fetch + * @return A list of fetched Parameter groups containing only the specified parameters + * @throws IOException if there is an I/O problem while fetching the Parameters + */ + default List fetchParameters(ConfigurationContext context, List fullyQualifiedParameterNames) throws IOException { + final List allGroups = fetchParameters(context); + final List filteredGroups = new ArrayList<>(); + + for (final ParameterGroup group : allGroups) { + // Determine which parameter names are desired from this group + final List desiredParameterNames = new ArrayList<>(); + final String prefix = group.getGroupName() + "."; + for (final String fullyQualifiedParameterName : fullyQualifiedParameterNames) { + if (fullyQualifiedParameterName.startsWith(prefix)) { + final String secretName = fullyQualifiedParameterName.substring(prefix.length()); + desiredParameterNames.add(secretName); + } + } + + // If no parameters are desired from this group, skip it + if (desiredParameterNames.isEmpty()) { + continue; + } + + // Create a HashSet for quick lookup + final Set parameterNameSet = new HashSet<>(desiredParameterNames); + final List filteredParameters = new ArrayList<>(); + for (final Parameter parameter : group.getParameters()) { + if (!parameterNameSet.contains(parameter.getDescriptor().getName())) { + continue; + } + + filteredParameters.add(parameter); + } + + // If we found any desired parameters, add them to the result + if (!filteredParameters.isEmpty()) { + filteredGroups.add(new ParameterGroup(group.getGroupName(), filteredParameters)); + } + } + + // Return the filtered groups + return filteredGroups; + } } diff --git a/src/main/java/org/apache/nifi/reporting/ComponentType.java b/src/main/java/org/apache/nifi/reporting/ComponentType.java index 95cec81..df02961 100644 --- a/src/main/java/org/apache/nifi/reporting/ComponentType.java +++ b/src/main/java/org/apache/nifi/reporting/ComponentType.java @@ -74,5 +74,10 @@ public enum ComponentType { /** * Bulletin is associated with a Flow Registry Client */ - FLOW_REGISTRY_CLIENT; + FLOW_REGISTRY_CLIENT, + + /** + * Bulletin is associated with a Connector + */ + CONNECTOR; } diff --git a/src/main/java/org/apache/nifi/web/NiFiConnectorWebContext.java b/src/main/java/org/apache/nifi/web/NiFiConnectorWebContext.java new file mode 100644 index 0000000..4fdfeb1 --- /dev/null +++ b/src/main/java/org/apache/nifi/web/NiFiConnectorWebContext.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web; + +import org.apache.nifi.components.connector.components.FlowContext; + +/** + * NiFi web context providing access to Connector instances for + * connector custom UIs. + */ +public interface NiFiConnectorWebContext { + + /** + * Returns the Connector Web Context for the given connector ID. + * The returned Connector can be cast to a connector-specific interface + * if the custom UI's classloader has visibility to that interface + * (typically through the NAR classloader hierarchy). Active and + * working flow context are provided for invoking connector methods + * on components within each of those flow contexts. + * + * @param the expected type of the Connector + * @param connectorId the ID of the connector to retrieve + * @return the ConnectorWebContext instance + * @throws IllegalArgumentException if the connector does not exist + */ + ConnectorWebContext getConnectorWebContext(String connectorId) throws IllegalArgumentException; + + /** + * Hold the context needed to work with the Connector within a custom ui + * + * @param the expected type of the Connector + * @param connector the Connector instance + * @param workingFlowContext the working {@link FlowContext} for the connector instance + * @param activeFlowContext the active {@link FlowContext} for the connector instance + */ + record ConnectorWebContext(T connector, FlowContext workingFlowContext, FlowContext activeFlowContext) { + } +} diff --git a/src/test/java/org/apache/nifi/components/connector/TestAbstractConnector.java b/src/test/java/org/apache/nifi/components/connector/TestAbstractConnector.java new file mode 100644 index 0000000..c4d96bf --- /dev/null +++ b/src/test/java/org/apache/nifi/components/connector/TestAbstractConnector.java @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.components.ProcessGroupFacade; +import org.apache.nifi.flow.VersionedExternalFlow; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class TestAbstractConnector { + + @Mock + private FlowContext flowContext; + + @Mock + private ConnectorConfigurationContext configurationContext; + + @Mock + private ConnectorPropertyValue mockPropertyValue; + + private TestableAbstractConnector connector; + private ConnectorValidationContext validationContext; + + + @BeforeEach + void setUp() { + connector = new TestableAbstractConnector(); + validationContext = new TestConnectorValidationContext(); + when(flowContext.getConfigurationContext()).thenReturn(configurationContext); + final ProcessGroupFacade rootGroupFacade = mock(ProcessGroupFacade.class); + when(rootGroupFacade.getProcessors()).thenReturn(Collections.emptySet()); + when(rootGroupFacade.getProcessGroups()).thenReturn(Collections.emptySet()); + when(rootGroupFacade.getControllerServices()).thenReturn(Collections.emptySet()); + when(rootGroupFacade.getConnections()).thenReturn(Collections.emptySet()); + when(flowContext.getRootGroup()).thenReturn(rootGroupFacade); + } + + @Test + void testValidateWithEmptyConfigurationSteps() { + connector.setConfigurationSteps(Collections.emptyList()); + + final List results = connector.validate(flowContext, validationContext); + + assertTrue(results.isEmpty()); + assertTrue(connector.isCustomValidateCalled()); + } + + @Test + void testValidateWithRequiredPropertyMissing() { + final ConnectorPropertyDescriptor requiredProperty = new ConnectorPropertyDescriptor.Builder() + .name("Required Property") + .description("A required property") + .required(true) + .build(); + + final ConnectorPropertyGroup propertyGroup = ConnectorPropertyGroup.builder() + .name("Test Group") + .addProperty(requiredProperty) + .build(); + + final ConfigurationStep configStep = new ConfigurationStep.Builder() + .name("Test Step") + .propertyGroups(List.of(propertyGroup)) + .build(); + + connector.setConfigurationSteps(List.of(configStep)); + when(configurationContext.getProperty("Test Step", "Required Property")).thenReturn(null); + + final List results = connector.validate(flowContext, validationContext); + + assertEquals(1, results.size()); + final ValidationResult result = results.getFirst(); + assertFalse(result.isValid()); + assertEquals("Required Property", result.getSubject()); + assertNull(result.getInput()); + assertEquals("Required Property is required", result.getExplanation()); + assertFalse(connector.isCustomValidateCalled()); + } + + @Test + void testValidateWithOptionalPropertyMissing() { + final ConnectorPropertyDescriptor optionalProperty = new ConnectorPropertyDescriptor.Builder() + .name("Optional Property") + .description("An optional property") + .required(false) + .build(); + + final ConnectorPropertyGroup propertyGroup = ConnectorPropertyGroup.builder() + .name("Test Group") + .addProperty(optionalProperty) + .build(); + + final ConfigurationStep configStep = new ConfigurationStep.Builder() + .name("Test Step") + .propertyGroups(List.of(propertyGroup)) + .build(); + + connector.setConfigurationSteps(List.of(configStep)); + when(configurationContext.getProperty("Test Step", "Optional Property")).thenReturn(null); + + final List results = connector.validate(flowContext, validationContext); + + assertTrue(results.isEmpty()); + assertTrue(connector.isCustomValidateCalled()); + } + + @Test + void testValidateWithInvalidPropertyValue() { + final ConnectorPropertyDescriptor propertyWithValidator = new ConnectorPropertyDescriptor.Builder() + .name("Validated Property") + .description("A property with validation") + .required(true) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + + final ConnectorPropertyGroup propertyGroup = ConnectorPropertyGroup.builder() + .name("Test Group") + .addProperty(propertyWithValidator) + .build(); + + final ConfigurationStep configStep = new ConfigurationStep.Builder() + .name("Test Step") + .propertyGroups(List.of(propertyGroup)) + .build(); + + connector.setConfigurationSteps(List.of(configStep)); + when(mockPropertyValue.getValue()).thenReturn(""); + when(configurationContext.getProperty("Test Step", "Validated Property")).thenReturn(mockPropertyValue); + + final List results = connector.validate(flowContext, validationContext); + + assertEquals(1, results.size()); + final ValidationResult result = results.getFirst(); + assertFalse(result.isValid()); + assertEquals("Validated Property", result.getSubject()); + assertFalse(connector.isCustomValidateCalled()); + } + + @Test + void testValidateWithValidPropertyValue() { + final ConnectorPropertyDescriptor validProperty = new ConnectorPropertyDescriptor.Builder() + .name("Valid Property") + .description("A valid property") + .required(true) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + + final ConnectorPropertyGroup propertyGroup = ConnectorPropertyGroup.builder() + .name("Test Group") + .addProperty(validProperty) + .build(); + + final ConfigurationStep configStep = new ConfigurationStep.Builder() + .name("Test Step") + .propertyGroups(List.of(propertyGroup)) + .build(); + + connector.setConfigurationSteps(List.of(configStep)); + when(mockPropertyValue.getValue()).thenReturn("valid-value"); + when(mockPropertyValue.isSet()).thenReturn(true); + when(configurationContext.getProperty("Test Step", "Valid Property")).thenReturn(mockPropertyValue); + + final List results = connector.validate(flowContext, validationContext); + + assertTrue(results.isEmpty()); + assertTrue(connector.isCustomValidateCalled()); + } + + @Test + void testValidateWithPropertyDependencyNotSatisfied() { + final ConnectorPropertyDescriptor dependencyProperty = new ConnectorPropertyDescriptor.Builder() + .name("Dependency Property") + .description("The dependency property") + .required(false) + .build(); + + final ConnectorPropertyDescriptor dependentProperty = new ConnectorPropertyDescriptor.Builder() + .name("Dependent Property") + .description("Property that depends on another") + .required(true) + .dependsOn(dependencyProperty, "Required Value") + .build(); + + final ConnectorPropertyGroup propertyGroup = ConnectorPropertyGroup.builder() + .name("Test Group") + .addProperty(dependencyProperty) + .addProperty(dependentProperty) + .build(); + + final ConfigurationStep configStep = new ConfigurationStep.Builder() + .name("Test Step") + .propertyGroups(List.of(propertyGroup)) + .build(); + + connector.setConfigurationSteps(List.of(configStep)); + when(mockPropertyValue.getValue()).thenReturn("Wrong Value"); + when(configurationContext.getProperty("Test Step", "Dependency Property")).thenReturn(mockPropertyValue); + + final List results = connector.validate(flowContext, validationContext); + + assertTrue(results.isEmpty()); + assertTrue(connector.isCustomValidateCalled()); + } + + @Test + void testValidateWithPropertyDependencySatisfiedButMissingRequiredValue() { + final ConnectorPropertyDescriptor dependencyProperty = new ConnectorPropertyDescriptor.Builder() + .name("Dependency Property") + .description("The dependency property") + .required(false) + .build(); + + final ConnectorPropertyDescriptor dependentProperty = new ConnectorPropertyDescriptor.Builder() + .name("Dependent Property") + .description("Property that depends on another") + .required(true) + .dependsOn(dependencyProperty, "Required Value") + .build(); + + final ConnectorPropertyGroup propertyGroup = ConnectorPropertyGroup.builder() + .name("Test Group") + .addProperty(dependencyProperty) + .addProperty(dependentProperty) + .build(); + + final ConfigurationStep configStep = new ConfigurationStep.Builder() + .name("Test Step") + .propertyGroups(List.of(propertyGroup)) + .build(); + + connector.setConfigurationSteps(List.of(configStep)); + final ConnectorPropertyValue dependencyValue = mock(ConnectorPropertyValue.class); + when(dependencyValue.getValue()).thenReturn("Required Value"); + when(configurationContext.getProperty("Test Step", "Dependency Property")).thenReturn(dependencyValue); + when(configurationContext.getProperty("Test Step", "Dependent Property")).thenReturn(null); + + final List results = connector.validate(flowContext, validationContext); + + assertEquals(1, results.size()); + final ValidationResult result = results.getFirst(); + assertFalse(result.isValid()); + assertEquals("Dependent Property", result.getSubject()); + assertEquals("Dependent Property is required", result.getExplanation()); + assertFalse(connector.isCustomValidateCalled()); + } + + @Test + void testValidateWithMultipleConfigurationSteps() { + final ConnectorPropertyDescriptor prop1 = new ConnectorPropertyDescriptor.Builder() + .name("Property One") + .required(true) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + + final ConnectorPropertyDescriptor prop2 = new ConnectorPropertyDescriptor.Builder() + .name("Property Two") + .required(true) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + + final ConnectorPropertyGroup group1 = ConnectorPropertyGroup.builder() + .name("Group One") + .addProperty(prop1) + .build(); + + final ConnectorPropertyGroup group2 = ConnectorPropertyGroup.builder() + .name("Group Two") + .addProperty(prop2) + .build(); + + final ConfigurationStep step1 = new ConfigurationStep.Builder() + .name("Step One") + .propertyGroups(List.of(group1)) + .build(); + + final ConfigurationStep step2 = new ConfigurationStep.Builder() + .name("Step Two") + .propertyGroups(List.of(group2)) + .build(); + + connector.setConfigurationSteps(List.of(step1, step2)); + final ConnectorPropertyValue validValue = mock(ConnectorPropertyValue.class); + when(validValue.getValue()).thenReturn("valid"); + when(validValue.isSet()).thenReturn(true); + final ConnectorPropertyValue invalidValue = mock(ConnectorPropertyValue.class); + when(invalidValue.getValue()).thenReturn(""); + when(invalidValue.isSet()).thenReturn(true); + when(configurationContext.getProperty("Step One", "Property One")).thenReturn(validValue); + when(configurationContext.getProperty("Step Two", "Property Two")).thenReturn(invalidValue); + + final List results = connector.validate(flowContext, validationContext); + + assertEquals(1, results.size()); + final ValidationResult result = results.getFirst(); + assertFalse(result.isValid()); + assertEquals("Property Two", result.getSubject()); + assertFalse(connector.isCustomValidateCalled()); + } + + @Test + void testValidateWithCustomValidationErrors() { + connector.setConfigurationSteps(Collections.emptyList()); + connector.setCustomValidationResults(List.of( + new ValidationResult.Builder() + .valid(false) + .subject("Custom Error") + .explanation("Custom validation failed") + .build() + )); + + final List results = connector.validate(flowContext, validationContext); + + assertEquals(1, results.size()); + final ValidationResult result = results.getFirst(); + assertFalse(result.isValid()); + assertEquals("Custom Error", result.getSubject()); + assertEquals("Custom validation failed", result.getExplanation()); + assertTrue(connector.isCustomValidateCalled()); + } + + @Test + void testValidateWithCustomValidationReturningNull() { + connector.setConfigurationSteps(Collections.emptyList()); + connector.setCustomValidationResults(null); + + final List results = connector.validate(flowContext, validationContext); + + assertTrue(results.isEmpty()); + assertTrue(connector.isCustomValidateCalled()); + } + + @Test + void testValidateWithCustomValidationReturningValidResults() { + connector.setConfigurationSteps(Collections.emptyList()); + connector.setCustomValidationResults(List.of( + new ValidationResult.Builder() + .valid(true) + .subject("Custom Check") + .explanation("Custom validation passed") + .build() + )); + + final List results = connector.validate(flowContext, validationContext); + + assertTrue(results.isEmpty()); + assertTrue(connector.isCustomValidateCalled()); + } + + @Test + void testValidateWithCircularPropertyDependency() { + final ConnectorPropertyDescriptor prop1 = new ConnectorPropertyDescriptor.Builder() + .name("Property One") + .required(false) + .build(); + + final ConnectorPropertyDescriptor prop2 = new ConnectorPropertyDescriptor.Builder() + .name("Property Two") + .required(false) + .dependsOn(prop1, "Value One") + .build(); + + final ConnectorPropertyDescriptor circularProp1 = new ConnectorPropertyDescriptor.Builder() + .name("Property One") + .required(false) + .dependsOn(prop2, "Value Two") + .build(); + + final ConnectorPropertyGroup propertyGroup = ConnectorPropertyGroup.builder() + .name("Test Group") + .addProperty(circularProp1) + .addProperty(prop2) + .build(); + + final ConfigurationStep configStep = new ConfigurationStep.Builder() + .name("Test Step") + .propertyGroups(List.of(propertyGroup)) + .build(); + + connector.setConfigurationSteps(List.of(configStep)); + final ConnectorPropertyValue value1 = mock(ConnectorPropertyValue.class); + when(value1.getValue()).thenReturn("Value One"); + final ConnectorPropertyValue value2 = mock(ConnectorPropertyValue.class); + when(value2.getValue()).thenReturn("Value Two"); + when(configurationContext.getProperty("Test Step", "Property One")).thenReturn(value1); + when(configurationContext.getProperty("Test Step", "Property Two")).thenReturn(value2); + + final List results = connector.validate(flowContext, validationContext); + + assertTrue(results.isEmpty()); + assertTrue(connector.isCustomValidateCalled()); + } + + private static class TestableAbstractConnector extends AbstractConnector { + private List configurationSteps = Collections.emptyList(); + private Collection customValidationResults = Collections.emptyList(); + private boolean customValidateCalled = false; + + public void setConfigurationSteps(final List steps) { + this.configurationSteps = steps; + } + + public void setCustomValidationResults(final Collection results) { + this.customValidationResults = results; + } + + public boolean isCustomValidateCalled() { + return customValidateCalled; + } + + @Override + public VersionedExternalFlow getInitialFlow() { + return null; + } + + @Override + public List getConfigurationSteps(final FlowContext workingContext) { + return configurationSteps; + } + + @Override + protected Collection customValidate(final ConnectorConfigurationContext context) { + customValidateCalled = true; + return customValidationResults; + } + + @Override + public void onStepConfigured(final String stepName, final FlowContext workingContext) { + } + + @Override + public void prepareForUpdate(final FlowContext workingFlowContext, final FlowContext activeFlowContext) { + } + + @Override + public void applyUpdate(final FlowContext workingFlowContext, final FlowContext activeFlowContext) { + } + + @Override + public List verifyConfigurationStep(final String stepName, final Map overrides, final FlowContext flowContext) { + return Collections.emptyList(); + } + + @Override + public List verify(final FlowContext flowContext) { + return List.of(); + } + } + + private static final Validator NON_EMPTY_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + if (input == null || input.trim().isEmpty()) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation(subject + " cannot be empty") + .build(); + } + + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(true) + .build(); + } + }; + + /** + * Simple test implementation of ConnectorValidationContext for unit testing. + */ + private static class TestConnectorValidationContext implements ConnectorValidationContext { + @Override + public ValidationContext createValidationContext(final String stepName, final String groupName) { + // Return null as it's not needed for basic validation tests + return null; + } + + @Override + public List fetchAllowableValues(final String stepName, final String propertyName) { + // Return empty list as we don't need to fetch dynamic allowable values in these tests + return Collections.emptyList(); + } + } + +} diff --git a/src/test/java/org/apache/nifi/components/connector/TestConnectorPropertyDescriptor.java b/src/test/java/org/apache/nifi/components/connector/TestConnectorPropertyDescriptor.java new file mode 100644 index 0000000..823c3ef --- /dev/null +++ b/src/test/java/org/apache/nifi/components/connector/TestConnectorPropertyDescriptor.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestConnectorPropertyDescriptor { + + private static final String TEST_STEP_NAME = "test-step"; + private static final String TEST_GROUP_NAME = "test-group"; + + @Test + void testValidateStringType() { + final ConnectorPropertyDescriptor descriptor = new ConnectorPropertyDescriptor.Builder() + .name("String Property") + .type(PropertyType.STRING) + .build(); + + final ConnectorValidationContext context = new TestConnectorValidationContext(); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "any string value", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "!@#$%^&*()_+-=[]{}|;:',.<>?/~`", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "", context).isValid()); + } + + @Test + void testValidatePasswordType() { + final ConnectorPropertyDescriptor descriptor = new ConnectorPropertyDescriptor.Builder() + .name("Password Property") + .type(PropertyType.SECRET) + .build(); + + final ConnectorValidationContext context = new TestConnectorValidationContext(); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "secretPassword123!", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "", context).isValid()); + } + + @Test + void testValidateStringListType() { + final ConnectorPropertyDescriptor descriptor = new ConnectorPropertyDescriptor.Builder() + .name("String List Property") + .type(PropertyType.STRING_LIST) + .build(); + + final ConnectorValidationContext context = new TestConnectorValidationContext(); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "item1,item2,item3", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "", context).isValid()); + } + + @Test + void testValidateBooleanTypeWithValidValues() { + final ConnectorPropertyDescriptor descriptor = new ConnectorPropertyDescriptor.Builder() + .name("Boolean Property") + .type(PropertyType.BOOLEAN) + .build(); + + final ConnectorValidationContext context = new TestConnectorValidationContext(); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "true", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "false", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "TRUE", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "FALSE", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "TrUe", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "FaLsE", context).isValid()); + } + + @Test + void testValidateBooleanTypeWithInvalidValues() { + final ConnectorPropertyDescriptor descriptor = new ConnectorPropertyDescriptor.Builder() + .name("Boolean Property") + .type(PropertyType.BOOLEAN) + .build(); + + final ConnectorValidationContext context = new TestConnectorValidationContext(); + ValidationResult result = descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "invalid", context); + assertFalse(result.isValid()); + assertEquals("Boolean Property", result.getSubject()); + assertEquals("invalid", result.getInput()); + assertEquals("Value must be true or false", result.getExplanation()); + + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "1", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "0", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "yes", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "no", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "", context).isValid()); + } + + @Test + void testValidateIntegerTypeWithValidValues() { + final ConnectorPropertyDescriptor descriptor = new ConnectorPropertyDescriptor.Builder() + .name("Integer Property") + .type(PropertyType.INTEGER) + .build(); + + final ConnectorValidationContext context = new TestConnectorValidationContext(); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "12345", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "-12345", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "0", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "00123", context).isValid()); + } + + @Test + void testValidateIntegerTypeWithInvalidValues() { + final ConnectorPropertyDescriptor descriptor = new ConnectorPropertyDescriptor.Builder() + .name("Integer Property") + .type(PropertyType.INTEGER) + .build(); + + final ConnectorValidationContext context = new TestConnectorValidationContext(); + ValidationResult result = descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "123.45", context); + assertFalse(result.isValid()); + assertEquals("Integer Property", result.getSubject()); + assertEquals("123.45", result.getInput()); + assertEquals("Value must be an integer", result.getExplanation()); + + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "not a number", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, " 123 ", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "+123", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "0x1A3F", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "", context).isValid()); + } + + @Test + void testValidateDoubleTypeWithValidValues() { + final ConnectorPropertyDescriptor descriptor = new ConnectorPropertyDescriptor.Builder() + .name("Double Property") + .type(PropertyType.DOUBLE) + .build(); + + final ConnectorValidationContext context = new TestConnectorValidationContext(); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "123", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "123.456", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "-123.456", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "0.0", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "0", context).isValid()); + } + + @Test + void testValidateDoubleTypeWithInvalidValues() { + final ConnectorPropertyDescriptor descriptor = new ConnectorPropertyDescriptor.Builder() + .name("Double Property") + .type(PropertyType.DOUBLE) + .build(); + + final ConnectorValidationContext context = new TestConnectorValidationContext(); + ValidationResult result = descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "not a number", context); + assertFalse(result.isValid()); + assertEquals("Double Property", result.getSubject()); + assertEquals("not a number", result.getInput()); + assertEquals("Value must be a floating point number", result.getExplanation()); + + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "123.456.789", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "1.23e10", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "123.", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, ".123", context).isValid()); + } + + @Test + void testValidateFloatTypeWithValidValues() { + final ConnectorPropertyDescriptor descriptor = new ConnectorPropertyDescriptor.Builder() + .name("Float Property") + .type(PropertyType.FLOAT) + .build(); + + final ConnectorValidationContext context = new TestConnectorValidationContext(); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "123", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "123.456", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "-123.456", context).isValid()); + assertTrue(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "0", context).isValid()); + } + + @Test + void testValidateFloatTypeWithInvalidValues() { + final ConnectorPropertyDescriptor descriptor = new ConnectorPropertyDescriptor.Builder() + .name("Float Property") + .type(PropertyType.FLOAT) + .build(); + + final ConnectorValidationContext context = new TestConnectorValidationContext(); + ValidationResult result = descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "not a number", context); + assertFalse(result.isValid()); + assertEquals("Float Property", result.getSubject()); + assertEquals("not a number", result.getInput()); + assertEquals("Value must be a floating point number", result.getExplanation()); + + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, "123.", context).isValid()); + assertFalse(descriptor.validate(TEST_STEP_NAME, TEST_GROUP_NAME, ".123", context).isValid()); + } + + /** + * Simple test implementation of ConnectorValidationContext for unit testing. + */ + private static class TestConnectorValidationContext implements ConnectorValidationContext { + @Override + public ValidationContext createValidationContext(final String stepName, final String groupName) { + // Return null as it's not needed for basic type validation tests + return null; + } + + @Override + public List fetchAllowableValues(final String stepName, final String propertyName) { + // Return empty list as we don't need to fetch dynamic allowable values in these tests + return Collections.emptyList(); + } + } +} +