diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index ca8f0e1a..f5fca9df 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -35,7 +35,8 @@ var migrationFS embed.FS // Driver is an implementation of riverdriver.Driver for database/sql. type Driver struct { - dbPool *sql.DB + dbPool *sql.DB + listener riverdriver.Listener } // New returns a new database/sql River driver for use with River. @@ -47,11 +48,26 @@ func New(dbPool *sql.DB) *Driver { return &Driver{dbPool: dbPool} } +// NewWithListener returns a new database/sql River driver for use with River +// just like New, except it also takes a riverdriver.Listener to use for +// listening to notifications. +func NewWithListener(dbPool *sql.DB, listener riverdriver.Listener) *Driver { + driver := New(dbPool) + driver.listener = listener + return driver +} + func (d *Driver) GetExecutor() riverdriver.Executor { return &Executor{d.dbPool, d.dbPool} } -func (d *Driver) GetListener() riverdriver.Listener { panic(riverdriver.ErrNotImplemented) } +func (d *Driver) GetListener() riverdriver.Listener { + if d.listener == nil { + panic(riverdriver.ErrNotImplemented) + } + return d.listener +} + func (d *Driver) GetMigrationFS(line string) fs.FS { if line == riverdriver.MigrationLineMain { return migrationFS @@ -60,7 +76,7 @@ func (d *Driver) GetMigrationFS(line string) fs.FS { } func (d *Driver) GetMigrationLines() []string { return []string{riverdriver.MigrationLineMain} } func (d *Driver) HasPool() bool { return d.dbPool != nil } -func (d *Driver) SupportsListener() bool { return false } +func (d *Driver) SupportsListener() bool { return d.listener != nil } func (d *Driver) UnwrapExecutor(tx *sql.Tx) riverdriver.ExecutorTx { return &ExecutorTx{Executor: Executor{nil, tx}, tx: tx} diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 8b4e6d0b..819b68a2 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -765,6 +765,27 @@ type Listener struct { mu sync.Mutex } +// NewListener returns a Listener that can be used to enable other drivers +// (notably `riverdatabasesql`) to listen for notifications when their +// abstraction doesn't allow direct access to pgx connections, even though they +// are using pgx under the hood. This constructor is not applicable to +// applications which only use pgx directly. +// +// Users of `database/sql` or Bun can use this with the +// `riverdatabasesql.NewWithListener` constructor to enable listener support in +// that driver. +// +// The dbPool will solely be used for acquiring new connections for `LISTEN` +// commands. As such, a pool must be provided that supports that command. Users +// of pgbouncer should ensure that this specific pool is configured with session +// pooling, even if their main application does not use session pooling. +// +// A single Client will never use more than one listener connection at a time, +// no matter how many topics are being listened to. +func NewListener(dbPool *pgxpool.Pool) riverdriver.Listener { + return &Listener{dbPool: dbPool} +} + func (l *Listener) Close(ctx context.Context) error { l.mu.Lock() defer l.mu.Unlock()