@@ -43,40 +43,40 @@ mod keys;
4343const WORKER_INSTANCE_LOST_THRESHOLD_MS: i64 = rivet_util::duration::seconds(30);
4444/// How long before overwriting an existing metrics lock.
4545const METRICS_LOCK_TIMEOUT_MS: i64 = rivet_util::duration::seconds(30);
46- /// For pubsub wake mechanism.
47- const WORKER_WAKE_SUBJECT : &str = "gasoline.worker.wake ";
46+ /// For pubsub bump mechanism.
47+ const WORKER_BUMP_SUBJECT : &str = "gasoline.worker.bump ";
4848
4949pub struct DatabaseKv {
5050 pools: rivet_pools::Pools,
5151 subspace: universaldb::utils::Subspace,
5252}
5353
5454impl DatabaseKv {
55- /// Spawns a new thread and publishes a worker wake message to pubsub.
56- fn wake_worker (&self) {
55+ /// Spawns a new thread and publishes a worker bump message to pubsub.
56+ fn bump_workers (&self) {
5757 let Ok(pubsub) = self.pools.ups() else {
5858 tracing::debug!("failed to acquire pubsub pool");
5959 return;
6060 };
6161
62- let spawn_res = tokio::task::Builder::new().name("wake ").spawn(
62+ let spawn_res = tokio::task::Builder::new().name("bump ").spawn(
6363 async move {
6464 // Fail gracefully
6565 if let Err(err) = pubsub
6666 .publish(
67- WORKER_WAKE_SUBJECT ,
67+ WORKER_BUMP_SUBJECT ,
6868 &Vec::new(),
6969 universalpubsub::PublishOpts::broadcast(),
7070 )
7171 .await
7272 {
73- tracing::warn!(?err, "failed to publish wake message");
73+ tracing::warn!(?err, "failed to publish bump message");
7474 }
7575 }
76- .instrument(tracing::info_span!("wake_worker_publish ")),
76+ .instrument(tracing::info_span!("bump_worker_publish ")),
7777 );
7878 if let Err(err) = spawn_res {
79- tracing::error!(?err, "failed to spawn wake task");
79+ tracing::error!(?err, "failed to spawn bump task");
8080 }
8181 }
8282}
@@ -424,12 +424,12 @@ impl Database for DatabaseKv {
424424 }
425425
426426 #[tracing::instrument(skip_all)]
427- async fn wake_sub <'a, 'b>(&'a self) -> WorkflowResult<BoxStream<'b, ()>> {
427+ async fn bump_sub <'a, 'b>(&'a self) -> WorkflowResult<BoxStream<'b, ()>> {
428428 let mut subscriber = self
429429 .pools
430430 .ups()
431431 .map_err(WorkflowError::PoolsGeneric)?
432- .subscribe(WORKER_WAKE_SUBJECT )
432+ .subscribe(WORKER_BUMP_SUBJECT )
433433 .await
434434 .map_err(|x| WorkflowError::CreateSubscription(x.into()))?;
435435
@@ -586,7 +586,7 @@ impl Database for DatabaseKv {
586586 "handled failover",
587587 );
588588
589- self.wake_worker ();
589+ self.bump_workers ();
590590 }
591591
592592 Ok(())
@@ -815,7 +815,7 @@ impl Database for DatabaseKv {
815815 .await
816816 .map_err(WorkflowError::Udb)?;
817817
818- self.wake_worker ();
818+ self.bump_workers ();
819819
820820 Ok(workflow_id)
821821 }
@@ -1028,7 +1028,7 @@ impl Database for DatabaseKv {
10281028 {
10291029 let wake_deadline_ts = key.condition.deadline_ts();
10301030
1031- // Update wake deadline ts
1031+ // Update wake deadline ts if earlier
10321032 if last_wake_deadline_ts.is_none()
10331033 || wake_deadline_ts < *last_wake_deadline_ts
10341034 {
@@ -1633,7 +1633,7 @@ impl Database for DatabaseKv {
16331633
16341634 // Wake worker again in case some other workflow was waiting for this one to complete
16351635 if wrote_to_wake_idx {
1636- self.wake_worker ();
1636+ self.bump_workers ();
16371637 }
16381638
16391639 let dt = start_instant.elapsed().as_secs_f64();
@@ -1794,15 +1794,15 @@ impl Database for DatabaseKv {
17941794 //
17951795 // This will result in the workflow sleeping instead of immediately running again.
17961796 //
1797- // Adding this wake_worker call ensures that if the workflow has a valid wake condition before commit
1797+ // Adding this bump_workers call ensures that if the workflow has a valid wake condition before commit
17981798 // then it will immediately wake up again.
17991799 //
18001800 // This is simpler than having this commit_workflow fn read wake conditions because:
18011801 // - the wake conditions are not indexed by wf id
18021802 // - would involve informing the worker to restart the workflow in memory instead of the usual
18031803 // workflow lifecycle
18041804 // - the worker is already designed to pull wake conditions frequently
1805- self.wake_worker ();
1805+ self.bump_workers ();
18061806
18071807 let dt = start_instant.elapsed().as_secs_f64();
18081808 metrics::COMMIT_WORKFLOW_DURATION.record(
@@ -2111,7 +2111,7 @@ impl Database for DatabaseKv {
21112111 .await
21122112 .map_err(WorkflowError::Udb)?;
21132113
2114- self.wake_worker ();
2114+ self.bump_workers ();
21152115
21162116 Ok(())
21172117 }
@@ -2163,7 +2163,7 @@ impl Database for DatabaseKv {
21632163 .await
21642164 .map_err(WorkflowError::Udb)?;
21652165
2166- self.wake_worker ();
2166+ self.bump_workers ();
21672167
21682168 Ok(())
21692169 }
@@ -2219,7 +2219,7 @@ impl Database for DatabaseKv {
22192219 .await
22202220 .map_err(WorkflowError::Udb)?;
22212221
2222- self.wake_worker ();
2222+ self.bump_workers ();
22232223
22242224 Ok(sub_workflow_id)
22252225 }
0 commit comments