diff --git a/.github/workflows/chaos-tests.yml b/.github/workflows/chaos-tests.yml index 07114440..9167b1bc 100644 --- a/.github/workflows/chaos-tests.yml +++ b/.github/workflows/chaos-tests.yml @@ -26,9 +26,9 @@ jobs: fetch-tags: true - name: Setup Go - uses: actions/setup-go@v5 + uses: actions/setup-go@v6 with: - go-version: '1.25.x' + go-version: '1.25.5' - name: Download dependencies run: go mod download diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index 838e14ba..1ed12207 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -23,19 +23,9 @@ jobs: fetch-depth: 0 - name: Setup Go - uses: actions/setup-go@v5 + uses: actions/setup-go@v6 with: - go-version: '1.25.x' - - - name: Cache Go modules - uses: actions/cache@v4 - with: - path: | - ~/go/pkg/mod - ~/.cache/go-build - key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-go- + go-version: '1.25.5' - name: Download dependencies run: go mod download diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 344a242d..50c74cbf 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -40,9 +40,9 @@ jobs: fetch-tags: true - name: Setup Go - uses: actions/setup-go@v5 + uses: actions/setup-go@v6 with: - go-version: '1.25.x' + go-version: '1.25.5' - name: Download dependencies run: go mod download diff --git a/dbos/dbos.go b/dbos/dbos.go index bc8642d9..84d1c0e5 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -29,25 +29,18 @@ const ( // Config holds configuration parameters for initializing a DBOS context. // DatabaseURL and AppName are required. type Config struct { - AppName string // Application name for identification (required) - DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required. - SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided. - DatabaseSchema string // Database schema name (defaults to "dbos") - Logger *slog.Logger // Custom logger instance (defaults to a new slog logger) - AdminServer bool // Enable Transact admin HTTP server (disabled by default) - AdminServerPort int // Port for the admin HTTP server (default: 3001) - ConductorURL string // DBOS conductor service URL (optional) - ConductorAPIKey string // DBOS conductor API key (optional) - ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var) - ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var) - QueueRunner QueueConfig // Queue configuration (optional) -} - -// QueueConfig configures the queue runner polling behavior. -type QueueConfig struct { - BaseInterval float64 // seconds - MinInterval float64 // seconds - MaxInterval float64 // seconds + AppName string // Application name for identification (required) + DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required. + SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided. + DatabaseSchema string // Database schema name (defaults to "dbos") + Logger *slog.Logger // Custom logger instance (defaults to a new slog logger) + AdminServer bool // Enable Transact admin HTTP server (disabled by default) + AdminServerPort int // Port for the admin HTTP server (default: 3001) + ConductorURL string // DBOS conductor service URL (optional) + ConductorAPIKey string // DBOS conductor API key (optional) + ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var) + ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var) + QueueRunnerConfig QueueRunnerConfig // Queue runner configuration (optional) } func processConfig(inputConfig *Config) (*Config, error) { @@ -62,7 +55,7 @@ func processConfig(inputConfig *Config) (*Config, error) { inputConfig.AdminServerPort = _DEFAULT_ADMIN_SERVER_PORT } - if inputConfig.QueueRunner.MinInterval > inputConfig.QueueRunner.MaxInterval { + if inputConfig.QueueRunnerConfig.MinInterval > inputConfig.QueueRunnerConfig.MaxInterval { return nil, fmt.Errorf("minInterval must be less than maxInterval") } @@ -78,7 +71,7 @@ func processConfig(inputConfig *Config) (*Config, error) { ApplicationVersion: inputConfig.ApplicationVersion, ExecutorID: inputConfig.ExecutorID, SystemDBPool: inputConfig.SystemDBPool, - QueueRunner: inputConfig.QueueRunner, + QueueRunnerConfig: inputConfig.QueueRunnerConfig, } // Load defaults @@ -140,6 +133,7 @@ type DBOSContext interface { ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow ListRegisteredWorkflows(_ DBOSContext, opts ...ListRegisteredWorkflowsOption) ([]WorkflowRegistryEntry, error) // List registered workflows with filtering options + ListRegisteredQueues() []WorkflowQueue // List all registered workflow queues // Accessors GetApplicationVersion() string // Get the application version for this context @@ -302,6 +296,14 @@ func (c *dbosContext) GetApplicationID() string { return c.applicationID } +// ListRegisteredQueues returns all registered workflow queues. +func (c *dbosContext) ListRegisteredQueues() []WorkflowQueue { + if c.queueRunner == nil { + return []WorkflowQueue{} + } + return c.queueRunner.listQueues() +} + // ListRegisteredWorkflows returns information about registered workflows with their registration parameters. // Supports filtering using functional options. func (c *dbosContext) ListRegisteredWorkflows(_ DBOSContext, opts ...ListRegisteredWorkflowsOption) ([]WorkflowRegistryEntry, error) { @@ -392,7 +394,7 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error initExecutor.logger.Debug("System database initialized") // Initialize the queue runner and register DBOS internal queue - initExecutor.queueRunner = newQueueRunner(initExecutor.logger, config.QueueRunner) + initExecutor.queueRunner = newQueueRunner(initExecutor.logger, config.QueueRunnerConfig) // Initialize conductor if API key is provided if config.ConductorAPIKey != "" { diff --git a/dbos/queue.go b/dbos/queue.go index f6dee53e..7e14439f 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -143,6 +143,13 @@ func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption) return q } +// QueueRunnerConfig configures the queue runner polling behavior. +type QueueRunnerConfig struct { + BaseInterval float64 // seconds + MinInterval float64 // seconds + MaxInterval float64 // seconds +} + type queueRunner struct { logger *slog.Logger @@ -162,7 +169,7 @@ type queueRunner struct { completionChan chan struct{} } -func newQueueRunner(logger *slog.Logger, config QueueConfig) *queueRunner { +func newQueueRunner(logger *slog.Logger, config QueueRunnerConfig) *queueRunner { if config.BaseInterval == 0 { config.BaseInterval = _DEFAULT_BASE_INTERVAL } diff --git a/dbos/queues_test.go b/dbos/queues_test.go index 66ce3ee9..73263503 100644 --- a/dbos/queues_test.go +++ b/dbos/queues_test.go @@ -490,6 +490,51 @@ func TestWorkflowQueues(t *testing.T) { expectedMsgPart := "does not exist" assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part") }) + + t.Run("ListRegisteredQueues", func(t *testing.T) { + // Get all registered queues + registeredQueues := dbosCtx.ListRegisteredQueues() + + // Create a map of expected queue names for easy lookup + expectedQueueNames := map[string]bool{ + queue.Name: true, + dlqEnqueueQueue.Name: true, + conflictQueue1.Name: true, + conflictQueue2.Name: true, + dedupQueue.Name: true, + _DBOS_INTERNAL_QUEUE_NAME: true, // Internal queue is always registered + } + + // Verify we got the expected number of queues + assert.Equal(t, len(expectedQueueNames), len(registeredQueues), "expected %d registered queues, got %d", len(expectedQueueNames), len(registeredQueues)) + + // Verify all expected queues are present + actualQueueNames := make(map[string]bool) + for _, q := range registeredQueues { + actualQueueNames[q.Name] = true + // Verify the queue exists in our expected list + assert.True(t, expectedQueueNames[q.Name], "unexpected queue found: %s", q.Name) + } + + // Verify all expected queues are in the result + for queueName := range expectedQueueNames { + assert.True(t, actualQueueNames[queueName], "expected queue %s not found in registered queues", queueName) + } + + // Verify specific queue properties for known queues + for _, q := range registeredQueues { + switch q.Name { + case queue.Name: + // Verify default queue properties + assert.Nil(t, q.WorkerConcurrency, "expected queue to have nil WorkerConcurrency") + assert.Nil(t, q.GlobalConcurrency, "expected queue to have nil GlobalConcurrency") + assert.False(t, q.PriorityEnabled, "expected queue to have PriorityEnabled=false") + case dedupQueue.Name: + // Verify dedup queue properties + assert.Nil(t, q.WorkerConcurrency, "expected dedup queue to have nil WorkerConcurrency") + } + } + }) } func TestQueueRecovery(t *testing.T) { @@ -1583,14 +1628,14 @@ func TestPartitionedQueues(t *testing.T) { func TestNewQueueRunner(t *testing.T) { t.Run("init queue with default interval", func(t *testing.T) { - runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{}) + runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueRunnerConfig{}) require.Equal(t, _DEFAULT_BASE_INTERVAL, runner.baseInterval) require.Equal(t, _DEFAULT_MAX_INTERVAL, runner.maxInterval) require.Equal(t, _DEFAULT_MIN_INTERVAL, runner.minInterval) }) t.Run("init queue with custom interval", func(t *testing.T) { - runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{ + runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueRunnerConfig{ BaseInterval: 1, MinInterval: 2, MaxInterval: 3, diff --git a/go.mod b/go.mod index f47b66b5..d377759d 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/dbos-inc/dbos-transact-golang go 1.23.0 -toolchain go1.25.0 +toolchain go1.25.5 require ( github.com/docker/docker v28.3.3+incompatible