Skip to content

Commit 7d192fa

Browse files
feat: Replace the agent DSL with a composite of instanceId and task name (#967)
* feat: Replace the agent DSL with a composite of instanceId and task name Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> * Change uniqueId to jsonPointer; replace tests with mock Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> * Extends BiFunction Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> --------- Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com>
1 parent 66a9ab2 commit 7d192fa

File tree

7 files changed

+321
-15
lines changed

7 files changed

+321
-15
lines changed

experimental/fluent/func/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@
3939
<artifactId>junit-jupiter-api</artifactId>
4040
<scope>test</scope>
4141
</dependency>
42+
<dependency>
43+
<groupId>org.mockito</groupId>
44+
<artifactId>mockito-core</artifactId>
45+
<version>${version.org.mockito}</version>
46+
<scope>test</scope>
47+
</dependency>
4248
</dependencies>
4349

4450
</project>

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.serverlessworkflow.api.types.func.CallJava;
1919
import io.serverlessworkflow.api.types.func.CallTaskJava;
2020
import io.serverlessworkflow.api.types.func.JavaContextFunction;
21+
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
2122
import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder;
2223
import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations;
2324
import io.serverlessworkflow.fluent.spec.TaskBaseBuilder;
@@ -61,6 +62,16 @@ public <T, V> FuncCallTaskBuilder function(
6162
return this;
6263
}
6364

65+
public <T, V> FuncCallTaskBuilder function(JavaFilterFunction<T, V> function) {
66+
return function(function, null);
67+
}
68+
69+
public <T, V> FuncCallTaskBuilder function(JavaFilterFunction<T, V> function, Class<T> argClass) {
70+
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass));
71+
super.setTask(this.callTaskJava.getCallJava());
72+
return this;
73+
}
74+
6475
/** Accept a side-effect Consumer; engine should pass input through unchanged. */
6576
public <T> FuncCallTaskBuilder consumer(Consumer<T> consumer) {
6677
this.callTaskJava = new CallTaskJava(CallJava.consumer(consumer));

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.fluent.func.dsl;
1717

1818
import io.serverlessworkflow.api.types.func.JavaContextFunction;
19+
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
1920
import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder;
2021
import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder;
2122
import java.util.function.Consumer;
@@ -26,6 +27,7 @@ public final class FuncCallStep<T, R> extends Step<FuncCallStep<T, R>, FuncCallT
2627
private final String name;
2728
private final Function<T, R> fn;
2829
private final JavaContextFunction<T, R> ctxFn;
30+
private final JavaFilterFunction<T, R> filterFn;
2931
private final Class<T> argClass;
3032

3133
/** Function<T,R> variant (unnamed). */
@@ -38,6 +40,7 @@ public final class FuncCallStep<T, R> extends Step<FuncCallStep<T, R>, FuncCallT
3840
this.name = name;
3941
this.fn = fn;
4042
this.ctxFn = null;
43+
this.filterFn = null;
4144
this.argClass = argClass;
4245
}
4346

@@ -51,6 +54,21 @@ public final class FuncCallStep<T, R> extends Step<FuncCallStep<T, R>, FuncCallT
5154
this.name = name;
5255
this.fn = null;
5356
this.ctxFn = ctxFn;
57+
this.filterFn = null;
58+
this.argClass = argClass;
59+
}
60+
61+
/** JavaFilterFunction<T,R> variant (unnamed). */
62+
FuncCallStep(JavaFilterFunction<T, R> filterFn, Class<T> argClass) {
63+
this(null, filterFn, argClass);
64+
}
65+
66+
/** JavaFilterFunction<T,R> variant (named). */
67+
FuncCallStep(String name, JavaFilterFunction<T, R> filterFn, Class<T> argClass) {
68+
this.name = name;
69+
this.fn = null;
70+
this.ctxFn = null;
71+
this.filterFn = filterFn;
5472
this.argClass = argClass;
5573
}
5674

@@ -60,6 +78,8 @@ protected void configure(FuncTaskItemListBuilder list, Consumer<FuncCallTaskBuil
6078
cb -> {
6179
if (ctxFn != null) {
6280
cb.function(ctxFn, argClass);
81+
} else if (filterFn != null) {
82+
cb.function(filterFn, argClass);
6383
} else {
6484
cb.function(fn, argClass);
6585
}

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.cloudevents.CloudEventData;
1919
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
2020
import io.serverlessworkflow.api.types.func.JavaContextFunction;
21+
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
2122
import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder;
2223
import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder;
2324
import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder;
@@ -26,6 +27,8 @@
2627
import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer;
2728
import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer;
2829
import io.serverlessworkflow.fluent.func.dsl.internal.CommonFuncOps;
30+
import io.serverlessworkflow.impl.TaskContextData;
31+
import io.serverlessworkflow.impl.WorkflowContextData;
2932
import java.util.Collection;
3033
import java.util.List;
3134
import java.util.Map;
@@ -286,7 +289,7 @@ public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> cla
286289
}
287290

288291
/**
289-
* Build a call step for functions that need {@code WorkflowContextData} as the first parameter.
292+
* Build a call step for functions that need {@link WorkflowContextData} as the first parameter.
290293
* The DSL wraps it as a {@link JavaContextFunction} and injects the runtime context.
291294
*
292295
* <p>Signature expected: {@code (ctx, payload) -> result}
@@ -297,7 +300,7 @@ public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> cla
297300
* @param <R> result type
298301
* @return a call step
299302
*/
300-
public static <T, R> FuncCallStep<T, R> withContext(CtxBiFunction<T, R> fn, Class<T> in) {
303+
public static <T, R> FuncCallStep<T, R> withContext(JavaContextFunction<T, R> fn, Class<T> in) {
301304
return withContext(null, fn, in);
302305
}
303306

@@ -319,7 +322,7 @@ public static <T, R> FuncCallStep<T, R> withInstanceId(
319322
}
320323

321324
/**
322-
* Named variant of {@link #withContext(CtxBiFunction, Class)}.
325+
* Named variant of {@link #withContext(JavaContextFunction, Class)}.
323326
*
324327
* @param name task name
325328
* @param fn context-aware bi-function
@@ -329,9 +332,40 @@ public static <T, R> FuncCallStep<T, R> withInstanceId(
329332
* @return a named call step
330333
*/
331334
public static <T, R> FuncCallStep<T, R> withContext(
332-
String name, CtxBiFunction<T, R> fn, Class<T> in) {
333-
JavaContextFunction<T, R> jcf = (payload, wctx) -> fn.apply(wctx, payload);
334-
return new FuncCallStep<>(name, jcf, in);
335+
String name, JavaContextFunction<T, R> fn, Class<T> in) {
336+
return new FuncCallStep<>(name, fn, in);
337+
}
338+
339+
/**
340+
* Build a call step for functions that need {@link WorkflowContextData} and {@link
341+
* io.serverlessworkflow.impl.TaskContextData} as the first and second parameter. The DSL wraps it
342+
* as a {@link JavaFilterFunction} and injects the runtime context.
343+
*
344+
* <p>Signature expected: {@code (wctx, tctx, payload) -> result}
345+
*
346+
* @param fn context-aware bi-function
347+
* @param in payload input class
348+
* @param <T> input type
349+
* @param <R> result type
350+
* @return a call step
351+
*/
352+
public static <T, R> FuncCallStep<T, R> withFilter(JavaFilterFunction<T, R> fn, Class<T> in) {
353+
return withFilter(null, fn, in);
354+
}
355+
356+
/**
357+
* Named variant of {@link #withFilter(JavaFilterFunction, Class)}.
358+
*
359+
* @param name task name
360+
* @param fn context-aware bi-function
361+
* @param in payload input class
362+
* @param <T> input type
363+
* @param <R> result type
364+
* @return a named call step
365+
*/
366+
public static <T, R> FuncCallStep<T, R> withFilter(
367+
String name, JavaFilterFunction<T, R> fn, Class<T> in) {
368+
return new FuncCallStep<>(name, fn, in);
335369
}
336370

337371
/**
@@ -350,6 +384,38 @@ public static <T, R> FuncCallStep<T, R> withInstanceId(
350384
return new FuncCallStep<>(name, jcf, in);
351385
}
352386

387+
/**
388+
* Builds a composition of the current workflow instance id and the definition of the task
389+
* position as a JSON pointer.
390+
*/
391+
static String defaultUniqueId(WorkflowContextData wctx, TaskContextData tctx) {
392+
return String.format("%s-%s", wctx.instanceData().id(), tctx.position().jsonPointer());
393+
}
394+
395+
/**
396+
* Build a call step for functions that expect a composition with the workflow instance id and the
397+
* task position as the first parameter. The instance ID is extracted from the runtime context,
398+
* the task position from the definition.
399+
*
400+
* <p>Signature expected: {@code (uniqueId, payload) -> result}
401+
*
402+
* @param fn unique-id-aware bi-function
403+
* @param in payload input class
404+
* @param <T> input type
405+
* @param <R> result type
406+
* @return a call step
407+
*/
408+
public static <T, R> FuncCallStep<T, R> withUniqueId(
409+
String name, UniqueIdBiFunction<T, R> fn, Class<T> in) {
410+
JavaFilterFunction<T, R> jff =
411+
(payload, wctx, tctx) -> fn.apply(defaultUniqueId(wctx, tctx), payload);
412+
return new FuncCallStep<>(name, jff, in);
413+
}
414+
415+
public static <T, R> FuncCallStep<T, R> withUniqueId(UniqueIdBiFunction<T, R> fn, Class<T> in) {
416+
return withUniqueId(null, fn, in);
417+
}
418+
353419
/**
354420
* Create a fire-and-forget side-effect step (unnamed). The consumer receives the typed input.
355421
*
@@ -387,12 +453,12 @@ public static <T> ConsumeStep<T> consume(String name, Consumer<T> consumer, Clas
387453
* @param <R> result type
388454
* @return a call step
389455
*/
390-
public static <T, R> FuncCallStep<T, R> agent(InstanceIdBiFunction<T, R> fn, Class<T> in) {
391-
return withInstanceId(fn, in);
456+
public static <T, R> FuncCallStep<T, R> agent(UniqueIdBiFunction<T, R> fn, Class<T> in) {
457+
return withUniqueId(fn, in);
392458
}
393459

394460
/**
395-
* Named agent-style sugar. See {@link #agent(InstanceIdBiFunction, Class)}.
461+
* Named agent-style sugar. See {@link #agent(UniqueIdBiFunction, Class)}.
396462
*
397463
* @param name task name
398464
* @param fn (instanceId, payload) -> result
@@ -402,8 +468,8 @@ public static <T, R> FuncCallStep<T, R> agent(InstanceIdBiFunction<T, R> fn, Cla
402468
* @return a named call step
403469
*/
404470
public static <T, R> FuncCallStep<T, R> agent(
405-
String name, InstanceIdBiFunction<T, R> fn, Class<T> in) {
406-
return withInstanceId(name, fn, in);
471+
String name, UniqueIdBiFunction<T, R> fn, Class<T> in) {
472+
return withUniqueId(name, fn, in);
407473
}
408474

409475
/**
@@ -677,7 +743,7 @@ public static <T> FuncTaskConfigurer switchWhenOrElse(
677743
* switchWhenOrElse(".approved == true", "sendEmail", FlowDirectiveEnum.END)
678744
* </pre>
679745
*
680-
* The JQ expression is evaluated against the task input at runtime.
746+
* <p>The JQ expression is evaluated against the task input at runtime.
681747
*/
682748
public static FuncTaskConfigurer switchWhenOrElse(
683749
String jqExpression, String thenTask, FlowDirectiveEnum otherwise) {
@@ -698,7 +764,7 @@ public static FuncTaskConfigurer switchWhenOrElse(
698764
* switchWhenOrElse(".score >= 80", "pass", "fail")
699765
* </pre>
700766
*
701-
* The JQ expression is evaluated against the task input at runtime.
767+
* <p>The JQ expression is evaluated against the task input at runtime.
702768
*/
703769
public static FuncTaskConfigurer switchWhenOrElse(
704770
String jqExpression, String thenTask, String otherwiseTask) {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.fluent.func.dsl;
17+
18+
import java.util.function.BiFunction;
19+
20+
/**
21+
* Functions that expect a unique ID injection in runtime, typically an idempotent generated unique
22+
* id based on the workflow instance id and task name.
23+
*
24+
* @param <T> The task payload input
25+
* @param <R> The task result output
26+
*/
27+
@FunctionalInterface
28+
public interface UniqueIdBiFunction<T, R> extends BiFunction<String, T, R> {
29+
R apply(String uniqueId, T object);
30+
}

0 commit comments

Comments
 (0)