55import com .yugabyte .yw .models .HighAvailabilityConfig ;
66import java .time .Duration ;
77import java .util .concurrent .atomic .AtomicBoolean ;
8+ import java .util .function .Function ;
89import javax .inject .Inject ;
910import javax .inject .Singleton ;
1011import lombok .extern .slf4j .Slf4j ;
@@ -31,45 +32,39 @@ public PlatformScheduler(
3132 this .shutdownHookHandler = shutdownHookHandler ;
3233 }
3334
34- public Cancellable schedule (
35- String name , Duration initialDelay , Duration interval , Runnable runnable ) {
35+ private Cancellable createShutdownAwareSchedule (
36+ String name , Runnable runnable , Function < Runnable , Cancellable > scheduleFactory ) {
3637 final AtomicBoolean isRunning = new AtomicBoolean ();
3738 final Object lock = new Object ();
38-
39- Cancellable cancellable =
40- actorSystem
41- .scheduler ()
42- .scheduleWithFixedDelay (
43- initialDelay ,
44- interval ,
45- () -> {
46- boolean shouldRun = false ;
47- synchronized (lock ) {
48- // Synchronized block in shutdown and this should be serialized.
49- shouldRun =
50- !shutdownHookHandler .isShutdown ()
51- && !HighAvailabilityConfig .isFollower ()
52- && isRunning .compareAndSet (false , true );
53- }
54- if (shouldRun ) {
55- try {
56- runnable .run ();
57- } finally {
58- isRunning .set (false );
59- if (shutdownHookHandler .isShutdown ()) {
60- synchronized (lock ) {
61- lock .notify ();
62- }
63- }
64- }
65- } else {
66- log .warn (
67- "Previous run of scheduler {} is in progress, is being shut down, or YBA is"
68- + " in follower mode." ,
69- name );
70- }
71- },
72- executionContext );
39+ Runnable wrappedRunnable =
40+ () -> {
41+ boolean shouldRun = false ;
42+ synchronized (lock ) {
43+ // Synchronized block in shutdown and this should be serialized.
44+ shouldRun =
45+ !shutdownHookHandler .isShutdown ()
46+ && !HighAvailabilityConfig .isFollower ()
47+ && isRunning .compareAndSet (false , true );
48+ }
49+ if (shouldRun ) {
50+ try {
51+ runnable .run ();
52+ } finally {
53+ isRunning .set (false );
54+ if (shutdownHookHandler .isShutdown ()) {
55+ synchronized (lock ) {
56+ lock .notify ();
57+ }
58+ }
59+ }
60+ } else {
61+ log .warn (
62+ "Previous run of scheduler {} is in progress, is being shut down, or YBA is in"
63+ + " follower mode." ,
64+ name );
65+ }
66+ };
67+ Cancellable cancellable = scheduleFactory .apply (wrappedRunnable );
7368 shutdownHookHandler .addShutdownHook (
7469 cancellable ,
7570 can -> {
@@ -93,29 +88,21 @@ public Cancellable schedule(
9388 return cancellable ;
9489 }
9590
96- public Cancellable scheduleOnce (String name , Duration initialDelay , Runnable runnable ) {
97- final AtomicBoolean isRunning = new AtomicBoolean ();
91+ public Cancellable schedule (
92+ String name , Duration initialDelay , Duration interval , Runnable runnable ) {
93+ return createShutdownAwareSchedule (
94+ name ,
95+ runnable ,
96+ r ->
97+ actorSystem
98+ .scheduler ()
99+ .scheduleWithFixedDelay (initialDelay , interval , r , executionContext ));
100+ }
98101
99- return actorSystem
100- .scheduler ()
101- .scheduleOnce (
102- initialDelay ,
103- () -> {
104- boolean shouldRun = false ;
105- synchronized (isRunning ) {
106- shouldRun =
107- isRunning .compareAndSet (false , true ) && !HighAvailabilityConfig .isFollower ();
108- }
109- if (shouldRun ) {
110- try {
111- runnable .run ();
112- } finally {
113- isRunning .set (false );
114- }
115- } else {
116- log .warn ("Scheduler {} did not run because YBA is in follower mode." , name );
117- }
118- },
119- executionContext );
102+ public Cancellable scheduleOnce (String name , Duration initialDelay , Runnable runnable ) {
103+ return createShutdownAwareSchedule (
104+ name ,
105+ runnable ,
106+ r -> actorSystem .scheduler ().scheduleOnce (initialDelay , r , executionContext ));
120107 }
121108}
0 commit comments