Skip to content

Commit 9a4b400

Browse files
committed
[yugabyte#4395] Prepared/replicated operation state ordering fix with a perf workaround for 64-core nodes
Summary: Commit f9f906a ("[yugabyte#4395] Fix SnapshotTxnTest.MultiWriteWithRestart flakiness") fixed an issue when a replicated operation could still be unprepared, and we were treating it as prepared. But it was causing a regression in a heavy write workload (MultiTableMultiIndexInserts in yb-sample-apps) on a 3-node cluster with 64-core i3.16xlarge nodes. So it was reverted by commit 1158e5f ("[yugabyte#4835] Revert operation driver logic changes causing a perf regression"). After an investigation it became clear that submitting an empty task to the preparer's thread pool helps avoid the performance regression. So in this diff we are restoring the above fix with this small addition that fixes the regression. Here is an explanation of why submitting an empty task improves performance. We are using a ThreadPool that uses a mutex when submitting a task, and many threads are trying to submit tasks to this thread pool. Waiting on this mutex sometimes takes a significant amount of time. So, when we submit an empty task, it works like a small delay. Since it is done while UpdateConsensus is being processed, it increases UpdateConsensus processing time at a follower. As a result the leader can accumulate a bigger UpdateConsensus request for the next call that result in fewer UpdateConsensus calls overall, i.e. better batching. This submission of an empty task to the thread pool is only acceptable as a temporary solution to the performance issue caused by the correctness fix. However, the prepared/replicated state ordering fix and performance of the MultiTableMultiIndexInserts workload on 64-core nodes are both important, so this diff implements the best solution available to us now. A better solution for the performance issue would be to implement a controlled UpdateConsensus delay that improves throughput. Test Plan: ybd tsan --gtest_filter SnapshotTxnTest.MultiWriteWithRestart -n 500 -- -p 2 Reviewers: mikhail Reviewed By: mikhail Subscribers: bogdan, ybase Differential Revision: https://phabricator.dev.yugabyte.com/D8719
1 parent baa37e3 commit 9a4b400

File tree

3 files changed

+59
-60
lines changed

3 files changed

+59
-60
lines changed

src/yb/tablet/operations/operation_driver.cc

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -256,42 +256,15 @@ Status OperationDriver::PrepareAndStart() {
256256
// We can only do this after we've called Start()
257257
prepare_state_ = PREPARED;
258258

259-
// On the replica (non-leader) side, the replication state might have been REPLICATING during
260-
// our previous acquisition of this lock, but it might have changed to REPLICATED in the
261-
// meantime. That would mean ReplicationFinished got called, but ReplicationFinished would not
262-
// trigger Apply unless the operation is PREPARED, so we are responsible for doing that.
263-
// If we fail to capture the new replication state here, the operation will never be applied.
264-
repl_state_copy = replication_state_;
259+
if (replication_state_ == NOT_REPLICATING) {
260+
replication_state_ = REPLICATING;
261+
}
265262
}
266263

267-
switch (repl_state_copy) {
268-
case NOT_REPLICATING:
269-
{
270-
{
271-
std::lock_guard<simple_spinlock> lock(lock_);
272-
replication_state_ = REPLICATING;
273-
}
264+
return Status::OK();
265+
}
274266

275-
// After the batching changes from 07/2017, It is the caller's responsibility to call
276-
// Consensus::Replicate. See Preparer for details.
277-
return Status::OK();
278-
}
279-
case REPLICATING:
280-
{
281-
// Already replicating - nothing to trigger
282-
return Status::OK();
283-
}
284-
case REPLICATION_FAILED:
285-
DCHECK(!operation_status_.ok());
286-
FALLTHROUGH_INTENDED;
287-
case REPLICATED:
288-
{
289-
// We can move on to apply. Note that ApplyOperation() will handle the error status in the
290-
// REPLICATION_FAILED case.
291-
return ApplyOperation(yb::OpId::kUnknownTerm, nullptr /* applied_op_ids */);
292-
}
293-
}
294-
FATAL_INVALID_ENUM_VALUE(ReplicationState, repl_state_copy);
267+
OperationDriver::~OperationDriver() {
295268
}
296269

297270
void OperationDriver::ReplicationFailed(const Status& replication_status) {
@@ -373,11 +346,30 @@ void OperationDriver::ReplicationFinished(
373346
// Note that if we set the state to REPLICATION_FAILED above, ApplyOperation() will actually abort
374347
// the operation, i.e. ApplyTask() will never be called and the operation will never be applied to
375348
// the tablet.
376-
if (prepare_state_copy == PREPARED) {
377-
// We likely need to do cleanup if this fails so for now just
378-
// CHECK_OK
379-
CHECK_OK(ApplyOperation(leader_term, applied_op_ids));
349+
if (prepare_state_copy != PrepareState::PREPARED) {
350+
LOG(DFATAL) << "Replicating an operation that has not been prepared: " << AsString(this);
351+
352+
LOG(ERROR) << "Attempting to wait for the operation to be prepared";
353+
354+
// This case should never happen, but if it happens we are trying to survive.
355+
for (;;) {
356+
std::this_thread::sleep_for(1ms);
357+
PrepareState prepare_state;
358+
{
359+
std::lock_guard<simple_spinlock> lock(lock_);
360+
prepare_state = prepare_state_;
361+
if (prepare_state == PrepareState::PREPARED) {
362+
break;
363+
}
364+
}
365+
YB_LOG_EVERY_N_SECS(WARNING, 1)
366+
<< "Waiting for the operation to be prepared, current state: " << prepare_state;
367+
}
380368
}
369+
370+
// We likely need to do cleanup if this fails so for now just
371+
// CHECK_OK
372+
CHECK_OK(ApplyOperation(leader_term, applied_op_ids));
381373
}
382374

383375
void OperationDriver::Abort(const Status& status) {

src/yb/tablet/operations/operation_driver.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ class OperationDriver : public RefCountedThreadSafe<OperationDriver>,
231231
PREPARED
232232
};
233233

234-
~OperationDriver() override {}
234+
~OperationDriver();
235235

236236
// Starts operation, returns false is we should NOT continue processing the operation.
237237
bool StartOperation();

src/yb/tablet/preparer.cc

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
DEFINE_int32(max_group_replicate_batch_size, 16,
3232
"Maximum number of operations to submit to consensus for replication in a batch.");
3333

34+
using namespace std::literals;
3435
using std::vector;
3536

3637
namespace yb {
@@ -139,8 +140,19 @@ Status PreparerImpl::Submit(OperationDriver* operation_driver) {
139140
return STATUS(IllegalState, "Tablet is shutting down");
140141
}
141142

142-
active_tasks_.fetch_add(1, std::memory_order_release);
143-
queue_.Push(operation_driver);
143+
if (!operation_driver->is_leader_side()) {
144+
while (active_tasks_.load(std::memory_order_acquire) != 0) {
145+
YB_LOG_EVERY_N_SECS(WARNING, 1)
146+
<< "Waiting for active tasks to become zero: "
147+
<< active_tasks_.load(std::memory_order_acquire);
148+
// It should be very rare case, so could do busy wait.
149+
std::this_thread::sleep_for(1ms);
150+
}
151+
operation_driver->PrepareAndStartTask();
152+
} else {
153+
active_tasks_.fetch_add(1, std::memory_order_release);
154+
queue_.Push(operation_driver);
155+
}
144156

145157
auto expected = false;
146158
if (!running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
@@ -213,29 +225,24 @@ bool ShouldApplySeparately(OperationType operation_type) {
213225
void PreparerImpl::ProcessItem(OperationDriver* item) {
214226
CHECK_NOTNULL(item);
215227

216-
if (item->is_leader_side()) {
217-
auto operation_type = item->operation_type();
228+
LOG_IF(DFATAL, !item->is_leader_side()) << "Processing follower-side item";
218229

219-
const bool apply_separately = ShouldApplySeparately(operation_type);
220-
const int64_t bound_term = apply_separately ? -1 : item->consensus_round()->bound_term();
230+
auto operation_type = item->operation_type();
221231

222-
// Don't add more than the max number of operations to a batch, and also don't add
223-
// operations bound to different terms, so as not to fail unrelated operations
224-
// unnecessarily in case of a bound term mismatch.
225-
if (leader_side_batch_.size() >= FLAGS_max_group_replicate_batch_size ||
226-
(!leader_side_batch_.empty() &&
227-
bound_term != leader_side_batch_.back()->consensus_round()->bound_term())) {
228-
ProcessAndClearLeaderSideBatch();
229-
}
230-
leader_side_batch_.push_back(item);
231-
if (apply_separately) {
232-
ProcessAndClearLeaderSideBatch();
233-
}
234-
} else {
235-
// We found a non-leader-side operation. We need to process the accumulated batch of
236-
// leader-side operations first, and then process this other operation.
232+
const bool apply_separately = ShouldApplySeparately(operation_type);
233+
const int64_t bound_term = apply_separately ? -1 : item->consensus_round()->bound_term();
234+
235+
// Don't add more than the max number of operations to a batch, and also don't add
236+
// operations bound to different terms, so as not to fail unrelated operations
237+
// unnecessarily in case of a bound term mismatch.
238+
if (leader_side_batch_.size() >= FLAGS_max_group_replicate_batch_size ||
239+
(!leader_side_batch_.empty() &&
240+
bound_term != leader_side_batch_.back()->consensus_round()->bound_term())) {
241+
ProcessAndClearLeaderSideBatch();
242+
}
243+
leader_side_batch_.push_back(item);
244+
if (apply_separately) {
237245
ProcessAndClearLeaderSideBatch();
238-
item->PrepareAndStartTask();
239246
}
240247
}
241248

0 commit comments

Comments
 (0)