Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2261,7 +2261,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
t.Run("CompletesARunningJob", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)
exec, bundle := setup(ctx, t)

now := time.Now().UTC()

Expand All @@ -2274,7 +2274,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
require.NoError(t, err)
jobAfter := jobsAfter[0]
require.Equal(t, rivertype.JobStateCompleted, jobAfter.State)
require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond)
require.WithinDuration(t, now, *jobAfter.FinalizedAt, bundle.driver.TimePrecision())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly ... not 100% sure how this was passing before. Should be constrained to DB-specific precision.


jobUpdated, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: ""})
require.NoError(t, err)
Expand Down
10 changes: 10 additions & 0 deletions riverdriver/riversqlite/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,16 @@ WHERE id = @id
AND state != 'running'
RETURNING *;

-- This doesn't exist under the Postgres driver, but is used for an optimized
-- happy path for setting jobs to `complete` where metadata isn't required.
-- name: JobSetCompletedIfRunning :many
UPDATE /* TEMPLATE: schema */river_job
SET finalized_at = coalesce(cast(sqlc.narg('finalized_at') AS text), datetime('now', 'subsec')),
state = 'completed'
WHERE id IN (sqlc.slice('id'))
AND state = 'running'
RETURNING *;

-- Differs significantly from the Postgres version in that it can't do a bulk
-- update, and since sqlc doesn't support `UPDATE` in CTEs, we need separate
-- queries like JobSetMetadataIfNotRunning to do the fallback work.
Expand Down
69 changes: 69 additions & 0 deletions riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 61 additions & 4 deletions riverdriver/riversqlite/river_sqlite_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
// `dbPool.SetMaxOpenConns(1)`.
//
// A known deficiency in this driver compared to Postgres is that due to
// limitations in sqlc, it performs operations like completion and `InsertMany`
// one row at a time instead of in batches. This means that it's slower than the
// Postgres driver, especially when benchmarking.
// limitations in sqlc, it performs bulk operations like non-standard
// completions and `InsertMany` one row at a time instead of in batches. This
// means that it processes batches more slowly than the Postgres driver.
package riversqlite

import (
Expand All @@ -40,6 +40,7 @@ import (
"github.com/riverqueue/river/riverdriver/riversqlite/internal/dbsqlc"
"github.com/riverqueue/river/rivershared/sqlctemplate"
"github.com/riverqueue/river/rivershared/uniquestates"
"github.com/riverqueue/river/rivershared/util/maputil"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
Expand Down Expand Up @@ -683,8 +684,63 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr
ctx = schemaTemplateParam(ctx, params.Schema)
dbtx := templateReplaceWrapper{dbtx: e.driver.UnwrapTx(execTx), replacer: &e.driver.replacer}

// Because it's by far the most common path, put in an optimization for
// jobs that we're setting to `completed` that don't have any metadata
// updates needed. Group those jobs out and complete them all in one
// query, then continue on and do all the other updates.
var (
completedIDs = make([]int64, 0, len(params.ID))
completedIndexes = make(map[int64]int, len(params.ID)) // job ID -> params index (for setting result)
)
for i, id := range params.ID {
if params.State[i] == rivertype.JobStateCompleted && !params.MetadataDoMerge[i] {
completedIDs = append(completedIDs, id)
completedIndexes[id] = i
}
}

if len(completedIDs) > 0 {
jobs, err := dbsqlc.New().JobSetCompletedIfRunning(ctx, dbtx, &dbsqlc.JobSetCompletedIfRunningParams{
ID: completedIDs,
FinalizedAt: timeStringNullable(params.Now),
})
if err != nil {
return fmt.Errorf("error setting completed state on jobs: %w", err)
}

for _, job := range jobs {
setRes[completedIndexes[job.ID]], err = jobRowFromInternal(job)
if err != nil {
return err
}
delete(completedIndexes, job.ID)
}

// Fetch any jobs that weren't set by the query above because they
// weren't `running`. In practice this should be quite rare, but we
// check for it in the test suite.
if len(completedIndexes) > 0 {
jobs, err := dbsqlc.New().JobGetByIDMany(ctx, dbtx, maputil.Keys(completedIndexes))
if err != nil {
return fmt.Errorf("error getting non-running jobs: %w", err)
}

for _, job := range jobs {
setRes[completedIndexes[job.ID]], err = jobRowFromInternal(job)
if err != nil {
return err
}
}
}
}

// Should be a batch insert, but that's currently impossible with SQLite/sqlc. https://github.com/sqlc-dev/sqlc/issues/3802
for i := range params.ID {
for i, id := range params.ID {
// Skip job if we handled it in the happy path optimization above.
if _, ok := completedIndexes[id]; ok {
continue
}

setStateParams := &dbsqlc.JobSetStateIfRunningParams{
ID: params.ID[i],
Error: []byte("{}"), // even if not used, must be valid JSON because it's bed into the `json` function
Expand Down Expand Up @@ -723,6 +779,7 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr
MetadataUpdates: sliceutil.FirstNonEmpty(params.MetadataUpdates[i], []byte("{}")),
})
if err != nil {
// Allow a job to have been deleted in the interim.
if errors.Is(err, sql.ErrNoRows) {
return nil
}
Expand Down
Loading