Skip to content

Commit 5cdc1b5

Browse files
committed
[yugabyte#26880] DocDB: Prevent deadlock between txn load and compaction that causes peer to enter a stuck state
Summary: **Problem** Background compactions using a filter (for instance, tables where cdc is enabled) could deadlock with the transaction loader causing the peer to enter into a stuck state. This is because compaction can wait for the loader to finish with `RunningTransactionContext::mutex_` held, and the loader could wait on the same mutex causing a deadlock. **Issue seen** In one of the internal clusters, we saw 2 peers stuck with the following traces unable to process any consensus operations. The peer gets stuck trying to process a replicated op and holds up `ReplicaState::update_lock_`, and waits for the loader thread to complete. ``` @ 0x7fcfb39f27a9 __pthread_cond_timedwait @ 0x555e3409bcd4 yb::tablet::TransactionLoader::WaitLoaded() @ 0x555e33ff760e yb::tablet::Tablet::ApplyKeyValueRowOperations() @ 0x555e33ff6dce yb::tablet::Tablet::ApplyOperation() @ 0x555e33ff69f2 yb::tablet::Tablet::ApplyRowOperations() @ 0x555e33fafeed yb::tablet::WriteOperation::DoReplicated() @ 0x555e33fa129d yb::tablet::Operation::Replicated() @ 0x555e33fa3690 yb::tablet::OperationDriver::ReplicationFinished() @ 0x555e32f29082 yb::consensus::ConsensusRound::NotifyReplicationFinished() @ 0x555e32f81091 yb::consensus::ReplicaState::ApplyPendingOperationsUnlocked() @ 0x555e32f8039a yb::consensus::ReplicaState::AdvanceCommittedOpIdUnlocked() @ 0x555e32f65d49 yb::consensus::RaftConsensus::UpdateReplica() @ 0x555e32f41e4f yb::consensus::RaftConsensus::Update() @ 0x555e3437f846 yb::tserver::ConsensusServiceImpl::UpdateConsensus() @ 0x555e32fce4f8 std::__1::__function::__func<>::operator()() @ 0x555e32fcf0fe yb::consensus::ConsensusServiceIf::Handle() Total number of threads: 2 ``` The loader thread waits on `RunningTransactionContext::mutex_` ``` @ 0x7fcfb39f582a __lll_lock_wait @ 0x7fcfb39eead8 __GI___pthread_mutex_lock @ 0x555e340a9003 yb::tablet::TransactionParticipant::Impl::LoadTransaction() @ 0x555e340984ce yb::tablet::TransactionLoader::Executor::Execute() @ 0x555e347d9cd8 yb::Thread::SuperviseThread() @ 0x7fcfb39ec1c9 start_thread @ 0x7fcfb3c3de72 __GI___clone Total number of threads: 2 ``` and `RunningTransactionContext::mutex_` is held by the background compaction thread that is waiting for the loader to complete, causing a deadlock. ``` @ 0x7fcfb39f27a9 __pthread_cond_timedwait @ 0x555e3409bcd4 yb::tablet::TransactionLoader::WaitLoaded() @ 0x555e340ae872 yb::tablet::TransactionParticipant::Cleanup() @ 0x555e330db3bf yb::docdb::(anonymous namespace)::DocDBIntentsCompactionFilter::CompactionFinished() @ 0x555e33c79d65 rocksdb::CompactionJob::ProcessKeyValueCompaction() @ 0x555e33c76f6a rocksdb::CompactionJob::Run() @ 0x555e33cadea9 rocksdb::DBImpl::BackgroundCompaction() @ 0x555e33cabd95 rocksdb::DBImpl::BackgroundCallCompaction() @ 0x555e347a847b yb::(anonymous namespace)::PriorityThreadPoolWorker::Run() @ 0x555e347d9cd8 yb::Thread::SuperviseThread() @ 0x7fcfb39ec1c9 start_thread @ 0x7fcfb3c3de72 __GI___clone Total number of threads: 2 ``` **Fix** This revision addresses the issue by executing `WaitLoaded` outside scope of `RunningTransactionContext::mutex_` Jira: DB-16294 Test Plan: Jenkins ./yb_build.sh --cxx-test cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestCompactionDoesntDeadlockWithTxnLoader The test fails without the changes. Reviewers: sergei, esheng Reviewed By: sergei Subscribers: ybase Differential Revision: https://phorge.dev.yugabyte.com/D43354
1 parent 0816b2f commit 5cdc1b5

File tree

5 files changed

+77
-2
lines changed

5 files changed

+77
-2
lines changed

src/yb/integration-tests/cdcsdk_ysql-test.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2724,6 +2724,62 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestIntentCountPersistencyAfterCo
27242724
ASSERT_EQ(final_record_size, 0);
27252725
}
27262726

2727+
TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCompactionDoesntDeadlockWithTxnLoader)) {
2728+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_retryable_request_timeout_secs) = 0;
2729+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
2730+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0;
2731+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_aborted_intent_cleanup_ms) = 1000;
2732+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_save_index_into_wal_segments) = true;
2733+
2734+
ASSERT_OK(SetUpWithParams(1, 1, false));
2735+
const uint32_t num_tablets = 1;
2736+
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets));
2737+
2738+
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
2739+
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr));
2740+
ASSERT_EQ(tablets.size(), num_tablets);
2741+
2742+
TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));
2743+
xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT));
2744+
2745+
auto resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets));
2746+
ASSERT_FALSE(resp.has_error());
2747+
2748+
// Insert some records in transaction.
2749+
ASSERT_OK(WriteRowsHelper(0 /* start */, 10 /* end */, &test_cluster_, true));
2750+
ASSERT_OK(WriteRowsHelper(10 /* start */, 20 /* end */, &test_cluster_, true));
2751+
ASSERT_OK(FlushTable(table.table_id()));
2752+
2753+
GetChangesResponsePB change_resp_1;
2754+
ASSERT_OK(WaitForGetChangesToFetchRecords(&change_resp_1, stream_id, tablets, 20));
2755+
ASSERT_OK(WaitForPostApplyMetadataWritten(2 /* expected_num_transactions */));
2756+
2757+
ASSERT_OK(FlushTable(table.table_id()));
2758+
auto* tablet_server = test_cluster()->mini_tablet_server(0);
2759+
2760+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_skip_process_apply) = true;
2761+
2762+
yb::SyncPoint::GetInstance()->LoadDependency({
2763+
{"TransactionParticipant::Impl::Cleanup", "TransactionLoader::Executor::Start"}});
2764+
yb::SyncPoint::GetInstance()->ClearTrace();
2765+
yb::SyncPoint::GetInstance()->EnableProcessing();
2766+
ASSERT_OK(tablet_server->Restart());
2767+
2768+
std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms));
2769+
auto status_future = std::async(std::launch::async, [&]() -> Status {
2770+
return tablet_server->CompactTablets(docdb::SkipFlush::kFalse);
2771+
});
2772+
2773+
// Compaction shouldn't deadlock with the txn loader thread.
2774+
CHECK_OK(
2775+
WaitFor([&]() {
2776+
return status_future.wait_for(0s) == std::future_status::ready;
2777+
},
2778+
15s * kTimeMultiplier,
2779+
"Compaction taking longer, probably deadlocked with txn load."));
2780+
ASSERT_OK(status_future.get());
2781+
}
2782+
27272783
// https://github.com/yugabyte/yugabyte-db/issues/19385
27282784
TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestLogGCForNewTablesAddedAfterCreateStream)) {
27292785
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 100000;

src/yb/integration-tests/cdcsdk_ysql_test_base.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ DECLARE_bool(cdc_use_byte_threshold_for_vwal_changes);
135135
DECLARE_bool(ysql_enable_pg_export_snapshot);
136136
DECLARE_bool(ysql_yb_enable_consistent_replication_from_hash_range);
137137
DECLARE_uint64(cdcsdk_update_restart_time_interval_secs);
138+
DECLARE_int32(retryable_request_timeout_secs);
139+
DECLARE_bool(save_index_into_wal_segments);
140+
DECLARE_bool(TEST_skip_process_apply);
138141

139142
namespace yb {
140143

src/yb/tablet/transaction_loader.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "yb/util/operation_counter.h"
3434
#include "yb/util/pb_util.h"
3535
#include "yb/util/scope_exit.h"
36+
#include "yb/util/sync_point.h"
3637
#include "yb/util/thread.h"
3738

3839
using namespace std::literals;
@@ -108,6 +109,7 @@ class TransactionLoader::Executor {
108109
});
109110

110111
LOG_WITH_PREFIX(INFO) << "Load transactions start";
112+
DEBUG_ONLY_TEST_SYNC_POINT("TransactionLoader::Executor::Start");
111113

112114
status = LoadPendingApplies();
113115
if (!status.ok()) {
@@ -432,6 +434,17 @@ Status TransactionLoader::WaitLoaded(const TransactionId& id) NO_THREAD_SAFETY_A
432434
return load_status_;
433435
}
434436

437+
Status TransactionLoader::WaitLoaded(const TransactionIdApplyOpIdMap& txns) {
438+
if (txns.empty() || RSTATUS_DCHECK_RESULT(Completed())) {
439+
return Status::OK();
440+
}
441+
const auto& max_txn = std::max_element(
442+
txns.begin(), txns.end(), [](const auto& lhs, const auto& rhs) {
443+
return lhs.first < rhs.first;
444+
});
445+
return WaitLoaded(max_txn->first);
446+
}
447+
435448
// Disable thread safety analysis because std::unique_lock is used.
436449
Status TransactionLoader::WaitAllLoaded() NO_THREAD_SAFETY_ANALYSIS {
437450
// WaitAllLoaded is only invoked when opening a tablet.

src/yb/tablet/transaction_loader.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ class TransactionLoader {
8787
}
8888

8989
Status WaitLoaded(const TransactionId& id);
90+
Status WaitLoaded(const TransactionIdApplyOpIdMap& txns);
9091
Status WaitAllLoaded();
9192

9293
std::optional<docdb::ApplyStateWithCommitInfo> GetPendingApply(const TransactionId& id) const

src/yb/tablet/transaction_participant.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -804,15 +804,17 @@ class TransactionParticipant::Impl
804804
}
805805

806806
Status Cleanup(TransactionIdApplyOpIdMap&& txns, TransactionStatusManager* status_manager) {
807+
DEBUG_ONLY_TEST_SYNC_POINT("TransactionParticipant::Impl::Cleanup");
808+
// Execute WaitLoaded outside of this->mutex_, else there's possibility of a deadlock since
809+
// the loader needs this->mutex_ in TransactionLoaderContext::LoadTransaction to finish load.
810+
RETURN_NOT_OK(loader_.WaitLoaded(txns));
807811
TransactionIdSet set;
808812
{
809813
std::lock_guard lock(mutex_);
810814
const OpId& cdcsdk_checkpoint_op_id = GetLatestCheckPointUnlocked();
811815

812816
if (cdcsdk_checkpoint_op_id != OpId::Max()) {
813817
for (const auto& [transaction_id, apply_op_id] : txns) {
814-
RETURN_NOT_OK(loader_.WaitLoaded(transaction_id));
815-
816818
const OpId* apply_record_op_id = &apply_op_id;
817819
if (!apply_op_id.valid()) {
818820
// Apply op id is unknown -- may be from before upgrade to version that writes

0 commit comments

Comments
 (0)