2222import java .net .URI ;
2323import java .nio .charset .StandardCharsets ;
2424import java .time .Duration ;
25+ import java .time .Instant ;
2526import java .util .Map ;
2627import java .util .Optional ;
2728import java .util .concurrent .CompletableFuture ;
@@ -57,6 +58,8 @@ public final class OnlineExperimentImpl extends BaseExperimentAsync implements O
5758 private static final int SCHEDULED_EXECUTOR_TERMINATION_WAIT_SEC = 60 ;
5859 private static final int STD_OUT_LOGGER_FLUSH_WAIT_DELAY_MS = 2000 ;
5960
61+ private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 3000 ;
62+
6063 private final ScheduledExecutorService scheduledExecutorService =
6164 Executors .newSingleThreadScheduledExecutor ();
6265
@@ -66,7 +69,11 @@ public final class OnlineExperimentImpl extends BaseExperimentAsync implements O
6669 private StdOutLogger stdOutLogger ;
6770 private StdOutLogger stdErrLogger ;
6871 private boolean interceptStdout ;
72+
73+ // The future representing scheduled heartbeat sender thread
6974 private ScheduledFuture <?> heartbeatSendFuture ;
75+ // The time instant to indicate when next heartbeat should be sent
76+ private Instant nextHeartbeatInstant ;
7077
7178 // The flag to indicate if experiment end() was called and experiment shutdown initialized
7279 private final AtomicBoolean atShutdown = new AtomicBoolean ();
@@ -566,8 +573,10 @@ void init() {
566573 this .logger .error (getString (FAILED_LOG_SYSTEM_DETAILS ), ex );
567574 }
568575
576+ this .nextHeartbeatInstant = Instant .now ();
569577 this .heartbeatSendFuture = this .scheduledExecutorService .scheduleAtFixedRate (
570- new OnlineExperimentImpl .HeartbeatPing (this ), 1 , 3 , TimeUnit .SECONDS );
578+ new OnlineExperimentImpl .HeartbeatPing (this ),
579+ 500 , 1000 , TimeUnit .MILLISECONDS );
571580 }
572581
573582 private void setupStdOutIntercept () {
@@ -607,13 +616,23 @@ private void sendHeartbeat() {
607616 return ;
608617 }
609618 logger .debug ("sendHeartbeat" );
619+ Instant nowInstant = Instant .now ();
620+ if (nowInstant .isBefore (this .nextHeartbeatInstant )) {
621+ return ;
622+ }
623+
610624 Optional <ExperimentStatusResponse > status = this .sendExperimentStatus ();
611625 if (status .isPresent ()) {
612626 long interval = status .get ().getIsAliveBeatDurationMillis ();
627+ Instant now = Instant .now ();
628+ this .nextHeartbeatInstant = now .plusMillis (interval );
613629 if (logger .isDebugEnabled ()) {
614- logger .debug ("received heartbeat interval {}" , interval );
630+ logger .debug ("received heartbeat interval {} ms at {}, next heartbeat at {}" ,
631+ interval , now , this .nextHeartbeatInstant );
615632 }
616- // TODO: implement logic to change heartbeat interval
633+ } else {
634+ // use default interval
635+ this .nextHeartbeatInstant = Instant .now ().plusMillis (DEFAULT_HEARTBEAT_INTERVAL_MS );
617636 }
618637 }
619638
@@ -699,7 +718,7 @@ static class HeartbeatPing implements Runnable {
699718
700719 @ Override
701720 public void run () {
702- onlineExperiment .sendHeartbeat ();
721+ this . onlineExperiment .sendHeartbeat ();
703722 }
704723 }
705724
0 commit comments