diff --git a/common/component/azure/servicebus/subscription.go b/common/component/azure/servicebus/subscription.go index ecdf7c00bb..a08d4b0530 100644 --- a/common/component/azure/servicebus/subscription.go +++ b/common/component/azure/servicebus/subscription.go @@ -273,8 +273,17 @@ func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler Handle continue } - // Handle the messages in background - go s.handleAsync(ctx, msgs, handler, receiver) + // If we require sessions then we must process the message + // synchronously to ensure the FIFO order is maintained. + // This is considered safe as even when using bulk receives, + // the messages are merged into a single request to the app + // containing multiple messages and thus it becomes an app + // concern to process them in order. + if s.requireSessions { + s.handleMessages(ctx, msgs, handler, receiver) + } else { + go s.handleMessages(ctx, msgs, handler, receiver) + } } } @@ -393,8 +402,8 @@ func (s *Subscription) doRenewLocksSession(ctx context.Context, sessionReceiver } } -// handleAsync handles messages from azure service bus and is meant to be called in a goroutine (go s.handleAsync). -func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.ReceivedMessage, handler HandlerFn, receiver Receiver) { +// handleMessages handles messages from azure service bus and can be called synchronously or asynchronously depending on order requirements. +func (s *Subscription) handleMessages(ctx context.Context, msgs []*azservicebus.ReceivedMessage, handler HandlerFn, receiver Receiver) { var ( consumeToken bool takenConcurrentHandler bool diff --git a/pubsub/azure/servicebus/topics/servicebus_test.go b/pubsub/azure/servicebus/topics/servicebus_test.go new file mode 100644 index 0000000000..c59b115ce7 --- /dev/null +++ b/pubsub/azure/servicebus/topics/servicebus_test.go @@ -0,0 +1,376 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topics + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + impl "github.com/dapr/components-contrib/common/component/azure/servicebus" + "github.com/dapr/kit/logger" + "github.com/dapr/kit/ptr" +) + +type mockReceiver struct { + messages []*azservicebus.ReceivedMessage + messageIndex int + sessionID string + mu sync.Mutex + closed bool +} + +func newMockReceiver(sessionID string, messages []*azservicebus.ReceivedMessage) *mockReceiver { + return &mockReceiver{ + sessionID: sessionID, + messages: messages, + } +} + +func (m *mockReceiver) ReceiveMessages(ctx context.Context, count int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.closed { + return nil, errors.New("receiver closed") + } + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + if m.messageIndex >= len(m.messages) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(100 * time.Millisecond): + return nil, errors.New("no more messages") + } + } + + end := m.messageIndex + count + if end > len(m.messages) { + end = len(m.messages) + } + + result := m.messages[m.messageIndex:end] + m.messageIndex = end + return result, nil +} + +func (m *mockReceiver) CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error { + return nil +} + +func (m *mockReceiver) AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error { + return nil +} + +func (m *mockReceiver) Close(ctx context.Context) error { + m.mu.Lock() + defer m.mu.Unlock() + m.closed = true + return nil +} + +func TestSessionOrderingWithSingleHandler(t *testing.T) { + const numMessages = 10 + sessionID := "test-session-1" + + messages := make([]*azservicebus.ReceivedMessage, numMessages) + for i := range numMessages { + seqNum := int64(i + 1) + messages[i] = &azservicebus.ReceivedMessage{ + MessageID: fmt.Sprintf("msg-%d", i), + SessionID: &sessionID, + SequenceNumber: &seqNum, + Body: []byte(fmt.Sprintf("message-%d", i)), + } + } + + sub := impl.NewSubscription( + impl.SubscriptionOptions{ + MaxActiveMessages: 100, + TimeoutInSec: 5, + MaxBulkSubCount: ptr.Of(1), + MaxConcurrentHandlers: 1, + Entity: "test-topic", + LockRenewalInSec: 30, + RequireSessions: true, + SessionIdleTimeout: time.Second * 5, + }, + logger.NewLogger("test"), + ) + + var ( + processedOrder []int + orderMu sync.Mutex + ) + + handlerFunc := func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]impl.HandlerResponseItem, error) { + var msgIndex int + _, err := fmt.Sscanf(string(msgs[0].Body), "message-%d", &msgIndex) + require.NoError(t, err) + + time.Sleep(10 * time.Millisecond) + + orderMu.Lock() + processedOrder = append(processedOrder, msgIndex) + orderMu.Unlock() + + return nil, nil + } + + receiver := newMockReceiver(sessionID, messages) + + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + + done := make(chan struct{}) + go func() { + defer close(done) + _ = sub.ReceiveBlocking(ctx, handlerFunc, receiver, func() {}, "test-session") + }() + + <-done + + expectedOrder := make([]int, numMessages) + for i := range expectedOrder { + expectedOrder[i] = i + } + + assert.Equal(t, expectedOrder, processedOrder, "messages must be processed in order") +} + +func TestMultipleSessionsConcurrentHandler(t *testing.T) { + const ( + numSessions = 5 + messagesPerSession = 10 + maxConcurrentLimit = 3 + ) + + sessionIDs := make([]string, numSessions) + for i := range numSessions { + sessionIDs[i] = fmt.Sprintf("session-%d", i) + } + + allMessages := make(map[string][]*azservicebus.ReceivedMessage) + for _, sessionID := range sessionIDs { + messages := make([]*azservicebus.ReceivedMessage, messagesPerSession) + for i := range messagesPerSession { + seqNum := int64(i + 1) + sessID := sessionID + messages[i] = &azservicebus.ReceivedMessage{ + MessageID: fmt.Sprintf("%s-msg-%d", sessionID, i), + SessionID: &sessID, + SequenceNumber: &seqNum, + Body: []byte(fmt.Sprintf("%s:%d", sessionID, i)), + } + } + allMessages[sessionID] = messages + } + + sub := impl.NewSubscription( + impl.SubscriptionOptions{ + MaxActiveMessages: 100, + TimeoutInSec: 5, + MaxBulkSubCount: ptr.Of(1), + MaxConcurrentHandlers: maxConcurrentLimit, + Entity: "test-topic", + LockRenewalInSec: 30, + RequireSessions: true, + SessionIdleTimeout: time.Second * 5, + }, + logger.NewLogger("test"), + ) + + // Track processing times and active messages per session + type messageProcessing struct { + sessionID string + messageID string + seqNum int64 + startTime time.Time + endTime time.Time + msgIndex int + } + + var ( + mu sync.Mutex + globalOrder []string + sessionOrders = make(map[string][]int) + concurrentHandlers atomic.Int32 + maxConcurrentHandlers atomic.Int32 + processingLog []messageProcessing + activePerSession = make(map[string]int32) + parallelViolations atomic.Int32 + ) + + handlerFunc := func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]impl.HandlerResponseItem, error) { + msg := msgs[0] + sessionID := *msg.SessionID + startTime := time.Now() + + // Track concurrent processing within the same session + mu.Lock() + activeCount := activePerSession[sessionID] + if activeCount > 0 { + // Another message from this session is already being processed + parallelViolations.Add(1) + t.Errorf("Session %s has %d messages processing in parallel at %v", + sessionID, activeCount+1, startTime) + } + activePerSession[sessionID]++ + mu.Unlock() + + current := concurrentHandlers.Add(1) + defer func() { + concurrentHandlers.Add(-1) + mu.Lock() + activePerSession[sessionID]-- + mu.Unlock() + }() + + for { + max := maxConcurrentHandlers.Load() + if current <= max || maxConcurrentHandlers.CompareAndSwap(max, current) { + break + } + } + + var msgIndex int + parts := strings.Split(string(msg.Body), ":") + require.Len(t, parts, 2) + _, err := fmt.Sscanf(parts[1], "%d", &msgIndex) + require.NoError(t, err) + + time.Sleep(50 * time.Millisecond) + + endTime := time.Now() + + mu.Lock() + globalOrder = append(globalOrder, sessionID) + sessionOrders[sessionID] = append(sessionOrders[sessionID], msgIndex) + processingLog = append(processingLog, messageProcessing{ + sessionID: sessionID, + messageID: msg.MessageID, + seqNum: *msg.SequenceNumber, + startTime: startTime, + endTime: endTime, + msgIndex: msgIndex, + }) + mu.Unlock() + + return nil, nil + } + + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + var wg sync.WaitGroup + for _, sessionID := range sessionIDs { + wg.Add(1) + go func() { + defer wg.Done() + receiver := newMockReceiver(sessionID, allMessages[sessionID]) + done := make(chan struct{}) + go func() { + defer close(done) + _ = sub.ReceiveBlocking(ctx, handlerFunc, receiver, func() {}, "session-"+sessionID) + }() + <-done + }() + } + + wg.Wait() + + // Verify no parallel session processing was detected + assert.Equal(t, int32(0), parallelViolations.Load(), + "N messages from the same session should process in parallel") + + // Verify FIFO ordering per session + for _, sessionID := range sessionIDs { + order := sessionOrders[sessionID] + require.Len(t, order, messagesPerSession, "session %s should process all messages", sessionID) + + for i := range messagesPerSession { + assert.Equal(t, i, order[i], "session %s message %d out of order", sessionID, i) + } + } + + // Verify strict ordering, message N+1 must start after message N ends + sessionProcessingTimes := make(map[string][]messageProcessing) + for _, proc := range processingLog { + sessionProcessingTimes[proc.sessionID] = append(sessionProcessingTimes[proc.sessionID], proc) + } + + for sessionID, procs := range sessionProcessingTimes { + require.Len(t, procs, messagesPerSession, "session %s should have all processing records", sessionID) + + // Sort by message index to ensure we check in FIFO order + sortedProcs := make([]messageProcessing, len(procs)) + for _, proc := range procs { + sortedProcs[proc.msgIndex] = proc + } + + // Verify each message starts after the previous one completes + for i := 1; i < len(sortedProcs); i++ { + prev := sortedProcs[i-1] + curr := sortedProcs[i] + + assert.False(t, curr.startTime.Before(prev.endTime), + "Session %s: message %d started at %v before message %d ended at %v (overlap detected)", + sessionID, curr.msgIndex, curr.startTime, prev.msgIndex, prev.endTime) + + // Also verify sequence numbers are strictly increasing + assert.Equal(t, prev.seqNum+1, curr.seqNum, + "Session %s: sequence numbers should be consecutive", sessionID) + } + } + + // Verify concurrent handler limits + assert.LessOrEqual(t, maxConcurrentHandlers.Load(), int32(maxConcurrentLimit), + "concurrent handlers should not exceed configured maximum") + + assert.Greater(t, maxConcurrentHandlers.Load(), int32(1), + "multiple handlers should run concurrently across sessions") + + // Check global order to prove concurrent processing + // If processed sequentially, all messages from one session would come before the next + // If processed concurrently, session IDs will be interleaved + hasInterleaving := false + seenSessions := make(map[string]bool) + lastSession := "" + + for _, sessionID := range globalOrder { + if sessionID != lastSession && seenSessions[sessionID] { + // We've seen this session before but with a different session in between + hasInterleaving = true + break + } + seenSessions[sessionID] = true + lastSession = sessionID + } + + assert.True(t, hasInterleaving, + "global order must show session interleaving, proving concurrent processing across sessions") +} diff --git a/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go b/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go index de47f202f1..2fbdca1a7b 100644 --- a/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go +++ b/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go @@ -18,6 +18,8 @@ import ( "fmt" "regexp" "strconv" + "sync" + "sync/atomic" "testing" "time" @@ -1084,10 +1086,16 @@ func TestServicebusWithSessionsFIFO(t *testing.T) { sessionWatcher := watcher.NewOrdered() + // Track active messages per session to ensure no parallel processing within a session + var ( + fifoMu sync.Mutex + fifoActivePerSess = make(map[string]int) + fifoParallelIssues atomic.Int32 + ) + // subscriber of the given topic subscriberApplicationWithSessions := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn { return func(ctx flow.Context, s common.Service) error { - // Setup the /orders event handler. return multierr.Combine( s.AddTopicEventHandler(&common.Subscription{ PubsubName: pubsubName, @@ -1098,9 +1106,31 @@ func TestServicebusWithSessionsFIFO(t *testing.T) { "maxConcurrentSessions": "1", }, }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { - // Track/Observe the data of the event. + // Extract session ID (if present) to track concurrency + var sessionID string + if m := sessionIDRegex.FindStringSubmatch(fmt.Sprintf("%s", e.Data)); len(m) > 1 { + sessionID = m[1] + } + + fifoMu.Lock() + active := fifoActivePerSess[sessionID] + if active > 0 { + fifoParallelIssues.Add(1) + ctx.Logf("Session %s already has %d active messages", sessionID, active) + } + fifoActivePerSess[sessionID] = active + 1 + fifoMu.Unlock() + + // Simulate handler work to widen potential overlap window + time.Sleep(20 * time.Millisecond) + messagesWatcher.Observe(e.Data) ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data) + + fifoMu.Lock() + fifoActivePerSess[sessionID]-- + fifoMu.Unlock() + return false, nil }), ) @@ -1221,6 +1251,9 @@ func TestServicebusWithSessionsFIFO(t *testing.T) { if !assert.Equal(t, ordered, observed) { t.Errorf("expected: %v, observed: %v", ordered, observed) } + + // Assert no parallel violations within the single session + assert.Equal(t, int32(0), fifoParallelIssues.Load(), "no parallel processing within a session expected") } return nil @@ -1256,6 +1289,215 @@ func TestServicebusWithSessionsFIFO(t *testing.T) { Run() } +// TestServicebusWithConcurrentSessionsFIFO validates that multiple sessions can be +// processed concurrently while each session maintains strict FIFO ordering. +func TestServicebusWithConcurrentSessionsFIFO(t *testing.T) { + topic := "sessions-concurrent-fifo" + numSessions := 5 + + sessionWatcher := watcher.NewUnordered() + + var ( + mu sync.Mutex + globalOrder []string // tracks session IDs in the order messages were received + activePerSession = make(map[string]int) + parallelIssues atomic.Int32 + ) + + subscriberApplicationWithSessions := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn { + return func(ctx flow.Context, s common.Service) error { + return multierr.Combine( + s.AddTopicEventHandler(&common.Subscription{ + PubsubName: pubsubName, + Topic: topicName, + Route: "/orders", + Metadata: map[string]string{ + "requireSessions": "true", + "maxConcurrentSessions": "5", + }, + }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { + messagesWatcher.Observe(e.Data) + + // Track session ID and enforce single in-flight per session + match := sessionIDRegex.FindStringSubmatch(fmt.Sprintf("%s", e.Data)) + var sessionID string + if len(match) > 1 { + sessionID = match[1] + mu.Lock() + inflight := activePerSession[sessionID] + if inflight > 0 { + parallelIssues.Add(1) + ctx.Logf("Session %s already has %d active messages", sessionID, inflight) + } + activePerSession[sessionID] = inflight + 1 + globalOrder = append(globalOrder, sessionID) + mu.Unlock() + } + + // Simulate work + time.Sleep(30 * time.Millisecond) + + ctx.Logf("Message Received appID: %s, pubsub: %s, topic: %s, id: %s, data: %s", + appID, e.PubsubName, e.Topic, e.ID, e.Data) + + if sessionID != "" { + mu.Lock() + activePerSession[sessionID]-- + mu.Unlock() + } + + return false, nil + }), + ) + } + } + + publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable { + return func(ctx flow.Context) error { + messages := make([]string, numMessages) + for i := range messages { + var msgSuffix string + if metadata["SessionId"] != "" { + msgSuffix = fmt.Sprintf(", sessionId: %s", metadata["SessionId"]) + } + messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s%s", + metadata[messageKey], topicName, i, uuid.New().String(), msgSuffix) + } + + for _, messageWatcher := range messageWatchers { + messageWatcher.ExpectStrings(messages...) + } + + client := sidecar.GetClient(ctx, sidecarName) + ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName) + + var publishOptions dapr.PublishEventOption + if metadata != nil { + publishOptions = dapr.PublishEventWithMetadata(metadata) + } + + for _, message := range messages { + ctx.Logf("Publishing: %q", message) + var err error + if publishOptions != nil { + err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions) + } else { + err = client.PublishEvent(ctx, pubsubName, topicName, message) + } + require.NoError(ctx, err, "error publishing message") + } + return nil + } + } + + assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable { + return func(ctx flow.Context) error { + for _, m := range messageWatchers { + _, exp, obs := m.Partial(ctx, timeout) + + var observed []string + if obs != nil { + for _, v := range obs.([]interface{}) { + observed = append(observed, v.(string)) + } + } + var expected []string + if exp != nil { + for _, v := range exp.([]interface{}) { + expected = append(expected, v.(string)) + } + } + + // Group messages by session + sessionMessages := make(map[string][]string) + for _, msg := range observed { + match := sessionIDRegex.FindStringSubmatch(msg) + if len(match) > 0 { + sessionID := match[1] + sessionMessages[sessionID] = append(sessionMessages[sessionID], msg) + } else { + t.Error("session id not found in message") + } + } + + require.Greater(t, len(sessionMessages), 1, + "should receive messages from multiple sessions concurrently") + + // Verify FIFO ordering per session + for sessionID, msgs := range sessionMessages { + var expectedForSession []string + for _, msg := range expected { + match := sessionIDRegex.FindStringSubmatch(msg) + if len(match) > 0 && match[1] == sessionID { + expectedForSession = append(expectedForSession, msg) + } + } + + require.Equal(t, expectedForSession, msgs, + "session %s messages must be in FIFO order", sessionID) + } + + // Check global order to prove concurrent processing + // If processed sequentially, all messages from one session would come before the next + // If processed concurrently, session IDs will be interleaved + mu.Lock() + orderCopy := make([]string, len(globalOrder)) + copy(orderCopy, globalOrder) + mu.Unlock() + + if len(orderCopy) > 1 { + // Check if we have session interleaving + hasInterleaving := false + seenSessions := make(map[string]bool) + lastSession := "" + + for _, sessionID := range orderCopy { + if sessionID != lastSession && seenSessions[sessionID] { + // We've seen this session before but with a different session in between + hasInterleaving = true + break + } + seenSessions[sessionID] = true + lastSession = sessionID + } + + require.True(t, hasInterleaving, + "global order must show session interleaving, proving concurrent processing") + + ctx.Logf("Successfully processed %d sessions concurrently with FIFO ordering maintained", + len(sessionMessages)) + + // Assert no parallel violations within a single session + assert.Equal(t, int32(0), parallelIssues.Load(), "no parallel processing within a session expected") + } + } + return nil + } + } + + f := flow.New(t, "servicebus certification concurrent sessions FIFO test"). + Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), + subscriberApplicationWithSessions(appID1, topic, sessionWatcher))). + Step(sidecar.Run(sidecarName1, + append(componentRuntimeOptions(), + embedded.WithComponentsPath("./components/consumer_one"), + embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)), + embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)), + )..., + )) + + for i := 0; i < numSessions; i++ { + sessionID := fmt.Sprintf("session-%d", i) + f = f.Step(fmt.Sprintf("publish messages to %s", sessionID), + publishMessages(map[string]string{"SessionId": sessionID}, sidecarName1, topic, sessionWatcher)) + } + + f.Step("verify all sessions processed with FIFO ordering and concurrency", assertMessages(30*time.Second, sessionWatcher)). + Step("reset", flow.Reset(sessionWatcher)). + Run() +} + // TestServicebusWithSessionsRoundRobin tests that if we publish messages to the same // topic but with 2 different session ids (session1 and session2), then eventually // the receiver will receive messages from both the sessions. @@ -1266,10 +1508,16 @@ func TestServicebusWithSessionsRoundRobin(t *testing.T) { sessionWatcher := watcher.NewUnordered() + // Concurrency tracking for round-robin scenario + var ( + rrMu sync.Mutex + rrActivePerSess = make(map[string]int) + rrParallelIssues atomic.Int32 + ) + // subscriber of the given topic subscriberApplicationWithSessions := func(appID string, topicName string, messageWatcher *watcher.Watcher) app.SetupFn { return func(ctx flow.Context, s common.Service) error { - // Setup the /orders event handler. return multierr.Combine( s.AddTopicEventHandler(&common.Subscription{ PubsubName: pubsubName, @@ -1281,9 +1529,31 @@ func TestServicebusWithSessionsRoundRobin(t *testing.T) { "sessionIdleTimeoutInSec": "2", // timeout and try another session }, }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { - // Track/Observe the data of the event. + // Extract session ID + var sessionID string + if m := sessionIDRegex.FindStringSubmatch(fmt.Sprintf("%s", e.Data)); len(m) > 1 { + sessionID = m[1] + } + + rrMu.Lock() + active := rrActivePerSess[sessionID] + if active > 0 { + rrParallelIssues.Add(1) + ctx.Logf("Session %s already has %d active messages", sessionID, active) + } + rrActivePerSess[sessionID] = active + 1 + rrMu.Unlock() + + // Simulate work + time.Sleep(15 * time.Millisecond) + messageWatcher.Observe(e.Data) ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data) + + rrMu.Lock() + rrActivePerSess[sessionID]-- + rrMu.Unlock() + return false, nil }), ) @@ -1341,6 +1611,9 @@ func TestServicebusWithSessionsRoundRobin(t *testing.T) { m.Assert(ctx, 25*timeout) } + // Assert no parallel violations + assert.Equal(t, int32(0), rrParallelIssues.Load(), "no parallel processing within a session expected") + return nil } }