diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 2657b4a2..c25fc12d 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -2261,7 +2261,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("CompletesARunningJob", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + exec, bundle := setup(ctx, t) now := time.Now().UTC() @@ -2274,7 +2274,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.NoError(t, err) jobAfter := jobsAfter[0] require.Equal(t, rivertype.JobStateCompleted, jobAfter.State) - require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, bundle.driver.TimePrecision()) jobUpdated, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: ""}) require.NoError(t, err) diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index 5a4b0e74..3586719a 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -380,6 +380,16 @@ WHERE id = @id AND state != 'running' RETURNING *; +-- This doesn't exist under the Postgres driver, but is used for an optimized +-- happy path for setting jobs to `complete` where metadata isn't required. +-- name: JobSetCompletedIfRunning :many +UPDATE /* TEMPLATE: schema */river_job +SET finalized_at = coalesce(cast(sqlc.narg('finalized_at') AS text), datetime('now', 'subsec')), + state = 'completed' +WHERE id IN (sqlc.slice('id')) + AND state = 'running' +RETURNING *; + -- Differs significantly from the Postgres version in that it can't do a bulk -- update, and since sqlc doesn't support `UPDATE` in CTEs, we need separate -- queries like JobSetMetadataIfNotRunning to do the fallback work. diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 5c308b4b..5a567257 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -1121,6 +1121,75 @@ func (q *Queries) JobScheduleSetDiscarded(ctx context.Context, db DBTX, arg *Job return items, nil } +const jobSetCompletedIfRunning = `-- name: JobSetCompletedIfRunning :many +UPDATE /* TEMPLATE: schema */river_job +SET finalized_at = coalesce(cast(?1 AS text), datetime('now', 'subsec')), + state = 'completed' +WHERE id IN (/*SLICE:id*/?) + AND state = 'running' +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +` + +type JobSetCompletedIfRunningParams struct { + FinalizedAt *string + ID []int64 +} + +// This doesn't exist under the Postgres driver, but is used for an optimized +// happy path for setting jobs to `complete` where metadata isn't required. +func (q *Queries) JobSetCompletedIfRunning(ctx context.Context, db DBTX, arg *JobSetCompletedIfRunningParams) ([]*RiverJob, error) { + query := jobSetCompletedIfRunning + var queryParams []interface{} + queryParams = append(queryParams, arg.FinalizedAt) + if len(arg.ID) > 0 { + for _, v := range arg.ID { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:id*/?", strings.Repeat(",?", len(arg.ID))[1:], 1) + } else { + query = strings.Replace(query, "/*SLICE:id*/?", "NULL", 1) + } + rows, err := db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + &i.UniqueStates, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobSetMetadataIfNotRunning = `-- name: JobSetMetadataIfNotRunning :one UPDATE /* TEMPLATE: schema */river_job SET metadata = json_patch(metadata, json(cast(?1 AS blob))) diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 1504fd79..364731d4 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -11,9 +11,9 @@ // `dbPool.SetMaxOpenConns(1)`. // // A known deficiency in this driver compared to Postgres is that due to -// limitations in sqlc, it performs operations like completion and `InsertMany` -// one row at a time instead of in batches. This means that it's slower than the -// Postgres driver, especially when benchmarking. +// limitations in sqlc, it performs bulk operations like non-standard +// completions and `InsertMany` one row at a time instead of in batches. This +// means that it processes batches more slowly than the Postgres driver. package riversqlite import ( @@ -40,6 +40,7 @@ import ( "github.com/riverqueue/river/riverdriver/riversqlite/internal/dbsqlc" "github.com/riverqueue/river/rivershared/sqlctemplate" "github.com/riverqueue/river/rivershared/uniquestates" + "github.com/riverqueue/river/rivershared/util/maputil" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/sliceutil" @@ -683,8 +684,63 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr ctx = schemaTemplateParam(ctx, params.Schema) dbtx := templateReplaceWrapper{dbtx: e.driver.UnwrapTx(execTx), replacer: &e.driver.replacer} + // Because it's by far the most common path, put in an optimization for + // jobs that we're setting to `completed` that don't have any metadata + // updates needed. Group those jobs out and complete them all in one + // query, then continue on and do all the other updates. + var ( + completedIDs = make([]int64, 0, len(params.ID)) + completedIndexes = make(map[int64]int, len(params.ID)) // job ID -> params index (for setting result) + ) + for i, id := range params.ID { + if params.State[i] == rivertype.JobStateCompleted && !params.MetadataDoMerge[i] { + completedIDs = append(completedIDs, id) + completedIndexes[id] = i + } + } + + if len(completedIDs) > 0 { + jobs, err := dbsqlc.New().JobSetCompletedIfRunning(ctx, dbtx, &dbsqlc.JobSetCompletedIfRunningParams{ + ID: completedIDs, + FinalizedAt: timeStringNullable(params.Now), + }) + if err != nil { + return fmt.Errorf("error setting completed state on jobs: %w", err) + } + + for _, job := range jobs { + setRes[completedIndexes[job.ID]], err = jobRowFromInternal(job) + if err != nil { + return err + } + delete(completedIndexes, job.ID) + } + + // Fetch any jobs that weren't set by the query above because they + // weren't `running`. In practice this should be quite rare, but we + // check for it in the test suite. + if len(completedIndexes) > 0 { + jobs, err := dbsqlc.New().JobGetByIDMany(ctx, dbtx, maputil.Keys(completedIndexes)) + if err != nil { + return fmt.Errorf("error getting non-running jobs: %w", err) + } + + for _, job := range jobs { + setRes[completedIndexes[job.ID]], err = jobRowFromInternal(job) + if err != nil { + return err + } + } + } + } + // Should be a batch insert, but that's currently impossible with SQLite/sqlc. https://github.com/sqlc-dev/sqlc/issues/3802 - for i := range params.ID { + for i, id := range params.ID { + // Skip job if we handled it in the happy path optimization above. + if _, ok := completedIndexes[id]; ok { + continue + } + setStateParams := &dbsqlc.JobSetStateIfRunningParams{ ID: params.ID[i], Error: []byte("{}"), // even if not used, must be valid JSON because it's bed into the `json` function @@ -723,6 +779,7 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr MetadataUpdates: sliceutil.FirstNonEmpty(params.MetadataUpdates[i], []byte("{}")), }) if err != nil { + // Allow a job to have been deleted in the interim. if errors.Is(err, sql.ErrNoRows) { return nil }