Skip to content

Commit b6b0915

Browse files
committed
[BACKPORT 2024.1][yugabyte#23278] CDCSDK: Handle non-eligible tables cleanup with drop table while loading CDC stream
Summary: **Backport description:** Minor merge conflicts in test's base class because of missing flag. **Original description:** Original commit: 64e1bf8 / D37053 When a table present under a CDC stream is dropped, it is removed from the CDC stream metadata by a background thread. Suppose before the background thread could cleanup, there was a master restart or a master leadership change. On either of these scenarios, while loading the CDC streams, we check all tables present in the CDC stream metadata for ineligibility. Table schema is one of the objects that is scanned while checking for ineligibility. To get the table schema, we fetch the `TableInfo` object from master. This step was leading to a master crash as we receive a nullptr while fetching TableInfo since the table has been dropped. Jira: DB-12205 Test Plan: ./yb_build.sh --cxx-test cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNonEligibleTablesCleanupWhenDropTableCleanupIsDisabled Reviewers: asrinivasan, stiwary, skumar Reviewed By: stiwary Subscribers: ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D37090
1 parent 6e7726e commit b6b0915

File tree

3 files changed

+88
-21
lines changed

3 files changed

+88
-21
lines changed

src/yb/integration-tests/cdcsdk_ysql-test.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9298,5 +9298,61 @@ TEST_F(CDCSDKYsqlTest, TestChildTabletsOfNonEligibleTableDoNotGetAddedToConsiste
92989298
/* use_consistent_snapshot_stream */ true);
92999299
}
93009300

9301+
TEST_F(
9302+
CDCSDKYsqlTest,
9303+
YB_DISABLE_TEST_IN_TSAN(TestNonEligibleTablesCleanupWhenDropTableCleanupIsDisabled)) {
9304+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) = true;
9305+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) = true;
9306+
// Setup cluster.
9307+
ASSERT_OK(SetUpWithParams(3, 3, false));
9308+
const vector<string> table_list_suffix = {"_1", "_2", "_3"};
9309+
const int kNumTables = 3;
9310+
vector<YBTableName> table(kNumTables);
9311+
int idx = 0;
9312+
vector<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> tablets(kNumTables);
9313+
9314+
while (idx < 3) {
9315+
table[idx] = ASSERT_RESULT(CreateTable(
9316+
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true,
9317+
table_list_suffix[idx]));
9318+
ASSERT_OK(test_client()->GetTablets(
9319+
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));
9320+
ASSERT_OK(WriteEnumsRows(
9321+
0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName,
9322+
kTableName));
9323+
idx += 1;
9324+
}
9325+
9326+
auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream());
9327+
std::unordered_set<std::string> expected_table_ids = {
9328+
table[0].table_id(), table[1].table_id(), table[2].table_id()};
9329+
VerifyTablesInStreamMetadata(stream_id, expected_table_ids, "Waiting for stream metadata.");
9330+
9331+
LOG(INFO) << "Dropping table: " << Format("$0$1", kTableName, table_list_suffix[0]);
9332+
DropTable(&test_cluster_, Format("$0$1", kTableName, table_list_suffix[0]).c_str());
9333+
// Stream metadata wouldnt be cleaned up since the codepath is disabled via
9334+
// 'TEST_cdcsdk_disable_drop_table_cleanup' flag. Therefore all 3 tables are expected to be
9335+
// present in stream metadata.
9336+
SleepFor(MonoDelta::FromSeconds(3));
9337+
VerifyTablesInStreamMetadata(
9338+
stream_id, expected_table_ids, "Waiting for stream metadata after drop table.");
9339+
9340+
// On loading of CDC stream after a master leader restart, presence of non-eligible tables in CDC
9341+
// stream will be checked.
9342+
auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster());
9343+
ASSERT_OK(leader_master->Restart());
9344+
LOG(INFO) << "Master Restarted";
9345+
SleepFor(MonoDelta::FromSeconds(5));
9346+
9347+
// Enable bg threads to cleanup CDC stream metadata for dropped tables.
9348+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) = false;
9349+
9350+
// Verify the dropped table has been removed from stream metadata after enabling the cleanup.
9351+
expected_table_ids.erase(table[0].table_id());
9352+
VerifyTablesInStreamMetadata(
9353+
stream_id, expected_table_ids,
9354+
"Waiting for GetDBStreamInfo post metadata cleanup after restart.");
9355+
}
9356+
93019357
} // namespace cdc
93029358
} // namespace yb

src/yb/integration-tests/cdcsdk_ysql_test_base.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal);
122122
DECLARE_bool(TEST_cdcsdk_add_indexes_to_stream);
123123
DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream);
124124
DECLARE_bool(TEST_cdcsdk_skip_stream_active_check);
125+
DECLARE_bool(TEST_cdcsdk_disable_drop_table_cleanup);
126+
125127
namespace yb {
126128

127129
using client::YBClient;

src/yb/master/xrepl_catalog_manager.cc

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ DEFINE_RUNTIME_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream, fa
160160
"materialised view etc. in their stream metadata and these tables will be marked for removal "
161161
"by catalog manager background thread.");
162162

163+
DEFINE_test_flag(bool, cdcsdk_disable_drop_table_cleanup, false,
164+
"When enabled, cleanup of dropped tables from CDC streams will be skipped.");
165+
163166
DEFINE_RUNTIME_AUTO_bool(cdcsdk_enable_identification_of_non_eligible_tables,
164167
kLocalPersisted,
165168
false,
@@ -1917,27 +1920,32 @@ void CatalogManager::FindAllNonEligibleTablesInCDCSDKStream(
19171920
for (const auto& table_id : table_ids) {
19181921
if (!user_table_ids.contains(table_id)) {
19191922
auto table_info = GetTableInfoUnlocked(table_id);
1920-
Schema schema;
1921-
Status status = table_info->GetSchema(&schema);
1922-
if (!status.ok()) {
1923-
LOG_WITH_FUNC(WARNING) << "Error while getting schema for table: " << table_info->name();
1924-
// Skip this table for now, it will be revisited for removal on master restart/master leader
1925-
// change.
1926-
continue;
1927-
}
1923+
if (table_info) {
1924+
Schema schema;
1925+
Status status = table_info->GetSchema(&schema);
1926+
if (!status.ok()) {
1927+
LOG_WITH_FUNC(WARNING) << "Error while getting schema for table: " << table_info->name();
1928+
// Skip this table for now, it will be revisited for removal on master restart/master
1929+
// leader change.
1930+
continue;
1931+
}
19281932

1929-
// Re-confirm this table is not meant to be part of a CDC stream.
1930-
if (!IsTableEligibleForCDCSDKStream(table_info, schema)) {
1931-
LOG(INFO) << "Found a non-eligible table: " << table_info->id()
1932-
<< ", for stream: " << stream_id;
1933-
LockGuard lock(cdcsdk_non_eligible_table_mutex_);
1934-
namespace_to_cdcsdk_non_eligible_table_map_[table_info->namespace_id()].insert(
1935-
table_info->id());
1933+
// Re-confirm this table is not meant to be part of a CDC stream.
1934+
if (!IsTableEligibleForCDCSDKStream(table_info, schema)) {
1935+
LOG(INFO) << "Found a non-eligible table: " << table_info->id()
1936+
<< ", for stream: " << stream_id;
1937+
LockGuard lock(cdcsdk_non_eligible_table_mutex_);
1938+
namespace_to_cdcsdk_non_eligible_table_map_[table_info->namespace_id()].insert(
1939+
table_info->id());
1940+
} else {
1941+
// Ideally we are not expected to enter the else clause.
1942+
LOG(WARNING) << "Found table " << table_id << " in metadata of stream " << stream_id
1943+
<< " that is not present in the eligible list of tables "
1944+
"from the namespace for CDC";
1945+
}
19361946
} else {
1937-
// Ideally we are not expected to enter the else clause.
1938-
LOG(WARNING) << "Found table " << table_id << " in metadata of stream " << stream_id
1939-
<< " that is not present in the eligible list of tables "
1940-
"from the namespace for CDC";
1947+
LOG(INFO) << "Found table " << table_id << " in stream " << stream_id
1948+
<< " metadata that is not present in master.";
19411949
}
19421950
}
19431951
}
@@ -6871,8 +6879,9 @@ void CatalogManager::RunXReplBgTasks(const LeaderEpoch& epoch) {
68716879
// Clean up Failed Replication Bootstrap on the Consumer.
68726880
WARN_NOT_OK(ClearFailedReplicationBootstrap(), "Failed Clearing Failed Replication Bootstrap");
68736881

6874-
WARN_NOT_OK(
6875-
CleanUpCDCSDKStreamsMetadata(epoch), "Failed Cleanup CDCSDK Streams Metadata");
6882+
if (!FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) {
6883+
WARN_NOT_OK(CleanUpCDCSDKStreamsMetadata(epoch), "Failed Cleanup CDCSDK Streams Metadata");
6884+
}
68766885

68776886
// Restart xCluster and CDCSDK parent tablet deletion bg task.
68786887
StartXReplParentTabletDeletionTaskIfStopped();

0 commit comments

Comments
 (0)