Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/chaos-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ jobs:
fetch-tags: true

- name: Setup Go
uses: actions/setup-go@v5
uses: actions/setup-go@v6
with:
go-version: '1.25.x'
go-version: '1.25.5'

- name: Download dependencies
run: go mod download
Expand Down
14 changes: 2 additions & 12 deletions .github/workflows/security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,9 @@ jobs:
fetch-depth: 0

- name: Setup Go
uses: actions/setup-go@v5
uses: actions/setup-go@v6
with:
go-version: '1.25.x'

- name: Cache Go modules
uses: actions/cache@v4
with:
path: |
~/go/pkg/mod
~/.cache/go-build
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
go-version: '1.25.5'

- name: Download dependencies
run: go mod download
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ jobs:
fetch-tags: true

- name: Setup Go
uses: actions/setup-go@v5
uses: actions/setup-go@v6
with:
go-version: '1.25.x'
go-version: '1.25.5'

- name: Download dependencies
run: go mod download
Expand Down
46 changes: 24 additions & 22 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,18 @@ const (
// Config holds configuration parameters for initializing a DBOS context.
// DatabaseURL and AppName are required.
type Config struct {
AppName string // Application name for identification (required)
DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required.
SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided.
DatabaseSchema string // Database schema name (defaults to "dbos")
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
AdminServerPort int // Port for the admin HTTP server (default: 3001)
ConductorURL string // DBOS conductor service URL (optional)
ConductorAPIKey string // DBOS conductor API key (optional)
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
QueueRunner QueueConfig // Queue configuration (optional)
}

// QueueConfig configures the queue runner polling behavior.
type QueueConfig struct {
BaseInterval float64 // seconds
MinInterval float64 // seconds
MaxInterval float64 // seconds
AppName string // Application name for identification (required)
DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required.
SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided.
DatabaseSchema string // Database schema name (defaults to "dbos")
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
AdminServerPort int // Port for the admin HTTP server (default: 3001)
ConductorURL string // DBOS conductor service URL (optional)
ConductorAPIKey string // DBOS conductor API key (optional)
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
QueueRunnerConfig QueueRunnerConfig // Queue runner configuration (optional)
}

func processConfig(inputConfig *Config) (*Config, error) {
Expand All @@ -62,7 +55,7 @@ func processConfig(inputConfig *Config) (*Config, error) {
inputConfig.AdminServerPort = _DEFAULT_ADMIN_SERVER_PORT
}

if inputConfig.QueueRunner.MinInterval > inputConfig.QueueRunner.MaxInterval {
if inputConfig.QueueRunnerConfig.MinInterval > inputConfig.QueueRunnerConfig.MaxInterval {
return nil, fmt.Errorf("minInterval must be less than maxInterval")
}

Expand All @@ -78,7 +71,7 @@ func processConfig(inputConfig *Config) (*Config, error) {
ApplicationVersion: inputConfig.ApplicationVersion,
ExecutorID: inputConfig.ExecutorID,
SystemDBPool: inputConfig.SystemDBPool,
QueueRunner: inputConfig.QueueRunner,
QueueRunnerConfig: inputConfig.QueueRunnerConfig,
}

// Load defaults
Expand Down Expand Up @@ -140,6 +133,7 @@ type DBOSContext interface {
ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow
ListRegisteredWorkflows(_ DBOSContext, opts ...ListRegisteredWorkflowsOption) ([]WorkflowRegistryEntry, error) // List registered workflows with filtering options
ListRegisteredQueues() []WorkflowQueue // List all registered workflow queues

// Accessors
GetApplicationVersion() string // Get the application version for this context
Expand Down Expand Up @@ -302,6 +296,14 @@ func (c *dbosContext) GetApplicationID() string {
return c.applicationID
}

// ListRegisteredQueues returns all registered workflow queues.
func (c *dbosContext) ListRegisteredQueues() []WorkflowQueue {
if c.queueRunner == nil {
return []WorkflowQueue{}
}
return c.queueRunner.listQueues()
}

// ListRegisteredWorkflows returns information about registered workflows with their registration parameters.
// Supports filtering using functional options.
func (c *dbosContext) ListRegisteredWorkflows(_ DBOSContext, opts ...ListRegisteredWorkflowsOption) ([]WorkflowRegistryEntry, error) {
Expand Down Expand Up @@ -392,7 +394,7 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
initExecutor.logger.Debug("System database initialized")

// Initialize the queue runner and register DBOS internal queue
initExecutor.queueRunner = newQueueRunner(initExecutor.logger, config.QueueRunner)
initExecutor.queueRunner = newQueueRunner(initExecutor.logger, config.QueueRunnerConfig)

// Initialize conductor if API key is provided
if config.ConductorAPIKey != "" {
Expand Down
9 changes: 8 additions & 1 deletion dbos/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption)
return q
}

// QueueRunnerConfig configures the queue runner polling behavior.
type QueueRunnerConfig struct {
BaseInterval float64 // seconds
MinInterval float64 // seconds
MaxInterval float64 // seconds
}

type queueRunner struct {
logger *slog.Logger

Expand All @@ -162,7 +169,7 @@ type queueRunner struct {
completionChan chan struct{}
}

func newQueueRunner(logger *slog.Logger, config QueueConfig) *queueRunner {
func newQueueRunner(logger *slog.Logger, config QueueRunnerConfig) *queueRunner {
if config.BaseInterval == 0 {
config.BaseInterval = _DEFAULT_BASE_INTERVAL
}
Expand Down
49 changes: 47 additions & 2 deletions dbos/queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,51 @@ func TestWorkflowQueues(t *testing.T) {
expectedMsgPart := "does not exist"
assert.Contains(t, unwrappedErr.Error(), expectedMsgPart, "expected unwrapped error message to contain expected part")
})

t.Run("ListRegisteredQueues", func(t *testing.T) {
// Get all registered queues
registeredQueues := dbosCtx.ListRegisteredQueues()

// Create a map of expected queue names for easy lookup
expectedQueueNames := map[string]bool{
queue.Name: true,
dlqEnqueueQueue.Name: true,
conflictQueue1.Name: true,
conflictQueue2.Name: true,
dedupQueue.Name: true,
_DBOS_INTERNAL_QUEUE_NAME: true, // Internal queue is always registered
}

// Verify we got the expected number of queues
assert.Equal(t, len(expectedQueueNames), len(registeredQueues), "expected %d registered queues, got %d", len(expectedQueueNames), len(registeredQueues))

// Verify all expected queues are present
actualQueueNames := make(map[string]bool)
for _, q := range registeredQueues {
actualQueueNames[q.Name] = true
// Verify the queue exists in our expected list
assert.True(t, expectedQueueNames[q.Name], "unexpected queue found: %s", q.Name)
}

// Verify all expected queues are in the result
for queueName := range expectedQueueNames {
assert.True(t, actualQueueNames[queueName], "expected queue %s not found in registered queues", queueName)
}

// Verify specific queue properties for known queues
for _, q := range registeredQueues {
switch q.Name {
case queue.Name:
// Verify default queue properties
assert.Nil(t, q.WorkerConcurrency, "expected queue to have nil WorkerConcurrency")
assert.Nil(t, q.GlobalConcurrency, "expected queue to have nil GlobalConcurrency")
assert.False(t, q.PriorityEnabled, "expected queue to have PriorityEnabled=false")
case dedupQueue.Name:
// Verify dedup queue properties
assert.Nil(t, q.WorkerConcurrency, "expected dedup queue to have nil WorkerConcurrency")
}
}
})
}

func TestQueueRecovery(t *testing.T) {
Expand Down Expand Up @@ -1583,14 +1628,14 @@ func TestPartitionedQueues(t *testing.T) {

func TestNewQueueRunner(t *testing.T) {
t.Run("init queue with default interval", func(t *testing.T) {
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{})
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueRunnerConfig{})
require.Equal(t, _DEFAULT_BASE_INTERVAL, runner.baseInterval)
require.Equal(t, _DEFAULT_MAX_INTERVAL, runner.maxInterval)
require.Equal(t, _DEFAULT_MIN_INTERVAL, runner.minInterval)
})

t.Run("init queue with custom interval", func(t *testing.T) {
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueConfig{
runner := newQueueRunner(slog.New(slog.NewTextHandler(os.Stdout, nil)), QueueRunnerConfig{
BaseInterval: 1,
MinInterval: 2,
MaxInterval: 3,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/dbos-inc/dbos-transact-golang

go 1.23.0

toolchain go1.25.0
toolchain go1.25.5

require (
github.com/docker/docker v28.3.3+incompatible
Expand Down