Skip to content

Commit f55d382

Browse files
authored
Dynamic workflows and activities (#1946)
* WIP * Added test for registering multiple dynamic activities * varags of EncodedValue doesn't work * test almost working, extra {} for some reason around * need .name to get rid of brackets * activities work now too * clean up * Fix lock location, add dynamic activity options, * workflow_testsuite dynamic workflow test, fix other test * rename, cleanup * set unused field to _ * forgot a rename spot
1 parent b13a953 commit f55d382

File tree

13 files changed

+540
-29
lines changed

13 files changed

+540
-29
lines changed

activity/activity.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ type (
1717

1818
// RegisterOptions consists of options for registering an activity.
1919
RegisterOptions = internal.RegisterActivityOptions
20+
21+
// DynamicRegisterOptions consists of options for registering a dynamic activity.
22+
DynamicRegisterOptions = internal.DynamicRegisterActivityOptions
2023
)
2124

2225
// ErrResultPending is returned from activity's implementation to indicate the activity is not completed when the

client/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,8 +1403,8 @@ func NewValue(data *commonpb.Payloads) converter.EncodedValue {
14031403
}
14041404

14051405
// NewValues creates a new [converter.EncodedValues] which can be used to decode binary data returned by Temporal. For example:
1406-
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got response from calling [client.Client.DescribeWorkflowExecution].
1407-
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
1406+
// User has Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got a response from calling [client.Client.DescribeWorkflowExecution].
1407+
// The response contains the binary field PendingActivityInfo.HeartbeatDetails,
14081408
// which can be decoded by using:
14091409
//
14101410
// var result1 string

contrib/tools/workflowcheck/workflow/testdata/src/go.temporal.io/sdk/worker/worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ type Registry interface {
1717
type WorkflowRegistry interface {
1818
RegisterWorkflow(w interface{})
1919
RegisterWorkflowWithOptions(w interface{}, options workflow.RegisterOptions)
20+
RegisterDynamicWorkflow(w interface{}, options workflow.DynamicRegisterOptions)
2021
}
2122

2223
type ActivityRegistry interface {
2324
RegisterActivity(a interface{})
2425
RegisterActivityWithOptions(a interface{}, options activity.RegisterOptions)
26+
RegisterDynamicActivity(a interface{}, options activity.DynamicRegisterOptions)
2527
}

internal/internal_worker.go

Lines changed: 151 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,10 @@ type registry struct {
535535
workflowVersioningBehaviorMap map[string]VersioningBehavior
536536
activityFuncMap map[string]activity
537537
activityAliasMap map[string]string
538+
dynamicWorkflow interface{}
539+
dynamicWorkflowOptions DynamicRegisterWorkflowOptions
540+
dynamicActivity activity
541+
_ DynamicRegisterActivityOptions
538542
interceptors []WorkerInterceptor
539543
}
540544

@@ -567,7 +571,7 @@ func (r *registry) RegisterWorkflowWithOptions(
567571
}
568572
// Validate that it is a function
569573
fnType := reflect.TypeOf(wf)
570-
if err := validateFnFormat(fnType, true); err != nil {
574+
if err := validateFnFormat(fnType, true, false); err != nil {
571575
panic(err)
572576
}
573577
fnName, _ := getFunctionName(wf)
@@ -597,6 +601,29 @@ func (r *registry) RegisterWorkflowWithOptions(
597601
}
598602
}
599603

604+
func (r *registry) RegisterDynamicWorkflow(wf interface{}, options DynamicRegisterWorkflowOptions) {
605+
r.Lock()
606+
defer r.Unlock()
607+
// Support direct registration of WorkflowDefinition
608+
factory, ok := wf.(WorkflowDefinitionFactory)
609+
if ok {
610+
r.dynamicWorkflow = factory
611+
r.dynamicWorkflowOptions = options
612+
return
613+
}
614+
615+
// Validate that it is a function
616+
fnType := reflect.TypeOf(wf)
617+
if err := validateFnFormat(fnType, true, true); err != nil {
618+
panic(err)
619+
}
620+
if r.dynamicWorkflow != nil {
621+
panic("dynamic workflow already registered")
622+
}
623+
r.dynamicWorkflow = wf
624+
r.dynamicWorkflowOptions = options
625+
}
626+
600627
func (r *registry) RegisterActivity(af interface{}) {
601628
r.RegisterActivityWithOptions(af, RegisterActivityOptions{})
602629
}
@@ -626,7 +653,7 @@ func (r *registry) RegisterActivityWithOptions(
626653
}
627654
return
628655
}
629-
if err := validateFnFormat(fnType, false); err != nil {
656+
if err := validateFnFormat(fnType, false, false); err != nil {
630657
panic(err)
631658
}
632659
fnName, _ := getFunctionName(af)
@@ -669,7 +696,7 @@ func (r *registry) registerActivityStructWithOptions(aStruct interface{}, option
669696
continue
670697
}
671698
name := method.Name
672-
if err := validateFnFormat(method.Type, false); err != nil {
699+
if err := validateFnFormat(method.Type, false, false); err != nil {
673700
if options.SkipInvalidStructFunctions {
674701
continue
675702
}
@@ -691,6 +718,26 @@ func (r *registry) registerActivityStructWithOptions(aStruct interface{}, option
691718
return nil
692719
}
693720

721+
func (r *registry) RegisterDynamicActivity(af interface{}, options DynamicRegisterActivityOptions) {
722+
r.Lock()
723+
defer r.Unlock()
724+
// Support direct registration of activity
725+
a, ok := af.(activity)
726+
if ok {
727+
r.dynamicActivity = a
728+
return
729+
}
730+
// Validate that it is a function
731+
fnType := reflect.TypeOf(af)
732+
if err := validateFnFormat(fnType, false, true); err != nil {
733+
panic(err)
734+
}
735+
if r.dynamicActivity != nil {
736+
panic("dynamic activity already registered")
737+
}
738+
r.dynamicActivity = &activityExecutor{name: "", fn: af, dynamic: true}
739+
}
740+
694741
func (r *registry) RegisterNexusService(service *nexus.Service) {
695742
if service.Name == "" {
696743
panic(fmt.Errorf("tried to register a service with no name"))
@@ -715,8 +762,14 @@ func (r *registry) getWorkflowAlias(fnName string) (string, bool) {
715762
func (r *registry) getWorkflowFn(fnName string) (interface{}, bool) {
716763
r.Lock()
717764
defer r.Unlock()
718-
fn, ok := r.workflowFuncMap[fnName]
719-
return fn, ok
765+
if fn, ok := r.workflowFuncMap[fnName]; ok {
766+
return fn, ok
767+
}
768+
769+
if r.dynamicWorkflow != nil {
770+
return "dynamic", true
771+
}
772+
return nil, false
720773
}
721774

722775
func (r *registry) getRegisteredWorkflowTypes() []string {
@@ -745,8 +798,13 @@ func (r *registry) addActivityWithLock(fnName string, a activity) {
745798
func (r *registry) GetActivity(fnName string) (activity, bool) {
746799
r.Lock()
747800
defer r.Unlock()
748-
a, ok := r.activityFuncMap[fnName]
749-
return a, ok
801+
if a, ok := r.activityFuncMap[fnName]; ok {
802+
return a, ok
803+
}
804+
if r.dynamicActivity != nil {
805+
return r.dynamicActivity, true
806+
}
807+
return nil, false
750808
}
751809

752810
func (r *registry) getActivityNoLock(fnName string) (activity, bool) {
@@ -757,10 +815,17 @@ func (r *registry) getActivityNoLock(fnName string) (activity, bool) {
757815
func (r *registry) getRegisteredActivities() []activity {
758816
r.Lock()
759817
defer r.Unlock()
760-
activities := make([]activity, 0, len(r.activityFuncMap))
818+
numActivities := len(r.activityFuncMap)
819+
if r.dynamicActivity != nil {
820+
numActivities++
821+
}
822+
activities := make([]activity, 0, numActivities)
761823
for _, a := range r.activityFuncMap {
762824
activities = append(activities, a)
763825
}
826+
if r.dynamicActivity != nil {
827+
activities = append(activities, r.dynamicActivity)
828+
}
764829
return activities
765830
}
766831

@@ -788,7 +853,12 @@ func (r *registry) getWorkflowDefinition(wt WorkflowType) (WorkflowDefinition, e
788853
if ok {
789854
return wdf.NewWorkflowDefinition(), nil
790855
}
791-
executor := &workflowExecutor{workflowType: lookup, fn: wf, interceptors: r.interceptors}
856+
var dynamic bool
857+
if d, ok := wf.(string); ok && d == "dynamic" {
858+
wf = r.dynamicWorkflow
859+
dynamic = true
860+
}
861+
executor := &workflowExecutor{workflowType: lookup, fn: wf, interceptors: r.interceptors, dynamic: dynamic}
792862
return newSyncWorkflowDefinition(executor), nil
793863
}
794864

@@ -799,8 +869,16 @@ func (r *registry) getWorkflowVersioningBehavior(wt WorkflowType) (VersioningBeh
799869
}
800870
r.Lock()
801871
defer r.Unlock()
802-
behavior := r.workflowVersioningBehaviorMap[lookup]
803-
return behavior, behavior != VersioningBehaviorUnspecified
872+
if behavior, ok := r.workflowVersioningBehaviorMap[lookup]; ok {
873+
return behavior, behavior != VersioningBehaviorUnspecified
874+
}
875+
if r.dynamicWorkflowOptions.LoadDynamicRuntimeOptions != nil {
876+
config := LoadDynamicRuntimeOptionsDetails{WorkflowType: wt}
877+
if behavior, err := r.dynamicWorkflowOptions.LoadDynamicRuntimeOptions(config); err == nil {
878+
return behavior.VersioningBehavior, true
879+
}
880+
}
881+
return VersioningBehaviorUnspecified, false
804882
}
805883

806884
func (r *registry) getNexusService(service string) *nexus.Service {
@@ -820,7 +898,7 @@ func (r *registry) getRegisteredNexusServices() []*nexus.Service {
820898
}
821899

822900
// Validate function parameters.
823-
func validateFnFormat(fnType reflect.Type, isWorkflow bool) error {
901+
func validateFnFormat(fnType reflect.Type, isWorkflow, isDynamic bool) error {
824902
if fnType.Kind() != reflect.Func {
825903
return fmt.Errorf("expected a func as input but was %s", fnType.Kind())
826904
}
@@ -845,6 +923,17 @@ func validateFnFormat(fnType reflect.Type, isWorkflow bool) error {
845923
}
846924
}
847925

926+
if isDynamic {
927+
if fnType.NumIn() != 2 {
928+
return fmt.Errorf(
929+
"expected function to have two arguments, first being workflow.Context and second being an EncodedValues type, found %d arguments", fnType.NumIn(),
930+
)
931+
}
932+
if fnType.In(1) != reflect.TypeOf((*converter.EncodedValues)(nil)).Elem() {
933+
return fmt.Errorf("expected function to EncodedValues as second argument, got %s", fnType.In(1).Elem())
934+
}
935+
}
936+
848937
// Return values
849938
// We expect either
850939
// <result>, error
@@ -888,17 +977,25 @@ type workflowExecutor struct {
888977
workflowType string
889978
fn interface{}
890979
interceptors []WorkerInterceptor
980+
dynamic bool
891981
}
892982

893983
func (we *workflowExecutor) Execute(ctx Context, input *commonpb.Payloads) (*commonpb.Payloads, error) {
894984
dataConverter := WithWorkflowContext(ctx, getWorkflowEnvOptions(ctx).DataConverter)
895985
fnType := reflect.TypeOf(we.fn)
896986

897-
args, err := decodeArgsToRawValues(dataConverter, fnType, input)
898-
if err != nil {
899-
return nil, fmt.Errorf(
900-
"unable to decode the workflow function input payload with error: %w, function name: %v",
901-
err, we.workflowType)
987+
var args []interface{}
988+
var err error
989+
if we.dynamic {
990+
// Dynamic workflows take in a single EncodedValues, encode all data into single EncodedValues
991+
args = []interface{}{newEncodedValues(input, dataConverter)}
992+
} else {
993+
args, err = decodeArgsToRawValues(dataConverter, fnType, input)
994+
if err != nil {
995+
return nil, fmt.Errorf(
996+
"unable to decode the workflow function input payload with error: %w, function name: %v",
997+
err, we.workflowType)
998+
}
902999
}
9031000

9041001
envInterceptor := getWorkflowEnvironmentInterceptor(ctx)
@@ -918,6 +1015,7 @@ type activityExecutor struct {
9181015
name string
9191016
fn interface{}
9201017
skipInterceptors bool
1018+
dynamic bool
9211019
}
9221020

9231021
func (ae *activityExecutor) ActivityType() ActivityType {
@@ -932,11 +1030,18 @@ func (ae *activityExecutor) Execute(ctx context.Context, input *commonpb.Payload
9321030
fnType := reflect.TypeOf(ae.fn)
9331031
dataConverter := getDataConverterFromActivityCtx(ctx)
9341032

935-
args, err := decodeArgsToRawValues(dataConverter, fnType, input)
936-
if err != nil {
937-
return nil, fmt.Errorf(
938-
"unable to decode the activity function input payload with error: %w for function name: %v",
939-
err, ae.name)
1033+
var args []interface{}
1034+
var err error
1035+
if ae.dynamic {
1036+
// Dynamic activities take in a single EncodedValues, encode all data into single EncodedValues
1037+
args = []interface{}{newEncodedValues(input, dataConverter)}
1038+
} else {
1039+
args, err = decodeArgsToRawValues(dataConverter, fnType, input)
1040+
if err != nil {
1041+
return nil, fmt.Errorf(
1042+
"unable to decode the activity function input payload with error: %w for function name: %v",
1043+
err, ae.name)
1044+
}
9401045
}
9411046

9421047
return ae.ExecuteWithActualArgs(ctx, args)
@@ -1044,6 +1149,19 @@ func (aw *AggregatedWorker) RegisterWorkflowWithOptions(w interface{}, options R
10441149
aw.registry.RegisterWorkflowWithOptions(w, options)
10451150
}
10461151

1152+
// RegisterDynamicWorkflow registers dynamic workflow implementation with the AggregatedWorker
1153+
func (aw *AggregatedWorker) RegisterDynamicWorkflow(w interface{}, options DynamicRegisterWorkflowOptions) {
1154+
if aw.workflowWorker == nil {
1155+
panic("workflow worker disabled, cannot register workflow")
1156+
}
1157+
if options.LoadDynamicRuntimeOptions == nil && aw.executionParams.UseBuildIDForVersioning &&
1158+
(aw.executionParams.DeploymentSeriesName != "" || aw.executionParams.WorkerDeploymentVersion != "") &&
1159+
aw.executionParams.DefaultVersioningBehavior == VersioningBehaviorUnspecified {
1160+
panic("dynamic workflow does not have a versioning behavior")
1161+
}
1162+
aw.registry.RegisterDynamicWorkflow(w, options)
1163+
}
1164+
10471165
// RegisterActivity registers activity implementation with the AggregatedWorker
10481166
func (aw *AggregatedWorker) RegisterActivity(a interface{}) {
10491167
aw.registry.RegisterActivity(a)
@@ -1054,6 +1172,12 @@ func (aw *AggregatedWorker) RegisterActivityWithOptions(a interface{}, options R
10541172
aw.registry.RegisterActivityWithOptions(a, options)
10551173
}
10561174

1175+
// RegisterDynamicActivity registers the dynamic activity function with options.
1176+
// Registering activities via a structure is not supported for dynamic activities.
1177+
func (aw *AggregatedWorker) RegisterDynamicActivity(a interface{}, options DynamicRegisterActivityOptions) {
1178+
aw.registry.RegisterDynamicActivity(a, options)
1179+
}
1180+
10571181
func (aw *AggregatedWorker) RegisterNexusService(service *nexus.Service) {
10581182
if aw.started.Load() {
10591183
panic(errors.New("cannot register Nexus services after worker start"))
@@ -1334,6 +1458,11 @@ func (aw *WorkflowReplayer) RegisterWorkflowWithOptions(w interface{}, options R
13341458
aw.registry.RegisterWorkflowWithOptions(w, options)
13351459
}
13361460

1461+
// RegisterDynamicWorkflow registers a dynamic workflow function to replay
1462+
func (aw *WorkflowReplayer) RegisterDynamicWorkflow(w interface{}, options DynamicRegisterWorkflowOptions) {
1463+
aw.registry.RegisterDynamicWorkflow(w, options)
1464+
}
1465+
13371466
// ReplayWorkflowHistoryWithOptions executes a single workflow task for the given history.
13381467
// Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger.
13391468
// The logger is an optional parameter. Defaults to the noop logger.

internal/internal_worker_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2983,3 +2983,30 @@ func (s *internalWorkerTestSuite) TestReservedTemporalName() {
29832983
require.Error(s.T(), err)
29842984
require.Contains(s.T(), err.Error(), temporalPrefixError)
29852985
}
2986+
2987+
func (s *internalWorkerTestSuite) TestRegisterMultipleDynamicWorkflow() {
2988+
var suite WorkflowTestSuite
2989+
env := suite.NewTestWorkflowEnvironment()
2990+
workflowFn1 := func(ctx Context, values converter.EncodedValues) error { return nil }
2991+
workflowFn2 := func(ctx Context, values converter.EncodedValues) error { return nil }
2992+
env.RegisterDynamicWorkflow(workflowFn1, DynamicRegisterWorkflowOptions{})
2993+
err := runAndCatchPanic(func() {
2994+
env.RegisterDynamicWorkflow(workflowFn2, DynamicRegisterWorkflowOptions{})
2995+
})
2996+
require.Error(s.T(), err)
2997+
require.Contains(s.T(), err.Error(), "dynamic workflow already registered")
2998+
2999+
// activity
3000+
activityFn1 := func(ctx context.Context, values converter.EncodedValues) error {
3001+
return nil
3002+
}
3003+
activityFn2 := func(ctx context.Context, values converter.EncodedValues) error {
3004+
return nil
3005+
}
3006+
env.RegisterDynamicActivity(activityFn1, DynamicRegisterActivityOptions{})
3007+
err = runAndCatchPanic(func() {
3008+
env.RegisterDynamicActivity(activityFn2, DynamicRegisterActivityOptions{})
3009+
})
3010+
require.Error(s.T(), err)
3011+
require.Contains(s.T(), err.Error(), "dynamic activity already registered")
3012+
}

internal/internal_workflow_client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1665,6 +1665,7 @@ func createStartWorkflowInput(
16651665
return nil, err
16661666
}
16671667
workflowType, err := getWorkflowFunctionName(registry, workflow)
1668+
16681669
if err != nil {
16691670
return nil, err
16701671
}

0 commit comments

Comments
 (0)