From 197a49a31869aca6feebc8e2b999d9d514507353 Mon Sep 17 00:00:00 2001 From: Swagatika Beura Date: Fri, 7 Nov 2025 18:15:07 +0530 Subject: [PATCH 01/12] chore(azure): remove redundant @Slf4j annotations (part-of https://github.com/kestra-io/kestra/issues/12770) --- build.gradle | 4 ++-- src/main/java/io/kestra/plugin/azure/batch/job/Create.java | 2 +- .../kestra/plugin/azure/batch/models/ContainerRegistry.java | 1 + .../java/io/kestra/plugin/azure/datafactory/CreateRun.java | 6 +++--- src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java | 6 +++--- src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java | 4 ++-- .../io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java | 6 +++--- src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java | 4 ++-- .../azure/eventhubs/client/EventHubClientFactory.java | 4 ++-- .../azure/eventhubs/service/consumer/ConsumerContext.java | 2 +- .../eventhubs/service/consumer/EventHubConsumerService.java | 6 +++--- .../eventhubs/service/producer/EventHubProducerService.java | 2 +- .../azure/eventhubs/service/producer/ProducerContext.java | 2 +- .../io/kestra/plugin/azure/storage/adls/DeleteFiles.java | 2 +- .../io/kestra/plugin/azure/storage/blob/DeleteList.java | 2 +- .../java/io/kestra/plugin/azure/batch/job/CreateTest.java | 2 +- .../service/producer/EventHubProducerServiceTest.java | 4 ++-- 17 files changed, 30 insertions(+), 29 deletions(-) diff --git a/build.gradle b/build.gradle index 93abdc9a..1eadf74e 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ dependencies { compileOnly group: "io.kestra", name: "script" // Logs - compileOnly'org.slf4j:slf4j-api' + //compileOnly'org.slf4j:slf4j-api' // Azure libraries are managed by the the Kestra Platform so they are aligned on all plugins api (group: 'com.azure', name: 'azure-identity') { @@ -282,7 +282,7 @@ shadowJar { exclude { it.moduleGroup.startsWith('com.fasterxml.jackson') && !it.moduleName.equals('jackson-datatype-joda') } - exclude "org/slf4j/**" +// exclude "org/slf4j/**" } } diff --git a/src/main/java/io/kestra/plugin/azure/batch/job/Create.java b/src/main/java/io/kestra/plugin/azure/batch/job/Create.java index cea5e222..27564bad 100644 --- a/src/main/java/io/kestra/plugin/azure/batch/job/Create.java +++ b/src/main/java/io/kestra/plugin/azure/batch/job/Create.java @@ -27,7 +27,7 @@ import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; -import org.slf4j.Logger; +//import org.slf4j.Logger; import java.io.File; import java.io.IOException; diff --git a/src/main/java/io/kestra/plugin/azure/batch/models/ContainerRegistry.java b/src/main/java/io/kestra/plugin/azure/batch/models/ContainerRegistry.java index 33a4ec8f..49b162fc 100644 --- a/src/main/java/io/kestra/plugin/azure/batch/models/ContainerRegistry.java +++ b/src/main/java/io/kestra/plugin/azure/batch/models/ContainerRegistry.java @@ -1,3 +1,4 @@ + package io.kestra.plugin.azure.batch.models; import io.kestra.core.exceptions.IllegalVariableEvaluationException; diff --git a/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java b/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java index 2d2b725f..0ba3d774 100644 --- a/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java +++ b/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java @@ -32,8 +32,8 @@ import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; -import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; +//import lombok.extern.slf4j.Slf4j; +//import org.slf4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -50,7 +50,7 @@ import java.util.concurrent.atomic.AtomicReference; -@Slf4j +//@Slf4j @SuperBuilder @ToString @EqualsAndHashCode diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java index aa15f540..075cdb01 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java @@ -33,8 +33,8 @@ import lombok.NoArgsConstructor; import lombok.ToString; import lombok.experimental.SuperBuilder; -import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; +//import lombok.extern.slf4j.Slf4j; +//import org.slf4j.Logger; import java.io.BufferedOutputStream; import java.io.File; @@ -79,7 +79,7 @@ @Schema( title = "Consume events from Azure Event Hubs." ) -@Slf4j +// @Slf4j @SuperBuilder @NoArgsConstructor @Getter diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java index 9b709488..c4a47efd 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java @@ -22,7 +22,7 @@ import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; -import lombok.extern.slf4j.Slf4j; +//import lombok.extern.slf4j.Slf4j; import java.io.*; import java.util.HashMap; @@ -86,7 +86,7 @@ @Schema( title = "Publish events to Azure Event Hubs." ) -@Slf4j +//@Slf4j @SuperBuilder @Getter @NoArgsConstructor diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java index e25b1ea4..eddf99ed 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java @@ -19,9 +19,9 @@ import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; import lombok.experimental.SuperBuilder; -import lombok.extern.slf4j.Slf4j; +//import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Publisher; -import org.slf4j.Logger; +//import org.slf4j.Logger; import reactor.core.publisher.Flux; import java.time.Duration; @@ -102,7 +102,7 @@ title = "Trigger a flow on message consumption in real-time from Azure Event Hubs.", description = "If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the [io.kestra.plugin.azure.eventhubs.Trigger](https://kestra.io/plugins/plugin-azure/triggers/io.kestra.plugin.azure.eventhubs.trigger) instead." ) -@Slf4j +//@Slf4j @NoArgsConstructor @SuperBuilder @ToString diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java index 22cdff3a..de5ca649 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java @@ -13,7 +13,7 @@ import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; -import lombok.extern.slf4j.Slf4j; +//import lombok.extern.slf4j.Slf4j; import java.time.Duration; import java.util.Collections; @@ -56,7 +56,7 @@ title = "Trigger a flow on message consumption periodically from Azure Event Hubs.", description = "If you would like to consume each message from Azure Event Hubs in real-time and create one execution per message, you can use the [io.kestra.plugin.azure.eventhubs.RealtimeTrigger](https://kestra.io/plugins/plugin-azure/triggers/io.kestra.plugin.azure.eventhubs.realtimetrigger) instead." ) -@Slf4j +//@Slf4j @NoArgsConstructor @SuperBuilder @ToString diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java b/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java index 5448e772..fb73ea23 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java @@ -15,13 +15,13 @@ import io.kestra.plugin.azure.eventhubs.config.BlobContainerClientConfig; import io.kestra.plugin.azure.eventhubs.config.EventHubClientConfig; import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; -import lombok.extern.slf4j.Slf4j; +//import lombok.extern.slf4j.Slf4j; import java.time.Duration; import java.util.Objects; import java.util.Optional; -@Slf4j +//@Slf4j public class EventHubClientFactory { /** diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java index 3ddb3660..2c975f5d 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java @@ -1,7 +1,7 @@ package io.kestra.plugin.azure.eventhubs.service.consumer; import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; -import org.slf4j.Logger; +//import org.slf4j.Logger; import java.time.Duration; diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java index 9b81e7bf..8763c9bc 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java @@ -12,8 +12,8 @@ import io.kestra.plugin.azure.eventhubs.client.EventHubClientFactory; import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; import io.kestra.plugin.azure.eventhubs.model.EventDataObject; -import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; +//import lombok.extern.slf4j.Slf4j; +//import org.slf4j.Logger; import java.util.Collection; import java.util.Collections; @@ -31,7 +31,7 @@ import static io.kestra.core.utils.Rethrow.throwFunction; -@Slf4j +//@Slf4j public final class EventHubConsumerService { private final EventHubClientFactory clientFactory; diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java index 32851ddd..cbafd72d 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java @@ -10,7 +10,7 @@ import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; import io.kestra.plugin.azure.eventhubs.model.EventDataObject; import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; -import org.slf4j.Logger; +//import org.slf4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java index 192045a0..d4ec1a76 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java @@ -1,6 +1,6 @@ package io.kestra.plugin.azure.eventhubs.service.producer; -import org.slf4j.Logger; +//import org.slf4j.Logger; import java.util.Map; diff --git a/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java b/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java index e3163d6f..0a3b06a1 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java +++ b/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java @@ -21,7 +21,7 @@ import lombok.*; import lombok.experimental.SuperBuilder; import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; +//import org.slf4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.scheduler.Schedulers; diff --git a/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java b/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java index c165b57f..73855578 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java +++ b/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java @@ -20,7 +20,7 @@ import lombok.*; import lombok.experimental.SuperBuilder; import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; +//import org.slf4j.Logger; import java.util.NoSuchElementException; import java.util.function.Function; diff --git a/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java b/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java index ffd384e2..3f85f6bd 100644 --- a/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java +++ b/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java @@ -15,7 +15,7 @@ import jakarta.inject.Named; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.slf4j.event.Level; +//import org.slf4j.event.Level; import reactor.core.publisher.Flux; import java.io.InputStream; diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java index 716ec116..f82a6623 100644 --- a/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java @@ -20,8 +20,8 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import java.io.BufferedReader; From daa700734eb65984e386f90fa49dc23c75f7cc50 Mon Sep 17 00:00:00 2001 From: Swagatika Beura Date: Fri, 7 Nov 2025 21:20:27 +0530 Subject: [PATCH 02/12] fix: plugin compile issue by replacing @Slf4j with LoggerFactory --- .../azure/eventhubs/RealtimeTrigger.java | 290 ------------------ .../plugin/azure/eventhubs/Trigger.java | 9 +- .../client/EventHubClientFactory.java | 5 + 3 files changed, 13 insertions(+), 291 deletions(-) diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java index eddf99ed..e69de29b 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java @@ -1,290 +0,0 @@ -package io.kestra.plugin.azure.eventhubs; - -import com.azure.messaging.eventhubs.EventData; -import com.azure.messaging.eventhubs.EventProcessorClient; -import com.azure.messaging.eventhubs.models.PartitionContext; -import io.kestra.core.models.annotations.Example; -import io.kestra.core.models.annotations.Plugin; -import io.kestra.core.models.conditions.ConditionContext; -import io.kestra.core.models.executions.Execution; -import io.kestra.core.models.property.Property; -import io.kestra.core.models.triggers.*; -import io.kestra.core.runners.RunContext; -import io.kestra.plugin.azure.eventhubs.model.EventDataObject; -import io.kestra.plugin.azure.eventhubs.model.EventDataOutput; -import io.kestra.plugin.azure.eventhubs.serdes.Serdes; -import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; -import io.kestra.plugin.azure.eventhubs.service.consumer.EventHubConsumerService; -import io.kestra.plugin.azure.eventhubs.service.consumer.StartingPosition; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; -import lombok.experimental.SuperBuilder; -//import lombok.extern.slf4j.Slf4j; -import org.reactivestreams.Publisher; -//import org.slf4j.Logger; -import reactor.core.publisher.Flux; - -import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * The {@link RealtimeTrigger} can be used for triggering flow based on events received from Azure Event Hubs. - */ -@Plugin(examples = { - @Example( - full = true, - title = "Trigger flow based on events received from Azure Event Hubs in real-time.", - code = """ - id: azure_eventhubs_realtime_trigger - namespace: company.team - - tasks: - - id: log - type: io.kestra.plugin.core.log.Log - message: Hello there! I received {{ trigger.body }} from Azure EventHubs! - - triggers: - - id: read_from_eventhub - type: io.kestra.plugin.azure.eventhubs.RealtimeTrigger - eventHubName: my_eventhub - namespace: my_eventhub_namespace - connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}" - bodyDeserializer: JSON - consumerGroup: "$Default" - checkpointStoreProperties: - containerName: kestra - connectionString: "{{ secret('BLOB_CONNECTION') }}" - """ - ), - @Example( - full = true, - title = "Use Azure Eventhubs Realtime Trigger to push events into StorageTable", - code = """ - id: eventhubs_realtime_trigger - namespace: company.team - - tasks: - - id: insert_into_storagetable - type: io.kestra.plugin.azure.storage.table.Bulk - endpoint: https://yourstorageaccount.blob.core.windows.net - connectionString: "{{ secret('STORAGETABLE_CONNECTION') }}" - table: orders - from: - - partitionKey: order_id - rowKey: "{{ trigger.body | jq('.order_id') | first }}" - properties: - customer_name: "{{ trigger.body | jq('.customer_name') | first }}" - customer_email: "{{ trigger.body | jq('.customer_email') | first }}" - product_id: "{{ trigger.body | jq('.product_id') | first }}" - price: "{{ trigger.body | jq('.price') | first }}" - quantity: "{{ trigger.body | jq('.quantity') | first }}" - total: "{{ trigger.body | jq('.total') | first }}" - - triggers: - - id: realtime_trigger - type: io.kestra.plugin.azure.eventhubs.RealtimeTrigger - eventHubName: orders - namespace: kestra - connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}" - bodyDeserializer: JSON - consumerGroup: $Default - checkpointStoreProperties: - containerName: kestra - connectionString: "{{ secret('BLOB_CONNECTION') }}" - """ - ) -}) -@Schema( - title = "Trigger a flow on message consumption in real-time from Azure Event Hubs.", - description = "If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the [io.kestra.plugin.azure.eventhubs.Trigger](https://kestra.io/plugins/plugin-azure/triggers/io.kestra.plugin.azure.eventhubs.trigger) instead." -) -//@Slf4j -@NoArgsConstructor -@SuperBuilder -@ToString -@EqualsAndHashCode -@Getter -public class RealtimeTrigger extends AbstractTrigger implements EventHubConsumerInterface, RealtimeTriggerInterface, TriggerOutput { - - // TASK'S PARAMETERS - protected Property connectionString; - - protected Property sharedKeyAccountName; - - protected Property sharedKeyAccountAccessKey; - - protected Property sasToken; - - @Builder.Default - protected Property clientMaxRetries = Property.ofValue(5); - - @Builder.Default - protected Property clientRetryDelay = Property.ofValue(500L); - - @Builder.Default - private Property bodyDeserializer = Property.ofValue(Serdes.STRING); - - @Builder.Default - private Property> bodyDeserializerProperties = Property.ofValue(new HashMap<>()); - - @Builder.Default - private Property consumerGroup = Property.ofValue("$Default"); - - @Builder.Default - private Property partitionStartingPosition = Property.ofValue(StartingPosition.EARLIEST); - - private Property enqueueTime; - - @Builder.Default - private Property> checkpointStoreProperties = Property.ofValue(new HashMap<>()); - - private Property namespace; - - private Property eventHubName; - - private Property customEndpointAddress; - - @Builder.Default - @Getter(AccessLevel.NONE) - private final AtomicBoolean isActive = new AtomicBoolean(true); - - @Builder.Default - @Getter(AccessLevel.NONE) - private final CountDownLatch waitForTermination = new CountDownLatch(1); - - /** - * {@inheritDoc} - **/ - @Override - public Publisher evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception { - final Consume task = Consume - .builder() - .connectionString(connectionString) - .sharedKeyAccountName(sharedKeyAccountName) - .sharedKeyAccountAccessKey(sharedKeyAccountAccessKey) - .sasToken(sasToken) - .clientMaxRetries(clientMaxRetries) - .clientRetryDelay(clientRetryDelay) - .bodyDeserializer(bodyDeserializer) - .bodyDeserializerProperties(bodyDeserializerProperties) - .consumerGroup(consumerGroup) - .partitionStartingPosition(partitionStartingPosition) - .checkpointStoreProperties(checkpointStoreProperties) - .enqueueTime(enqueueTime) - .namespace(namespace) - .eventHubName(eventHubName) - .customEndpointAddress(customEndpointAddress) - .build(); - return Flux - .from(publisher(task, conditionContext.getRunContext())) - .map(event -> TriggerService.generateRealtimeExecution(this, conditionContext, context, event)); - } - - public Publisher publisher(final Consume task, - final RunContext runContext) throws Exception { - - final EventHubConsumerService service = task.newEventHubConsumerService(runContext, task); - final EventDataObjectConverter converter = task.newConverter(task, runContext); - - return Flux.create(emitter -> { - Logger contextLogger = runContext.logger(); - try { - EventProcessorClient client = service.createEventProcessorClientBuilder(contextLogger) - .processEvent(eventContext -> { - if (!isActive.get()) { - return; // return immediately if the trigger is not active (checkpoint will not be updated) - } - - final EventData eventData = eventContext.getEventData(); - if (eventData == null) return; - - final EventDataObject dataObject = converter.convertFromEventData(eventData); - - PartitionContext partitionContext = eventContext.getPartitionContext(); - if (contextLogger.isTraceEnabled()) { - contextLogger.trace( - "Received new event from eventHub {} and partitionId={} [offset={}, sequenceId={}]", - partitionContext.getEventHubName(), - partitionContext.getPartitionId(), - dataObject.offset(), - dataObject.sequenceNumber() - ); - } - emitter.next(EventDataOutput.of(dataObject)); - eventContext.updateCheckpoint(); - - }, Duration.ofMillis(500)) - .processError(context -> { - PartitionContext partitionContext = context.getPartitionContext(); - contextLogger.error("Failed to process eventHub: {}, partitionId: {} with consumerGroup: {}", - partitionContext.getEventHubName(), - partitionContext.getPartitionId(), - partitionContext.getConsumerGroup(), - context.getThrowable() - ); - emitter.error(context.getThrowable()); - }) - .buildEventProcessorClient(); - - // handle dispose - invoked after complete/error. - emitter.onDispose(() -> { - try { - client.stop(); // cannot be invoked from EventProcessorClient thread. - } finally { - waitForTermination.countDown(); - } - }); - client.start(); - busyWait(); - emitter.complete(); - } catch (Exception throwable) { - emitter.error(throwable); - } - }); - } - - private void busyWait() { - while (isActive.get()) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - isActive.set(false); // proactively stop consuming - } - } - } - - /** - * {@inheritDoc} - **/ - @Override - public void kill() { - stop(true); - } - - /** - * {@inheritDoc} - **/ - @Override - public void stop() { - stop(false); // must be non-blocking - } - - private void stop(boolean wait) { - if (!isActive.compareAndSet(true, false)) { - return; - } - - if (wait) { - try { - waitForTermination.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } -} diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java index de5ca649..238b9aef 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java @@ -13,7 +13,12 @@ import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; -//import lombok.extern.slf4j.Slf4j; +//import lombok.extern.slf4j.Slf4j;import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.Duration; import java.util.Collections; @@ -64,6 +69,8 @@ @Getter public class Trigger extends AbstractTrigger implements EventHubConsumerInterface, EventHubBatchConsumerInterface, PollingTriggerInterface, TriggerOutput { + private static final Logger log = LoggerFactory.getLogger(Trigger.class); + // TRIGGER'S PROPERTIES @Builder.Default private Duration interval = Duration.ofSeconds(60); diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java b/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java index fb73ea23..af8d92d3 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java @@ -16,6 +16,8 @@ import io.kestra.plugin.azure.eventhubs.config.EventHubClientConfig; import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; //import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Objects; @@ -24,6 +26,9 @@ //@Slf4j public class EventHubClientFactory { + private static final Logger log = LoggerFactory.getLogger(EventHubClientFactory.class); + + /** * Factory method for constructing a new {@link EventHubClientBuilder} for the given config. * From 59344e0361963cae0b86a65a589260ad63104f93 Mon Sep 17 00:00:00 2001 From: Swagatika Beura Date: Fri, 7 Nov 2025 23:58:18 +0530 Subject: [PATCH 03/12] Restored RealtimeTrigger.java file accidentally deleted --- .../azure/eventhubs/RealtimeTrigger.java | 290 ++++++++++++++++++ 1 file changed, 290 insertions(+) diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java index e69de29b..e25b1ea4 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java @@ -0,0 +1,290 @@ +package io.kestra.plugin.azure.eventhubs; + +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventProcessorClient; +import com.azure.messaging.eventhubs.models.PartitionContext; +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.conditions.ConditionContext; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.triggers.*; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.azure.eventhubs.model.EventDataObject; +import io.kestra.plugin.azure.eventhubs.model.EventDataOutput; +import io.kestra.plugin.azure.eventhubs.serdes.Serdes; +import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; +import io.kestra.plugin.azure.eventhubs.service.consumer.EventHubConsumerService; +import io.kestra.plugin.azure.eventhubs.service.consumer.StartingPosition; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; +import lombok.experimental.SuperBuilder; +import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import reactor.core.publisher.Flux; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The {@link RealtimeTrigger} can be used for triggering flow based on events received from Azure Event Hubs. + */ +@Plugin(examples = { + @Example( + full = true, + title = "Trigger flow based on events received from Azure Event Hubs in real-time.", + code = """ + id: azure_eventhubs_realtime_trigger + namespace: company.team + + tasks: + - id: log + type: io.kestra.plugin.core.log.Log + message: Hello there! I received {{ trigger.body }} from Azure EventHubs! + + triggers: + - id: read_from_eventhub + type: io.kestra.plugin.azure.eventhubs.RealtimeTrigger + eventHubName: my_eventhub + namespace: my_eventhub_namespace + connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}" + bodyDeserializer: JSON + consumerGroup: "$Default" + checkpointStoreProperties: + containerName: kestra + connectionString: "{{ secret('BLOB_CONNECTION') }}" + """ + ), + @Example( + full = true, + title = "Use Azure Eventhubs Realtime Trigger to push events into StorageTable", + code = """ + id: eventhubs_realtime_trigger + namespace: company.team + + tasks: + - id: insert_into_storagetable + type: io.kestra.plugin.azure.storage.table.Bulk + endpoint: https://yourstorageaccount.blob.core.windows.net + connectionString: "{{ secret('STORAGETABLE_CONNECTION') }}" + table: orders + from: + - partitionKey: order_id + rowKey: "{{ trigger.body | jq('.order_id') | first }}" + properties: + customer_name: "{{ trigger.body | jq('.customer_name') | first }}" + customer_email: "{{ trigger.body | jq('.customer_email') | first }}" + product_id: "{{ trigger.body | jq('.product_id') | first }}" + price: "{{ trigger.body | jq('.price') | first }}" + quantity: "{{ trigger.body | jq('.quantity') | first }}" + total: "{{ trigger.body | jq('.total') | first }}" + + triggers: + - id: realtime_trigger + type: io.kestra.plugin.azure.eventhubs.RealtimeTrigger + eventHubName: orders + namespace: kestra + connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}" + bodyDeserializer: JSON + consumerGroup: $Default + checkpointStoreProperties: + containerName: kestra + connectionString: "{{ secret('BLOB_CONNECTION') }}" + """ + ) +}) +@Schema( + title = "Trigger a flow on message consumption in real-time from Azure Event Hubs.", + description = "If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the [io.kestra.plugin.azure.eventhubs.Trigger](https://kestra.io/plugins/plugin-azure/triggers/io.kestra.plugin.azure.eventhubs.trigger) instead." +) +@Slf4j +@NoArgsConstructor +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +public class RealtimeTrigger extends AbstractTrigger implements EventHubConsumerInterface, RealtimeTriggerInterface, TriggerOutput { + + // TASK'S PARAMETERS + protected Property connectionString; + + protected Property sharedKeyAccountName; + + protected Property sharedKeyAccountAccessKey; + + protected Property sasToken; + + @Builder.Default + protected Property clientMaxRetries = Property.ofValue(5); + + @Builder.Default + protected Property clientRetryDelay = Property.ofValue(500L); + + @Builder.Default + private Property bodyDeserializer = Property.ofValue(Serdes.STRING); + + @Builder.Default + private Property> bodyDeserializerProperties = Property.ofValue(new HashMap<>()); + + @Builder.Default + private Property consumerGroup = Property.ofValue("$Default"); + + @Builder.Default + private Property partitionStartingPosition = Property.ofValue(StartingPosition.EARLIEST); + + private Property enqueueTime; + + @Builder.Default + private Property> checkpointStoreProperties = Property.ofValue(new HashMap<>()); + + private Property namespace; + + private Property eventHubName; + + private Property customEndpointAddress; + + @Builder.Default + @Getter(AccessLevel.NONE) + private final AtomicBoolean isActive = new AtomicBoolean(true); + + @Builder.Default + @Getter(AccessLevel.NONE) + private final CountDownLatch waitForTermination = new CountDownLatch(1); + + /** + * {@inheritDoc} + **/ + @Override + public Publisher evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception { + final Consume task = Consume + .builder() + .connectionString(connectionString) + .sharedKeyAccountName(sharedKeyAccountName) + .sharedKeyAccountAccessKey(sharedKeyAccountAccessKey) + .sasToken(sasToken) + .clientMaxRetries(clientMaxRetries) + .clientRetryDelay(clientRetryDelay) + .bodyDeserializer(bodyDeserializer) + .bodyDeserializerProperties(bodyDeserializerProperties) + .consumerGroup(consumerGroup) + .partitionStartingPosition(partitionStartingPosition) + .checkpointStoreProperties(checkpointStoreProperties) + .enqueueTime(enqueueTime) + .namespace(namespace) + .eventHubName(eventHubName) + .customEndpointAddress(customEndpointAddress) + .build(); + return Flux + .from(publisher(task, conditionContext.getRunContext())) + .map(event -> TriggerService.generateRealtimeExecution(this, conditionContext, context, event)); + } + + public Publisher publisher(final Consume task, + final RunContext runContext) throws Exception { + + final EventHubConsumerService service = task.newEventHubConsumerService(runContext, task); + final EventDataObjectConverter converter = task.newConverter(task, runContext); + + return Flux.create(emitter -> { + Logger contextLogger = runContext.logger(); + try { + EventProcessorClient client = service.createEventProcessorClientBuilder(contextLogger) + .processEvent(eventContext -> { + if (!isActive.get()) { + return; // return immediately if the trigger is not active (checkpoint will not be updated) + } + + final EventData eventData = eventContext.getEventData(); + if (eventData == null) return; + + final EventDataObject dataObject = converter.convertFromEventData(eventData); + + PartitionContext partitionContext = eventContext.getPartitionContext(); + if (contextLogger.isTraceEnabled()) { + contextLogger.trace( + "Received new event from eventHub {} and partitionId={} [offset={}, sequenceId={}]", + partitionContext.getEventHubName(), + partitionContext.getPartitionId(), + dataObject.offset(), + dataObject.sequenceNumber() + ); + } + emitter.next(EventDataOutput.of(dataObject)); + eventContext.updateCheckpoint(); + + }, Duration.ofMillis(500)) + .processError(context -> { + PartitionContext partitionContext = context.getPartitionContext(); + contextLogger.error("Failed to process eventHub: {}, partitionId: {} with consumerGroup: {}", + partitionContext.getEventHubName(), + partitionContext.getPartitionId(), + partitionContext.getConsumerGroup(), + context.getThrowable() + ); + emitter.error(context.getThrowable()); + }) + .buildEventProcessorClient(); + + // handle dispose - invoked after complete/error. + emitter.onDispose(() -> { + try { + client.stop(); // cannot be invoked from EventProcessorClient thread. + } finally { + waitForTermination.countDown(); + } + }); + client.start(); + busyWait(); + emitter.complete(); + } catch (Exception throwable) { + emitter.error(throwable); + } + }); + } + + private void busyWait() { + while (isActive.get()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + isActive.set(false); // proactively stop consuming + } + } + } + + /** + * {@inheritDoc} + **/ + @Override + public void kill() { + stop(true); + } + + /** + * {@inheritDoc} + **/ + @Override + public void stop() { + stop(false); // must be non-blocking + } + + private void stop(boolean wait) { + if (!isActive.compareAndSet(true, false)) { + return; + } + + if (wait) { + try { + waitForTermination.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} From 22db57d5d0fc40173c7780a8e44c606e42944454 Mon Sep 17 00:00:00 2001 From: Swagatika Beura Date: Sat, 8 Nov 2025 00:18:16 +0530 Subject: [PATCH 04/12] Removed commented @Slf4j lines as suggested --- .../java/io/kestra/plugin/azure/datafactory/CreateRun.java | 5 ++--- src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java | 5 ++--- src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java | 4 ++-- .../io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java | 2 +- src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java | 2 +- .../plugin/azure/eventhubs/client/EventHubClientFactory.java | 3 +-- .../eventhubs/service/consumer/EventHubConsumerService.java | 4 +--- 7 files changed, 10 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java b/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java index 0ba3d774..dc870032 100644 --- a/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java +++ b/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java @@ -32,8 +32,7 @@ import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; -//import lombok.extern.slf4j.Slf4j; -//import org.slf4j.Logger; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -50,7 +49,7 @@ import java.util.concurrent.atomic.AtomicReference; -//@Slf4j + @SuperBuilder @ToString @EqualsAndHashCode diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java index 075cdb01..27ec1d0e 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java @@ -33,8 +33,7 @@ import lombok.NoArgsConstructor; import lombok.ToString; import lombok.experimental.SuperBuilder; -//import lombok.extern.slf4j.Slf4j; -//import org.slf4j.Logger; + import java.io.BufferedOutputStream; import java.io.File; @@ -79,7 +78,7 @@ @Schema( title = "Consume events from Azure Event Hubs." ) -// @Slf4j + @SuperBuilder @NoArgsConstructor @Getter diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java index c4a47efd..552038d8 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java @@ -22,7 +22,7 @@ import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; -//import lombok.extern.slf4j.Slf4j; + import java.io.*; import java.util.HashMap; @@ -86,7 +86,7 @@ @Schema( title = "Publish events to Azure Event Hubs." ) -//@Slf4j + @SuperBuilder @Getter @NoArgsConstructor diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java index e25b1ea4..86cb9292 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java @@ -102,7 +102,7 @@ title = "Trigger a flow on message consumption in real-time from Azure Event Hubs.", description = "If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the [io.kestra.plugin.azure.eventhubs.Trigger](https://kestra.io/plugins/plugin-azure/triggers/io.kestra.plugin.azure.eventhubs.trigger) instead." ) -@Slf4j + @NoArgsConstructor @SuperBuilder @ToString diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java index 238b9aef..97782e98 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java @@ -61,7 +61,7 @@ title = "Trigger a flow on message consumption periodically from Azure Event Hubs.", description = "If you would like to consume each message from Azure Event Hubs in real-time and create one execution per message, you can use the [io.kestra.plugin.azure.eventhubs.RealtimeTrigger](https://kestra.io/plugins/plugin-azure/triggers/io.kestra.plugin.azure.eventhubs.realtimetrigger) instead." ) -//@Slf4j + @NoArgsConstructor @SuperBuilder @ToString diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java b/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java index af8d92d3..bc0af7a8 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java @@ -15,7 +15,7 @@ import io.kestra.plugin.azure.eventhubs.config.BlobContainerClientConfig; import io.kestra.plugin.azure.eventhubs.config.EventHubClientConfig; import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; -//import lombok.extern.slf4j.Slf4j; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,7 +23,6 @@ import java.util.Objects; import java.util.Optional; -//@Slf4j public class EventHubClientFactory { private static final Logger log = LoggerFactory.getLogger(EventHubClientFactory.class); diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java index 8763c9bc..ae832e7b 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java @@ -12,8 +12,7 @@ import io.kestra.plugin.azure.eventhubs.client.EventHubClientFactory; import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; import io.kestra.plugin.azure.eventhubs.model.EventDataObject; -//import lombok.extern.slf4j.Slf4j; -//import org.slf4j.Logger; + import java.util.Collection; import java.util.Collections; @@ -31,7 +30,6 @@ import static io.kestra.core.utils.Rethrow.throwFunction; -//@Slf4j public final class EventHubConsumerService { private final EventHubClientFactory clientFactory; From cf32bd99de591344e883098c38b0fc6df8ccf7d9 Mon Sep 17 00:00:00 2001 From: Swagatika Beura Date: Sat, 8 Nov 2025 23:19:27 +0530 Subject: [PATCH 05/12] Removed commented @Slf4j lines and fixed code formatting --- .../service/producer/EventHubProducerServiceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java index f82a6623..2007dc1f 100644 --- a/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java @@ -20,8 +20,8 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + //import org.slf4j.Logger; + //import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import java.io.BufferedReader; From 554d373b268120f41e29e1f207b05425ac5ccf69 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Fri, 7 Nov 2025 14:50:43 +0530 Subject: [PATCH 06/12] chore: update contributor guidelines in PR template --- .github/pull_request_template.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index b9773c98..962b831b 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -62,6 +62,8 @@ Thank you for your contribution. ❤️ --> ✨ **New plugins / subplugins** - [ ] Make sure your new plugin is configured like mentioned [here](https://kestra.io/docs/plugin-developer-guide/gradle#mandatory-configuration). - [ ] Add a `package-info.java` under each sub package respecting [this format](https://github.com/kestra-io/plugin-odoo/blob/main/src/main/java/io/kestra/plugin/odoo/package-info.java) and choosing the right category. +- [ ] Every time you use `runContext.metric(...)` you have to add a `@Metric` ([see this doc](https://kestra.io/docs/plugin-developer-guide/document#document-the-plugin-metrics)) +- [ ] Docs don't support to have both tasks/triggers in the root package (e.g. `io.kestra.plugin.kubernetes`) and in a sub package (e.g. `io.kestra.plugin.kubernetes.kubectl`), whether it's: all tasks/triggers in the root package OR only tasks/triggers in sub packages. - [ ] Icons added in `src/main/resources/icons` in SVG format and not in thumbnail (keep it big): - `plugin-icon.svg` - One icon per package, e.g. `io.kestra.plugin.aws.svg` @@ -74,7 +76,7 @@ Thank you for your contribution. ❤️ --> 🧪 **Tests** - [ ] Unit Tests added or updated to cover the change (using the `RunContext` to actually run tasks). - [ ] Add sanity checks if possible with a YAML flow inside `src/test/resources/flows`. -- [ ] Avoid disabling tests for CI. Instead, configure a local environment whenever it's possible with `.github/setup-unit.sh` (which can be executed locally and in the CI) all along with a new `docker-compose-ci.yml` file (do **not** edit the existing `docker-compose.yml`). +- [ ] Avoid disabling tests for CI. Instead, configure a local environment whenever it's possible with `.github/setup-unit.sh` (to be set executable with `chmod +x setup-unit.sh`) (which can be executed locally and in the CI) all along with a new `docker-compose-ci.yml` file (do **not** edit the existing `docker-compose.yml`). If needed, create an executable (`chmod +x cleanup-unit.sh`) `cleanup-unit.sh` to remove the potential costly resources (tables, datasets, etc). - [ ] Provide screenshots from your QA / tests locally in the PR description. The goal here is to use the JAR of the plugin and directly test it locally in Kestra UI to ensure it integrates well. 📤 **Outputs** From b22b13959dc201e203eeb3e40ea39b9016294da2 Mon Sep 17 00:00:00 2001 From: Swagatika Beura Date: Mon, 10 Nov 2025 23:55:11 +0530 Subject: [PATCH 07/12] chore: remove commented Logger imports as suggested by reviewer --- src/main/java/io/kestra/plugin/azure/batch/job/Create.java | 2 +- .../plugin/azure/eventhubs/EventHubConsumerInterface.java | 3 ++- .../azure/eventhubs/service/producer/ProducerContext.java | 1 - .../service/producer/EventHubProducerServiceTest.java | 3 +-- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/kestra/plugin/azure/batch/job/Create.java b/src/main/java/io/kestra/plugin/azure/batch/job/Create.java index 27564bad..57e06f1d 100644 --- a/src/main/java/io/kestra/plugin/azure/batch/job/Create.java +++ b/src/main/java/io/kestra/plugin/azure/batch/job/Create.java @@ -27,7 +27,7 @@ import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; -//import org.slf4j.Logger; + import java.io.File; import java.io.IOException; diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java b/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java index a525894a..1c45f299 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java @@ -9,12 +9,13 @@ import java.time.Duration; import java.util.Map; + /** * Base class for implementing tasks that consume events into EventHubs. * This class provides all required and optional parameters. */ public interface EventHubConsumerInterface extends EventHubClientInterface { - + // TASK'S PARAMETERS @Schema( title = "The Deserializer to be used for serializing the event value." diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java index d4ec1a76..c4f72e2d 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java @@ -1,6 +1,5 @@ package io.kestra.plugin.azure.eventhubs.service.producer; -//import org.slf4j.Logger; import java.util.Map; diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java index 2007dc1f..2ebafa96 100644 --- a/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java @@ -20,8 +20,7 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; - //import org.slf4j.Logger; - //import org.slf4j.LoggerFactory; + import reactor.core.publisher.Mono; import java.io.BufferedReader; From d625f3118aadfb5709d3d3bf7f15db7c42987711 Mon Sep 17 00:00:00 2001 From: Swagatika Beura Date: Tue, 18 Nov 2025 20:32:53 +0530 Subject: [PATCH 08/12] chore: remove slf4j imports and apply reviewer suggestions --- build.gradle | 4 ++-- src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java | 5 ++--- .../azure/eventhubs/service/consumer/ConsumerContext.java | 2 +- .../eventhubs/service/producer/EventHubProducerService.java | 2 +- .../io/kestra/plugin/azure/storage/adls/DeleteFiles.java | 2 +- .../java/io/kestra/plugin/azure/storage/blob/DeleteList.java | 2 +- 6 files changed, 8 insertions(+), 9 deletions(-) diff --git a/build.gradle b/build.gradle index 1eadf74e..0956a60f 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ dependencies { compileOnly group: "io.kestra", name: "script" // Logs - //compileOnly'org.slf4j:slf4j-api' + compileOnly'org.slf4j:slf4j-api' // Azure libraries are managed by the the Kestra Platform so they are aligned on all plugins api (group: 'com.azure', name: 'azure-identity') { @@ -282,7 +282,7 @@ shadowJar { exclude { it.moduleGroup.startsWith('com.fasterxml.jackson') && !it.moduleName.equals('jackson-datatype-joda') } -// exclude "org/slf4j/**" +exclude "org/slf4j/**" } } diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java index 97782e98..93471977 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java @@ -12,9 +12,8 @@ import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; import lombok.*; -import lombok.experimental.SuperBuilder; -//import lombok.extern.slf4j.Slf4j;import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; +import lombok.experimental.SupperBuilder; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java index 2c975f5d..484bed35 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java @@ -1,7 +1,7 @@ package io.kestra.plugin.azure.eventhubs.service.consumer; import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; -//import org.slf4j.Logger; + import java.time.Duration; diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java index cbafd72d..97d72b4b 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java @@ -10,7 +10,7 @@ import io.kestra.plugin.azure.eventhubs.config.EventHubConsumerConfig; import io.kestra.plugin.azure.eventhubs.model.EventDataObject; import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; -//import org.slf4j.Logger; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java b/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java index 0a3b06a1..aa6209d1 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java +++ b/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java @@ -21,7 +21,7 @@ import lombok.*; import lombok.experimental.SuperBuilder; import org.apache.commons.lang3.tuple.Pair; -//import org.slf4j.Logger; + import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.scheduler.Schedulers; diff --git a/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java b/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java index 73855578..8b52fe9a 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java +++ b/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java @@ -20,7 +20,7 @@ import lombok.*; import lombok.experimental.SuperBuilder; import org.apache.commons.lang3.tuple.Pair; -//import org.slf4j.Logger; + import java.util.NoSuchElementException; import java.util.function.Function; From 338e8e91528403e1d4bfedad9f01708142a34df0 Mon Sep 17 00:00:00 2001 From: Swagatika Beura Date: Tue, 18 Nov 2025 22:37:08 +0530 Subject: [PATCH 09/12] fix: correct Logger imports and SuperBuilder typo to resolve CI failure --- src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java | 1 + .../plugin/azure/eventhubs/service/consumer/ConsumerContext.java | 1 + .../eventhubs/service/consumer/EventHubConsumerService.java | 1 + .../eventhubs/service/producer/EventHubProducerService.java | 1 + src/main/java/io/kestra/plugin/azure/storage/adls/Delete.java | 1 + .../java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java | 1 + src/main/java/io/kestra/plugin/azure/storage/adls/Trigger.java | 1 + 7 files changed, 7 insertions(+) diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java index 93471977..b7352d48 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java @@ -13,6 +13,7 @@ import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SupperBuilder; +import lombok.experimental.SuperBuilder; import org.slf4j.Logger; diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java index 484bed35..92084c7c 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/ConsumerContext.java @@ -1,4 +1,5 @@ package io.kestra.plugin.azure.eventhubs.service.consumer; +import org.slf4j.Logger; import io.kestra.plugin.azure.eventhubs.service.EventDataObjectConverter; diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java index ae832e7b..340c82f2 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/consumer/EventHubConsumerService.java @@ -1,4 +1,5 @@ package io.kestra.plugin.azure.eventhubs.service.consumer; +import org.slf4j.Logger; import com.azure.messaging.eventhubs.CheckpointStore; import com.azure.messaging.eventhubs.EventData; diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java index 97d72b4b..320070a6 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerService.java @@ -1,4 +1,5 @@ package io.kestra.plugin.azure.eventhubs.service.producer; +import org.slf4j.Logger; import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventDataBatch; diff --git a/src/main/java/io/kestra/plugin/azure/storage/adls/Delete.java b/src/main/java/io/kestra/plugin/azure/storage/adls/Delete.java index af1843e7..c7ae9259 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/adls/Delete.java +++ b/src/main/java/io/kestra/plugin/azure/storage/adls/Delete.java @@ -1,4 +1,5 @@ package io.kestra.plugin.azure.storage.adls; +import org.slf4j.Logger; import com.azure.storage.file.datalake.DataLakeFileClient; import io.kestra.core.models.annotations.Example; diff --git a/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java b/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java index aa6209d1..a9da6156 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java +++ b/src/main/java/io/kestra/plugin/azure/storage/adls/DeleteFiles.java @@ -1,4 +1,5 @@ package io.kestra.plugin.azure.storage.adls; +import org.slf4j.Logger; import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeFileSystemClient; diff --git a/src/main/java/io/kestra/plugin/azure/storage/adls/Trigger.java b/src/main/java/io/kestra/plugin/azure/storage/adls/Trigger.java index f7916788..67243d56 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/adls/Trigger.java +++ b/src/main/java/io/kestra/plugin/azure/storage/adls/Trigger.java @@ -1,4 +1,5 @@ package io.kestra.plugin.azure.storage.adls; +import org.slf4j.Logger; import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeServiceClient; From acc0212a603124592bf1c34519cbd2f07fdb5a3d Mon Sep 17 00:00:00 2001 From: Swagatika Beura Date: Wed, 19 Nov 2025 19:48:13 +0530 Subject: [PATCH 10/12] fix: updated logs to use runContext.logger() and removed unused import --- src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java | 3 ++- src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java index b7352d48..e64368df 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java @@ -69,7 +69,8 @@ @Getter public class Trigger extends AbstractTrigger implements EventHubConsumerInterface, EventHubBatchConsumerInterface, PollingTriggerInterface, TriggerOutput { - private static final Logger log = LoggerFactory.getLogger(Trigger.class); +runContext.logger().info("Trigger started"); + // TRIGGER'S PROPERTIES @Builder.Default diff --git a/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java b/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java index 3f85f6bd..e1156d6f 100644 --- a/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java +++ b/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java @@ -15,7 +15,7 @@ import jakarta.inject.Named; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -//import org.slf4j.event.Level; + import reactor.core.publisher.Flux; import java.io.InputStream; From 874290dc7d5946e272f7eece0324a782370b1c57 Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Wed, 19 Nov 2025 19:56:08 +0530 Subject: [PATCH 11/12] refactor: Reapply @Slf4j cleanup for Azure plugin classes --- build.gradle | 2 +- .../java/io/kestra/plugin/azure/batch/job/Create.java | 1 + .../plugin/azure/batch/models/ContainerRegistry.java | 1 - .../io/kestra/plugin/azure/datafactory/CreateRun.java | 2 +- .../java/io/kestra/plugin/azure/eventhubs/Consume.java | 2 +- .../azure/eventhubs/EventHubConsumerInterface.java | 2 +- .../java/io/kestra/plugin/azure/eventhubs/Produce.java | 1 - .../kestra/plugin/azure/eventhubs/RealtimeTrigger.java | 1 - .../java/io/kestra/plugin/azure/eventhubs/Trigger.java | 9 --------- .../azure/eventhubs/client/EventHubClientFactory.java | 1 - .../eventhubs/service/producer/ProducerContext.java | 2 ++ .../io/kestra/plugin/azure/storage/adls/Trigger.java | 1 - .../io/kestra/plugin/azure/storage/blob/DeleteList.java | 1 + .../io/kestra/plugin/azure/batch/job/CreateTest.java | 1 + .../service/producer/EventHubProducerServiceTest.java | 2 ++ 15 files changed, 11 insertions(+), 18 deletions(-) diff --git a/build.gradle b/build.gradle index 0956a60f..93abdc9a 100644 --- a/build.gradle +++ b/build.gradle @@ -282,7 +282,7 @@ shadowJar { exclude { it.moduleGroup.startsWith('com.fasterxml.jackson') && !it.moduleName.equals('jackson-datatype-joda') } -exclude "org/slf4j/**" + exclude "org/slf4j/**" } } diff --git a/src/main/java/io/kestra/plugin/azure/batch/job/Create.java b/src/main/java/io/kestra/plugin/azure/batch/job/Create.java index 57e06f1d..200921e1 100644 --- a/src/main/java/io/kestra/plugin/azure/batch/job/Create.java +++ b/src/main/java/io/kestra/plugin/azure/batch/job/Create.java @@ -27,6 +27,7 @@ import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; +import org.slf4j.Logger; import java.io.File; diff --git a/src/main/java/io/kestra/plugin/azure/batch/models/ContainerRegistry.java b/src/main/java/io/kestra/plugin/azure/batch/models/ContainerRegistry.java index 49b162fc..33a4ec8f 100644 --- a/src/main/java/io/kestra/plugin/azure/batch/models/ContainerRegistry.java +++ b/src/main/java/io/kestra/plugin/azure/batch/models/ContainerRegistry.java @@ -1,4 +1,3 @@ - package io.kestra.plugin.azure.batch.models; import io.kestra.core.exceptions.IllegalVariableEvaluationException; diff --git a/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java b/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java index dc870032..6f0beaf6 100644 --- a/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java +++ b/src/main/java/io/kestra/plugin/azure/datafactory/CreateRun.java @@ -33,6 +33,7 @@ import lombok.*; import lombok.experimental.SuperBuilder; +import org.slf4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -49,7 +50,6 @@ import java.util.concurrent.atomic.AtomicReference; - @SuperBuilder @ToString @EqualsAndHashCode diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java index 27ec1d0e..e35dbd51 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Consume.java @@ -33,6 +33,7 @@ import lombok.NoArgsConstructor; import lombok.ToString; import lombok.experimental.SuperBuilder; +import org.slf4j.Logger; import java.io.BufferedOutputStream; @@ -78,7 +79,6 @@ @Schema( title = "Consume events from Azure Event Hubs." ) - @SuperBuilder @NoArgsConstructor @Getter diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java b/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java index 1c45f299..afe783da 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java @@ -15,7 +15,7 @@ * This class provides all required and optional parameters. */ public interface EventHubConsumerInterface extends EventHubClientInterface { - + // TASK'S PARAMETERS @Schema( title = "The Deserializer to be used for serializing the event value." diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java index 552038d8..91e4d108 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Produce.java @@ -86,7 +86,6 @@ @Schema( title = "Publish events to Azure Event Hubs." ) - @SuperBuilder @Getter @NoArgsConstructor diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java index 86cb9292..45515a26 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java @@ -102,7 +102,6 @@ title = "Trigger a flow on message consumption in real-time from Azure Event Hubs.", description = "If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the [io.kestra.plugin.azure.eventhubs.Trigger](https://kestra.io/plugins/plugin-azure/triggers/io.kestra.plugin.azure.eventhubs.trigger) instead." ) - @NoArgsConstructor @SuperBuilder @ToString diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java index e64368df..1c432ff6 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/Trigger.java @@ -12,16 +12,10 @@ import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; import lombok.*; -import lombok.experimental.SupperBuilder; import lombok.experimental.SuperBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - import java.time.Duration; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -69,9 +63,6 @@ @Getter public class Trigger extends AbstractTrigger implements EventHubConsumerInterface, EventHubBatchConsumerInterface, PollingTriggerInterface, TriggerOutput { -runContext.logger().info("Trigger started"); - - // TRIGGER'S PROPERTIES @Builder.Default private Duration interval = Duration.ofSeconds(60); diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java b/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java index bc0af7a8..bf47a9f6 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/client/EventHubClientFactory.java @@ -24,7 +24,6 @@ import java.util.Optional; public class EventHubClientFactory { - private static final Logger log = LoggerFactory.getLogger(EventHubClientFactory.class); diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java index c4f72e2d..41ba67f0 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java @@ -1,6 +1,8 @@ package io.kestra.plugin.azure.eventhubs.service.producer; +import org.slf4j.Logger; + import java.util.Map; /** diff --git a/src/main/java/io/kestra/plugin/azure/storage/adls/Trigger.java b/src/main/java/io/kestra/plugin/azure/storage/adls/Trigger.java index 67243d56..f7916788 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/adls/Trigger.java +++ b/src/main/java/io/kestra/plugin/azure/storage/adls/Trigger.java @@ -1,5 +1,4 @@ package io.kestra.plugin.azure.storage.adls; -import org.slf4j.Logger; import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeServiceClient; diff --git a/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java b/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java index 8b52fe9a..22ed1adf 100644 --- a/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java +++ b/src/main/java/io/kestra/plugin/azure/storage/blob/DeleteList.java @@ -26,6 +26,7 @@ import java.util.function.Function; import jakarta.validation.constraints.Min; +import org.slf4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.scheduler.Schedulers; diff --git a/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java b/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java index e1156d6f..bb1f27b5 100644 --- a/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java +++ b/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java @@ -16,6 +16,7 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.slf4j.event.Level; import reactor.core.publisher.Flux; import java.io.InputStream; diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java index 2ebafa96..2d77b61d 100644 --- a/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java @@ -21,6 +21,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import java.io.BufferedReader; From e183f2959c3e09729b94d38b2f50f323de68532b Mon Sep 17 00:00:00 2001 From: Malay Dewangan Date: Wed, 19 Nov 2025 20:02:54 +0530 Subject: [PATCH 12/12] refactor: Reapply @Slf4j cleanup for Azure plugin classes --- src/main/java/io/kestra/plugin/azure/batch/job/Create.java | 1 - .../plugin/azure/eventhubs/EventHubConsumerInterface.java | 3 --- .../java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java | 2 -- .../azure/eventhubs/service/producer/ProducerContext.java | 3 +-- src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java | 1 - .../service/producer/EventHubProducerServiceTest.java | 1 - 6 files changed, 1 insertion(+), 10 deletions(-) diff --git a/src/main/java/io/kestra/plugin/azure/batch/job/Create.java b/src/main/java/io/kestra/plugin/azure/batch/job/Create.java index 200921e1..cea5e222 100644 --- a/src/main/java/io/kestra/plugin/azure/batch/job/Create.java +++ b/src/main/java/io/kestra/plugin/azure/batch/job/Create.java @@ -29,7 +29,6 @@ import lombok.experimental.SuperBuilder; import org.slf4j.Logger; - import java.io.File; import java.io.IOException; import java.net.URI; diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java b/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java index afe783da..d3b9d810 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/EventHubConsumerInterface.java @@ -1,15 +1,12 @@ package io.kestra.plugin.azure.eventhubs; -import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.property.Property; import io.kestra.plugin.azure.eventhubs.serdes.Serdes; import io.kestra.plugin.azure.eventhubs.service.consumer.StartingPosition; import io.swagger.v3.oas.annotations.media.Schema; -import java.time.Duration; import java.util.Map; - /** * Base class for implementing tasks that consume events into EventHubs. * This class provides all required and optional parameters. diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java index 45515a26..05b741e2 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/RealtimeTrigger.java @@ -19,13 +19,11 @@ import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; import lombok.experimental.SuperBuilder; -import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Publisher; import org.slf4j.Logger; import reactor.core.publisher.Flux; import java.time.Duration; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; diff --git a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java index 41ba67f0..a72fd2e9 100644 --- a/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java +++ b/src/main/java/io/kestra/plugin/azure/eventhubs/service/producer/ProducerContext.java @@ -1,6 +1,5 @@ package io.kestra.plugin.azure.eventhubs.service.producer; - import org.slf4j.Logger; import java.util.Map; @@ -15,6 +14,6 @@ public record ProducerContext(String bodyContentType, Map eventProperties, Integer maxEventsPerBatch, - Logger logger + Logger logger ) { } diff --git a/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java b/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java index bb1f27b5..ffd384e2 100644 --- a/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java +++ b/src/test/java/io/kestra/plugin/azure/batch/job/CreateTest.java @@ -15,7 +15,6 @@ import jakarta.inject.Named; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; - import org.slf4j.event.Level; import reactor.core.publisher.Flux; diff --git a/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java index 2d77b61d..716ec116 100644 --- a/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java +++ b/src/test/java/io/kestra/plugin/azure/eventhubs/service/producer/EventHubProducerServiceTest.java @@ -20,7 +20,6 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono;