From 31a2cd1c66692359fd956c052da6c732b52a1ac1 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Sun, 12 Jan 2025 13:33:25 -0600 Subject: [PATCH 1/5] use new unique jobs implementation This moves the library to use the new unique jobs implementation from https://github.com/riverqueue/river/pull/590 and migrates the sqlalchemy driver to use a unified insertion path, allowing bulk inserts to use unique jobs. --- .github/workflows/ci.yaml | 4 +- README.md | 14 +- requirements-dev.lock | 1 + requirements.lock | 1 + src/riverqueue/client.py | 325 ++++----- src/riverqueue/driver/__init__.py | 1 - src/riverqueue/driver/driver_protocol.py | 63 +- .../driver/riversqlalchemy/dbsqlc/models.py | 1 + .../driver/riversqlalchemy/dbsqlc/pg_misc.py | 31 - .../driver/riversqlalchemy/dbsqlc/pg_misc.sql | 2 - .../riversqlalchemy/dbsqlc/river_job.py | 479 +++---------- .../riversqlalchemy/dbsqlc/river_job.sql | 121 +--- .../driver/riversqlalchemy/dbsqlc/sqlc.yaml | 6 +- .../riversqlalchemy/sql_alchemy_driver.py | 117 +--- src/riverqueue/fnv.py | 42 -- src/riverqueue/insert_opts.py | 15 +- src/riverqueue/job.py | 5 + tests/client_test.py | 191 ++++-- .../riversqlalchemy/sqlalchemy_driver_test.py | 78 ++- tests/fnv_test.py | 638 ------------------ 20 files changed, 530 insertions(+), 1605 deletions(-) delete mode 100644 src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.py delete mode 100644 src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.sql delete mode 100644 src/riverqueue/fnv.py delete mode 100644 tests/fnv_test.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 914f9b5..3135280 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -63,7 +63,7 @@ jobs: run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${TEST_DATABASE_NAME};" ${ADMIN_DATABASE_URL} - name: river migrate-up - run: river migrate-up --database-url "$TEST_DATABASE_URL" --max-steps 5 # temporarily include max steps so tests can pass with unique fixes + run: river migrate-up --database-url "$TEST_DATABASE_URL" - name: Test run: rye test @@ -109,7 +109,7 @@ jobs: run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${DATABASE_NAME};" ${ADMIN_DATABASE_URL} - name: river migrate-up - run: river migrate-up --database-url "$DATABASE_URL" --max-steps 5 # temporarily include max steps so tests can pass with unique fixes + run: river migrate-up --database-url "$DATABASE_URL" - name: Run examples run: rye run python3 -m examples.all diff --git a/README.md b/README.md index cf2844b..faea5c5 100644 --- a/README.md +++ b/README.md @@ -48,8 +48,8 @@ class JobArgs(Protocol): pass ``` -* `kind` is a unique string that identifies them the job in the database, and which a Go worker will recognize. -* `to_json()` defines how the job will serialize to JSON, which of course will have to be parseable as an object in Go. +- `kind` is a unique string that identifies them the job in the database, and which a Go worker will recognize. +- `to_json()` defines how the job will serialize to JSON, which of course will have to be parseable as an object in Go. They may also respond to `insert_opts()` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind. @@ -95,16 +95,6 @@ insert_res.job insert_res.unique_skipped_as_duplicated ``` -### Custom advisory lock prefix - -Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks: - -```python -client = riverqueue.Client(riversqlalchemy.Driver(engine), advisory_lock_prefix=123456) -``` - -Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other. - ## Inserting jobs in bulk Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency: diff --git a/requirements-dev.lock b/requirements-dev.lock index 86503b9..0878f85 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -7,6 +7,7 @@ # all-features: false # with-sources: false # generate-hashes: false +# universal: false -e file:. asyncpg==0.29.0 diff --git a/requirements.lock b/requirements.lock index 0d0f3cb..0a2a598 100644 --- a/requirements.lock +++ b/requirements.lock @@ -7,6 +7,7 @@ # all-features: false # with-sources: false # generate-hashes: false +# universal: false -e file:. sqlalchemy==2.0.30 diff --git a/src/riverqueue/client.py b/src/riverqueue/client.py index 0f0b773..0b1fea4 100644 --- a/src/riverqueue/client.py +++ b/src/riverqueue/client.py @@ -1,28 +1,36 @@ from dataclasses import dataclass, field -from datetime import datetime, timezone, timedelta +from datetime import datetime, timezone from hashlib import sha256 import re from typing import ( Optional, Protocol, - Tuple, List, - cast, runtime_checkable, ) from riverqueue.insert_opts import InsertOpts, UniqueOpts from .driver import ( - JobGetByKindAndUniquePropertiesParam, JobInsertParams, DriverProtocol, - ExecutorProtocol, ) -from .driver.driver_protocol import AsyncDriverProtocol, AsyncExecutorProtocol +from .driver.driver_protocol import AsyncDriverProtocol, ExecutorProtocol from .job import Job, JobState -from .fnv import fnv1_hash +JOB_STATE_BIT_POSITIONS = { + JobState.AVAILABLE: 7, + JobState.CANCELLED: 6, + JobState.COMPLETED: 5, + JobState.DISCARDED: 4, + JobState.PENDING: 3, + JobState.RETRYABLE: 2, + JobState.RUNNING: 1, + JobState.SCHEDULED: 0, +} +""" +Maps job states to bit positions in a unique bitmask. +""" MAX_ATTEMPTS_DEFAULT: int = 25 """ @@ -39,9 +47,10 @@ Default queue for a job. """ -UNIQUE_STATES_DEFAULT: list[str] = [ +UNIQUE_STATES_DEFAULT: list[JobState] = [ JobState.AVAILABLE, JobState.COMPLETED, + JobState.PENDING, JobState.RUNNING, JobState.RETRYABLE, JobState.SCHEDULED, @@ -50,6 +59,16 @@ Default job states included during a unique job insertion. """ +UNIQUE_STATES_REQUIRED: list[JobState] = [ + JobState.AVAILABLE, + JobState.PENDING, + JobState.RUNNING, + JobState.SCHEDULED, +] +""" +Job states required when customizing the state list for unique job insertion. +""" + @dataclass class InsertResult: @@ -128,13 +147,8 @@ class AsyncClient: This variant is for use with Python's asyncio (asynchronous I/O). """ - def __init__( - self, driver: AsyncDriverProtocol, advisory_lock_prefix: Optional[int] = None - ): + def __init__(self, driver: AsyncDriverProtocol): self.driver = driver - self.advisory_lock_prefix = _check_advisory_lock_prefix_bounds( - advisory_lock_prefix - ) async def insert( self, args: JobArgs, insert_opts: Optional[InsertOpts] = None @@ -200,12 +214,10 @@ def to_json(self) -> str: Returns an instance of `InsertResult`. """ - async with self.driver.executor() as exec: - if not insert_opts: - insert_opts = InsertOpts() - insert_params, unique_opts = _make_driver_insert_params(args, insert_opts) + if not insert_opts: + insert_opts = InsertOpts() - return await self.__insert_job_with_unique(exec, insert_params, unique_opts) + return (await self.insert_many([InsertManyParams(args, insert_opts)]))[0] async def insert_tx( self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None @@ -242,14 +254,14 @@ async def insert_tx( ``` """ - exec = self.driver.unwrap_executor(tx) if not insert_opts: insert_opts = InsertOpts() - insert_params, unique_opts = _make_driver_insert_params(args, insert_opts) - return await self.__insert_job_with_unique(exec, insert_params, unique_opts) + return (await self.insert_many_tx(tx, [InsertManyParams(args, insert_opts)]))[0] - async def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int: + async def insert_many( + self, args: List[JobArgs | InsertManyParams] + ) -> list[InsertResult]: """ Inserts many new jobs as part of a single batch operation for improved efficiency. @@ -282,9 +294,12 @@ async def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int: """ async with self.driver.executor() as exec: - return await exec.job_insert_many(_make_driver_insert_params_many(args)) + res = await exec.job_insert_many(_make_driver_insert_params_many(args)) + return _to_insert_results(res) - async def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int: + async def insert_many_tx( + self, tx, args: List[JobArgs | InsertManyParams] + ) -> list[InsertResult]: """ Inserts many new jobs as part of a single batch operation for improved efficiency. @@ -316,52 +331,8 @@ async def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> in """ exec = self.driver.unwrap_executor(tx) - return await exec.job_insert_many(_make_driver_insert_params_many(args)) - - async def __insert_job_with_unique( - self, - exec: AsyncExecutorProtocol, - insert_params: JobInsertParams, - unique_opts: Optional[UniqueOpts], - ) -> InsertResult: - """ - Inserts a job, accounting for unique jobs whose insertion may be skipped - if an equivalent job is already present. - """ - - get_params, unique_key = _build_unique_get_params_and_unique_key( - insert_params, unique_opts - ) - - if not get_params or not unique_opts: - return InsertResult(await exec.job_insert(insert_params)) - - # fast path - if ( - not unique_opts.by_state - or unique_opts.by_state.sort == UNIQUE_STATES_DEFAULT - ): - job, unique_skipped_as_duplicate = await exec.job_insert_unique( - insert_params, sha256(unique_key.encode("utf-8")).digest() - ) - return InsertResult( - job=job, unique_skipped_as_duplicated=unique_skipped_as_duplicate - ) - - async with exec.transaction(): - lock_key = "unique_key" - lock_key += "kind=#{insert_params.kind}" - lock_key += unique_key - - await exec.advisory_lock( - _hash_lock_key(self.advisory_lock_prefix, lock_key) - ) - - existing_job = await exec.job_get_by_kind_and_unique_properties(get_params) - if existing_job: - return InsertResult(existing_job, unique_skipped_as_duplicated=True) - - return InsertResult(await exec.job_insert(insert_params)) + res = await exec.job_insert_many(_make_driver_insert_params_many(args)) + return _to_insert_results(res) class Client: @@ -382,13 +353,8 @@ class Client: ``` """ - def __init__( - self, driver: DriverProtocol, advisory_lock_prefix: Optional[int] = None - ): + def __init__(self, driver: DriverProtocol): self.driver = driver - self.advisory_lock_prefix = _check_advisory_lock_prefix_bounds( - advisory_lock_prefix - ) def insert( self, args: JobArgs, insert_opts: Optional[InsertOpts] = None @@ -454,12 +420,10 @@ def to_json(self) -> str: Returns an instance of `InsertResult`. """ - with self.driver.executor() as exec: - if not insert_opts: - insert_opts = InsertOpts() - insert_params, unique_opts = _make_driver_insert_params(args, insert_opts) + if not insert_opts: + insert_opts = InsertOpts() - return self.__insert_job_with_unique(exec, insert_params, unique_opts) + return self.insert_many([InsertManyParams(args, insert_opts)])[0] def insert_tx( self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None @@ -496,14 +460,12 @@ def insert_tx( ``` """ - exec = self.driver.unwrap_executor(tx) if not insert_opts: insert_opts = InsertOpts() - insert_params, unique_opts = _make_driver_insert_params(args, insert_opts) - return self.__insert_job_with_unique(exec, insert_params, unique_opts) + return self.insert_many_tx(tx, [InsertManyParams(args, insert_opts)])[0] - def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int: + def insert_many(self, args: List[JobArgs | InsertManyParams]) -> list[InsertResult]: """ Inserts many new jobs as part of a single batch operation for improved efficiency. @@ -536,9 +498,11 @@ def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int: """ with self.driver.executor() as exec: - return exec.job_insert_many(_make_driver_insert_params_many(args)) + return self._insert_many_exec(exec, args) - def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int: + def insert_many_tx( + self, tx, args: List[JobArgs | InsertManyParams] + ) -> list[InsertResult]: """ Inserts many new jobs as part of a single batch operation for improved efficiency. @@ -569,74 +533,32 @@ def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int: Returns the number of jobs inserted. """ - exec = self.driver.unwrap_executor(tx) - return exec.job_insert_many(_make_driver_insert_params_many(args)) - - def __insert_job_with_unique( - self, - exec: ExecutorProtocol, - insert_params: JobInsertParams, - unique_opts: Optional[UniqueOpts], - ) -> InsertResult: - """ - Inserts a job, accounting for unique jobs whose insertion may be skipped - if an equivalent job is already present. - """ + return self._insert_many_exec(self.driver.unwrap_executor(tx), args) - get_params, unique_key = _build_unique_get_params_and_unique_key( - insert_params, unique_opts - ) + def _insert_many_exec( + self, exec: ExecutorProtocol, args: List[JobArgs | InsertManyParams] + ) -> list[InsertResult]: + res = exec.job_insert_many(_make_driver_insert_params_many(args)) + return _to_insert_results(res) - if not get_params or not unique_opts: - return InsertResult(exec.job_insert(insert_params)) - # fast path - if ( - not unique_opts.by_state - or unique_opts.by_state.sort == UNIQUE_STATES_DEFAULT - ): - job, unique_skipped_as_duplicate = exec.job_insert_unique( - insert_params, sha256(unique_key.encode("utf-8")).digest() - ) - return InsertResult( - job=job, unique_skipped_as_duplicated=unique_skipped_as_duplicate - ) - - with exec.transaction(): - lock_key = "unique_key" - lock_key += "kind=#{insert_params.kind}" - lock_key += unique_key - - exec.advisory_lock(_hash_lock_key(self.advisory_lock_prefix, lock_key)) - - existing_job = exec.job_get_by_kind_and_unique_properties(get_params) - if existing_job: - return InsertResult(existing_job, unique_skipped_as_duplicated=True) - - return InsertResult(exec.job_insert(insert_params)) - - -def _build_unique_get_params_and_unique_key( +def _build_unique_key_and_bitmask( insert_params: JobInsertParams, - unique_opts: Optional[UniqueOpts], -) -> tuple[Optional[JobGetByKindAndUniquePropertiesParam], str]: + unique_opts: UniqueOpts, +) -> tuple[Optional[bytes], Optional[bytes]]: """ - Builds driver get params and an advisory lock key from insert params and - unique options for use during a unique job insertion. + Builds driver get params and a unique key from insert params and unique + options for use during a job insertion. """ - - if unique_opts is None: - return (None, "") - any_unique_opts = False - get_params = JobGetByKindAndUniquePropertiesParam(kind=insert_params.kind) unique_key = "" + if not unique_opts.exclude_kind: + unique_key += f"&kind={insert_params.kind}" + if unique_opts.by_args: any_unique_opts = True - get_params.by_args = True - get_params.args = insert_params.args unique_key += f"&args={insert_params.args}" if unique_opts.by_period: @@ -645,69 +567,32 @@ def _build_unique_get_params_and_unique_key( ) any_unique_opts = True - get_params.by_created_at = True - get_params.created_at = [ - lower_period_bound, - lower_period_bound + timedelta(seconds=unique_opts.by_period), - ] unique_key += f"&period={lower_period_bound.strftime('%FT%TZ')}" if unique_opts.by_queue: any_unique_opts = True - get_params.by_queue = True - get_params.queue = insert_params.queue unique_key += f"&queue={insert_params.queue}" if unique_opts.by_state: any_unique_opts = True - get_params.by_state = True - get_params.state = cast(list[str], unique_opts.by_state) unique_key += f"&state={','.join(unique_opts.by_state)}" else: - get_params.state = UNIQUE_STATES_DEFAULT unique_key += f"&state={','.join(UNIQUE_STATES_DEFAULT)}" if not any_unique_opts: - return (None, "") - - return (get_params, unique_key) + return (None, None) + unique_key_hash = sha256(unique_key.encode("utf-8")).digest() + unique_states = _validate_unique_states( + unique_opts.by_state or UNIQUE_STATES_DEFAULT + ) -def _check_advisory_lock_prefix_bounds( - advisory_lock_prefix: Optional[int], -) -> Optional[int]: - """ - Checks that an advisory lock prefix fits in 4 bytes, which is the maximum - space reserved for one. - """ - - if advisory_lock_prefix: - # We only reserve 4 bytes for the prefix, so make sure the given one - # properly fits. This will error in case that's not the case. - advisory_lock_prefix.to_bytes(4) - return advisory_lock_prefix - - -def _hash_lock_key(advisory_lock_prefix: Optional[int], lock_key: str) -> int: - """ - Generates an FNV-1 hash from the given lock key string suitable for use with - a PG advisory lock while checking for the existence of a unique job. - """ - - if advisory_lock_prefix is None: - lock_key_hash = fnv1_hash(lock_key.encode("utf-8"), 64) - else: - prefix = advisory_lock_prefix - lock_key_hash = (prefix << 32) | fnv1_hash(lock_key.encode("utf-8"), 32) - - return _uint64_to_int64(lock_key_hash) + return unique_key_hash, unique_bitmask_from_states(unique_states) def _make_driver_insert_params( - args: JobArgs, - insert_opts: InsertOpts, - is_insert_many: bool = False, -) -> Tuple[JobInsertParams, Optional[UniqueOpts]]: + args: JobArgs, insert_opts: InsertOpts +) -> JobInsertParams: """ Converts user-land job args and insert options to insert params for an underlying driver. @@ -725,9 +610,6 @@ def _make_driver_insert_params( scheduled_at = insert_opts.scheduled_at or args_insert_opts.scheduled_at unique_opts = insert_opts.unique_opts or args_insert_opts.unique_opts - if is_insert_many and unique_opts: - raise ValueError("unique opts can't be used with `insert_many`") - insert_params = JobInsertParams( args=args_json, kind=args.kind, @@ -741,18 +623,24 @@ def _make_driver_insert_params( tags=_validate_tags(insert_opts.tags or args_insert_opts.tags or []), ) - return insert_params, unique_opts + unique_opts = insert_opts.unique_opts or args_insert_opts.unique_opts + if unique_opts: + unique_key, unique_states = _build_unique_key_and_bitmask( + insert_params, unique_opts + ) + insert_params.unique_key = unique_key + insert_params.unique_states = unique_states + + return insert_params def _make_driver_insert_params_many( args: List[JobArgs | InsertManyParams], ) -> List[JobInsertParams]: return [ - _make_driver_insert_params( - arg.args, arg.insert_opts or InsertOpts(), is_insert_many=True - )[0] + _make_driver_insert_params(arg.args, arg.insert_opts or InsertOpts()) if isinstance(arg, InsertManyParams) - else _make_driver_insert_params(arg, InsertOpts(), is_insert_many=True)[0] + else _make_driver_insert_params(arg, InsertOpts()) for arg in args ] @@ -763,9 +651,36 @@ def _truncate_time(time, interval_seconds) -> datetime: ) -def _uint64_to_int64(uint64): - # Packs a uint64 then unpacks to int64 to fit within Postgres bigint - return (uint64 + (1 << 63)) % (1 << 64) - (1 << 63) +def _to_insert_results(results: list[tuple[Job, bool]]) -> list[InsertResult]: + return [ + InsertResult(job, unique_skipped_as_duplicated) + for job, unique_skipped_as_duplicated in results + ] + + +def unique_bitmask_from_states(states: list[JobState]) -> bytes: + val = 0 + + for state in states: + bit_index = JOB_STATE_BIT_POSITIONS[state] + + bit_position = 7 - (bit_index % 8) + val |= 1 << bit_position + + return val.to_bytes(1, "big") # Returns bytes like b'\xf5' + + +def unique_bitmask_to_states(mask: str) -> list[JobState]: + states = [] + + # This logic differs a bit from the above because we're working with a string + # of Postgres' bit(8) representation where the bit numbering is reversed + # (MSB on the right). + for state, bit_index in JOB_STATE_BIT_POSITIONS.items(): + if mask[bit_index] == "1": + states.append(state) + + return sorted(states) tag_re = re.compile(r"\A[\w][\w\-]+[\w]\Z") @@ -777,3 +692,13 @@ def _validate_tags(tags: list[str]) -> list[str]: len(tag) <= 255 and tag_re.match(tag) ), f"tags should be less than 255 characters in length and match regex {tag_re.pattern}" return tags + + +def _validate_unique_states(states: list[JobState]) -> list[JobState]: + for required_state in UNIQUE_STATES_REQUIRED: + if required_state not in states: + raise ValueError( + f"by_state should include required state '{required_state}'" + ) + + return states diff --git a/src/riverqueue/driver/__init__.py b/src/riverqueue/driver/__init__.py index 1a766c5..0f581d9 100644 --- a/src/riverqueue/driver/__init__.py +++ b/src/riverqueue/driver/__init__.py @@ -1,7 +1,6 @@ # Reexport for more ergonomic use in calling code. from .driver_protocol import ( ExecutorProtocol as ExecutorProtocol, - JobGetByKindAndUniquePropertiesParam as JobGetByKindAndUniquePropertiesParam, JobInsertParams as JobInsertParams, DriverProtocol as DriverProtocol, ) diff --git a/src/riverqueue/driver/driver_protocol.py b/src/riverqueue/driver/driver_protocol.py index 0d5c50a..144a93d 100644 --- a/src/riverqueue/driver/driver_protocol.py +++ b/src/riverqueue/driver/driver_protocol.py @@ -4,30 +4,11 @@ ) from dataclasses import dataclass, field from datetime import datetime -from typing import Any, Iterator, List, Optional, Protocol +from typing import Any, Iterator, Optional, Protocol from ..job import Job -@dataclass() -class JobGetByKindAndUniquePropertiesParam: - """ - Parameters for looking up a job by kind and unique properties. - """ - - kind: str - by_args: Optional[bool] = None - args: Optional[Any] = None - by_created_at: Optional[bool] = None - created_at: Optional[List[datetime]] = None - created_at_begin: Optional[datetime] = None - created_at_end: Optional[datetime] = None - by_queue: Optional[bool] = None - queue: Optional[str] = None - by_state: Optional[bool] = None - state: Optional[List[str]] = None - - @dataclass class JobInsertParams: """ @@ -46,6 +27,8 @@ class JobInsertParams: scheduled_at: Optional[datetime] = None state: str = field(default="available") tags: list[str] = field(default_factory=list) + unique_key: Optional[bytes] = None + unique_states: Optional[bytes] = None class AsyncExecutorProtocol(Protocol): @@ -55,23 +38,9 @@ class AsyncExecutorProtocol(Protocol): job. """ - async def advisory_lock(self, lock: int) -> None: - pass - - async def job_insert(self, insert_params: JobInsertParams) -> Job: - pass - - async def job_insert_many(self, all_params) -> int: - pass - - async def job_insert_unique( - self, insert_params: JobInsertParams, unique_key: bytes - ) -> tuple[Job, bool]: - pass - - async def job_get_by_kind_and_unique_properties( - self, get_params: JobGetByKindAndUniquePropertiesParam - ) -> Optional[Job]: + async def job_insert_many( + self, all_params: list[JobInsertParams] + ) -> list[tuple[Job, bool]]: pass # Even after spending two hours on it, I'm unable to find a return type for @@ -133,23 +102,9 @@ class ExecutorProtocol(Protocol): job. """ - def advisory_lock(self, lock: int) -> None: - pass - - def job_insert(self, insert_params: JobInsertParams) -> Job: - pass - - def job_insert_many(self, all_params) -> int: - pass - - def job_insert_unique( - self, insert_params: JobInsertParams, unique_key: bytes - ) -> tuple[Job, bool]: - pass - - def job_get_by_kind_and_unique_properties( - self, get_params: JobGetByKindAndUniquePropertiesParam - ) -> Optional[Job]: + def job_insert_many( + self, all_params: list[JobInsertParams] + ) -> list[tuple[Job, bool]]: pass @contextmanager diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/models.py b/src/riverqueue/driver/riversqlalchemy/dbsqlc/models.py index 843c8e4..9e93204 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/models.py +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/models.py @@ -37,3 +37,4 @@ class RiverJob: scheduled_at: datetime.datetime tags: List[str] unique_key: Optional[memoryview] + unique_states: Optional[Any] diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.py b/src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.py deleted file mode 100644 index 2518389..0000000 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.py +++ /dev/null @@ -1,31 +0,0 @@ -# Code generated by sqlc. DO NOT EDIT. -# versions: -# sqlc v1.27.0 -# source: pg_misc.sql -from typing import Any - -import sqlalchemy -import sqlalchemy.ext.asyncio - -from . import models - - -PG_ADVISORY_XACT_LOCK = """-- name: pg_advisory_xact_lock \\:exec -SELECT pg_advisory_xact_lock(:p1) -""" - - -class Querier: - def __init__(self, conn: sqlalchemy.engine.Connection): - self._conn = conn - - def pg_advisory_xact_lock(self, *, key: int) -> None: - self._conn.execute(sqlalchemy.text(PG_ADVISORY_XACT_LOCK), {"p1": key}) - - -class AsyncQuerier: - def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection): - self._conn = conn - - async def pg_advisory_xact_lock(self, *, key: int) -> None: - await self._conn.execute(sqlalchemy.text(PG_ADVISORY_XACT_LOCK), {"p1": key}) diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.sql b/src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.sql deleted file mode 100644 index 1bab8ad..0000000 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.sql +++ /dev/null @@ -1,2 +0,0 @@ --- name: PGAdvisoryXactLock :exec -SELECT pg_advisory_xact_lock(@key); \ No newline at end of file diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py index 9c1ebeb..7ffff94 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py @@ -13,48 +13,21 @@ JOB_GET_ALL = """-- name: job_get_all \\:many -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states FROM river_job """ JOB_GET_BY_ID = """-- name: job_get_by_id \\:one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states FROM river_job WHERE id = :p1 """ -JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES = """-- name: job_get_by_kind_and_unique_properties \\:one -SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key -FROM river_job -WHERE kind = :p1 - AND CASE WHEN :p2\\:\\:boolean THEN args = :p3 ELSE true END - AND CASE WHEN :p4\\:\\:boolean THEN tstzrange(:p5\\:\\:timestamptz, :p6\\:\\:timestamptz, '[)') @> created_at ELSE true END - AND CASE WHEN :p7\\:\\:boolean THEN queue = :p8 ELSE true END - AND CASE WHEN :p9\\:\\:boolean THEN state\\:\\:text = any(:p10\\:\\:text[]) ELSE true END -""" - - -@dataclasses.dataclass() -class JobGetByKindAndUniquePropertiesParams: - kind: str - by_args: bool - args: Any - by_created_at: bool - created_at_begin: datetime.datetime - created_at_end: datetime.datetime - by_queue: bool - queue: str - by_state: bool - state: List[str] - - -JOB_INSERT_FAST = """-- name: job_insert_fast \\:one +JOB_INSERT_FAST_MANY = """-- name: job_insert_fast_many \\:many INSERT INTO river_job( args, - created_at, - finalized_at, kind, max_attempts, metadata, @@ -62,49 +35,9 @@ class JobGetByKindAndUniquePropertiesParams: queue, scheduled_at, state, - tags -) VALUES ( - :p1\\:\\:jsonb, - coalesce(:p2\\:\\:timestamptz, now()), - :p3, - :p4\\:\\:text, - :p5\\:\\:smallint, - coalesce(:p6\\:\\:jsonb, '{}'), - :p7\\:\\:smallint, - :p8\\:\\:text, - coalesce(:p9\\:\\:timestamptz, now()), - :p10\\:\\:river_job_state, - coalesce(:p11\\:\\:varchar(255)[], '{}') -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key -""" - - -@dataclasses.dataclass() -class JobInsertFastParams: - args: Any - created_at: Optional[datetime.datetime] - finalized_at: Optional[datetime.datetime] - kind: str - max_attempts: int - metadata: Any - priority: int - queue: str - scheduled_at: Optional[datetime.datetime] - state: models.RiverJobState - tags: List[str] - - -JOB_INSERT_FAST_MANY = """-- name: job_insert_fast_many \\:execrows -INSERT INTO river_job( - args, - kind, - max_attempts, - metadata, - priority, - queue, - scheduled_at, - state, - tags + tags, + unique_key, + unique_states ) SELECT unnest(:p1\\:\\:jsonb[]), unnest(:p2\\:\\:text[]), @@ -114,10 +47,22 @@ class JobInsertFastParams: unnest(:p6\\:\\:text[]), unnest(:p7\\:\\:timestamptz[]), unnest(:p8\\:\\:river_job_state[]), - - -- Had trouble getting multi-dimensional arrays to play nicely with sqlc, - -- but it might be possible. For now, join tags into a single string. - string_to_array(unnest(:p9\\:\\:text[]), ',') + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest(:p9\\:\\:text[]), ','), + + unnest(:p10\\:\\:bytea[]), + -- Strings of bits are used for the input type here to make sqlalchemy play nicely with bit(8)\\: + unnest(:p11\\:\\:text[])\\:\\:bit(8) + +ON CONFLICT (unique_key) + WHERE unique_key IS NOT NULL + AND unique_states IS NOT NULL + AND river_job_state_in_bitmask(unique_states, state) + -- Something needs to be updated for a row to be returned on a conflict. + DO UPDATE SET kind = EXCLUDED.kind +RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states, (xmax != 0) AS unique_skipped_as_duplicate """ @@ -132,6 +77,31 @@ class JobInsertFastManyParams: scheduled_at: List[datetime.datetime] state: List[models.RiverJobState] tags: List[str] + unique_key: List[memoryview] + unique_states: List[str] + + +@dataclasses.dataclass() +class JobInsertFastManyRow: + id: int + args: Any + attempt: int + attempted_at: Optional[datetime.datetime] + attempted_by: Optional[List[str]] + created_at: datetime.datetime + errors: Optional[List[Any]] + finalized_at: Optional[datetime.datetime] + kind: str + max_attempts: int + metadata: Any + priority: int + queue: str + state: models.RiverJobState + scheduled_at: datetime.datetime + tags: List[str] + unique_key: Optional[memoryview] + unique_states: Optional[Any] + unique_skipped_as_duplicate: bool JOB_INSERT_FULL = """-- name: job_insert_full \\:one @@ -167,7 +137,7 @@ class JobInsertFastManyParams: :p13\\:\\:river_job_state, coalesce(:p14\\:\\:varchar(255)[], '{}'), :p15 -) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key +) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states """ @@ -190,79 +160,6 @@ class JobInsertFullParams: unique_key: Optional[memoryview] -JOB_INSERT_UNIQUE = """-- name: job_insert_unique \\:one -INSERT INTO river_job( - args, - created_at, - finalized_at, - kind, - max_attempts, - metadata, - priority, - queue, - scheduled_at, - state, - tags, - unique_key -) VALUES ( - :p1, - coalesce(:p2\\:\\:timestamptz, now()), - :p3, - :p4, - :p5, - coalesce(:p6\\:\\:jsonb, '{}'), - :p7, - :p8, - coalesce(:p9\\:\\:timestamptz, now()), - :p10, - coalesce(:p11\\:\\:varchar(255)[], '{}'), - :p12 -) -ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL - -- Something needs to be updated for a row to be returned on a conflict. - DO UPDATE SET kind = EXCLUDED.kind -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, (xmax != 0) AS unique_skipped_as_duplicate -""" - - -@dataclasses.dataclass() -class JobInsertUniqueParams: - args: Any - created_at: Optional[datetime.datetime] - finalized_at: Optional[datetime.datetime] - kind: str - max_attempts: int - metadata: Any - priority: int - queue: str - scheduled_at: Optional[datetime.datetime] - state: models.RiverJobState - tags: List[str] - unique_key: Optional[memoryview] - - -@dataclasses.dataclass() -class JobInsertUniqueRow: - id: int - args: Any - attempt: int - attempted_at: Optional[datetime.datetime] - attempted_by: Optional[List[str]] - created_at: datetime.datetime - errors: Optional[List[Any]] - finalized_at: Optional[datetime.datetime] - kind: str - max_attempts: int - metadata: Any - priority: int - queue: str - state: models.RiverJobState - scheduled_at: datetime.datetime - tags: List[str] - unique_key: Optional[memoryview] - unique_skipped_as_duplicate: bool - - class Querier: def __init__(self, conn: sqlalchemy.engine.Connection): self._conn = conn @@ -288,6 +185,7 @@ def job_get_all(self) -> Iterator[models.RiverJob]: scheduled_at=row[14], tags=row[15], unique_key=row[16], + unique_states=row[17], ) def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]: @@ -312,80 +210,10 @@ def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]: scheduled_at=row[14], tags=row[15], unique_key=row[16], + unique_states=row[17], ) - def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUniquePropertiesParams) -> Optional[models.RiverJob]: - row = self._conn.execute(sqlalchemy.text(JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES), { - "p1": arg.kind, - "p2": arg.by_args, - "p3": arg.args, - "p4": arg.by_created_at, - "p5": arg.created_at_begin, - "p6": arg.created_at_end, - "p7": arg.by_queue, - "p8": arg.queue, - "p9": arg.by_state, - "p10": arg.state, - }).first() - if row is None: - return None - return models.RiverJob( - id=row[0], - args=row[1], - attempt=row[2], - attempted_at=row[3], - attempted_by=row[4], - created_at=row[5], - errors=row[6], - finalized_at=row[7], - kind=row[8], - max_attempts=row[9], - metadata=row[10], - priority=row[11], - queue=row[12], - state=row[13], - scheduled_at=row[14], - tags=row[15], - unique_key=row[16], - ) - - def job_insert_fast(self, arg: JobInsertFastParams) -> Optional[models.RiverJob]: - row = self._conn.execute(sqlalchemy.text(JOB_INSERT_FAST), { - "p1": arg.args, - "p2": arg.created_at, - "p3": arg.finalized_at, - "p4": arg.kind, - "p5": arg.max_attempts, - "p6": arg.metadata, - "p7": arg.priority, - "p8": arg.queue, - "p9": arg.scheduled_at, - "p10": arg.state, - "p11": arg.tags, - }).first() - if row is None: - return None - return models.RiverJob( - id=row[0], - args=row[1], - attempt=row[2], - attempted_at=row[3], - attempted_by=row[4], - created_at=row[5], - errors=row[6], - finalized_at=row[7], - kind=row[8], - max_attempts=row[9], - metadata=row[10], - priority=row[11], - queue=row[12], - state=row[13], - scheduled_at=row[14], - tags=row[15], - unique_key=row[16], - ) - - def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int: + def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> Iterator[JobInsertFastManyRow]: result = self._conn.execute(sqlalchemy.text(JOB_INSERT_FAST_MANY), { "p1": arg.args, "p2": arg.kind, @@ -396,8 +224,31 @@ def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int: "p7": arg.scheduled_at, "p8": arg.state, "p9": arg.tags, + "p10": arg.unique_key, + "p11": arg.unique_states, }) - return result.rowcount + for row in result: + yield JobInsertFastManyRow( + id=row[0], + args=row[1], + attempt=row[2], + attempted_at=row[3], + attempted_by=row[4], + created_at=row[5], + errors=row[6], + finalized_at=row[7], + kind=row[8], + max_attempts=row[9], + metadata=row[10], + priority=row[11], + queue=row[12], + state=row[13], + scheduled_at=row[14], + tags=row[15], + unique_key=row[16], + unique_states=row[17], + unique_skipped_as_duplicate=row[18], + ) def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob]: row = self._conn.execute(sqlalchemy.text(JOB_INSERT_FULL), { @@ -437,44 +288,7 @@ def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob] scheduled_at=row[14], tags=row[15], unique_key=row[16], - ) - - def job_insert_unique(self, arg: JobInsertUniqueParams) -> Optional[JobInsertUniqueRow]: - row = self._conn.execute(sqlalchemy.text(JOB_INSERT_UNIQUE), { - "p1": arg.args, - "p2": arg.created_at, - "p3": arg.finalized_at, - "p4": arg.kind, - "p5": arg.max_attempts, - "p6": arg.metadata, - "p7": arg.priority, - "p8": arg.queue, - "p9": arg.scheduled_at, - "p10": arg.state, - "p11": arg.tags, - "p12": arg.unique_key, - }).first() - if row is None: - return None - return JobInsertUniqueRow( - id=row[0], - args=row[1], - attempt=row[2], - attempted_at=row[3], - attempted_by=row[4], - created_at=row[5], - errors=row[6], - finalized_at=row[7], - kind=row[8], - max_attempts=row[9], - metadata=row[10], - priority=row[11], - queue=row[12], - state=row[13], - scheduled_at=row[14], - tags=row[15], - unique_key=row[16], - unique_skipped_as_duplicate=row[17], + unique_states=row[17], ) @@ -503,6 +317,7 @@ async def job_get_all(self) -> AsyncIterator[models.RiverJob]: scheduled_at=row[14], tags=row[15], unique_key=row[16], + unique_states=row[17], ) async def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]: @@ -527,81 +342,11 @@ async def job_get_by_id(self, *, id: int) -> Optional[models.RiverJob]: scheduled_at=row[14], tags=row[15], unique_key=row[16], + unique_states=row[17], ) - async def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUniquePropertiesParams) -> Optional[models.RiverJob]: - row = (await self._conn.execute(sqlalchemy.text(JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES), { - "p1": arg.kind, - "p2": arg.by_args, - "p3": arg.args, - "p4": arg.by_created_at, - "p5": arg.created_at_begin, - "p6": arg.created_at_end, - "p7": arg.by_queue, - "p8": arg.queue, - "p9": arg.by_state, - "p10": arg.state, - })).first() - if row is None: - return None - return models.RiverJob( - id=row[0], - args=row[1], - attempt=row[2], - attempted_at=row[3], - attempted_by=row[4], - created_at=row[5], - errors=row[6], - finalized_at=row[7], - kind=row[8], - max_attempts=row[9], - metadata=row[10], - priority=row[11], - queue=row[12], - state=row[13], - scheduled_at=row[14], - tags=row[15], - unique_key=row[16], - ) - - async def job_insert_fast(self, arg: JobInsertFastParams) -> Optional[models.RiverJob]: - row = (await self._conn.execute(sqlalchemy.text(JOB_INSERT_FAST), { - "p1": arg.args, - "p2": arg.created_at, - "p3": arg.finalized_at, - "p4": arg.kind, - "p5": arg.max_attempts, - "p6": arg.metadata, - "p7": arg.priority, - "p8": arg.queue, - "p9": arg.scheduled_at, - "p10": arg.state, - "p11": arg.tags, - })).first() - if row is None: - return None - return models.RiverJob( - id=row[0], - args=row[1], - attempt=row[2], - attempted_at=row[3], - attempted_by=row[4], - created_at=row[5], - errors=row[6], - finalized_at=row[7], - kind=row[8], - max_attempts=row[9], - metadata=row[10], - priority=row[11], - queue=row[12], - state=row[13], - scheduled_at=row[14], - tags=row[15], - unique_key=row[16], - ) - - async def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int: - result = await self._conn.execute(sqlalchemy.text(JOB_INSERT_FAST_MANY), { + async def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> AsyncIterator[JobInsertFastManyRow]: + result = await self._conn.stream(sqlalchemy.text(JOB_INSERT_FAST_MANY), { "p1": arg.args, "p2": arg.kind, "p3": arg.max_attempts, @@ -611,8 +356,31 @@ async def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int: "p7": arg.scheduled_at, "p8": arg.state, "p9": arg.tags, + "p10": arg.unique_key, + "p11": arg.unique_states, }) - return result.rowcount + async for row in result: + yield JobInsertFastManyRow( + id=row[0], + args=row[1], + attempt=row[2], + attempted_at=row[3], + attempted_by=row[4], + created_at=row[5], + errors=row[6], + finalized_at=row[7], + kind=row[8], + max_attempts=row[9], + metadata=row[10], + priority=row[11], + queue=row[12], + state=row[13], + scheduled_at=row[14], + tags=row[15], + unique_key=row[16], + unique_states=row[17], + unique_skipped_as_duplicate=row[18], + ) async def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob]: row = (await self._conn.execute(sqlalchemy.text(JOB_INSERT_FULL), { @@ -652,42 +420,5 @@ async def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.Riv scheduled_at=row[14], tags=row[15], unique_key=row[16], - ) - - async def job_insert_unique(self, arg: JobInsertUniqueParams) -> Optional[JobInsertUniqueRow]: - row = (await self._conn.execute(sqlalchemy.text(JOB_INSERT_UNIQUE), { - "p1": arg.args, - "p2": arg.created_at, - "p3": arg.finalized_at, - "p4": arg.kind, - "p5": arg.max_attempts, - "p6": arg.metadata, - "p7": arg.priority, - "p8": arg.queue, - "p9": arg.scheduled_at, - "p10": arg.state, - "p11": arg.tags, - "p12": arg.unique_key, - })).first() - if row is None: - return None - return JobInsertUniqueRow( - id=row[0], - args=row[1], - attempt=row[2], - attempted_at=row[3], - attempted_by=row[4], - created_at=row[5], - errors=row[6], - finalized_at=row[7], - kind=row[8], - max_attempts=row[9], - metadata=row[10], - priority=row[11], - queue=row[12], - state=row[13], - scheduled_at=row[14], - tags=row[15], - unique_key=row[16], - unique_skipped_as_duplicate=row[17], + unique_states=row[17], ) diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql index ca62c3f..464054c 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql @@ -1,17 +1,17 @@ CREATE TYPE river_job_state AS ENUM( - 'available', - 'cancelled', - 'completed', - 'discarded', - 'pending', - 'retryable', - 'running', - 'scheduled' + 'available', + 'cancelled', + 'completed', + 'discarded', + 'pending', + 'retryable', + 'running', + 'scheduled' ); CREATE TABLE river_job( id bigserial PRIMARY KEY, - args jsonb NOT NULL DEFAULT '{}'::jsonb, + args jsonb NOT NULL DEFAULT '{}', attempt smallint NOT NULL DEFAULT 0, attempted_at timestamptz, attempted_by text[], @@ -20,14 +20,14 @@ CREATE TABLE river_job( finalized_at timestamptz, kind text NOT NULL, max_attempts smallint NOT NULL, - metadata jsonb NOT NULL DEFAULT '{}' ::jsonb, + metadata jsonb NOT NULL DEFAULT '{}', priority smallint NOT NULL DEFAULT 1, - queue text NOT NULL DEFAULT 'default' ::text, - state river_job_state NOT NULL DEFAULT 'available' ::river_job_state, + queue text NOT NULL DEFAULT 'default', + state river_job_state NOT NULL DEFAULT 'available', scheduled_at timestamptz NOT NULL DEFAULT NOW(), - tags varchar(255)[] NOT NULL DEFAULT '{}' ::varchar(255)[], + tags varchar(255)[] NOT NULL DEFAULT '{}', unique_key bytea, - + unique_states bit(8), CONSTRAINT finalized_or_finalized_at_null CHECK ( (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) @@ -46,47 +46,9 @@ SELECT * FROM river_job WHERE id = @id; --- name: JobGetByKindAndUniqueProperties :one -SELECT * -FROM river_job -WHERE kind = @kind - AND CASE WHEN @by_args::boolean THEN args = @args ELSE true END - AND CASE WHEN @by_created_at::boolean THEN tstzrange(@created_at_begin::timestamptz, @created_at_end::timestamptz, '[)') @> created_at ELSE true END - AND CASE WHEN @by_queue::boolean THEN queue = @queue ELSE true END - AND CASE WHEN @by_state::boolean THEN state::text = any(@state::text[]) ELSE true END; - --- name: JobInsertFast :one -INSERT INTO river_job( - args, - created_at, - finalized_at, - kind, - max_attempts, - metadata, - priority, - queue, - scheduled_at, - state, - tags -) VALUES ( - @args::jsonb, - coalesce(sqlc.narg('created_at')::timestamptz, now()), - @finalized_at, - @kind::text, - @max_attempts::smallint, - coalesce(@metadata::jsonb, '{}'), - @priority::smallint, - @queue::text, - coalesce(sqlc.narg('scheduled_at')::timestamptz, now()), - @state::river_job_state, - coalesce(@tags::varchar(255)[], '{}') -) RETURNING *; - --- name: JobInsertUnique :one +-- name: JobInsertFastMany :many INSERT INTO river_job( args, - created_at, - finalized_at, kind, max_attempts, metadata, @@ -95,37 +57,8 @@ INSERT INTO river_job( scheduled_at, state, tags, - unique_key -) VALUES ( - @args, - coalesce(sqlc.narg('created_at')::timestamptz, now()), - @finalized_at, - @kind, - @max_attempts, - coalesce(@metadata::jsonb, '{}'), - @priority, - @queue, - coalesce(sqlc.narg('scheduled_at')::timestamptz, now()), - @state, - coalesce(@tags::varchar(255)[], '{}'), - @unique_key -) -ON CONFLICT (kind, unique_key) WHERE unique_key IS NOT NULL - -- Something needs to be updated for a row to be returned on a conflict. - DO UPDATE SET kind = EXCLUDED.kind -RETURNING *, (xmax != 0) AS unique_skipped_as_duplicate; - --- name: JobInsertFastMany :execrows -INSERT INTO river_job( - args, - kind, - max_attempts, - metadata, - priority, - queue, - scheduled_at, - state, - tags + unique_key, + unique_states ) SELECT unnest(@args::jsonb[]), unnest(@kind::text[]), @@ -135,10 +68,22 @@ INSERT INTO river_job( unnest(@queue::text[]), unnest(@scheduled_at::timestamptz[]), unnest(@state::river_job_state[]), + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest(@tags::text[]), ','), + + unnest(@unique_key::bytea[]), + -- Strings of bits are used for the input type here to make sqlalchemy play nicely with bit(8): + unnest(@unique_states::text[])::bit(8) - -- Had trouble getting multi-dimensional arrays to play nicely with sqlc, - -- but it might be possible. For now, join tags into a single string. - string_to_array(unnest(@tags::text[]), ','); +ON CONFLICT (unique_key) + WHERE unique_key IS NOT NULL + AND unique_states IS NOT NULL + AND river_job_state_in_bitmask(unique_states, state) + -- Something needs to be updated for a row to be returned on a conflict. + DO UPDATE SET kind = EXCLUDED.kind +RETURNING river_job.*, (xmax != 0) AS unique_skipped_as_duplicate; -- name: JobInsertFull :one INSERT INTO river_job( @@ -173,4 +118,4 @@ INSERT INTO river_job( @state::river_job_state, coalesce(@tags::varchar(255)[], '{}'), @unique_key -) RETURNING *; \ No newline at end of file +) RETURNING *; diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/sqlc.yaml b/src/riverqueue/driver/riversqlalchemy/dbsqlc/sqlc.yaml index a70746c..f7f6eb1 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/sqlc.yaml +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/sqlc.yaml @@ -2,14 +2,12 @@ version: "2" plugins: - name: "py" wasm: - url: "https://downloads.sqlc.dev/plugin/sqlc-gen-python_1.0.0.wasm" - sha256: "aca83e1f59f8ffdc604774c2f6f9eb321a2b23e07dc83fc12289d25305fa065b" + url: https://downloads.sqlc.dev/plugin/sqlc-gen-python_1.2.0.wasm + sha256: a6c5d174c407007c3717eea36ff0882744346e6ba991f92f71d6ab2895204c0e sql: - schema: - - pg_misc.sql - river_job.sql queries: - - pg_misc.sql - river_job.sql engine: "postgresql" codegen: diff --git a/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py b/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py index 5c16eb6..96c320e 100644 --- a/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py +++ b/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py @@ -18,58 +18,27 @@ from ...driver import ( DriverProtocol, ExecutorProtocol, - JobGetByKindAndUniquePropertiesParam, JobInsertParams, ) +from ...client import unique_bitmask_to_states from ...job import AttemptError, Job, JobState -from .dbsqlc import models, river_job, pg_misc +from .dbsqlc import models, river_job class AsyncExecutor(AsyncExecutorProtocol): def __init__(self, conn: AsyncConnection): self.conn = conn - self.pg_misc_querier = pg_misc.AsyncQuerier(conn) self.job_querier = river_job.AsyncQuerier(conn) - async def advisory_lock(self, key: int) -> None: - await self.pg_misc_querier.pg_advisory_xact_lock(key=key) - - async def job_insert(self, insert_params: JobInsertParams) -> Job: - return job_from_row( - cast( # drop Optional[] because insert always returns a row - models.RiverJob, - await self.job_querier.job_insert_fast( - cast(river_job.JobInsertFastParams, insert_params) - ), + async def job_insert_many( + self, all_params: list[JobInsertParams] + ) -> list[tuple[Job, bool]]: + return [ + _job_insert_result_from_row(row) + async for row in self.job_querier.job_insert_fast_many( + _build_insert_many_params(all_params) ) - ) - - async def job_insert_many(self, all_params: list[JobInsertParams]) -> int: - await self.job_querier.job_insert_fast_many( - _build_insert_many_params(all_params) - ) - return len(all_params) - - async def job_insert_unique( - self, insert_params: JobInsertParams, unique_key: bytes - ) -> tuple[Job, bool]: - insert_unique_params = cast(river_job.JobInsertUniqueParams, insert_params) - insert_unique_params.unique_key = memoryview(unique_key) - - res = cast( # drop Optional[] because insert always returns a row - river_job.JobInsertUniqueRow, - await self.job_querier.job_insert_unique(insert_unique_params), - ) - - return job_from_row(res), res.unique_skipped_as_duplicate - - async def job_get_by_kind_and_unique_properties( - self, get_params: JobGetByKindAndUniquePropertiesParam - ) -> Optional[Job]: - row = await self.job_querier.job_get_by_kind_and_unique_properties( - cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params) - ) - return job_from_row(row) if row else None + ] @asynccontextmanager async def transaction(self) -> AsyncGenerator: @@ -108,46 +77,15 @@ def unwrap_executor(self, tx) -> AsyncExecutorProtocol: class Executor(ExecutorProtocol): def __init__(self, conn: Connection): self.conn = conn - self.pg_misc_querier = pg_misc.Querier(conn) self.job_querier = river_job.Querier(conn) - def advisory_lock(self, key: int) -> None: - self.pg_misc_querier.pg_advisory_xact_lock(key=key) - - def job_insert(self, insert_params: JobInsertParams) -> Job: - return job_from_row( - cast( # drop Optional[] because insert always returns a row - models.RiverJob, - self.job_querier.job_insert_fast( - cast(river_job.JobInsertFastParams, insert_params) - ), - ), - ) - - def job_insert_many(self, all_params: list[JobInsertParams]) -> int: - self.job_querier.job_insert_fast_many(_build_insert_many_params(all_params)) - return len(all_params) - - def job_insert_unique( - self, insert_params: JobInsertParams, unique_key: bytes - ) -> tuple[Job, bool]: - insert_unique_params = cast(river_job.JobInsertUniqueParams, insert_params) - insert_unique_params.unique_key = memoryview(unique_key) - - res = cast( # drop Optional[] because insert always returns a row - river_job.JobInsertUniqueRow, - self.job_querier.job_insert_unique(insert_unique_params), - ) - - return job_from_row(res), res.unique_skipped_as_duplicate - - def job_get_by_kind_and_unique_properties( - self, get_params: JobGetByKindAndUniquePropertiesParam - ) -> Optional[Job]: - row = self.job_querier.job_get_by_kind_and_unique_properties( - cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params) + def job_insert_many( + self, all_params: list[JobInsertParams] + ) -> list[tuple[Job, bool]]: + res = self.job_querier.job_insert_fast_many( + _build_insert_many_params(all_params) ) - return job_from_row(row) if row else None + return [_job_insert_result_from_row(row) for row in res] @contextmanager def transaction(self) -> Iterator[None]: @@ -194,6 +132,8 @@ def _build_insert_many_params( scheduled_at=[], state=[], tags=[], + unique_key=[], + unique_states=[], ) for insert_params in all_params: @@ -208,11 +148,19 @@ def _build_insert_many_params( ) insert_many_params.state.append(cast(models.RiverJobState, insert_params.state)) insert_many_params.tags.append(",".join(insert_params.tags)) + insert_many_params.unique_key.append(insert_params.unique_key or None) + + if insert_params.unique_states: + one_byte = insert_params.unique_states[0] + bit_string = format(one_byte, "08b") + insert_many_params.unique_states.append(bit_string) + else: + insert_many_params.unique_states.append(None) return insert_many_params -def job_from_row(row: models.RiverJob | river_job.JobInsertUniqueRow) -> Job: +def job_from_row(row: models.RiverJob) -> Job: """ Converts an internal sqlc generated row to the top level type, issuing a few minor transformations along the way. Timestamps are changed from local @@ -240,5 +188,14 @@ def to_utc(t: datetime) -> datetime: scheduled_at=to_utc(row.scheduled_at), state=cast(JobState, row.state), tags=row.tags, - unique_key=cast(Optional[bytes], row.unique_key), + unique_key=cast(Optional[bytes], row.unique_key) if row.unique_key else None, + unique_states=unique_bitmask_to_states(row.unique_states) + if row.unique_states + else None, ) + + +def _job_insert_result_from_row( + row: river_job.JobInsertFastManyRow, +) -> tuple[Job, bool]: + return job_from_row(cast(models.RiverJob, row)), row.unique_skipped_as_duplicate diff --git a/src/riverqueue/fnv.py b/src/riverqueue/fnv.py deleted file mode 100644 index 99ad673..0000000 --- a/src/riverqueue/fnv.py +++ /dev/null @@ -1,42 +0,0 @@ -""" -FNV is the Fowler–Noll–Vo hash function, a simple hash that's very easy to -implement, and hash the perfect characteristics for use with the 64 bits of -available space in a PG advisory lock. - -I'm implemented it myself so that the River package can stay dependency free -(and because it's quite easy to do). -""" - -from typing import Dict, Literal - - -def fnv1_hash(data: bytes, size: Literal[32] | Literal[64]) -> int: - """ - Hashes data as a 32-bit or 64-bit FNV hash and returns the result. Data - should be bytes rather than a string, so encode a string with something like - `input_str.encode("utf-8")` or `b"string as bytes"`. - """ - - assert isinstance(data, bytes) - - hash = __OFFSET_BASIS[size] - mask = 2**size - 1 # creates a mask of 1s of `size` bits long like 0xffffffff - prime = __PRIME[size] - - for byte in data: - hash *= prime - hash &= mask # take lower N bits of multiplication product - hash ^= byte - - return hash - - -__OFFSET_BASIS: Dict[Literal[32] | Literal[64], int] = { - 32: 0x811C9DC5, - 64: 0xCBF29CE484222325, -} - -__PRIME: Dict[Literal[32] | Literal[64], int] = { - 32: 0x01000193, - 64: 0x00000100000001B3, -} diff --git a/src/riverqueue/insert_opts.py b/src/riverqueue/insert_opts.py index 6b9f15b..92cee29 100644 --- a/src/riverqueue/insert_opts.py +++ b/src/riverqueue/insert_opts.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from datetime import datetime -from typing import Any, Literal, Optional +from typing import Any, List, Literal, Optional, Union from riverqueue.job import JobState @@ -82,6 +82,8 @@ class UniqueOpts: args and queues. If either args or queue is changed on a new job, it's allowed to be inserted as a new job. + TODO update description ⚠ ⚠️ ⚠ + Uniquenes is checked at insert time by taking a Postgres advisory lock, doing a look up for an equivalent row, and inserting only if none was found. There's no database-level mechanism that guarantees jobs stay unique, so if @@ -89,7 +91,7 @@ class UniqueOpts: check doesn't occur), it's conceivable that duplicates could coexist. """ - by_args: Optional[Literal[True]] = None + by_args: Optional[Union[Literal[True], List[str]]] = None """ Indicates that uniqueness should be enforced for any specific instance of encoded args for a job. @@ -140,3 +142,12 @@ class UniqueOpts: discarded, but not yet cleaned out by the system, won't count towards the uniqueness of a new insert. """ + + exclude_kind: Optional[Literal[True]] = None + """ + Indicates that the job kind should be excluded from the unique key + computation. + + Default is false, meaning that the job kind is included in the unique key + computation. + """ diff --git a/src/riverqueue/job.py b/src/riverqueue/job.py index d448bc0..2b5964c 100644 --- a/src/riverqueue/job.py +++ b/src/riverqueue/job.py @@ -199,6 +199,11 @@ class Job: configuration. """ + unique_states: Optional[list[JobState]] + """ + A list of states that the job must be in to be considered for uniqueness. + """ + @dataclass class AttemptError: diff --git a/tests/client_test.py b/tests/client_test.py index 37e53b0..2450b71 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -4,7 +4,8 @@ import pytest -from riverqueue import Client, InsertOpts, UniqueOpts +from riverqueue import Client, InsertOpts, JobState, UniqueOpts +from riverqueue.client import unique_bitmask_from_states from riverqueue.driver import DriverProtocol, ExecutorProtocol import sqlalchemy @@ -40,19 +41,17 @@ def client(mock_driver) -> Client: def test_insert_with_only_args(client, mock_exec, simple_args): - mock_exec.job_get_by_kind_and_unique_properties.return_value = None - mock_exec.job_insert.return_value = "job_row" + mock_exec.job_insert_many.return_value = [("job_row", False)] insert_res = client.insert(simple_args) - mock_exec.job_insert.assert_called_once() + mock_exec.job_insert_many.assert_called_once() assert insert_res.job == "job_row" def test_insert_tx(mock_driver, client, simple_args): mock_exec = MagicMock(spec=ExecutorProtocol) - mock_exec.job_get_by_kind_and_unique_properties.return_value = None - mock_exec.job_insert.return_value = "job_row" + mock_exec.job_insert_many.return_value = [("job_row", False)] mock_tx = MagicMock(spec=sqlalchemy.Transaction) @@ -64,12 +63,12 @@ def mock_unwrap_executor(tx: sqlalchemy.Transaction): insert_res = client.insert_tx(mock_tx, simple_args) - mock_exec.job_insert.assert_called_once() + mock_exec.job_insert_many.assert_called_once() assert insert_res.job == "job_row" def test_insert_with_insert_opts_from_args(client, mock_exec, simple_args): - mock_exec.job_insert.return_value = "job_row" + mock_exec.job_insert_many.return_value = [("job_row", False)] insert_res = client.insert( simple_args, @@ -78,10 +77,12 @@ def test_insert_with_insert_opts_from_args(client, mock_exec, simple_args): ), ) - mock_exec.job_insert.assert_called_once() + mock_exec.job_insert_many.assert_called_once() assert insert_res.job == "job_row" - insert_args = mock_exec.job_insert.call_args[0][0] + call_args = mock_exec.job_insert_many.call_args[0][0] + assert len(call_args) == 1 + insert_args = call_args[0] assert insert_args.max_attempts == 23 assert insert_args.priority == 2 assert insert_args.queue == "job_custom_queue" @@ -106,16 +107,18 @@ def insert_opts() -> InsertOpts: def to_json() -> str: return "{}" - mock_exec.job_insert.return_value = "job_row" + mock_exec.job_insert_many.return_value = [("job_row", False)] insert_res = client.insert( MyArgs(), ) - mock_exec.job_insert.assert_called_once() + mock_exec.job_insert_many.assert_called_once() assert insert_res.job == "job_row" - insert_args = mock_exec.job_insert.call_args[0][0] + call_args = mock_exec.job_insert_many.call_args[0][0] + assert len(call_args) == 1 + insert_args = call_args[0] assert insert_args.max_attempts == 23 assert insert_args.priority == 2 assert insert_args.queue == "job_custom_queue" @@ -140,7 +143,7 @@ def insert_opts() -> InsertOpts: def to_json() -> str: return "{}" - mock_exec.job_insert.return_value = "job_row" + mock_exec.job_insert_many.return_value = [("job_row", False)] insert_res = client.insert( simple_args, @@ -149,10 +152,12 @@ def to_json() -> str: ), ) - mock_exec.job_insert.assert_called_once() + mock_exec.job_insert_many.assert_called_once() assert insert_res.job == "job_row" - insert_args = mock_exec.job_insert.call_args[0][0] + call_args = mock_exec.job_insert_many.call_args[0][0] + assert len(call_args) == 1 + insert_args = call_args[0] assert insert_args.max_attempts == 17 assert insert_args.priority == 3 assert insert_args.queue == "my_queue" @@ -161,18 +166,18 @@ def to_json() -> str: def test_insert_with_unique_opts_by_args(client, mock_exec, simple_args): insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=True)) - - # fast path - mock_exec.job_insert_unique.return_value = ("job_row", False) + mock_exec.job_insert_many.return_value = [("job_row", False)] insert_res = client.insert(simple_args, insert_opts=insert_opts) - mock_exec.job_insert_unique.assert_called_once() + mock_exec.job_insert_many.assert_called_once() assert insert_res.job == "job_row" # Check that the UniqueOpts were correctly processed - call_args = mock_exec.job_insert_unique.call_args[0][0] - assert call_args.kind == "simple" + call_args = mock_exec.job_insert_many.call_args[0][0] + assert len(call_args) == 1 + insert_params = call_args[0] + assert insert_params.kind == "simple" @patch("datetime.datetime") @@ -182,51 +187,68 @@ def test_insert_with_unique_opts_by_period( mock_datetime.now.return_value = datetime(2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc) insert_opts = InsertOpts(unique_opts=UniqueOpts(by_period=900)) - - # fast path - mock_exec.job_insert_unique.return_value = ("job_row", False) + mock_exec.job_insert_many.return_value = [("job_row", False)] insert_res = client.insert(simple_args, insert_opts=insert_opts) - mock_exec.job_insert_unique.assert_called_once() + mock_exec.job_insert_many.assert_called_once() assert insert_res.job == "job_row" # Check that the UniqueOpts were correctly processed - call_args = mock_exec.job_insert_unique.call_args[0][0] - assert call_args.kind == "simple" + call_args = mock_exec.job_insert_many.call_args[0][0] + assert len(call_args) == 1 + insert_params = call_args[0] + assert insert_params.kind == "simple" def test_insert_with_unique_opts_by_queue(client, mock_exec, simple_args): insert_opts = InsertOpts(unique_opts=UniqueOpts(by_queue=True)) - # fast path - mock_exec.job_insert_unique.return_value = ("job_row", False) + mock_exec.job_insert_many.return_value = [("job_row", False)] insert_res = client.insert(simple_args, insert_opts=insert_opts) - mock_exec.job_insert_unique.assert_called_once() + mock_exec.job_insert_many.assert_called_once() assert insert_res.job == "job_row" # Check that the UniqueOpts were correctly processed - call_args = mock_exec.job_insert_unique.call_args[0][0] - assert call_args.kind == "simple" + call_args = mock_exec.job_insert_many.call_args[0][0] + assert len(call_args) == 1 + insert_params = call_args[0] + assert insert_params.kind == "simple" + # default unique states should all be set except for cancelled and discarded: + assert insert_params.unique_states == bytes([0b11110101]) def test_insert_with_unique_opts_by_state(client, mock_exec, simple_args): - insert_opts = InsertOpts(unique_opts=UniqueOpts(by_state=["available", "running"])) - - # slow path - mock_exec.job_get_by_kind_and_unique_properties.return_value = None - mock_exec.job_insert.return_value = "job_row" + # Turn on all unique states: + insert_opts = InsertOpts( + unique_opts=UniqueOpts( + by_state=[ + "available", + "cancelled", + "completed", + "discarded", + "pending", + "retryable", + "running", + "scheduled", + ] + ) + ) + mock_exec.job_insert_many.return_value = [("job_row", False)] insert_res = client.insert(simple_args, insert_opts=insert_opts) - mock_exec.job_insert.assert_called_once() + mock_exec.job_insert_many.assert_called_once() assert insert_res.job == "job_row" # Check that the UniqueOpts were correctly processed - call_args = mock_exec.job_insert.call_args[0][0] - assert call_args.kind == "simple" + call_args = mock_exec.job_insert_many.call_args[0][0] + assert len(call_args) == 1 + insert_params = call_args[0] + assert insert_params.kind == "simple" + assert insert_params.unique_states == bytes([0b11111111]) def test_insert_kind_error(client): @@ -263,7 +285,8 @@ def to_json() -> None: assert "args should return non-nil from `to_json`" == str(ex.value) -def test_tag_validation(client, simple_args): +def test_tag_validation(client, mock_exec, simple_args): + mock_exec.job_insert_many.return_value = [("job_row", False)] client.insert( simple_args, insert_opts=InsertOpts(tags=["foo", "bar", "baz", "foo-bar-baz"]) ) @@ -283,17 +306,73 @@ def test_tag_validation(client, simple_args): ) -def test_check_advisory_lock_prefix_bounds(): - Client(mock_driver, advisory_lock_prefix=123) - - with pytest.raises(OverflowError) as ex: - Client(mock_driver, advisory_lock_prefix=-1) - assert "can't convert negative int to unsigned" == str(ex.value) - - # 2^32-1 is 0xffffffff (1s for 32 bits) which fits - Client(mock_driver, advisory_lock_prefix=2**32 - 1) - - # 2^32 is 0x100000000, which does not - with pytest.raises(OverflowError) as ex: - Client(mock_driver, advisory_lock_prefix=2**32) - assert "int too big to convert" == str(ex.value) +@pytest.mark.parametrize( + "description, input_states, postgres_bitstring", + [ + # Postgres bitstrings are little-endian, so the MSB (AVAILABLE) is on the right. + ("No states selected", [], bytes([0b00000000])), + ("Single state - available", [JobState.AVAILABLE], bytes([0b00000001])), + ("Single state - SCHEDULED", [JobState.SCHEDULED], bytes([0b10000000])), + ("Single state - RUNNING", [JobState.RUNNING], bytes([0b01000000])), + ( + "AVAILABLE and SCHEDULED", + [JobState.AVAILABLE, JobState.SCHEDULED], + bytes([0b10000001]), + ), + ( + "COMPLETED, PENDING, RETRYABLE", + [JobState.COMPLETED, JobState.PENDING, JobState.RETRYABLE], + bytes([0b00110100]), + ), + ( + "Default states", + [ + JobState.AVAILABLE, + JobState.COMPLETED, + JobState.PENDING, + JobState.RETRYABLE, + JobState.RUNNING, + JobState.SCHEDULED, + ], + bytes([0b11110101]), + ), + ( + "All states selected", + [ + JobState.AVAILABLE, + JobState.CANCELLED, + JobState.COMPLETED, + JobState.DISCARDED, + JobState.PENDING, + JobState.RETRYABLE, + JobState.RUNNING, + JobState.SCHEDULED, + ], + bytes([0b11111111]), + ), + ( + "AVAILABLE, COMPLETED, RETRYABLE, SCHEDULED", + [ + JobState.AVAILABLE, + JobState.COMPLETED, + JobState.RETRYABLE, + JobState.SCHEDULED, + ], + bytes([0b10100101]), + ), + ( + "Overlapping states", + [JobState.AVAILABLE, JobState.AVAILABLE], + bytes([0b00000001]), + ), + ("None input treated as empty", None, bytes([0b00000000])), + ], +) +def test_unique_bitmask_from_states(description, input_states, postgres_bitstring): + if input_states is None: + input_states = [] + + result = unique_bitmask_from_states(input_states) + assert ( + result == postgres_bitstring + ), f"{description} For states {input_states}, expected {postgres_bitstring}, got {result}" diff --git a/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py b/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py index 90a3680..83a8f1e 100644 --- a/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py +++ b/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py @@ -204,7 +204,14 @@ async def test_insert_with_unique_opts_by_queue(self, client, simple_args): @pytest.mark.asyncio async def test_insert_with_unique_opts_by_state(self, client, simple_args): insert_opts = InsertOpts( - unique_opts=UniqueOpts(by_state=[JobState.AVAILABLE, JobState.RUNNING]) + unique_opts=UniqueOpts( + by_state=[ + JobState.AVAILABLE, + JobState.PENDING, + JobState.RUNNING, + JobState.SCHEDULED, + ] + ) ) insert_res = await client.insert(simple_args, insert_opts=insert_opts) @@ -238,7 +245,7 @@ async def test_insert_with_unique_opts_all_fast_path( @patch("datetime.datetime") @pytest.mark.asyncio - async def test_insert_with_unique_opts_all_slow_path( + async def test_insert_with_unique_opts_all( self, mock_datetime, client, simple_args ): mock_datetime.now.return_value = datetime( @@ -252,8 +259,11 @@ async def test_insert_with_unique_opts_all_slow_path( by_queue=True, by_state=[ JobState.AVAILABLE, + JobState.COMPLETED, + JobState.PENDING, JobState.RUNNING, - ], # non-default states activate slow path + JobState.SCHEDULED, + ], ) ) @@ -267,12 +277,14 @@ async def test_insert_with_unique_opts_all_slow_path( @pytest.mark.asyncio async def test_insert_many_with_only_args(self, client, simple_args): - num_inserted = await client.insert_many([simple_args]) - assert num_inserted == 1 + results = await client.insert_many([simple_args]) + assert len(results) == 1 + assert results[0].unique_skipped_as_duplicated is False + assert results[0].job.id > 0 @pytest.mark.asyncio async def test_insert_many_with_insert_opts(self, client, simple_args): - num_inserted = await client.insert_many( + results = await client.insert_many( [ InsertManyParams( args=simple_args, @@ -280,12 +292,16 @@ async def test_insert_many_with_insert_opts(self, client, simple_args): ) ] ) - assert num_inserted == 1 + assert len(results) == 1 + assert results[0].unique_skipped_as_duplicated is False + assert results[0].job.id > 0 @pytest.mark.asyncio async def test_insert_many_tx(self, client, simple_args, test_tx): - num_inserted = await client.insert_many_tx(test_tx, [simple_args]) - assert num_inserted == 1 + results = await client.insert_many_tx(test_tx, [simple_args]) + assert len(results) == 1 + assert results[0].unique_skipped_as_duplicated is False + assert results[0].job.id > 0 class TestSyncClient: @@ -381,11 +397,28 @@ def test_insert_with_unique_opts_by_queue(self, client, simple_args): def test_insert_with_unique_opts_by_state(self, client, simple_args): insert_opts = InsertOpts( - unique_opts=UniqueOpts(by_state=[JobState.AVAILABLE, JobState.RUNNING]) + unique_opts=UniqueOpts( + by_state=[ + JobState.AVAILABLE, + JobState.COMPLETED, + JobState.PENDING, + JobState.RETRYABLE, + JobState.RUNNING, + JobState.SCHEDULED, + ] + ) ) insert_res = client.insert(simple_args, insert_opts=insert_opts) assert insert_res.job + assert insert_res.job.unique_states == [ + JobState.AVAILABLE, + JobState.COMPLETED, + JobState.PENDING, + JobState.RETRYABLE, + JobState.RUNNING, + JobState.SCHEDULED, + ] assert not insert_res.unique_skipped_as_duplicated insert_res2 = client.insert(simple_args, insert_opts=insert_opts) @@ -413,9 +446,7 @@ def test_insert_with_unique_opts_all_fast_path( assert insert_res2.unique_skipped_as_duplicated @patch("datetime.datetime") - def test_insert_with_unique_opts_all_slow_path( - self, mock_datetime, client, simple_args - ): + def test_insert_with_unique_opts_all(self, mock_datetime, client, simple_args): mock_datetime.now.return_value = datetime( 2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc ) @@ -427,7 +458,10 @@ def test_insert_with_unique_opts_all_slow_path( by_queue=True, by_state=[ JobState.AVAILABLE, + JobState.COMPLETED, + JobState.PENDING, JobState.RUNNING, + JobState.SCHEDULED, ], # non-default states activate slow path ) ) @@ -441,11 +475,13 @@ def test_insert_with_unique_opts_all_slow_path( assert insert_res2.unique_skipped_as_duplicated def test_insert_many_with_only_args(self, client, simple_args): - num_inserted = client.insert_many([simple_args]) - assert num_inserted == 1 + results = client.insert_many([simple_args]) + assert len(results) == 1 + assert results[0].unique_skipped_as_duplicated is False + assert results[0].job.id > 0 def test_insert_many_with_insert_opts(self, client, simple_args): - num_inserted = client.insert_many( + results = client.insert_many( [ InsertManyParams( args=simple_args, @@ -453,8 +489,12 @@ def test_insert_many_with_insert_opts(self, client, simple_args): ) ] ) - assert num_inserted == 1 + assert len(results) == 1 + assert results[0].unique_skipped_as_duplicated is False + assert results[0].job.id > 0 def test_insert_many_tx(self, client, simple_args, test_tx): - num_inserted = client.insert_many_tx(test_tx, [simple_args]) - assert num_inserted == 1 + results = client.insert_many_tx(test_tx, [simple_args]) + assert len(results) == 1 + assert results[0].unique_skipped_as_duplicated is False + assert results[0].job.id > 0 diff --git a/tests/fnv_test.py b/tests/fnv_test.py deleted file mode 100644 index 5894b28..0000000 --- a/tests/fnv_test.py +++ /dev/null @@ -1,638 +0,0 @@ -from typing import Dict, List - -from riverqueue.fnv import fnv1_hash - - -def test_fnv1_32_bits(): - for test_str in __TEST_STRS: - assert fnv1_hash(test_str, 32) == __FNV1_32_HASHES[test_str] - - -def test_fnv1_64_bits(): - for test_str in __TEST_STRS: - assert fnv1_hash(test_str, 64) == __FNV1_64_HASHES[test_str] - - -# -# Test strings pulled from this test suite: -# -# https://github.com/znerol/py-fnvhash/blob/master/fnvhash/test/vector.py -# - -__TEST_STRS: List[bytes] = [ - b"", - b"a", - b"b", - b"c", - b"d", - b"e", - b"f", - b"fo", - b"foo", - b"foob", - b"fooba", - b"foobar", - b"" + b"\x00", - b"a" + b"\x00", - b"b" + b"\x00", - b"c" + b"\x00", - b"d" + b"\x00", - b"e" + b"\x00", - b"f" + b"\x00", - b"fo" + b"\x00", - b"foo" + b"\x00", - b"foob" + b"\x00", - b"fooba" + b"\x00", - b"foobar" + b"\x00", - b"ch", - b"cho", - b"chon", - b"chong", - b"chongo", - b"chongo ", - b"chongo w", - b"chongo wa", - b"chongo was", - b"chongo was ", - b"chongo was h", - b"chongo was he", - b"chongo was her", - b"chongo was here", - b"chongo was here!", - b"chongo was here!\n", - b"ch" + b"\x00", - b"cho" + b"\x00", - b"chon" + b"\x00", - b"chong" + b"\x00", - b"chongo" + b"\x00", - b"chongo " + b"\x00", - b"chongo w" + b"\x00", - b"chongo wa" + b"\x00", - b"chongo was" + b"\x00", - b"chongo was " + b"\x00", - b"chongo was h" + b"\x00", - b"chongo was he" + b"\x00", - b"chongo was her" + b"\x00", - b"chongo was here" + b"\x00", - b"chongo was here!" + b"\x00", - b"chongo was here!\n" + b"\x00", - b"cu", - b"cur", - b"curd", - b"curds", - b"curds ", - b"curds a", - b"curds an", - b"curds and", - b"curds and ", - b"curds and w", - b"curds and wh", - b"curds and whe", - b"curds and whey", - b"curds and whey\n", - b"cu" + b"\x00", - b"cur" + b"\x00", - b"curd" + b"\x00", - b"curds" + b"\x00", - b"curds " + b"\x00", - b"curds a" + b"\x00", - b"curds an" + b"\x00", - b"curds and" + b"\x00", - b"curds and " + b"\x00", - b"curds and w" + b"\x00", - b"curds and wh" + b"\x00", - b"curds and whe" + b"\x00", - b"curds and whey" + b"\x00", - b"curds and whey\n" + b"\x00", - b"hi", - b"hi" + b"\x00", - b"hello", - b"hello" + b"\x00", - b"\xff\x00\x00\x01", - b"\x01\x00\x00\xff", - b"\xff\x00\x00\x02", - b"\x02\x00\x00\xff", - b"\xff\x00\x00\x03", - b"\x03\x00\x00\xff", - b"\xff\x00\x00\x04", - b"\x04\x00\x00\xff", - b"\x40\x51\x4e\x44", - b"\x44\x4e\x51\x40", - b"\x40\x51\x4e\x4a", - b"\x4a\x4e\x51\x40", - b"\x40\x51\x4e\x54", - b"\x54\x4e\x51\x40", - b"127.0.0.1", - b"127.0.0.1" + b"\x00", - b"127.0.0.2", - b"127.0.0.2" + b"\x00", - b"127.0.0.3", - b"127.0.0.3" + b"\x00", - b"64.81.78.68", - b"64.81.78.68" + b"\x00", - b"64.81.78.74", - b"64.81.78.74" + b"\x00", - b"64.81.78.84", - b"64.81.78.84" + b"\x00", - b"feedface", - b"feedface" + b"\x00", - b"feedfacedaffdeed", - b"feedfacedaffdeed" + b"\x00", - b"feedfacedeadbeef", - b"feedfacedeadbeef" + b"\x00", - b"line 1\nline 2\nline 3", - b"chongo /\\../\\", - b"chongo /\\../\\" + b"\x00", - b"chongo (Landon Curt Noll) /\\../\\", - b"chongo (Landon Curt Noll) /\\../\\" + b"\x00", - b"http://antwrp.gsfc.nasa.gov/apod/astropix.html", - b"http://en.wikipedia.org/wiki/Fowler_Noll_Vo_hash", - b"http://epod.usra.edu/", - b"http://exoplanet.eu/", - b"http://hvo.wr.usgs.gov/cam3/", - b"http://hvo.wr.usgs.gov/cams/HMcam/", - b"http://hvo.wr.usgs.gov/kilauea/update/deformation.html", - b"http://hvo.wr.usgs.gov/kilauea/update/images.html", - b"http://hvo.wr.usgs.gov/kilauea/update/maps.html", - b"http://hvo.wr.usgs.gov/volcanowatch/current_issue.html", - b"http://neo.jpl.nasa.gov/risk/", - b"http://norvig.com/21-days.html", - b"http://primes.utm.edu/curios/home.php", - b"http://slashdot.org/", - b"http://tux.wr.usgs.gov/Maps/155.25-19.5.html", - b"http://volcano.wr.usgs.gov/kilaueastatus.php", - b"http://www.avo.alaska.edu/activity/Redoubt.php", - b"http://www.dilbert.com/fast/", - b"http://www.fourmilab.ch/gravitation/orbits/", - b"http://www.fpoa.net/", - b"http://www.ioccc.org/index.html", - b"http://www.isthe.com/cgi-bin/number.cgi", - b"http://www.isthe.com/chongo/bio.html", - b"http://www.isthe.com/chongo/index.html", - b"http://www.isthe.com/chongo/src/calc/lucas-calc", - b"http://www.isthe.com/chongo/tech/astro/venus2004.html", - b"http://www.isthe.com/chongo/tech/astro/vita.html", - b"http://www.isthe.com/chongo/tech/comp/c/expert.html", - b"http://www.isthe.com/chongo/tech/comp/calc/index.html", - b"http://www.isthe.com/chongo/tech/comp/fnv/index.html", - b"http://www.isthe.com/chongo/tech/math/number/howhigh.html", - b"http://www.isthe.com/chongo/tech/math/number/number.html", - b"http://www.isthe.com/chongo/tech/math/prime/mersenne.html", - b"http://www.isthe.com/chongo/tech/math/prime/mersenne.html#largest", - b"http://www.lavarnd.org/cgi-bin/corpspeak.cgi", - b"http://www.lavarnd.org/cgi-bin/haiku.cgi", - b"http://www.lavarnd.org/cgi-bin/rand-none.cgi", - b"http://www.lavarnd.org/cgi-bin/randdist.cgi", - b"http://www.lavarnd.org/index.html", - b"http://www.lavarnd.org/what/nist-test.html", - b"http://www.macosxhints.com/", - b"http://www.mellis.com/", - b"http://www.nature.nps.gov/air/webcams/parks/havoso2alert/havoalert.cfm", - b"http://www.nature.nps.gov/air/webcams/parks/havoso2alert/timelines_24.cfm", - b"http://www.paulnoll.com/", - b"http://www.pepysdiary.com/", - b"http://www.sciencenews.org/index/home/activity/view", - b"http://www.skyandtelescope.com/", - b"http://www.sput.nl/~rob/sirius.html", - b"http://www.systemexperts.com/", - b"http://www.tq-international.com/phpBB3/index.php", - b"http://www.travelquesttours.com/index.htm", - b"http://www.wunderground.com/global/stations/89606.html", - b"21701" * 10, - b"M21701" * 10, - b"2^21701-1" * 10, - b"\x54\xc5" * 10, - b"\xc5\x54" * 10, - b"23209" * 10, - b"M23209" * 10, - b"2^23209-1" * 10, - b"\x5a\xa9" * 10, - b"\xa9\x5a" * 10, - b"391581216093" * 10, - b"391581*2^216093-1" * 10, - b"\x05\xf9\x9d\x03\x4c\x81" * 10, - b"FEDCBA9876543210" * 10, - b"\xfe\xdc\xba\x98\x76\x54\x32\x10" * 10, - b"EFCDAB8967452301" * 10, - b"\xef\xcd\xab\x89\x67\x45\x23\x01" * 10, - b"0123456789ABCDEF" * 10, - b"\x01\x23\x45\x67\x89\xab\xcd\xef" * 10, - b"1032547698BADCFE" * 10, - b"\x10\x32\x54\x76\x98\xba\xdc\xfe" * 10, - b"\x00" * 500, - b"\x07" * 500, - b"~" * 500, - b"\x7f" * 500, -] - -__FNV1_32_HASHES: Dict[bytes, int] = { - __TEST_STRS[0]: 0x811C9DC5, - __TEST_STRS[1]: 0x050C5D7E, - __TEST_STRS[2]: 0x050C5D7D, - __TEST_STRS[3]: 0x050C5D7C, - __TEST_STRS[4]: 0x050C5D7B, - __TEST_STRS[5]: 0x050C5D7A, - __TEST_STRS[6]: 0x050C5D79, - __TEST_STRS[7]: 0x6B772514, - __TEST_STRS[8]: 0x408F5E13, - __TEST_STRS[9]: 0xB4B1178B, - __TEST_STRS[10]: 0xFDC80FB0, - __TEST_STRS[11]: 0x31F0B262, - __TEST_STRS[12]: 0x050C5D1F, - __TEST_STRS[13]: 0x70772D5A, - __TEST_STRS[14]: 0x6F772BC7, - __TEST_STRS[15]: 0x6E772A34, - __TEST_STRS[16]: 0x6D7728A1, - __TEST_STRS[17]: 0x6C77270E, - __TEST_STRS[18]: 0x6B77257B, - __TEST_STRS[19]: 0x408F5E7C, - __TEST_STRS[20]: 0xB4B117E9, - __TEST_STRS[21]: 0xFDC80FD1, - __TEST_STRS[22]: 0x31F0B210, - __TEST_STRS[23]: 0xFFE8D046, - __TEST_STRS[24]: 0x6E772A5C, - __TEST_STRS[25]: 0x4197AEBB, - __TEST_STRS[26]: 0xFCC8100F, - __TEST_STRS[27]: 0xFDF147FA, - __TEST_STRS[28]: 0xBCD44EE1, - __TEST_STRS[29]: 0x23382C13, - __TEST_STRS[30]: 0x846D619E, - __TEST_STRS[31]: 0x1630ABDB, - __TEST_STRS[32]: 0xC99E89B2, - __TEST_STRS[33]: 0x1692C316, - __TEST_STRS[34]: 0x9F091BCA, - __TEST_STRS[35]: 0x2556BE9B, - __TEST_STRS[36]: 0x628E0E73, - __TEST_STRS[37]: 0x98A0BF6C, - __TEST_STRS[38]: 0xB10D5725, - __TEST_STRS[39]: 0xDD002F35, - __TEST_STRS[40]: 0x4197AED4, - __TEST_STRS[41]: 0xFCC81061, - __TEST_STRS[42]: 0xFDF1479D, - __TEST_STRS[43]: 0xBCD44E8E, - __TEST_STRS[44]: 0x23382C33, - __TEST_STRS[45]: 0x846D61E9, - __TEST_STRS[46]: 0x1630ABBA, - __TEST_STRS[47]: 0xC99E89C1, - __TEST_STRS[48]: 0x1692C336, - __TEST_STRS[49]: 0x9F091BA2, - __TEST_STRS[50]: 0x2556BEFE, - __TEST_STRS[51]: 0x628E0E01, - __TEST_STRS[52]: 0x98A0BF09, - __TEST_STRS[53]: 0xB10D5704, - __TEST_STRS[54]: 0xDD002F3F, - __TEST_STRS[55]: 0x1C4A506F, - __TEST_STRS[56]: 0x6E772A41, - __TEST_STRS[57]: 0x26978421, - __TEST_STRS[58]: 0xE184FF97, - __TEST_STRS[59]: 0x9B5E5AC6, - __TEST_STRS[60]: 0x5B88E592, - __TEST_STRS[61]: 0xAA8164B7, - __TEST_STRS[62]: 0x20B18C7B, - __TEST_STRS[63]: 0xF28025C5, - __TEST_STRS[64]: 0x84BB753F, - __TEST_STRS[65]: 0x3219925A, - __TEST_STRS[66]: 0x384163C6, - __TEST_STRS[67]: 0x54F010D7, - __TEST_STRS[68]: 0x8CEA820C, - __TEST_STRS[69]: 0xE12AB8EE, - __TEST_STRS[70]: 0x26978453, - __TEST_STRS[71]: 0xE184FFF3, - __TEST_STRS[72]: 0x9B5E5AB5, - __TEST_STRS[73]: 0x5B88E5B2, - __TEST_STRS[74]: 0xAA8164D6, - __TEST_STRS[75]: 0x20B18C15, - __TEST_STRS[76]: 0xF28025A1, - __TEST_STRS[77]: 0x84BB751F, - __TEST_STRS[78]: 0x3219922D, - __TEST_STRS[79]: 0x384163AE, - __TEST_STRS[80]: 0x54F010B2, - __TEST_STRS[81]: 0x8CEA8275, - __TEST_STRS[82]: 0xE12AB8E4, - __TEST_STRS[83]: 0x64411EAA, - __TEST_STRS[84]: 0x6977223C, - __TEST_STRS[85]: 0x428AE474, - __TEST_STRS[86]: 0xB6FA7167, - __TEST_STRS[87]: 0x73408525, - __TEST_STRS[88]: 0xB78320A1, - __TEST_STRS[89]: 0x0CAF4135, - __TEST_STRS[90]: 0xB78320A2, - __TEST_STRS[91]: 0xCDC88E80, - __TEST_STRS[92]: 0xB78320A3, - __TEST_STRS[93]: 0x8EE1DBCB, - __TEST_STRS[94]: 0xB78320A4, - __TEST_STRS[95]: 0x4FFB2716, - __TEST_STRS[96]: 0x860632AA, - __TEST_STRS[97]: 0xCC2C5C64, - __TEST_STRS[98]: 0x860632A4, - __TEST_STRS[99]: 0x2A7EC4A6, - __TEST_STRS[100]: 0x860632BA, - __TEST_STRS[101]: 0xFEFE8E14, - __TEST_STRS[102]: 0x0A3CFFD8, - __TEST_STRS[103]: 0xF606C108, - __TEST_STRS[104]: 0x0A3CFFDB, - __TEST_STRS[105]: 0xF906C5C1, - __TEST_STRS[106]: 0x0A3CFFDA, - __TEST_STRS[107]: 0xF806C42E, - __TEST_STRS[108]: 0xC07167D7, - __TEST_STRS[109]: 0xC9867775, - __TEST_STRS[110]: 0xBF716668, - __TEST_STRS[111]: 0xC78435B8, - __TEST_STRS[112]: 0xC6717155, - __TEST_STRS[113]: 0xB99568CF, - __TEST_STRS[114]: 0x7662E0D6, - __TEST_STRS[115]: 0x33A7F0E2, - __TEST_STRS[116]: 0xC2732F95, - __TEST_STRS[117]: 0xB053E78F, - __TEST_STRS[118]: 0x3A19C02A, - __TEST_STRS[119]: 0xA089821E, - __TEST_STRS[120]: 0x31AE8F83, - __TEST_STRS[121]: 0x995FA9C4, - __TEST_STRS[122]: 0x35983F8C, - __TEST_STRS[123]: 0x5036A251, - __TEST_STRS[124]: 0x97018583, - __TEST_STRS[125]: 0xB4448D60, - __TEST_STRS[126]: 0x025DFE59, - __TEST_STRS[127]: 0xC5EAB3AF, - __TEST_STRS[128]: 0x7D21BA1E, - __TEST_STRS[129]: 0x7704CDDB, - __TEST_STRS[130]: 0xD0071BFE, - __TEST_STRS[131]: 0x0FF3774C, - __TEST_STRS[132]: 0xB0FEA0EA, - __TEST_STRS[133]: 0x58177303, - __TEST_STRS[134]: 0x4F599CDA, - __TEST_STRS[135]: 0x3E590A47, - __TEST_STRS[136]: 0x965595F8, - __TEST_STRS[137]: 0xC37F178D, - __TEST_STRS[138]: 0x9711DD26, - __TEST_STRS[139]: 0x23C99B7F, - __TEST_STRS[140]: 0x6E568B17, - __TEST_STRS[141]: 0x43F0245B, - __TEST_STRS[142]: 0xBCB7A001, - __TEST_STRS[143]: 0x12E6DFFE, - __TEST_STRS[144]: 0x0792F2D6, - __TEST_STRS[145]: 0xB966936B, - __TEST_STRS[146]: 0x46439AC5, - __TEST_STRS[147]: 0x728D49AF, - __TEST_STRS[148]: 0xD33745C9, - __TEST_STRS[149]: 0xBC382A57, - __TEST_STRS[150]: 0x4BDA1D31, - __TEST_STRS[151]: 0xCE35CCAE, - __TEST_STRS[152]: 0x3B6EED94, - __TEST_STRS[153]: 0x445C9C58, - __TEST_STRS[154]: 0x3DB8BF9D, - __TEST_STRS[155]: 0x2DEE116D, - __TEST_STRS[156]: 0xC18738DA, - __TEST_STRS[157]: 0x5B156176, - __TEST_STRS[158]: 0x2AA7D593, - __TEST_STRS[159]: 0xB2409658, - __TEST_STRS[160]: 0xE1489528, - __TEST_STRS[161]: 0xFE1EE07E, - __TEST_STRS[162]: 0xE8842315, - __TEST_STRS[163]: 0x3A6A63A2, - __TEST_STRS[164]: 0x06D2C18C, - __TEST_STRS[165]: 0xF8EF7225, - __TEST_STRS[166]: 0x843D3300, - __TEST_STRS[167]: 0xBB24F7AE, - __TEST_STRS[168]: 0x878C0EC9, - __TEST_STRS[169]: 0xB557810F, - __TEST_STRS[170]: 0x57423246, - __TEST_STRS[171]: 0x87F7505E, - __TEST_STRS[172]: 0xBB809F20, - __TEST_STRS[173]: 0x8932ABB5, - __TEST_STRS[174]: 0x0A9B3AA0, - __TEST_STRS[175]: 0xB8682A24, - __TEST_STRS[176]: 0xA7AC1C56, - __TEST_STRS[177]: 0x11409252, - __TEST_STRS[178]: 0xA987F517, - __TEST_STRS[179]: 0xF309E7ED, - __TEST_STRS[180]: 0xC9E8F417, - __TEST_STRS[181]: 0x7F447BDD, - __TEST_STRS[182]: 0xB929ADC5, - __TEST_STRS[183]: 0x57022879, - __TEST_STRS[184]: 0xDCFD2C49, - __TEST_STRS[185]: 0x6EDAFFF5, - __TEST_STRS[186]: 0xF04FB1F1, - __TEST_STRS[187]: 0xFB7DE8B9, - __TEST_STRS[188]: 0xC5F1D7E9, - __TEST_STRS[189]: 0x32C1F439, - __TEST_STRS[190]: 0x7FD3EB7D, - __TEST_STRS[191]: 0x81597DA5, - __TEST_STRS[192]: 0x05EB7A25, - __TEST_STRS[193]: 0x9C0FA1B5, - __TEST_STRS[194]: 0x53CCB1C5, - __TEST_STRS[195]: 0xFABECE15, - __TEST_STRS[196]: 0x4AD745A5, - __TEST_STRS[197]: 0xE5BDC495, - __TEST_STRS[198]: 0x23B3C0A5, - __TEST_STRS[199]: 0xFA823DD5, - __TEST_STRS[200]: 0x0C6C58B9, - __TEST_STRS[201]: 0xE2DBCCD5, - __TEST_STRS[202]: 0xDB7F50F9, -} - -__FNV1_64_HASHES: Dict[bytes, int] = { - __TEST_STRS[0]: 0xCBF29CE484222325, - __TEST_STRS[1]: 0xAF63BD4C8601B7BE, - __TEST_STRS[2]: 0xAF63BD4C8601B7BD, - __TEST_STRS[3]: 0xAF63BD4C8601B7BC, - __TEST_STRS[4]: 0xAF63BD4C8601B7BB, - __TEST_STRS[5]: 0xAF63BD4C8601B7BA, - __TEST_STRS[6]: 0xAF63BD4C8601B7B9, - __TEST_STRS[7]: 0x08326207B4EB2F34, - __TEST_STRS[8]: 0xD8CBC7186BA13533, - __TEST_STRS[9]: 0x0378817EE2ED65CB, - __TEST_STRS[10]: 0xD329D59B9963F790, - __TEST_STRS[11]: 0x340D8765A4DDA9C2, - __TEST_STRS[12]: 0xAF63BD4C8601B7DF, - __TEST_STRS[13]: 0x08326707B4EB37DA, - __TEST_STRS[14]: 0x08326607B4EB3627, - __TEST_STRS[15]: 0x08326507B4EB3474, - __TEST_STRS[16]: 0x08326407B4EB32C1, - __TEST_STRS[17]: 0x08326307B4EB310E, - __TEST_STRS[18]: 0x08326207B4EB2F5B, - __TEST_STRS[19]: 0xD8CBC7186BA1355C, - __TEST_STRS[20]: 0x0378817EE2ED65A9, - __TEST_STRS[21]: 0xD329D59B9963F7F1, - __TEST_STRS[22]: 0x340D8765A4DDA9B0, - __TEST_STRS[23]: 0x50A6D3B724A774A6, - __TEST_STRS[24]: 0x08326507B4EB341C, - __TEST_STRS[25]: 0xD8D5C8186BA98BFB, - __TEST_STRS[26]: 0x1CCEFC7EF118DBEF, - __TEST_STRS[27]: 0x0C92FAB3AD3DB77A, - __TEST_STRS[28]: 0x9B77794F5FDEC421, - __TEST_STRS[29]: 0x0AC742DFE7874433, - __TEST_STRS[30]: 0xD7DAD5766AD8E2DE, - __TEST_STRS[31]: 0xA1BB96378E897F5B, - __TEST_STRS[32]: 0x5B3F9B6733A367D2, - __TEST_STRS[33]: 0xB07CE25CBEA969F6, - __TEST_STRS[34]: 0x8D9E9997F9DF0D6A, - __TEST_STRS[35]: 0x838C673D9603CB7B, - __TEST_STRS[36]: 0x8B5EE8A5E872C273, - __TEST_STRS[37]: 0x4507C4E9FB00690C, - __TEST_STRS[38]: 0x4C9CA59581B27F45, - __TEST_STRS[39]: 0xE0ACA20B624E4235, - __TEST_STRS[40]: 0xD8D5C8186BA98B94, - __TEST_STRS[41]: 0x1CCEFC7EF118DB81, - __TEST_STRS[42]: 0x0C92FAB3AD3DB71D, - __TEST_STRS[43]: 0x9B77794F5FDEC44E, - __TEST_STRS[44]: 0x0AC742DFE7874413, - __TEST_STRS[45]: 0xD7DAD5766AD8E2A9, - __TEST_STRS[46]: 0xA1BB96378E897F3A, - __TEST_STRS[47]: 0x5B3F9B6733A367A1, - __TEST_STRS[48]: 0xB07CE25CBEA969D6, - __TEST_STRS[49]: 0x8D9E9997F9DF0D02, - __TEST_STRS[50]: 0x838C673D9603CB1E, - __TEST_STRS[51]: 0x8B5EE8A5E872C201, - __TEST_STRS[52]: 0x4507C4E9FB006969, - __TEST_STRS[53]: 0x4C9CA59581B27F64, - __TEST_STRS[54]: 0xE0ACA20B624E423F, - __TEST_STRS[55]: 0x13998E580AFA800F, - __TEST_STRS[56]: 0x08326507B4EB3401, - __TEST_STRS[57]: 0xD8D5AD186BA95DC1, - __TEST_STRS[58]: 0x1C72E17EF0CA4E97, - __TEST_STRS[59]: 0x2183C1B327C38AE6, - __TEST_STRS[60]: 0xB66D096C914504F2, - __TEST_STRS[61]: 0x404BF57AD8476757, - __TEST_STRS[62]: 0x887976BD815498BB, - __TEST_STRS[63]: 0x3AFD7F02C2BF85A5, - __TEST_STRS[64]: 0xFC4476B0EB70177F, - __TEST_STRS[65]: 0x186D2DA00F77ECBA, - __TEST_STRS[66]: 0xF97140FA48C74066, - __TEST_STRS[67]: 0xA2B1CF49AA926D37, - __TEST_STRS[68]: 0x0690712CD6CF940C, - __TEST_STRS[69]: 0xF7045B3102B8906E, - __TEST_STRS[70]: 0xD8D5AD186BA95DB3, - __TEST_STRS[71]: 0x1C72E17EF0CA4EF3, - __TEST_STRS[72]: 0x2183C1B327C38A95, - __TEST_STRS[73]: 0xB66D096C914504D2, - __TEST_STRS[74]: 0x404BF57AD8476736, - __TEST_STRS[75]: 0x887976BD815498D5, - __TEST_STRS[76]: 0x3AFD7F02C2BF85C1, - __TEST_STRS[77]: 0xFC4476B0EB70175F, - __TEST_STRS[78]: 0x186D2DA00F77ECCD, - __TEST_STRS[79]: 0xF97140FA48C7400E, - __TEST_STRS[80]: 0xA2B1CF49AA926D52, - __TEST_STRS[81]: 0x0690712CD6CF9475, - __TEST_STRS[82]: 0xF7045B3102B89064, - __TEST_STRS[83]: 0x74F762479F9D6AEA, - __TEST_STRS[84]: 0x08326007B4EB2B9C, - __TEST_STRS[85]: 0xD8C4C9186B9B1A14, - __TEST_STRS[86]: 0x7B495389BDBDD4C7, - __TEST_STRS[87]: 0x3B6DBA0D69908E25, - __TEST_STRS[88]: 0xD6B2B17BF4B71261, - __TEST_STRS[89]: 0x447BFB7F98E615B5, - __TEST_STRS[90]: 0xD6B2B17BF4B71262, - __TEST_STRS[91]: 0x3BD2807F93FE1660, - __TEST_STRS[92]: 0xD6B2B17BF4B71263, - __TEST_STRS[93]: 0x3329057F8F16170B, - __TEST_STRS[94]: 0xD6B2B17BF4B71264, - __TEST_STRS[95]: 0x2A7F8A7F8A2E19B6, - __TEST_STRS[96]: 0x23D3767E64B2F98A, - __TEST_STRS[97]: 0xFF768D7E4F9D86A4, - __TEST_STRS[98]: 0x23D3767E64B2F984, - __TEST_STRS[99]: 0xCCD1837E334E4AA6, - __TEST_STRS[100]: 0x23D3767E64B2F99A, - __TEST_STRS[101]: 0x7691FD7E028F6754, - __TEST_STRS[102]: 0x34AD3B1041204318, - __TEST_STRS[103]: 0xA29E749EA9D201C8, - __TEST_STRS[104]: 0x34AD3B104120431B, - __TEST_STRS[105]: 0xA29E779EA9D206E1, - __TEST_STRS[106]: 0x34AD3B104120431A, - __TEST_STRS[107]: 0xA29E769EA9D2052E, - __TEST_STRS[108]: 0x02A17EBCA4AA3497, - __TEST_STRS[109]: 0x229EF18BCD375C95, - __TEST_STRS[110]: 0x02A17DBCA4AA32C8, - __TEST_STRS[111]: 0x229B6F8BCD3449D8, - __TEST_STRS[112]: 0x02A184BCA4AA3ED5, - __TEST_STRS[113]: 0x22B3618BCD48C3EF, - __TEST_STRS[114]: 0x5C2C346706186F36, - __TEST_STRS[115]: 0xB78C410F5B84F8C2, - __TEST_STRS[116]: 0xED9478212B267395, - __TEST_STRS[117]: 0xD9BBB55C5256662F, - __TEST_STRS[118]: 0x8C54F0203249438A, - __TEST_STRS[119]: 0xBD9790B5727DC37E, - __TEST_STRS[120]: 0xA64E5F36C9E2B0E3, - __TEST_STRS[121]: 0x8FD0680DA3088A04, - __TEST_STRS[122]: 0x67AAD32C078284CC, - __TEST_STRS[123]: 0xB37D55D81C57B331, - __TEST_STRS[124]: 0x55AC0F3829057C43, - __TEST_STRS[125]: 0xCB27F4B8E1B6CC20, - __TEST_STRS[126]: 0x26CAF88BCBEF2D19, - __TEST_STRS[127]: 0x8E6E063B97E61B8F, - __TEST_STRS[128]: 0xB42750F7F3B7C37E, - __TEST_STRS[129]: 0xF3C6BA64CF7CA99B, - __TEST_STRS[130]: 0xEBFB69B427EA80FE, - __TEST_STRS[131]: 0x39B50C3ED970F46C, - __TEST_STRS[132]: 0x5B9B177AA3EB3E8A, - __TEST_STRS[133]: 0x6510063ECF4EC903, - __TEST_STRS[134]: 0x2B3BBD2C00797C7A, - __TEST_STRS[135]: 0xF1D6204FF5CB4AA7, - __TEST_STRS[136]: 0x4836E27CCF099F38, - __TEST_STRS[137]: 0x82EFBB0DD073B44D, - __TEST_STRS[138]: 0x4A80C282FFD7D4C6, - __TEST_STRS[139]: 0x305D1A9C9EE43BDF, - __TEST_STRS[140]: 0x15C366948FFC6997, - __TEST_STRS[141]: 0x80153AE218916E7B, - __TEST_STRS[142]: 0xFA23E2BDF9E2A9E1, - __TEST_STRS[143]: 0xD47E8D8A2333C6DE, - __TEST_STRS[144]: 0x7E128095F688B056, - __TEST_STRS[145]: 0x2F5356890EFCEDAB, - __TEST_STRS[146]: 0x95C2B383014F55C5, - __TEST_STRS[147]: 0x4727A5339CE6070F, - __TEST_STRS[148]: 0xB0555ECD575108E9, - __TEST_STRS[149]: 0x48D785770BB4AF37, - __TEST_STRS[150]: 0x09D4701C12AF02B1, - __TEST_STRS[151]: 0x79F031E78F3CF62E, - __TEST_STRS[152]: 0x52A1EE85DB1B5A94, - __TEST_STRS[153]: 0x6BD95B2EB37FA6B8, - __TEST_STRS[154]: 0x74971B7077AEF85D, - __TEST_STRS[155]: 0xB4E4FAE2FFCC1AAD, - __TEST_STRS[156]: 0x2BD48BD898B8F63A, - __TEST_STRS[157]: 0xE9966AC1556257F6, - __TEST_STRS[158]: 0x92A3D1CD078BA293, - __TEST_STRS[159]: 0xF81175A482E20AB8, - __TEST_STRS[160]: 0x5BBB3DE722E73048, - __TEST_STRS[161]: 0x6B4F363492B9F2BE, - __TEST_STRS[162]: 0xC2D559DF73D59875, - __TEST_STRS[163]: 0xF75F62284BC7A8C2, - __TEST_STRS[164]: 0xDA8DD8E116A9F1CC, - __TEST_STRS[165]: 0xBDC1E6AB76057885, - __TEST_STRS[166]: 0xFEC6A4238A1224A0, - __TEST_STRS[167]: 0xC03F40F3223E290E, - __TEST_STRS[168]: 0x1ED21673466FFDA9, - __TEST_STRS[169]: 0xDF70F906BB0DD2AF, - __TEST_STRS[170]: 0xF3DCDA369F2AF666, - __TEST_STRS[171]: 0x9EBB11573CDCEBDE, - __TEST_STRS[172]: 0x81C72D9077FEDCA0, - __TEST_STRS[173]: 0x0EC074A31BE5FB15, - __TEST_STRS[174]: 0x2A8B3280B6C48F20, - __TEST_STRS[175]: 0xFD31777513309344, - __TEST_STRS[176]: 0x194534A86AD006B6, - __TEST_STRS[177]: 0x3BE6FDF46E0CFE12, - __TEST_STRS[178]: 0x017CC137A07EB057, - __TEST_STRS[179]: 0x9428FC6E7D26B54D, - __TEST_STRS[180]: 0x9AAA2E3603EF8AD7, - __TEST_STRS[181]: 0x82C6D3F3A0CCDF7D, - __TEST_STRS[182]: 0xC86EEEA00CF09B65, - __TEST_STRS[183]: 0x705F8189DBB58299, - __TEST_STRS[184]: 0x415A7F554391CA69, - __TEST_STRS[185]: 0xCFE3D49FA2BDC555, - __TEST_STRS[186]: 0xF0F9C56039B25191, - __TEST_STRS[187]: 0x7075CB6ABD1D32D9, - __TEST_STRS[188]: 0x43C94E2C8B277509, - __TEST_STRS[189]: 0x3CBFD4E4EA670359, - __TEST_STRS[190]: 0xC05887810F4D019D, - __TEST_STRS[191]: 0x14468FF93AC22DC5, - __TEST_STRS[192]: 0xEBED699589D99C05, - __TEST_STRS[193]: 0x6D99F6DF321CA5D5, - __TEST_STRS[194]: 0x0CD410D08C36D625, - __TEST_STRS[195]: 0xEF1B2A2C86831D35, - __TEST_STRS[196]: 0x3B349C4D69EE5F05, - __TEST_STRS[197]: 0x55248CE88F45F035, - __TEST_STRS[198]: 0xAA69CA6A18A4C885, - __TEST_STRS[199]: 0x1FE3FCE62BD816B5, - __TEST_STRS[200]: 0x0289A488A8DF69D9, - __TEST_STRS[201]: 0x15E96E1613DF98B5, - __TEST_STRS[202]: 0xE6BE57375AD89B99, -} From b583535ff18976e8a2cf3511c18d6f99a8542374 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 26 Feb 2025 19:39:55 -0600 Subject: [PATCH 2/5] sort args before hashing, support partial arg extraction --- src/riverqueue/client.py | 16 +- tests/client_test.py | 154 +++++++++++++++++- .../riversqlalchemy/sqlalchemy_driver_test.py | 4 + 3 files changed, 165 insertions(+), 9 deletions(-) diff --git a/src/riverqueue/client.py b/src/riverqueue/client.py index 0b1fea4..596bcfd 100644 --- a/src/riverqueue/client.py +++ b/src/riverqueue/client.py @@ -8,6 +8,7 @@ List, runtime_checkable, ) +import json from riverqueue.insert_opts import InsertOpts, UniqueOpts @@ -559,7 +560,20 @@ def _build_unique_key_and_bitmask( if unique_opts.by_args: any_unique_opts = True - unique_key += f"&args={insert_params.args}" + + # Re-parse the args JSON for sorting and potentially filtering: + args_dict = json.loads(insert_params.args) + + args_to_include = args_dict + if unique_opts.by_args is not True: + # Filter to include only the specified keys: + args_to_include = { + key: args_dict[key] for key in unique_opts.by_args if key in args_dict + } + + # Serialize with sorted keys and append to unique key: + sorted_args = json.dumps(args_to_include, sort_keys=True) + unique_key += f"&args={sorted_args}" if unique_opts.by_period: lower_period_bound = _truncate_time( diff --git a/tests/client_test.py b/tests/client_test.py index 2450b71..7acd5eb 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from datetime import datetime, timezone from unittest.mock import MagicMock, patch +import json import pytest @@ -225,14 +226,14 @@ def test_insert_with_unique_opts_by_state(client, mock_exec, simple_args): insert_opts = InsertOpts( unique_opts=UniqueOpts( by_state=[ - "available", - "cancelled", - "completed", - "discarded", - "pending", - "retryable", - "running", - "scheduled", + JobState.AVAILABLE, + JobState.CANCELLED, + JobState.COMPLETED, + JobState.DISCARDED, + JobState.PENDING, + JobState.RETRYABLE, + JobState.RUNNING, + JobState.SCHEDULED, ] ) ) @@ -251,6 +252,143 @@ def test_insert_with_unique_opts_by_state(client, mock_exec, simple_args): assert insert_params.unique_states == bytes([0b11111111]) +def test_insert_with_unique_opts_by_args_true(client, mock_exec, simple_args): + """Test that by_args=True uses full args with sorted keys""" + mock_exec.job_insert_many.return_value = [("job_row", False)] + + # Call with by_args=True + insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=True)) + + insert_res = client.insert(simple_args, insert_opts=insert_opts) + + mock_exec.job_insert_many.assert_called_once() + assert insert_res.job == "job_row" + + # Verify the by_args=True was properly handled + call_args = mock_exec.job_insert_many.call_args[0][0] + assert len(call_args) == 1 + insert_params = call_args[0] + assert insert_params.unique_key is not None + + +def test_insert_with_unique_opts_by_args_sorting( + client: Client, mock_exec: MagicMock +) -> None: + """Test that different key order in args produces the same unique key""" + mock_exec.job_insert_many.side_effect = [ + [("job_row1", False)], + [("job_row2", False)], + ] + + @dataclass + class JsonArgs: + kind: str = "ordered" + json_str: str = "" + + def to_json(self) -> str: + return self.json_str + + # Insert with different key orders + insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=True)) + + # Same data with different key orders + ordered_json = '{"a": 1, "b": 2, "c": 3}' + reverse_ordered_json = '{"c": 3, "b": 2, "a": 1}' + + insert_res1 = client.insert( + JsonArgs(json_str=ordered_json), insert_opts=insert_opts + ) + insert_res2 = client.insert( + JsonArgs(json_str=reverse_ordered_json), insert_opts=insert_opts + ) + + # Get the unique keys that were generated + call_args1 = mock_exec.job_insert_many.call_args_list[0][0][0] # type: ignore[index] + call_args2 = mock_exec.job_insert_many.call_args_list[1][0][0] # type: ignore[index] + + # The unique keys should be identical despite different order in original JSON + assert call_args1[0].unique_key == call_args2[0].unique_key + + +def test_insert_with_unique_opts_by_args_partial_keys( + client: Client, mock_exec: MagicMock +) -> None: + """Test that by_args with keys extracts only specified keys, even from nested objects""" + mock_exec.job_insert_many.return_value = [("job_row", False)] + + @dataclass + class JsonArgs: + kind: str = "partial" + json_str: str = "" + + def to_json(self) -> str: + return self.json_str + + args1 = json.dumps( + { + "a": "value", + "b": "foo", + "c": { + "d": "bar", + }, + "e": "ignore_this", + } + ) + + # Same data as args1 except for omitted `e`, and reordered keys. It's a duplicate: + args2 = json.dumps( + { + "c": { + "d": "bar", + }, + "b": "foo", + "a": "value", + } + ) + + # Missing `c`, so it's not a duplicate: + args3 = json.dumps( + { + "a": "value", + "b": "foo", + "d": "something else", # Omitted + } + ) + + args4 = json.dumps( + { + "b": "foo", + "a": "value", + "e": "bar", # Omitted + } + ) + + # Filter by a, b, and c: + insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=["a", "b", "c"])) + + client.insert(JsonArgs(json_str=args1), insert_opts=insert_opts) + client.insert(JsonArgs(json_str=args2), insert_opts=insert_opts) + client.insert(JsonArgs(json_str=args3), insert_opts=insert_opts) + client.insert(JsonArgs(json_str=args4), insert_opts=insert_opts) + + # Parse args to verify filtering + call_args_1 = mock_exec.job_insert_many.call_args_list[0][0][0] # type: ignore[index] + insert_params_1 = call_args_1[0] + call_args_2 = mock_exec.job_insert_many.call_args_list[1][0][0] # type: ignore[index] + insert_params_2 = call_args_2[0] + call_args_3 = mock_exec.job_insert_many.call_args_list[2][0][0] # type: ignore[index] + insert_params_3 = call_args_3[0] + call_args_4 = mock_exec.job_insert_many.call_args_list[3][0][0] # type: ignore[index] + insert_params_4 = call_args_4[0] + + # Check that the keys were filtered correctly + assert insert_params_1.unique_key == insert_params_2.unique_key + # args3 is missing `c`, so it's not a duplicate: + assert insert_params_1.unique_key != insert_params_3.unique_key + # args3 and args4 are both the same when only looking at the filtered keys: + assert insert_params_3.unique_key == insert_params_4.unique_key + + def test_insert_kind_error(client): @dataclass class MyArgs: diff --git a/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py b/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py index 83a8f1e..0997a3f 100644 --- a/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py +++ b/tests/driver/riversqlalchemy/sqlalchemy_driver_test.py @@ -157,6 +157,8 @@ async def test_insert_with_opts(self, client, simple_args): insert_opts = InsertOpts(queue="high_priority", unique_opts=None) insert_res = await client.insert(simple_args, insert_opts=insert_opts) assert insert_res.job + assert insert_res.job.unique_key is None + assert insert_res.job.unique_states is None @pytest.mark.asyncio async def test_insert_with_unique_opts_by_args(self, client, simple_args): @@ -352,6 +354,8 @@ def test_insert_with_opts(self, client, simple_args): insert_opts = InsertOpts(queue="high_priority", unique_opts=None) insert_res = client.insert(simple_args, insert_opts=insert_opts) assert insert_res.job + assert insert_res.job.unique_key is None + assert insert_res.job.unique_states is None def test_insert_with_unique_opts_by_args(self, client, simple_args): print("self", self) From ea8b94b34665819788ea3a64c96edeace6ad4757 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 26 Feb 2025 20:06:16 -0600 Subject: [PATCH 3/5] work around sqlc nullable array value type issue --- src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py | 4 ++-- src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql | 4 ++-- src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py | 6 ++++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py index 7ffff94..2aa174c 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py @@ -52,9 +52,9 @@ -- query. string_to_array(unnest(:p9\\:\\:text[]), ','), - unnest(:p10\\:\\:bytea[]), + nullif(unnest(:p10\\:\\:bytea[]), ''), -- Strings of bits are used for the input type here to make sqlalchemy play nicely with bit(8)\\: - unnest(:p11\\:\\:text[])\\:\\:bit(8) + nullif(unnest(:p11\\:\\:text[]), '')\\:\\:bit(8) ON CONFLICT (unique_key) WHERE unique_key IS NOT NULL diff --git a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql index 464054c..db428a3 100644 --- a/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql +++ b/src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql @@ -73,9 +73,9 @@ INSERT INTO river_job( -- query. string_to_array(unnest(@tags::text[]), ','), - unnest(@unique_key::bytea[]), + nullif(unnest(@unique_key::bytea[]), ''), -- Strings of bits are used for the input type here to make sqlalchemy play nicely with bit(8): - unnest(@unique_states::text[])::bit(8) + nullif(unnest(@unique_states::text[]), '')::bit(8) ON CONFLICT (unique_key) WHERE unique_key IS NOT NULL diff --git a/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py b/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py index 96c320e..05dad0f 100644 --- a/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py +++ b/src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py @@ -148,14 +148,16 @@ def _build_insert_many_params( ) insert_many_params.state.append(cast(models.RiverJobState, insert_params.state)) insert_many_params.tags.append(",".join(insert_params.tags)) - insert_many_params.unique_key.append(insert_params.unique_key or None) + insert_many_params.unique_key.append( + memoryview(insert_params.unique_key or b"") + ) if insert_params.unique_states: one_byte = insert_params.unique_states[0] bit_string = format(one_byte, "08b") insert_many_params.unique_states.append(bit_string) else: - insert_many_params.unique_states.append(None) + insert_many_params.unique_states.append("") return insert_many_params From 273a6413ff6baf231e82d8f1a7361dea04fe0b48 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 26 Feb 2025 20:32:46 -0600 Subject: [PATCH 4/5] documentation updates, changelog --- CHANGELOG.md | 24 +++++++++++++++++++++++- README.md | 6 ++++-- src/riverqueue/insert_opts.py | 17 ++++++++++------- tests/client_test.py | 8 ++------ 4 files changed, 39 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f080016..f511015 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Breaking + +- **Breaking change:** The return type of `Client#insert_many` and `Client#insert_many_tx` has been changed. Rather than returning just the number of rows inserted, it returns an array of all the `InsertResult` values for each inserted row. Unique conflicts which are skipped as duplicates are indicated in the same fashion as single inserts (the `unique_skipped_as_duplicated` attribute), and in such cases the conflicting row will be returned instead. [PR #38](https://github.com/riverqueue/riverqueue-python/pull/38). +- **Breaking change:** Unique jobs no longer allow total customization of their states when using the `by_state` option. The pending, scheduled, available, and running states are required whenever customizing this list. + +### Added + +- The `UniqueOpts` class gains an `exclude_kind` option for cases where uniqueness needs to be guaranteed across multiple job types. +- Unique jobs utilizing `by_args` can now also opt to have a subset of the job's arguments considered for uniqueness. For example, you could choose to consider only the `customer_id` field while ignoring the other fields: + + ```python + UniqueOpts(by_args=["customer_id"]) + ``` + + Any fields considered in uniqueness are also sorted alphabetically in order to guarantee a consistent result across implementations, even if the encoded JSON isn't sorted consistently. + +### Changed + +- Unique jobs have been improved to allow bulk insertion of unique jobs via `Client#insert_many`. + + This updated implementation is significantly faster due to the removal of advisory locks in favor of an index-backed uniqueness system, while allowing some flexibility in which job states are considered. However, not all states may be removed from consideration when using the `by_state` option; pending, scheduled, available, and running states are required whenever customizing this list. + ## [0.7.0] - 2024-07-30 ### Changed @@ -79,4 +101,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Initial release, supporting insertion through [SQLAlchemy](https://www.sqlalchemy.org/) and its underlying Postgres drivers like [`psycopg2`](https://pypi.org/project/psycopg2/) or [`asyncpg`](https://github.com/MagicStack/asyncpg) (for async). \ No newline at end of file +- Initial release, supporting insertion through [SQLAlchemy](https://www.sqlalchemy.org/) and its underlying Postgres drivers like [`psycopg2`](https://pypi.org/project/psycopg2/) or [`asyncpg`](https://github.com/MagicStack/asyncpg) (for async). diff --git a/README.md b/README.md index faea5c5..14c0781 100644 --- a/README.md +++ b/README.md @@ -95,12 +95,14 @@ insert_res.job insert_res.unique_skipped_as_duplicated ``` +Unique jobs can also be inserted in bulk. + ## Inserting jobs in bulk Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency: ```python -num_inserted = client.insert_many([ +results = client.insert_many([ SimpleArgs(job_num=1), SimpleArgs(job_num=2) ]) @@ -109,7 +111,7 @@ num_inserted = client.insert_many([ Or with `InsertManyParams`, which may include insertion options: ```python -num_inserted = client.insert_many([ +results = client.insert_many([ InsertManyParams(args=SimpleArgs(job_num=1), insert_opts=riverqueue.InsertOpts(max_attempts=5)), InsertManyParams(args=SimpleArgs(job_num=2), insert_opts=riverqueue.InsertOpts(queue="high_priority")) ]) diff --git a/src/riverqueue/insert_opts.py b/src/riverqueue/insert_opts.py index 92cee29..8a5584e 100644 --- a/src/riverqueue/insert_opts.py +++ b/src/riverqueue/insert_opts.py @@ -82,13 +82,8 @@ class UniqueOpts: args and queues. If either args or queue is changed on a new job, it's allowed to be inserted as a new job. - TODO update description ⚠ ⚠️ ⚠ - - Uniquenes is checked at insert time by taking a Postgres advisory lock, - doing a look up for an equivalent row, and inserting only if none was found. - There's no database-level mechanism that guarantees jobs stay unique, so if - an equivalent row is inserted out of band (or batch inserted, where a unique - check doesn't occur), it's conceivable that duplicates could coexist. + Uniqueness relies on a hash of the job kind and any unique properties along + with a database unique constraint. """ by_args: Optional[Union[Literal[True], List[str]]] = None @@ -98,6 +93,14 @@ class UniqueOpts: Default is false, meaning that as long as any other unique property is enabled, uniqueness will be enforced for a kind regardless of input args. + + When set to true, the entire encoded args will be included in the uniqueness + hash, which requires care to ensure that no irrelevant args are factored + into the uniqueness check. It is also possible to use a subset of the args + by passing a list of string keys to include in the uniqueness check. + + All keys are sorted alphabetically before hashing to ensure consistent + results. """ by_period: Optional[int] = None diff --git a/tests/client_test.py b/tests/client_test.py index 7acd5eb..14b7f72 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -295,12 +295,8 @@ def to_json(self) -> str: ordered_json = '{"a": 1, "b": 2, "c": 3}' reverse_ordered_json = '{"c": 3, "b": 2, "a": 1}' - insert_res1 = client.insert( - JsonArgs(json_str=ordered_json), insert_opts=insert_opts - ) - insert_res2 = client.insert( - JsonArgs(json_str=reverse_ordered_json), insert_opts=insert_opts - ) + client.insert(JsonArgs(json_str=ordered_json), insert_opts=insert_opts) + client.insert(JsonArgs(json_str=reverse_ordered_json), insert_opts=insert_opts) # Get the unique keys that were generated call_args1 = mock_exec.job_insert_many.call_args_list[0][0][0] # type: ignore[index] From dceb837935f325df07c6005b7f98ff11325a9126 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 27 Feb 2025 09:52:34 -0600 Subject: [PATCH 5/5] remove whitespace from unique key json component --- src/riverqueue/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/riverqueue/client.py b/src/riverqueue/client.py index 596bcfd..7338ea8 100644 --- a/src/riverqueue/client.py +++ b/src/riverqueue/client.py @@ -571,8 +571,9 @@ def _build_unique_key_and_bitmask( key: args_dict[key] for key in unique_opts.by_args if key in args_dict } - # Serialize with sorted keys and append to unique key: - sorted_args = json.dumps(args_to_include, sort_keys=True) + # Serialize with sorted keys and append to unique key. Remove whitespace + # from the JSON to match other implementations: + sorted_args = json.dumps(args_to_include, sort_keys=True, separators=(",", ":")) unique_key += f"&args={sorted_args}" if unique_opts.by_period: