Skip to content

Commit 36fc3c6

Browse files
committed
separation of concerns
1 parent 72a1d3e commit 36fc3c6

File tree

1 file changed

+111
-55
lines changed

1 file changed

+111
-55
lines changed

dbos/admin_server_test.go

Lines changed: 111 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -560,12 +560,6 @@ func TestAdminServer(t *testing.T) {
560560
// Create a workflow queue with limited concurrency to keep workflows enqueued
561561
queue := NewWorkflowQueue(ctx, "test-queue", WithGlobalConcurrency(1))
562562

563-
// Create a partitioned queue for partition key test
564-
partitionedQueue := NewWorkflowQueue(ctx, "partitioned-test-queue", WithPartitionQueue(), WithGlobalConcurrency(1))
565-
566-
// Create a priority-enabled queue for priority and deduplication tests
567-
priorityQueue := NewWorkflowQueue(ctx, "priority-test-queue", WithPriorityEnabled(), WithGlobalConcurrency(1))
568-
569563
// Define a blocking workflow that will hold up the queue
570564
startEvent := NewEvent()
571565
blockingChan := make(chan struct{})
@@ -616,22 +610,6 @@ func TestAdminServer(t *testing.T) {
616610
enqueuedHandles = append(enqueuedHandles, handle)
617611
}
618612

619-
// Create workflow with partition key
620-
partitionHandle, err := RunWorkflow(ctx, blockingWorkflow, "partition-test", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1"))
621-
require.NoError(t, err, "Failed to create workflow with partition key")
622-
enqueuedHandles = append(enqueuedHandles, partitionHandle)
623-
624-
// Create workflow with deduplication ID
625-
dedupID := "test-dedup-id"
626-
dedupHandle, err := RunWorkflow(ctx, blockingWorkflow, "dedup-test", WithQueue(priorityQueue.Name), WithDeduplicationID(dedupID))
627-
require.NoError(t, err, "Failed to create workflow with deduplication ID")
628-
enqueuedHandles = append(enqueuedHandles, dedupHandle)
629-
630-
// Create workflow with priority
631-
priorityHandle, err := RunWorkflow(ctx, blockingWorkflow, "priority-test", WithQueue(priorityQueue.Name), WithPriority(5))
632-
require.NoError(t, err, "Failed to create workflow with priority")
633-
enqueuedHandles = append(enqueuedHandles, priorityHandle)
634-
635613
// Create non-queued workflows that should NOT appear in queues-only results
636614
var regularHandles []WorkflowHandle[string]
637615
for i := range 2 {
@@ -661,11 +639,10 @@ func TestAdminServer(t *testing.T) {
661639
err = json.NewDecoder(respQueuesOnly.Body).Decode(&queuesOnlyWorkflows)
662640
require.NoError(t, err, "Failed to decode queues_only workflows response")
663641

664-
// Should have exactly 7 workflows (3 original + 1 pending + 1 partition + 1 dedup + 1 priority)
665-
assert.Equal(t, 7, len(queuesOnlyWorkflows), "Expected exactly 7 workflows")
642+
// Should have exactly 4 workflows (1 pending + 3 enqueued)
643+
assert.Equal(t, 4, len(queuesOnlyWorkflows), "Expected exactly 4 workflows")
666644

667645
// Verify all returned workflows are from the queue and have ENQUEUED/PENDING status
668-
// Also verify QueuePartitionKey, DeduplicationID, and Priority fields are present
669646
for _, wf := range queuesOnlyWorkflows {
670647
status, ok := wf["Status"].(string)
671648
require.True(t, ok, "Status should be a string")
@@ -675,36 +652,6 @@ func TestAdminServer(t *testing.T) {
675652
queueName, ok := wf["QueueName"].(string)
676653
require.True(t, ok, "QueueName should be a string")
677654
assert.NotEmpty(t, queueName, "QueueName should not be empty")
678-
679-
wfID, ok := wf["WorkflowUUID"].(string)
680-
require.True(t, ok, "WorkflowUUID should be a string")
681-
682-
// Verify QueuePartitionKey field is present (may be empty string for non-partitioned workflows)
683-
_, hasPartitionKey := wf["QueuePartitionKey"]
684-
assert.True(t, hasPartitionKey, "QueuePartitionKey field should be present for workflow %s", wfID)
685-
686-
// Verify DeduplicationID field is present (may be empty string for workflows without dedup ID)
687-
_, hasDedupID := wf["DeduplicationID"]
688-
assert.True(t, hasDedupID, "DeduplicationID field should be present for workflow %s", wfID)
689-
690-
// Verify Priority field is present (may be 0 for workflows without priority)
691-
_, hasPriority := wf["Priority"]
692-
assert.True(t, hasPriority, "Priority field should be present for workflow %s", wfID)
693-
694-
// Verify specific values for our test workflows
695-
if wfID == partitionHandle.GetWorkflowID() {
696-
partitionKey, ok := wf["QueuePartitionKey"].(string)
697-
require.True(t, ok, "QueuePartitionKey should be a string")
698-
assert.Equal(t, "partition-1", partitionKey, "Expected partition key to be 'partition-1'")
699-
} else if wfID == dedupHandle.GetWorkflowID() {
700-
dedupIDResp, ok := wf["DeduplicationID"].(string)
701-
require.True(t, ok, "DeduplicationID should be a string")
702-
assert.Equal(t, dedupID, dedupIDResp, "Expected deduplication ID to match")
703-
} else if wfID == priorityHandle.GetWorkflowID() {
704-
priority, ok := wf["Priority"].(float64) // JSON numbers decode as float64
705-
require.True(t, ok, "Priority should be a number")
706-
assert.Equal(t, float64(5), priority, "Expected priority to be 5")
707-
}
708655
}
709656

710657
// Verify that the enqueued workflow IDs match
@@ -786,6 +733,115 @@ func TestAdminServer(t *testing.T) {
786733
assert.Equal(t, queue.Name, queueName, "Expected queue name to be 'test-queue'")
787734
})
788735

736+
t.Run("ListQueuedWorkflowsWithAdvancedFeatures", func(t *testing.T) {
737+
resetTestDatabase(t, databaseURL)
738+
ctx, err := NewDBOSContext(context.Background(), Config{
739+
DatabaseURL: databaseURL,
740+
AppName: "test-app",
741+
AdminServer: true,
742+
AdminServerPort: _DEFAULT_ADMIN_SERVER_PORT,
743+
})
744+
require.NoError(t, err)
745+
746+
// Create a partitioned queue for partition key test
747+
partitionedQueue := NewWorkflowQueue(ctx, "partitioned-test-queue", WithPartitionQueue(), WithGlobalConcurrency(1))
748+
749+
// Create a priority-enabled queue for priority and deduplication tests
750+
priorityQueue := NewWorkflowQueue(ctx, "priority-test-queue", WithPriorityEnabled(), WithGlobalConcurrency(1))
751+
752+
// Define a blocking workflow that will hold up the queue
753+
blockingChan := make(chan struct{})
754+
blockingWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
755+
<-blockingChan // Block until channel is closed
756+
return "blocked-" + input, nil
757+
}
758+
RegisterWorkflow(ctx, blockingWorkflow)
759+
760+
err = Launch(ctx)
761+
require.NoError(t, err)
762+
763+
// Ensure cleanup
764+
defer func() {
765+
close(blockingChan) // Unblock any blocked workflows
766+
if ctx != nil {
767+
Shutdown(ctx, 1*time.Minute)
768+
}
769+
}()
770+
771+
client := &http.Client{Timeout: 5 * time.Second}
772+
endpoint := fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_QUEUED_WORKFLOWS_PATTERN, "POST /"))
773+
774+
// Create workflow with partition key
775+
partitionHandle, err := RunWorkflow(ctx, blockingWorkflow, "partition-test", WithQueue(partitionedQueue.Name), WithQueuePartitionKey("partition-1"))
776+
require.NoError(t, err, "Failed to create workflow with partition key")
777+
778+
// Create workflow with deduplication ID
779+
dedupID := "test-dedup-id"
780+
dedupHandle, err := RunWorkflow(ctx, blockingWorkflow, "dedup-test", WithQueue(priorityQueue.Name), WithDeduplicationID(dedupID))
781+
require.NoError(t, err, "Failed to create workflow with deduplication ID")
782+
783+
// Create workflow with priority
784+
priorityHandle, err := RunWorkflow(ctx, blockingWorkflow, "priority-test", WithQueue(priorityQueue.Name), WithPriority(5))
785+
require.NoError(t, err, "Failed to create workflow with priority")
786+
787+
// Query with empty body to get all enqueued/pending queue workflows
788+
reqQueuesOnly, err := http.NewRequest(http.MethodPost, endpoint, nil)
789+
require.NoError(t, err, "Failed to create queues_only request")
790+
reqQueuesOnly.Header.Set("Content-Type", "application/json")
791+
792+
respQueuesOnly, err := client.Do(reqQueuesOnly)
793+
require.NoError(t, err, "Failed to make queues_only request")
794+
defer respQueuesOnly.Body.Close()
795+
796+
assert.Equal(t, http.StatusOK, respQueuesOnly.StatusCode)
797+
798+
var queuesOnlyWorkflows []map[string]any
799+
err = json.NewDecoder(respQueuesOnly.Body).Decode(&queuesOnlyWorkflows)
800+
require.NoError(t, err, "Failed to decode queues_only workflows response")
801+
802+
// Find our test workflows in the response
803+
var foundPartition, foundDedup, foundPriority bool
804+
for _, wf := range queuesOnlyWorkflows {
805+
wfID, ok := wf["WorkflowUUID"].(string)
806+
require.True(t, ok, "WorkflowUUID should be a string")
807+
808+
// Verify QueuePartitionKey field is present (may be empty string for non-partitioned workflows)
809+
_, hasPartitionKey := wf["QueuePartitionKey"]
810+
assert.True(t, hasPartitionKey, "QueuePartitionKey field should be present for workflow %s", wfID)
811+
812+
// Verify DeduplicationID field is present (may be empty string for workflows without dedup ID)
813+
_, hasDedupID := wf["DeduplicationID"]
814+
assert.True(t, hasDedupID, "DeduplicationID field should be present for workflow %s", wfID)
815+
816+
// Verify Priority field is present (may be 0 for workflows without priority)
817+
_, hasPriority := wf["Priority"]
818+
assert.True(t, hasPriority, "Priority field should be present for workflow %s", wfID)
819+
820+
// Verify specific values for our test workflows
821+
if wfID == partitionHandle.GetWorkflowID() {
822+
foundPartition = true
823+
partitionKey, ok := wf["QueuePartitionKey"].(string)
824+
require.True(t, ok, "QueuePartitionKey should be a string")
825+
assert.Equal(t, "partition-1", partitionKey, "Expected partition key to be 'partition-1'")
826+
} else if wfID == dedupHandle.GetWorkflowID() {
827+
foundDedup = true
828+
dedupIDResp, ok := wf["DeduplicationID"].(string)
829+
require.True(t, ok, "DeduplicationID should be a string")
830+
assert.Equal(t, dedupID, dedupIDResp, "Expected deduplication ID to match")
831+
} else if wfID == priorityHandle.GetWorkflowID() {
832+
foundPriority = true
833+
priority, ok := wf["Priority"].(float64) // JSON numbers decode as float64
834+
require.True(t, ok, "Priority should be a number")
835+
assert.Equal(t, float64(5), priority, "Expected priority to be 5")
836+
}
837+
}
838+
839+
// Verify all three workflows were found
840+
assert.True(t, foundPartition, "Expected to find workflow with partition key")
841+
assert.True(t, foundDedup, "Expected to find workflow with deduplication ID")
842+
assert.True(t, foundPriority, "Expected to find workflow with priority")
843+
})
844+
789845
t.Run("WorkflowSteps", func(t *testing.T) {
790846
resetTestDatabase(t, databaseURL)
791847
ctx, err := NewDBOSContext(context.Background(), Config{

0 commit comments

Comments
 (0)