Skip to content

Commit 872a07b

Browse files
authored
DBOS Context ListRegisteredQueues (#197)
Also update the toolchain and github actions to use go 1.25.5 (fix a few CVEs)
1 parent 83a055e commit 872a07b

File tree

7 files changed

+86
-42
lines changed

7 files changed

+86
-42
lines changed

.github/workflows/chaos-tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ jobs:
2626
fetch-tags: true
2727

2828
- name: Setup Go
29-
uses: actions/setup-go@v5
29+
uses: actions/setup-go@v6
3030
with:
31-
go-version: '1.25.x'
31+
go-version: '1.25.5'
3232

3333
- name: Download dependencies
3434
run: go mod download

.github/workflows/security.yml

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,9 @@ jobs:
2323
fetch-depth: 0
2424

2525
- name: Setup Go
26-
uses: actions/setup-go@v5
26+
uses: actions/setup-go@v6
2727
with:
28-
go-version: '1.25.x'
29-
30-
- name: Cache Go modules
31-
uses: actions/cache@v4
32-
with:
33-
path: |
34-
~/go/pkg/mod
35-
~/.cache/go-build
36-
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
37-
restore-keys: |
38-
${{ runner.os }}-go-
28+
go-version: '1.25.5'
3929

4030
- name: Download dependencies
4131
run: go mod download

.github/workflows/tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ jobs:
4040
fetch-tags: true
4141

4242
- name: Setup Go
43-
uses: actions/setup-go@v5
43+
uses: actions/setup-go@v6
4444
with:
45-
go-version: '1.25.x'
45+
go-version: '1.25.5'
4646

4747
- name: Download dependencies
4848
run: go mod download

dbos/dbos.go

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,18 @@ const (
2929
// Config holds configuration parameters for initializing a DBOS context.
3030
// DatabaseURL and AppName are required.
3131
type Config struct {
32-
AppName string // Application name for identification (required)
33-
DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required.
34-
SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided.
35-
DatabaseSchema string // Database schema name (defaults to "dbos")
36-
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
37-
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
38-
AdminServerPort int // Port for the admin HTTP server (default: 3001)
39-
ConductorURL string // DBOS conductor service URL (optional)
40-
ConductorAPIKey string // DBOS conductor API key (optional)
41-
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
42-
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
43-
QueueRunner QueueConfig // Queue configuration (optional)
44-
}
45-
46-
// QueueConfig configures the queue runner polling behavior.
47-
type QueueConfig struct {
48-
BaseInterval float64 // seconds
49-
MinInterval float64 // seconds
50-
MaxInterval float64 // seconds
32+
AppName string // Application name for identification (required)
33+
DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required.
34+
SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided.
35+
DatabaseSchema string // Database schema name (defaults to "dbos")
36+
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
37+
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
38+
AdminServerPort int // Port for the admin HTTP server (default: 3001)
39+
ConductorURL string // DBOS conductor service URL (optional)
40+
ConductorAPIKey string // DBOS conductor API key (optional)
41+
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
42+
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
43+
QueueRunnerConfig QueueRunnerConfig // Queue runner configuration (optional)
5144
}
5245

5346
func processConfig(inputConfig *Config) (*Config, error) {
@@ -62,7 +55,7 @@ func processConfig(inputConfig *Config) (*Config, error) {
6255
inputConfig.AdminServerPort = _DEFAULT_ADMIN_SERVER_PORT
6356
}
6457

65-
if inputConfig.QueueRunner.MinInterval > inputConfig.QueueRunner.MaxInterval {
58+
if inputConfig.QueueRunnerConfig.MinInterval > inputConfig.QueueRunnerConfig.MaxInterval {
6659
return nil, fmt.Errorf("minInterval must be less than maxInterval")
6760
}
6861

@@ -78,7 +71,7 @@ func processConfig(inputConfig *Config) (*Config, error) {
7871
ApplicationVersion: inputConfig.ApplicationVersion,
7972
ExecutorID: inputConfig.ExecutorID,
8073
SystemDBPool: inputConfig.SystemDBPool,
81-
QueueRunner: inputConfig.QueueRunner,
74+
QueueRunnerConfig: inputConfig.QueueRunnerConfig,
8275
}
8376

8477
// Load defaults
@@ -140,6 +133,7 @@ type DBOSContext interface {
140133
ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
141134
GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow
142135
ListRegisteredWorkflows(_ DBOSContext, opts ...ListRegisteredWorkflowsOption) ([]WorkflowRegistryEntry, error) // List registered workflows with filtering options
136+
ListRegisteredQueues() []WorkflowQueue // List all registered workflow queues
143137

144138
// Accessors
145139
GetApplicationVersion() string // Get the application version for this context
@@ -302,6 +296,14 @@ func (c *dbosContext) GetApplicationID() string {
302296
return c.applicationID
303297
}
304298

299+
// ListRegisteredQueues returns all registered workflow queues.
300+
func (c *dbosContext) ListRegisteredQueues() []WorkflowQueue {
301+
if c.queueRunner == nil {
302+
return []WorkflowQueue{}
303+
}
304+
return c.queueRunner.listQueues()
305+
}
306+
305307
// ListRegisteredWorkflows returns information about registered workflows with their registration parameters.
306308
// Supports filtering using functional options.
307309
func (c *dbosContext) ListRegisteredWorkflows(_ DBOSContext, opts ...ListRegisteredWorkflowsOption) ([]WorkflowRegistryEntry, error) {
@@ -392,7 +394,7 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
392394
initExecutor.logger.Debug("System database initialized")
393395

394396
// Initialize the queue runner and register DBOS internal queue
395-
initExecutor.queueRunner = newQueueRunner(initExecutor.logger, config.QueueRunner)
397+
initExecutor.queueRunner = newQueueRunner(initExecutor.logger, config.QueueRunnerConfig)
396398

397399
// Initialize conductor if API key is provided
398400
if config.ConductorAPIKey != "" {

dbos/queue.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,13 @@ func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption)
143143
return q
144144
}
145145

146+
// QueueRunnerConfig configures the queue runner polling behavior.
147+
type QueueRunnerConfig struct {
148+
BaseInterval float64 // seconds
149+
MinInterval float64 // seconds
150+
MaxInterval float64 // seconds
151+
}
152+
146153
type queueRunner struct {
147154
logger *slog.Logger
148155

@@ -162,7 +169,7 @@ type queueRunner struct {
162169
completionChan chan struct{}
163170
}
164171

165-
func newQueueRunner(logger *slog.Logger, config QueueConfig) *queueRunner {
172+
func newQueueRunner(logger *slog.Logger, config QueueRunnerConfig) *queueRunner {
166173
if config.BaseInterval == 0 {
167174
config.BaseInterval = _DEFAULT_BASE_INTERVAL
168175
}

dbos/queues_test.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,51 @@ func TestWorkflowQueues(t *testing.T) {
490490
expectedMsgPart := "does not exist"
491491
assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part")
492492
})
493+
494+
t.Run("ListRegisteredQueues", func(t *testing.T) {
495+
// Get all registered queues
496+
registeredQueues := dbosCtx.ListRegisteredQueues()
497+
498+
// Create a map of expected queue names for easy lookup
499+
expectedQueueNames := map[string]bool{
500+
queue.Name: true,
501+
dlqEnqueueQueue.Name: true,
502+
conflictQueue1.Name: true,
503+
conflictQueue2.Name: true,
504+
dedupQueue.Name: true,
505+
_DBOS_INTERNAL_QUEUE_NAME: true, // Internal queue is always registered
506+
}
507+
508+
// Verify we got the expected number of queues
509+
assert.Equal(t, len(expectedQueueNames), len(registeredQueues), "expected %d registered queues, got %d", len(expectedQueueNames), len(registeredQueues))
510+
511+
// Verify all expected queues are present
512+
actualQueueNames := make(map[string]bool)
513+
for _, q := range registeredQueues {
514+
actualQueueNames[q.Name] = true
515+
// Verify the queue exists in our expected list
516+
assert.True(t, expectedQueueNames[q.Name], "unexpected queue found: %s", q.Name)
517+
}
518+
519+
// Verify all expected queues are in the result
520+
for queueName := range expectedQueueNames {
521+
assert.True(t, actualQueueNames[queueName], "expected queue %s not found in registered queues", queueName)
522+
}
523+
524+
// Verify specific queue properties for known queues
525+
for _, q := range registeredQueues {
526+
switch q.Name {
527+
case queue.Name:
528+
// Verify default queue properties
529+
assert.Nil(t, q.WorkerConcurrency, "expected queue to have nil WorkerConcurrency")
530+
assert.Nil(t, q.GlobalConcurrency, "expected queue to have nil GlobalConcurrency")
531+
assert.False(t, q.PriorityEnabled, "expected queue to have PriorityEnabled=false")
532+
case dedupQueue.Name:
533+
// Verify dedup queue properties
534+
assert.Nil(t, q.WorkerConcurrency, "expected dedup queue to have nil WorkerConcurrency")
535+
}
536+
}
537+
})
493538
}
494539

495540
func TestQueueRecovery(t *testing.T) {
@@ -1583,14 +1628,14 @@ func TestPartitionedQueues(t *testing.T) {
15831628

15841629
func TestNewQueueRunner(t *testing.T) {
15851630
t.Run("init queue with default interval", func(t *testing.T) {
1586-
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{})
1631+
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueRunnerConfig{})
15871632
require.Equal(t, _DEFAULT_BASE_INTERVAL, runner.baseInterval)
15881633
require.Equal(t, _DEFAULT_MAX_INTERVAL, runner.maxInterval)
15891634
require.Equal(t, _DEFAULT_MIN_INTERVAL, runner.minInterval)
15901635
})
15911636

15921637
t.Run("init queue with custom interval", func(t *testing.T) {
1593-
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{
1638+
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueRunnerConfig{
15941639
BaseInterval: 1,
15951640
MinInterval: 2,
15961641
MaxInterval: 3,

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module github.com/dbos-inc/dbos-transact-golang
22

33
go 1.23.0
44

5-
toolchain go1.25.0
5+
toolchain go1.25.5
66

77
require (
88
github.com/docker/docker v28.3.3+incompatible

0 commit comments

Comments
 (0)