7777#include " yb/tserver/tablet_server.h"
7878#include " yb/tserver/ts_tablet_manager.h"
7979#include " yb/tserver/tserver_service.pb.h"
80+ #include " yb/tserver/xcluster_consumer.h"
8081
8182#include " yb/util/atomic.h"
8283#include " yb/util/backoff_waiter.h"
@@ -147,8 +148,8 @@ DECLARE_double(TEST_xcluster_simulate_random_failure_after_apply);
147148DECLARE_uint32 (cdcsdk_retention_barrier_no_revision_interval_secs);
148149DECLARE_int32 (heartbeat_interval_ms);
149150DECLARE_bool (TEST_xcluster_fail_setup_stream_update);
151+ DECLARE_bool (TEST_xcluster_force_remote_tserver);
150152DECLARE_bool (xcluster_skip_health_check_on_replication_setup);
151- DECLARE_bool (FLAGS_update_min_cdc_indices_interval_secs);
152153
153154namespace yb {
154155
@@ -3197,8 +3198,7 @@ TEST_P(XClusterTest, PausingAndResumingReplicationFromProducerMultiTable) {
31973198 ASSERT_OK (DeleteUniverseReplication ());
31983199}
31993200
3200- // This test is flaky. See #18437.
3201- TEST_P (XClusterTest, YB_DISABLE_TEST(LeaderFailoverTest)) {
3201+ TEST_F_EX (XClusterTest, LeaderFailoverTest, XClusterTestNoParam) {
32023202 // When the consumer tablet leader moves around (like during an upgrade) the pollers can start and
32033203 // stop on multiple nodes. This test makes sure that during such poller movement we do not get
32043204 // replication errors.
@@ -3214,10 +3214,12 @@ TEST_P(XClusterTest, YB_DISABLE_TEST(LeaderFailoverTest)) {
32143214
32153215 // Dont remove pollers when leaders move.
32163216 ANNOTATE_UNPROTECTED_WRITE (FLAGS_TEST_xcluster_disable_delete_old_pollers) = true ;
3217+ // Keep polling even if the leader term is lost.
3218+ ANNOTATE_UNPROTECTED_WRITE (FLAGS_TEST_xcluster_disable_poller_term_check) = true ;
32173219 // Dont increase Poll delay on failures as it is expected.
32183220 ANNOTATE_UNPROTECTED_WRITE (FLAGS_replication_failure_delay_exponent) = 0 ;
32193221 // The below flags are required for fast log gc.
3220- ANNOTATE_UNPROTECTED_WRITE (FLAGS_log_segment_size_bytes) = 500 ;
3222+ ANNOTATE_UNPROTECTED_WRITE (FLAGS_log_segment_size_bytes) = 100 ;
32213223 ANNOTATE_UNPROTECTED_WRITE (FLAGS_log_min_segments_to_retain) = 1 ;
32223224 ANNOTATE_UNPROTECTED_WRITE (FLAGS_log_min_seconds_to_retain) = 0 ;
32233225 ANNOTATE_UNPROTECTED_WRITE (FLAGS_cdc_wal_retention_time_secs) = 0 ;
@@ -3226,25 +3228,33 @@ TEST_P(XClusterTest, YB_DISABLE_TEST(LeaderFailoverTest)) {
32263228 const uint32_t kReplicationFactor = 3 , kTabletCount = 1 , kNumMasters = 1 , kNumTservers = 3 ;
32273229 ASSERT_OK (SetUpWithParams (
32283230 {kTabletCount }, {kTabletCount }, kReplicationFactor , kNumMasters , kNumTservers ));
3231+
3232+ google::SetVLOGLevel (" xcluster*" , 5 );
3233+
32293234 ASSERT_OK (
32303235 SetupUniverseReplication (producer_tables_, {LeaderOnly::kFalse , Transactional::kFalse }));
32313236
32323237 // After creating the cluster, make sure all tablets being polled for.
32333238 ASSERT_OK (CorrectlyPollingAllTablets (kTabletCount ));
32343239
3235- auto tablet_ids = ListTabletIdsForTable (consumer_cluster (), consumer_table_->id ());
3236- ASSERT_EQ (tablet_ids.size (), 1 );
3237- const auto tablet_id = *tablet_ids.begin ();
3240+ auto consumer_tablet_ids = ListTabletIdsForTable (consumer_cluster (), consumer_table_->id ());
3241+ ASSERT_EQ (consumer_tablet_ids.size (), 1 );
3242+ const auto consumer_tablet_id = *consumer_tablet_ids.begin ();
3243+
3244+ auto producer_tablet_ids = ListTabletIdsForTable (producer_cluster (), producer_table_->id ());
3245+ ASSERT_EQ (producer_tablet_ids.size (), 1 );
3246+ const auto producer_tablet_id = *producer_tablet_ids.begin ();
3247+
32383248 const auto kTimeout = 10s * kTimeMultiplier ;
32393249
3240- auto leader_master = ASSERT_RESULT (consumer_cluster ()->GetLeaderMiniMaster ());
3241- master::MasterClusterProxy master_proxy (
3242- &consumer_client ()->proxy_cache (), leader_master->bound_rpc_addr ());
3250+ auto master_proxy =
3251+ ASSERT_RESULT (consumer_cluster ()->GetLeaderMasterProxy <master::MasterClusterProxy>());
32433252 auto ts_map =
32443253 ASSERT_RESULT (itest::CreateTabletServerMap (master_proxy, &consumer_client ()->proxy_cache ()));
32453254
32463255 itest::TServerDetails* old_ts = nullptr ;
3247- ASSERT_OK (FindTabletLeader (ts_map, tablet_id, kTimeout , &old_ts));
3256+ ASSERT_OK (FindTabletLeader (ts_map, consumer_tablet_id, kTimeout , &old_ts));
3257+ tserver::MiniTabletServer* old_tserver = FindTabletLeader (consumer_cluster (), consumer_tablet_id);
32483258
32493259 itest::TServerDetails* new_ts = nullptr ;
32503260 for (auto & [ts_id, ts_details] : ts_map) {
@@ -3254,18 +3264,60 @@ TEST_P(XClusterTest, YB_DISABLE_TEST(LeaderFailoverTest)) {
32543264 }
32553265 }
32563266
3257- constexpr int kNumWriteRecords = 100 ;
3258- ASSERT_OK (InsertRowsInProducer (0 , kNumWriteRecords ));
3259- ASSERT_OK (VerifyNumRecordsOnConsumer (kNumWriteRecords ));
3267+ int num_rows_inserted = 0 ;
3268+ auto insert_rows_and_verify = [this , &num_rows_inserted]() -> Status {
3269+ constexpr int kNumWriteRecords = 100 ;
3270+ RETURN_NOT_OK (InsertRowsInProducer (num_rows_inserted, num_rows_inserted + kNumWriteRecords ));
3271+ num_rows_inserted += kNumWriteRecords ;
3272+ return VerifyNumRecordsOnConsumer (num_rows_inserted);
3273+ };
3274+
3275+ auto wait_for_error = [this , &producer_tablet_id, kTimeout ](
3276+ tserver::XClusterConsumer& consumer,
3277+ ReplicationErrorPb expected_error) {
3278+ return LoggedWaitFor (
3279+ [this , &consumer, &producer_tablet_id, expected_error]() -> Result<bool > {
3280+ auto errors = consumer.error_collector_ .GetErrorsToSend (/* get_all_errors=*/ true );
3281+ if (!errors[kReplicationGroupId ][consumer_table_->id ()].contains (producer_tablet_id)) {
3282+ LOG (INFO) << " No errors found" ;
3283+ return false ;
3284+ }
3285+
3286+ const auto error =
3287+ errors[kReplicationGroupId ][consumer_table_->id ()][producer_tablet_id].error ;
3288+ LOG (INFO) << " Poller Error: " << ReplicationErrorPb_Name (error);
3289+ return error == expected_error;
3290+ },
3291+ kTimeout ,
3292+ Format (" Waiting for poller error to be $0" , ReplicationErrorPb_Name (expected_error)),
3293+ /* initial_delay=*/ 100ms);
3294+ };
3295+
3296+ ASSERT_OK (insert_rows_and_verify ());
3297+
3298+ auto & old_consumer =
3299+ *dynamic_cast <tserver::XClusterConsumer*>(old_tserver->server ()->GetXClusterConsumer ());
3300+ ASSERT_OK (wait_for_error (old_consumer, ReplicationErrorPb::REPLICATION_OK));
32603301
32613302 // Failover to new tserver.
3262- ANNOTATE_UNPROTECTED_WRITE (FLAGS_TEST_xcluster_disable_poller_term_check) = true ;
3263- ASSERT_OK (itest::LeaderStepDown (old_ts, tablet_id, new_ts, kTimeout ));
3264- ASSERT_OK (itest::WaitUntilLeader (new_ts, tablet_id, kTimeout ));
3265- auto new_tserver = FindTabletLeader (consumer_cluster (), tablet_id);
3303+ ASSERT_OK (itest::LeaderStepDown (old_ts, consumer_tablet_id, new_ts, kTimeout ));
3304+ ASSERT_OK (itest::WaitUntilLeader (new_ts, consumer_tablet_id, kTimeout ));
32663305
3267- ASSERT_OK (InsertRowsInProducer (kNumWriteRecords , 2 * kNumWriteRecords ));
3268- ASSERT_OK (VerifyNumRecordsOnConsumer (2 * kNumWriteRecords ));
3306+ ASSERT_OK (insert_rows_and_verify ());
3307+
3308+ tserver::MiniTabletServer* new_tserver = FindTabletLeader (consumer_cluster (), consumer_tablet_id);
3309+ auto & new_consumer =
3310+ *dynamic_cast <tserver::XClusterConsumer*>(new_tserver->server ()->GetXClusterConsumer ());
3311+
3312+ ASSERT_OK (wait_for_error (new_consumer, ReplicationErrorPb::REPLICATION_OK));
3313+ // Old consumer should be stuck on the applies.
3314+ ASSERT_OK (wait_for_error (old_consumer, ReplicationErrorPb::REPLICATION_SYSTEM_ERROR));
3315+
3316+ // Master should report healthy since it track only errors from the latest term.
3317+ auto stream_id = ASSERT_RESULT (GetCDCStreamID (producer_table_->id ()));
3318+ ASSERT_OK (VerifyReplicationError (consumer_table_->id (), stream_id, std::nullopt ));
3319+
3320+ ASSERT_OK (insert_rows_and_verify ());
32693321
32703322 // GC log on producer.
32713323 // Note: Ideally cdc checkpoint should advance but we do not see that with our combination of
@@ -3276,27 +3328,17 @@ TEST_P(XClusterTest, YB_DISABLE_TEST(LeaderFailoverTest)) {
32763328 SetAtomicFlag (true , &FLAGS_enable_log_retention_by_op_idx);
32773329
32783330 // Failback to old tserver.
3279- ASSERT_OK (itest::LeaderStepDown (new_ts, tablet_id , old_ts, kTimeout ));
3280- ASSERT_OK (itest::WaitUntilLeader (old_ts, tablet_id , kTimeout ));
3331+ ASSERT_OK (itest::LeaderStepDown (new_ts, consumer_tablet_id , old_ts, kTimeout ));
3332+ ASSERT_OK (itest::WaitUntilLeader (old_ts, consumer_tablet_id , kTimeout ));
32813333
3282- // Delete old pollers so that we can properly shutdown the servers.
3283- ANNOTATE_UNPROTECTED_WRITE (FLAGS_TEST_xcluster_disable_delete_old_pollers) = false ;
3334+ ASSERT_OK (wait_for_error (old_consumer, ReplicationErrorPb::REPLICATION_MISSING_OP_ID));
32843335
3285- ASSERT_OK (LoggedWaitFor (
3286- [&new_tserver]() {
3287- auto new_tserver_pollers = new_tserver->server ()->GetXClusterConsumer ()->TEST_ListPollers ();
3288- return new_tserver_pollers.size () == 0 ;
3289- },
3290- kTimeout , " Waiting for pollers from new tserver to stop" ));
3336+ ANNOTATE_UNPROTECTED_WRITE (FLAGS_TEST_xcluster_disable_delete_old_pollers) = false ;
3337+ ANNOTATE_UNPROTECTED_WRITE (FLAGS_TEST_xcluster_disable_poller_term_check) = false ;
32913338
3292- ASSERT_OK (InsertRowsInProducer ( 2 * kNumWriteRecords , 3 * kNumWriteRecords ));
3339+ ASSERT_OK (wait_for_error (old_consumer, ReplicationErrorPb::REPLICATION_OK ));
32933340
3294- auto stream_id = ASSERT_RESULT (GetCDCStreamID (producer_table_->id ()));
3295- ASSERT_OK (VerifyReplicationError (
3296- consumer_table_->id (), stream_id, ReplicationErrorPb::REPLICATION_MISSING_OP_ID));
3297-
3298- ANNOTATE_UNPROTECTED_WRITE (FLAGS_TEST_xcluster_disable_poller_term_check) = false ;
3299- ASSERT_OK (VerifyNumRecordsOnConsumer (3 * kNumWriteRecords ));
3341+ ASSERT_OK (insert_rows_and_verify ());
33003342}
33013343
33023344Status VerifyMetaCacheObjectIsValid (
0 commit comments