Skip to content

Commit 01c8e67

Browse files
[BACKPORT 2024.1][yugabyte#19294] CDCSDK: Refactor reading of WAL messages for consistent CDC
Summary: ####Backport Description Minor merge conflict was encountered in cdcsdk_test_base.h at the declaration of test flag. ####Original Description In a workload with single shard transactions, it was observed that WAL contained operations that were replicated but not committed. In such a scenario an empty batch is returned by `ReadReplicatedMessagesForCDC` if the from_op_id is same as the commited_op_id. CDCSDK producer ends up updating the safe time. Hence in the next GetChanges call, the previously uncommitted messages get filtered out. This can lead to data loss in CDC operations. This diff introduces a new method `ReadReplicatedMessagesForConsistentCDC` which takes consistent_stream_safe_time as a parameter and reads the WAL until the committed_op_id or till we encounter a message with hybrid time greater than consistent_stream_safe_time. This method checks whether committed_op_id and majority_replicated_op_id are same. If they differ, it waits in a loop until both become equal. However in the case where we reach close to the deadline and the mismatch between committed_op_id and majority_replicated_op_id still exists, in order to prevent GetChanges rpc timeout `ReadReplicatedMessagesForConsistentCDC` will return an empty batch of messages with an indication about the mismatch. In such a scenario cdcsdk producer will not update its safe time. The GetChanges response will have the same safe_hybrid_time as the request in this case. Jira: DB-8105 Original commit: a0264d7 / D30420 Test Plan: Existing CDC tests and QA tests ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestWithMajorityReplicatedButNonCommittedSingleShardTxn Reviewers: vkushwaha, skumar, sergei, asrinivasan, stiwary, siddharth.shah Reviewed By: siddharth.shah Subscribers: ycdcxcluster, ybase, bogdan, sergei Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D37733
1 parent 9d88571 commit 01c8e67

File tree

8 files changed

+245
-66
lines changed

8 files changed

+245
-66
lines changed

src/yb/cdc/cdcsdk_producer.cc

Lines changed: 47 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1984,64 +1984,65 @@ Status GetConsistentWALRecords(
19841984
<< ", last_seen_op_id: " << last_seen_op_id->ToString()
19851985
<< ", historical_max_op_id: " << historical_max_op_id;
19861986
auto consensus = VERIFY_RESULT(tablet_peer->GetConsensus());
1987-
do {
1988-
auto read_ops = VERIFY_RESULT(consensus->ReadReplicatedMessagesForCDC(
1989-
*last_seen_op_id, *last_readable_opid_index, deadline));
1990-
1991-
if (read_ops.messages.empty()) {
1992-
VLOG_WITH_FUNC(1) << "Did not get any messages with current batch of 'read_ops'."
1993-
<< "last_seen_op_id: " << *last_seen_op_id << ", last_readable_opid_index "
1994-
<< **last_readable_opid_index;
1995-
break;
1996-
}
1997-
1998-
if (read_ops.read_from_disk_size && mem_tracker) {
1999-
(*consumption) = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size);
2000-
}
1987+
// Read the committed WAL messages with hybrid time <= consistent_stream_safe_time. If there exist
1988+
// messages in the WAL which are replicated but not yet committed,
1989+
// ReadReplicatedMessagesForConsistentCDC waits for them to get committed and eventually includes
1990+
// them in the result.
1991+
auto read_ops = VERIFY_RESULT(consensus->ReadReplicatedMessagesForConsistentCDC(
1992+
*last_seen_op_id, consistent_safe_time, deadline, false, *last_readable_opid_index));
20011993

2002-
for (const auto& msg : read_ops.messages) {
2003-
last_seen_op_id->term = msg->id().term();
2004-
last_seen_op_id->index = msg->id().index();
2005-
2006-
if (IsIntent(msg) || (IsUpdateTransactionOp(msg) &&
2007-
msg->transaction_state().status() != TransactionStatus::APPLYING)) {
2008-
continue;
2009-
}
1994+
if (read_ops.read_from_disk_size && mem_tracker) {
1995+
(*consumption) = ScopedTrackedConsumption(mem_tracker, read_ops.read_from_disk_size);
1996+
}
20101997

2011-
if (VLOG_IS_ON(3) && IsUpdateTransactionOp(msg) &&
2012-
msg->transaction_state().status() == TransactionStatus::APPLYING) {
2013-
auto txn_id =
2014-
VERIFY_RESULT(FullyDecodeTransactionId(msg->transaction_state().transaction_id()));
2015-
VLOG(3) << "Read transaction in WAL on "
2016-
<< "tablet_id: " << tablet_peer->tablet_id() << ", transaction_id: " << txn_id
2017-
<< ", OpId: " << msg->id().term() << "." << msg->id().index()
2018-
<< ", commit_time: " << GetTransactionCommitTime(msg)
2019-
<< ", consistent safe_time: " << consistent_safe_time
2020-
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req;
2021-
} else if (VLOG_IS_ON(3)) {
2022-
VLOG(3) << "Read WAL msg on "
2023-
<< "tablet_id: " << tablet_peer->tablet_id() << ", op_type: " << msg->op_type()
2024-
<< ", OpId: " << msg->id().term() << "." << msg->id().index()
2025-
<< ", commit_time: " << GetTransactionCommitTime(msg)
2026-
<< ", consistent safe_time: " << consistent_safe_time
2027-
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req;
2028-
}
1998+
for (const auto& msg : read_ops.messages) {
1999+
last_seen_op_id->term = msg->id().term();
2000+
last_seen_op_id->index = msg->id().index();
20292001

2030-
all_checkpoints->push_back(msg);
2031-
consistent_wal_records->push_back(msg);
2002+
if (IsIntent(msg) || (IsUpdateTransactionOp(msg) &&
2003+
msg->transaction_state().status() != TransactionStatus::APPLYING)) {
2004+
continue;
20322005
}
20332006

2034-
if (read_ops.messages.size() > 0) {
2035-
*msgs_holder = consensus::ReplicateMsgsHolder(
2036-
nullptr, std::move(read_ops.messages), std::move((*consumption)));
2007+
if (VLOG_IS_ON(3) && IsUpdateTransactionOp(msg) &&
2008+
msg->transaction_state().status() == TransactionStatus::APPLYING) {
2009+
auto txn_id =
2010+
VERIFY_RESULT(FullyDecodeTransactionId(msg->transaction_state().transaction_id()));
2011+
VLOG(3) << "Read transaction in WAL on "
2012+
<< "tablet_id: " << tablet_peer->tablet_id() << ", transaction_id: " << txn_id
2013+
<< ", OpId: " << msg->id().term() << "." << msg->id().index()
2014+
<< ", commit_time: " << GetTransactionCommitTime(msg)
2015+
<< ", consistent safe_time: " << consistent_safe_time
2016+
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req;
2017+
} else if (VLOG_IS_ON(3)) {
2018+
VLOG(3) << "Read WAL msg on "
2019+
<< "tablet_id: " << tablet_peer->tablet_id() << ", op_type: " << msg->op_type()
2020+
<< ", OpId: " << msg->id().term() << "." << msg->id().index()
2021+
<< ", commit_time: " << GetTransactionCommitTime(msg)
2022+
<< ", consistent safe_time: " << consistent_safe_time
2023+
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req;
20372024
}
2038-
} while (((*last_readable_opid_index) && last_seen_op_id->index < **last_readable_opid_index));
2025+
2026+
all_checkpoints->push_back(msg);
2027+
consistent_wal_records->push_back(msg);
2028+
}
2029+
2030+
if (read_ops.messages.size() > 0) {
2031+
*msgs_holder = consensus::ReplicateMsgsHolder(
2032+
nullptr, std::move(read_ops.messages), std::move((*consumption)));
2033+
}
20392034

20402035
// Handle the case where WAL doesn't have the apply record for all the committed transactions.
20412036
if (historical_max_op_id.valid() && historical_max_op_id > *last_seen_op_id) {
20422037
(*wait_for_wal_update) = true;
20432038
}
20442039

2040+
if (consistent_wal_records->empty() && read_ops.have_more_messages) {
2041+
VLOG(1) << "Received empty read_ops with have_more_messages set to true, indicating presence "
2042+
"of replicated but not committed records in the WAL";
2043+
*wait_for_wal_update = true;
2044+
}
2045+
20452046
SortConsistentWALRecords(consistent_wal_records);
20462047
VLOG_WITH_FUNC(1) << "Got a total of " << consistent_wal_records->size() << " WAL records "
20472048
<< "in the current segment";

src/yb/consensus/consensus.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,11 @@ class Consensus {
341341
const yb::OpId& from, int64_t* repl_index, const CoarseTimePoint deadline,
342342
const bool fetch_single_entry = false) = 0;
343343

344+
// Read all the committed messages for CDC producer.
345+
virtual Result<ReadOpsResult> ReadReplicatedMessagesForConsistentCDC(
346+
OpId from, uint64_t stream_safe_time, CoarseTimePoint deadline,
347+
bool fetch_single_entry = false, int64_t* repl_index = nullptr) = 0;
348+
344349
virtual void UpdateCDCConsumerOpId(const yb::OpId& op_id) = 0;
345350

346351
protected:

src/yb/consensus/consensus_queue.cc

Lines changed: 111 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,13 @@ DEFINE_RUNTIME_uint32(max_remote_bootstrap_attempts_from_non_leader, 5,
134134
DEFINE_test_flag(bool, assert_remote_bootstrap_happens_from_same_zone, false,
135135
"Assert that remote bootstrap is served by a peer in the same zone as the new peer.");
136136

137+
DEFINE_test_flag(bool, stop_committed_op_id_updation, false,
138+
"Test flag to stop the updation of committed_op_id");
139+
140+
DEFINE_RUNTIME_uint32(cdcsdk_wal_reads_deadline_buffer_secs, 5,
141+
"This flag determines the buffer time from the deadline at which we must stop reading the WAL "
142+
"messages and start processing the records we have read till now.");
143+
137144
namespace yb {
138145
namespace consensus {
139146

@@ -729,6 +736,25 @@ Result<ReadOpsResult> PeerMessageQueue::ReadFromLogCache(
729736
return result;
730737
}
731738

739+
Result<ReadOpsResult> PeerMessageQueue::ReadFromLogCacheForCDC(
740+
OpId last_op_id, int64_t to_index, CoarseTimePoint deadline, bool fetch_single_entry) {
741+
// If an empty OpID is only sent on the first read request, start at the earliest known entry.
742+
int64_t after_op_index =
743+
last_op_id.empty() ? max<int64_t>(log_cache_.earliest_op_index(), 0)
744+
: last_op_id.index;
745+
746+
auto result = ReadFromLogCache(
747+
after_op_index, to_index, FLAGS_consensus_max_batch_size_bytes, local_peer_uuid_, deadline,
748+
fetch_single_entry);
749+
if (PREDICT_FALSE(!result.ok()) && PREDICT_TRUE(result.status().IsNotFound())) {
750+
const std::string premature_gc_warning = Format(
751+
"The logs from index $0 have been garbage collected and cannot be read ", after_op_index);
752+
LOG_WITH_PREFIX(WARNING) << premature_gc_warning;
753+
return result.status().CloneAndPrepend(premature_gc_warning);
754+
}
755+
return result;
756+
}
757+
732758
// Read majority replicated messages from cache for CDC.
733759
// CDC producer will use this to get the messages to send in response to cdc::GetChanges RPC.
734760
Result<ReadOpsResult> PeerMessageQueue::ReadReplicatedMessagesForCDC(
@@ -751,31 +777,95 @@ Result<ReadOpsResult> PeerMessageQueue::ReadReplicatedMessagesForCDC(
751777

752778
if (last_op_id.index >= to_index && !fetch_single_entry) {
753779
// Nothing to read.
754-
return ReadOpsResult();
780+
return ReadOpsResult {
781+
.messages = ReplicateMsgs(),
782+
.preceding_op = OpId(),
783+
.have_more_messages = HaveMoreMessages(pending_messages)
784+
};
755785
}
756786

757-
// If an empty OpID is only sent on the first read request, start at the earliest known entry.
758-
int64_t after_op_index = last_op_id.empty() ?
759-
max(log_cache_.earliest_op_index(), last_op_id.index) :
760-
last_op_id.index;
787+
auto result =
788+
VERIFY_RESULT(ReadFromLogCacheForCDC(last_op_id, to_index, deadline, fetch_single_entry));
789+
790+
result.have_more_messages =
791+
HaveMoreMessages(result.have_more_messages.get() || pending_messages);
761792

762-
auto result = ReadFromLogCache(
763-
after_op_index, to_index, FLAGS_consensus_max_batch_size_bytes, local_peer_uuid_, deadline,
764-
fetch_single_entry);
765-
if (PREDICT_FALSE(!result.ok()) && PREDICT_TRUE(result.status().IsNotFound())) {
766-
const std::string premature_gc_warning =
767-
Format("The logs from index $0 have been garbage collected and cannot be read ($1)",
768-
after_op_index, result.status());
769-
LOG_WITH_PREFIX(INFO) << premature_gc_warning;
770-
return STATUS(NotFound, premature_gc_warning);
771-
}
772-
if (result.ok()) {
773-
result->have_more_messages = HaveMoreMessages(result->have_more_messages.get() ||
774-
pending_messages);
775-
}
776793
return result;
777794
}
778795

796+
// Read all the commited messages from cache for CDC.
797+
// CDC producer will use these to get the messages to send in response to cdc::GetChanges RPC.
798+
Result<ReadOpsResult> PeerMessageQueue::ReadReplicatedMessagesForConsistentCDC(
799+
OpId last_op_id, uint64_t stream_safe_time, CoarseTimePoint deadline, bool fetch_single_entry,
800+
int64_t* repl_index) {
801+
auto res = ReadOpsResult();
802+
res.have_more_messages = HaveMoreMessages(false);
803+
int64_t committed_op_id_index;
804+
int64_t last_replicated_op_id_index;
805+
bool pending_messages = false;
806+
uint64_t last_read_hybrid_time = 0;
807+
808+
do {
809+
// Return if we reach close to the deadline, providing time for cdc producer and virtual WAL
810+
// to process the records.
811+
if (deadline - CoarseMonoClock::Now() <= FLAGS_cdcsdk_wal_reads_deadline_buffer_secs * 1s) {
812+
return res;
813+
}
814+
815+
{
816+
LockGuard lock(queue_lock_);
817+
// Use committed_op_id because it's already been processed by the Transaction codepath.
818+
committed_op_id_index = queue_state_.committed_op_id.index;
819+
// Determine if there are pending operations in RAFT but not yet LogCache.
820+
last_replicated_op_id_index = queue_state_.majority_replicated_op_id.index;
821+
pending_messages = committed_op_id_index != last_replicated_op_id_index;
822+
}
823+
824+
if (repl_index) {
825+
*repl_index = committed_op_id_index;
826+
}
827+
828+
if (last_op_id.index >= committed_op_id_index && !fetch_single_entry) {
829+
if (pending_messages) {
830+
// Wait for committed_op_id to match majority_replicated_op_id.
831+
res.have_more_messages = HaveMoreMessages(pending_messages);
832+
continue;
833+
} else {
834+
// Nothing to read.
835+
return ReadOpsResult{
836+
.messages = ReplicateMsgs(),
837+
.preceding_op = last_op_id,
838+
.have_more_messages = HaveMoreMessages::kFalse};
839+
}
840+
}
841+
842+
auto result = VERIFY_RESULT(
843+
ReadFromLogCacheForCDC(last_op_id, committed_op_id_index, deadline, fetch_single_entry));
844+
845+
res.messages.insert(res.messages.end(), result.messages.begin(), result.messages.end());
846+
res.read_from_disk_size += result.read_from_disk_size;
847+
pending_messages |= result.have_more_messages.get();
848+
res.have_more_messages = HaveMoreMessages(pending_messages);
849+
850+
if (res.messages.size() > 0) {
851+
auto msg = res.messages.back();
852+
last_op_id = OpId::FromPB(msg->id());
853+
last_read_hybrid_time = msg->hybrid_time();
854+
} else {
855+
// If an empty last_op_id is sent in the first read request, then ReadFromLogCacheForCDC reads
856+
// from the earliest known OpId. If this earliest known OpId turns out to be same as
857+
// committed_op_id then we receive an empty message list in the result. The earliest known
858+
// OpId is present in the preceding_op of the result. We update the last_op_id with this to
859+
// prevent unncessary looping.
860+
last_op_id = result.preceding_op;
861+
}
862+
863+
} while ((last_op_id.index < committed_op_id_index || pending_messages) &&
864+
last_read_hybrid_time <= stream_safe_time);
865+
866+
return res;
867+
}
868+
779869
const PeerMessageQueue::TrackedPeer* PeerMessageQueue::FindClosestPeerForBootstrap(
780870
const TrackedPeer* remote_tracked_peer) {
781871
const CloudInfoPB& src_cloud_info = remote_tracked_peer->cloud_info.value();
@@ -1602,7 +1692,8 @@ void PeerMessageQueue::NotifyObserversOfMajorityReplOpChangeTask(
16021692
{
16031693
LockGuard lock(queue_lock_);
16041694
if (!new_committed_op_id.empty() &&
1605-
new_committed_op_id.index > queue_state_.committed_op_id.index) {
1695+
new_committed_op_id.index > queue_state_.committed_op_id.index &&
1696+
!GetAtomicFlag(&FLAGS_TEST_stop_committed_op_id_updation)) {
16061697
queue_state_.committed_op_id = new_committed_op_id;
16071698
}
16081699
queue_state_.last_applied_op_id.MakeAtLeast(last_applied_op_id);

src/yb/consensus/consensus_queue.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,10 @@ class PeerMessageQueue {
401401
const CoarseTimePoint deadline = CoarseTimePoint::max(),
402402
const bool fetch_single_entry = false);
403403

404+
Result<ReadOpsResult> ReadReplicatedMessagesForConsistentCDC(
405+
OpId last_op_id, uint64_t stream_safe_time, CoarseTimePoint deadline,
406+
bool fetch_single_entry = false, int64_t* last_replicated_opid_index = nullptr);
407+
404408
void UpdateCDCConsumerOpId(const yb::OpId& op_id);
405409

406410
// Get the maximum op ID that can be evicted for CDC consumer from log cache.
@@ -578,6 +582,12 @@ class PeerMessageQueue {
578582
const CoarseTimePoint deadline = CoarseTimePoint::max(),
579583
const bool fetch_single_entry = false);
580584

585+
Result<ReadOpsResult> ReadFromLogCacheForCDC(
586+
OpId last_op_id,
587+
int64_t to_index,
588+
CoarseTimePoint deadline = CoarseTimePoint::max(),
589+
bool fetch_single_entry = false);
590+
581591
void TEST_WaitForNotificationToFinish();
582592

583593
std::vector<PeerMessageQueueObserver*> observers_;

src/yb/consensus/raft_consensus.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3640,6 +3640,13 @@ Result<ReadOpsResult> RaftConsensus::ReadReplicatedMessagesForCDC(
36403640
from, last_replicated_opid_index, deadline, fetch_single_entry);
36413641
}
36423642

3643+
Result<ReadOpsResult> RaftConsensus::ReadReplicatedMessagesForConsistentCDC(
3644+
OpId from, uint64_t stream_safe_time, CoarseTimePoint deadline, bool fetch_single_entry,
3645+
int64_t* last_replicated_opid_index) {
3646+
return queue_->ReadReplicatedMessagesForConsistentCDC(
3647+
from, stream_safe_time, deadline, fetch_single_entry, last_replicated_opid_index);
3648+
}
3649+
36433650
void RaftConsensus::UpdateCDCConsumerOpId(const yb::OpId& op_id) {
36443651
return queue_->UpdateCDCConsumerOpId(op_id);
36453652
}

src/yb/consensus/raft_consensus.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,13 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
276276
const CoarseTimePoint deadline = CoarseTimePoint::max(),
277277
const bool fetch_single_entry = false) override;
278278

279+
Result<ReadOpsResult> ReadReplicatedMessagesForConsistentCDC(
280+
OpId from,
281+
uint64_t stream_safe_time,
282+
CoarseTimePoint deadline,
283+
bool fetch_single_entry = false,
284+
int64_t* last_replicated_opid_index = nullptr) override;
285+
279286
void UpdateCDCConsumerOpId(const yb::OpId& op_id) override;
280287

281288
// Start memory tracking of following operation in case it is still present in our caches.

src/yb/integration-tests/cdcsdk_test_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ DECLARE_bool(ysql_enable_pack_full_row_update);
5252
DECLARE_bool(ysql_yb_enable_replica_identity);
5353
DECLARE_bool(ysql_enable_packed_row_for_colocated_table);
5454
DECLARE_bool(yb_enable_cdc_consistent_snapshot_streams);
55+
DECLARE_bool(TEST_stop_committed_op_id_updation);
5556

5657
namespace yb {
5758
using client::YBClient;

0 commit comments

Comments
 (0)