Skip to content

Commit cb64d5b

Browse files
committed
[yugabyte#8485] Refactor split op info to generic operation blocking mechanism
Summary: This diff removes split op id from replica state. This split op id was being used for the various purposes: 1) Block adding new write (and some other) operations to the Raft log after the split operation started to replicate. 2) Store data for child tablet ids. 3) Retain split op id in logs. It was changed to the following: 1) Introduced a generic mechanism to dynamically reject operations. 2) Store split op id and child tablet ids in the tablet metadata (superblock). This is a persistent format change. 3) Use log anchor registry. Also moved lease check higher by call stack and do it only for the last operation in batch. Removed an unused index of a multi index container (on the op id) from `RunningRetryableRequests`. Changes to duplicate request error handling =========================================== RetryableRequests is a subsystem we use for filtering out duplicate requests that could have been submitted during retries. All operations submitted to Raft are also registered with RetryableRequests. For a duplicate requests, we handle failure as follows. Either the Register function notifies the duplicate operation that it failed immediately (e.g. in case the original operation is known to have failed), or it waits for the original request to finish replicating and then notifies the duplicate operation of replication failure. After that the operation is removed from MvccManager and is rolled back in ReplicaState using its op id. Also, when we add operations to Raft consensus, we do it in batches, and if at least one operation fails to get added, we fail the entire batch. There are two issues with the interaction between these mechanisms. 1. A duplicate request might be waiting for the original operation to finish replicating, and therefore the duplicate request is not deleted from MvccManager right away. As a result, an operation with the same op id could be added to MvccManager. Prior to the recent commit 1aece37 (D11371), this was not an issue, because MvccManager only contained hybrid time (but no op ids) and the duplicate operation's hybrid time was greater than the original operation's hybrid time, so the read path was not affected (and we allow handling aborts in any order). 2. If the operation that fails is not the first operation in its batch (which would mean we've already called RetryableRequests::Register on some earlier operations in the batch), we call ReplicationFinished for all preceding operations in the batch, and no one would notify RetryableRequests about the completion of those preceding operations, so they would stay in RetryableRequests indefinitely. The above issues became more frequent with this diff because we now check the lease after adding all operations. In the most frequent case when we have a valid leader lease, with the new behavior we will spend less time checking lease, and would not be reading the system clock repeatedly. However, the new logic might be slightly suboptimal in the rare case when the lease has expired. The second scenario above has become more frequent with the changes in this diff (but before the fix described below) because with the lease check happening after adding all operations in the batch, we would have added all the operations to RetryableRequests. Prior to this diff, for operations to get stuck in RetryableRequests in this way, the lease check would need to succeed e.g. for the first operation so it would get added to RetryableRequests, but then it would need to fail for the second operation, and this combination of circumstances is very rare. To fix the above issue, in this diff we handle 3 different cases: 1. Rounds that were rejected by RetryableRequests. We should not call ReplicationFinished for them as it would have been already called by Register. 2. Rounds that were registered with RetryableRequests. We should call NotifyReplicationFinishedUnlocked for them, which also notifies RetryableRequests of the operation failure. 3. Rounds that were not registered with retryable requests. We should call ReplicationFinished directly for them. Test Plan: Jenkins Reviewers: timur, mbautin Reviewed By: mbautin Subscribers: mbautin, bogdan, ybase Differential Revision: https://phabricator.dev.yugabyte.com/D11590
1 parent 04293a4 commit cb64d5b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+972
-1051
lines changed

src/yb/consensus/consensus.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,12 @@ Status Consensus::ExecuteHook(HookPoint point) {
128128
return Status::OK();
129129
}
130130

131-
Result<yb::OpId> Consensus::GetLastOpId(OpIdType type) {
131+
Result<OpId> Consensus::GetLastOpId(OpIdType type) {
132132
switch (type) {
133133
case OpIdType::RECEIVED_OPID:
134134
return GetLastReceivedOpId();
135135
case OpIdType::COMMITTED_OPID:
136136
return GetLastCommittedOpId();
137-
case OpIdType::SPLIT_OPID:
138-
return GetSplitOpId();
139137
case OpIdType::UNKNOWN_OPID_TYPE:
140138
break;
141139
}

src/yb/consensus/consensus.h

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,6 @@
6262

6363
namespace yb {
6464

65-
namespace log {
66-
class Log;
67-
}
68-
6965
namespace master {
7066
class SysCatalogTable;
7167
}
@@ -101,12 +97,6 @@ struct ConsensusBootstrapInfo {
10197
// The id of the last committed operation in the log.
10298
OpIdPB last_committed_id;
10399

104-
// Parameters of the split operation added to Raft log and designated for this tablet (in general
105-
// case Raft log the tablet could contain old split operations designated for it ancestor tablets,
106-
// those are not reflected here).
107-
// See comments for ReplicateState::split_op_with_tablet_ids_.
108-
SplitOpInfo split_op_info;
109-
110100
// REPLICATE messages which were in the log with no accompanying
111101
// COMMIT. These need to be passed along to consensus init in order
112102
// to potentially commit them.
@@ -240,7 +230,7 @@ class Consensus {
240230
virtual CHECKED_STATUS TEST_Replicate(const ConsensusRoundPtr& round) = 0;
241231

242232
// A batch version of Replicate, which is what we try to use as much as possible for performance.
243-
virtual CHECKED_STATUS ReplicateBatch(ConsensusRounds* rounds) = 0;
233+
virtual CHECKED_STATUS ReplicateBatch(const ConsensusRounds& rounds) = 0;
244234

245235
// Messages sent from LEADER to FOLLOWERS and LEARNERS to update their
246236
// state machines. This is equivalent to "AppendEntries()" in Raft
@@ -336,14 +326,6 @@ class Consensus {
336326

337327
virtual yb::OpId GetLastAppliedOpId() = 0;
338328

339-
// Return the ID of the split operation requesting to split this Raft group if it has been added
340-
// to Raft log and uninitialized OpId otherwise.
341-
virtual yb::OpId GetSplitOpId() = 0;
342-
343-
// Return split child tablet IDs if split operation has been added to Raft log and array of empty
344-
// tablet IDs otherwise.
345-
virtual std::array<TabletId, kNumSplitParts> GetSplitChildTabletIds() = 0;
346-
347329
// Assuming we are the leader, wait until we have a valid leader lease (i.e. the old leader's
348330
// lease has expired, and we have replicated a new lease that has not expired yet).
349331
virtual CHECKED_STATUS WaitForLeaderLeaseImprecise(CoarseTimePoint deadline) = 0;
@@ -520,8 +502,8 @@ class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
520502

521503
// Returns the id of the (replicate) operation this context
522504
// refers to. This is only set _after_ Consensus::Replicate(context).
523-
OpIdPB id() const {
524-
return replicate_msg_->id();
505+
OpId id() const {
506+
return OpId::FromPB(replicate_msg_->id());
525507
}
526508

527509
// Register a callback that is called by Consensus to notify that the round
@@ -549,7 +531,6 @@ class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
549531
// other than 'term'.
550532
// See CheckBoundTerm().
551533
void BindToTerm(int64_t term) {
552-
DCHECK_EQ(bound_term_, kUnboundTerm);
553534
bound_term_ = term;
554535
}
555536

src/yb/consensus/consensus.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,6 @@ enum OpIdType {
542542
UNKNOWN_OPID_TYPE = 0;
543543
RECEIVED_OPID = 1;
544544
COMMITTED_OPID = 2;
545-
SPLIT_OPID = 3;
546545
}
547546

548547
message GetLastOpIdRequestPB {
@@ -554,6 +553,8 @@ message GetLastOpIdRequestPB {
554553

555554
// Whether to return the last-received or last-committed OpId.
556555
optional OpIdType opid_type = 3 [ default = RECEIVED_OPID ];
556+
557+
optional OperationType op_type = 4;
557558
}
558559

559560
message GetLastOpIdResponsePB {

src/yb/consensus/consensus_context.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "yb/common/common_fwd.h"
1818

1919
#include "yb/consensus/consensus_fwd.h"
20+
#include "yb/consensus/consensus.pb.h"
2021

2122
#include "yb/util/result.h"
2223

@@ -69,6 +70,10 @@ class ConsensusContext {
6970
// Listener could be set only once and then reset.
7071
virtual void ListenNumSSTFilesChanged(std::function<void()> listener) = 0;
7172

73+
// Checks whether operation with provided op id and type could be added to the log.
74+
virtual CHECKED_STATUS CheckOperationAllowed(
75+
const OpId& op_id, consensus::OperationType op_type) = 0;
76+
7277
virtual ~ConsensusContext() = default;
7378
};
7479

src/yb/consensus/consensus_fwd.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,6 @@
1919
#include "yb/util/strongly_typed_bool.h"
2020

2121
namespace yb {
22-
23-
namespace log {
24-
25-
class Log;
26-
27-
} // namespace log
28-
2922
namespace consensus {
3023

3124
class Consensus;

src/yb/consensus/consensus_peers.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,6 @@ class Messenger;
6262
class PeriodicTimer;
6363
}
6464

65-
namespace log {
66-
class Log;
67-
}
68-
6965
namespace consensus {
7066

7167
// A peer in consensus (local or remote).

src/yb/consensus/consensus_queue.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ DEFINE_validator(rpc_throttle_threshold_bytes, &RpcThrottleThresholdBytesValidat
147147
namespace yb {
148148
namespace consensus {
149149

150-
using log::AsyncLogReader;
151150
using log::Log;
152151
using std::unique_ptr;
153152
using rpc::Messenger;
@@ -1604,5 +1603,10 @@ void PeerMessageQueue::TrackOperationsMemory(const OpIds& op_ids) {
16041603
log_cache_.TrackOperationsMemory(op_ids);
16051604
}
16061605

1606+
Result<OpId> PeerMessageQueue::TEST_GetLastOpIdWithType(
1607+
int64_t max_allowed_index, OperationType op_type) {
1608+
return log_cache_.TEST_GetLastOpIdWithType(max_allowed_index, op_type);
1609+
}
1610+
16071611
} // namespace consensus
16081612
} // namespace yb

src/yb/consensus/consensus_queue.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,6 @@ class MemTracker;
6363
class MetricEntity;
6464
class ThreadPoolToken;
6565

66-
namespace log {
67-
class Log;
68-
class AsyncLogReader;
69-
}
70-
7166
namespace consensus {
7267
class PeerMessageQueueObserver;
7368
struct MajorityReplicatedData;
@@ -378,6 +373,8 @@ class PeerMessageQueue {
378373
return clock_;
379374
}
380375

376+
Result<OpId> TEST_GetLastOpIdWithType(int64_t max_allowed_index, OperationType op_type);
377+
381378
private:
382379
FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex);
383380
FRIEND_TEST(ConsensusQueueTest, TestReadReplicatedMessagesForCDC);

src/yb/consensus/consensus_types.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,19 @@ class ConsensusAppendCallback {
3636
// committed_op_id - committed operation id.
3737
//
3838
// Should initialize appropriate replicate message.
39-
virtual void HandleConsensusAppend(const yb::OpId& op_id, const yb::OpId& committed_op_id) = 0;
39+
virtual void HandleConsensusAppend(const OpId& op_id, const OpId& committed_op_id) = 0;
40+
41+
// Invoked when appropriate operation failed to replicate.
42+
virtual void ReplicationFinished(
43+
const Status& status, int64_t leader_term, OpIds* applied_op_ids) = 0;
44+
4045
virtual ~ConsensusAppendCallback() {}
4146
};
4247

4348
struct ConsensusOptions {
4449
std::string tablet_id;
4550
};
4651

47-
struct SplitOpInfo {
48-
OpId op_id;
49-
std::array<TabletId, kNumSplitParts> child_tablet_ids;
50-
};
51-
5252
} // namespace consensus
5353
} // namespace yb
5454

src/yb/consensus/log.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ Status Log::RollOver() {
642642
return Status::OK();
643643
}
644644

645-
Status Log::Reserve(LogEntryTypePB type,
645+
void Log::Reserve(LogEntryTypePB type,
646646
LogEntryBatchPB* entry_batch,
647647
LogEntryBatch** reserved_entry) {
648648
TRACE_EVENT0("log", "Log::Reserve");
@@ -669,7 +669,6 @@ Status Log::Reserve(LogEntryTypePB type,
669669
// TODO (perf) Use a ring buffer instead of a blocking queue and set
670670
// 'reserved_entry' to a pre-allocated slot in the buffer.
671671
*reserved_entry = new_entry_batch.release();
672-
return Status::OK();
673672
}
674673

675674
Status Log::TEST_AsyncAppendWithReplicates(
@@ -715,7 +714,7 @@ Status Log::AsyncAppendReplicates(const ReplicateMsgs& msgs, const yb::OpId& com
715714
}
716715

717716
LogEntryBatch* reserved_entry_batch;
718-
RETURN_NOT_OK(Reserve(REPLICATE, &batch, &reserved_entry_batch));
717+
Reserve(REPLICATE, &batch, &reserved_entry_batch);
719718

720719
// If we're able to reserve, set the vector of replicate shared pointers in the LogEntryBatch.
721720
// This will make sure there's a reference for each replicate while we're appending.
@@ -1052,7 +1051,7 @@ Status Log::WaitUntilAllFlushed() {
10521051
LogEntryBatchPB entry_batch;
10531052
entry_batch.add_entry()->set_type(log::FLUSH_MARKER);
10541053
LogEntryBatch* reserved_entry_batch;
1055-
RETURN_NOT_OK(Reserve(FLUSH_MARKER, &entry_batch, &reserved_entry_batch));
1054+
Reserve(FLUSH_MARKER, &entry_batch, &reserved_entry_batch);
10561055
Synchronizer s;
10571056
RETURN_NOT_OK(AsyncAppend(reserved_entry_batch, s.AsStatusCallback()));
10581057
return s.Wait();

0 commit comments

Comments
 (0)