diff --git a/client.go b/client.go index fecc8a33..95264159 100644 --- a/client.go +++ b/client.go @@ -542,6 +542,35 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client return client, nil } +// pgIdentifierRegexp matches all valid PostgreSQL identifier names: +var pgIdentifierRegexp = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$") + +// Listen subscribes to a notification topic. The notifyFunc will be called for +// any notification that is received on the topic. The returned subscription must be +// closed with Unlisten when no longer needed. +// +// The provided notifyFunc should be fast and not block to avoid dropping other +// notifications and potentially causing internal issues within River. +// +// It returns an error if the specified topic is not a valid Postgres +// identifier. Panics if notifyFunc is nil. +func (c *Client[TTx]) Listen(topic string, notifyFunc NotifyFunc) (*ListenSubscription, error) { + // Validate that the topic is a valid Postgres identifier: + if valid := pgIdentifierRegexp.MatchString(topic); !valid { + return nil, fmt.Errorf("invalid topic name %q") + } + if notifyFunc == nil { + panic("notifyFunc is required") + } + + wrapperFunc := func(topic notifier.NotificationTopic, payload string) { + notifyFunc(string(topic), payload) + } + return &ListenSubscription{ + Subscription: c.notifier.Listen(notifier.NotificationTopic(topic), wrapperFunc), + }, nil +} + // Start starts the client's job fetching and working loops. Once this is called, // the client will run in a background goroutine until stopped. All jobs are // run with a context inheriting from the provided context, but with a timeout @@ -618,6 +647,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error { c.wg.Add(2) go func() { + // TODO(bgentry): is this the wrong context? Should the notifier actually + // use the `workCtx` so that it doesn't get shut down before existing jobs + // have finished / had their contexts cancelled? This would preserve the + // ability to cancel an individual job's context during the initial + // shutdown phase. c.notifier.Run(fetchNewWorkCtx) c.wg.Done() }() diff --git a/listen_subscription.go b/listen_subscription.go new file mode 100644 index 00000000..e7a8c1f7 --- /dev/null +++ b/listen_subscription.go @@ -0,0 +1,18 @@ +package river + +import "github.com/riverqueue/river/internal/notifier" + +// ListenSubscription is a subscription returned from the Client's Listen method. +type ListenSubscription struct { + *notifier.Subscription +} + +// Unlisten stops listening to this particular subscription. +func (sub *ListenSubscription) Unlisten() { + sub.Subscription.Unlisten() +} + +// NotifyFunc is a function that will be called any time a Postgres notification +// payload is receiverd on the specified topic. It must not block, or messages +// will be dropped (including those important to River's internal operation). +type NotifyFunc func(topic string, payload string)