Skip to content

Commit aa43df4

Browse files
committed
[yugabyte#27486] docdb: Unit test to cover background compaction run during post split compaction
Summary: Unit test for yugabyte#27426 Covers this example: ``` 1. Default table TTL is configured, TTL file expiration is configured, rocksdb max file size for compaction is set to 3 GB, post split compaction input threshold is 256 MB. 2. SST files before post split compaction: SST1 [seq1], SST2 [seq2], SST3 [seq3], SST4 [seq4], SST5 [seq5], SST6 [seq6], SST7 [seq7] SST1 ... SST7 are 3.1 GB each 3. The state after 5 out of 6 iterations (where only 1 SST is taken by during each iteration): SST8 [seq1], SST9 [seq2], SST10 [seq3], SST11 [seq4], SST12 [seq5], SST6 [seq6], SST7 [seq7] SST8 ... SST12 are 1.5 GB each, SST6 and SST7 are 3.1 GB 4. Background compaction kicks in immediately after SST11 has been flushed. 5. Background compaction takes SST8...SST12 by size amp criteria and markis them `being compacted`. 6. Post split compaction runs the next iteration but it sees the first file by sequence, SST8, is locked and immediately stops compacing due to no files got picked on this iteration. 7. Background compaction processes SST8...SST12 producing SST13 [seq1] of size 7.5 GB (5 x 1.5 GB). 8. The final set of SST files after both post split and background compactions are completed: SST13 [seq1, 7.5 GB], SST6 [seq6, 3.1 GB], SST7 [seq7, 3.1 GB] ``` Refer to https://phorge.dev.yugabyte.com/differential/revision/edit/44394/ for details. Jira: DB-17033 Test Plan: ./yb_build.sh --cxx-test='TEST_F(CompactionTest, BackgroundCompactionDuringPostSplitCompaction)' Reviewers: timur, rthallam Reviewed By: timur, rthallam Subscribers: ybase, yql Differential Revision: https://phorge.dev.yugabyte.com/D44537
1 parent 5987b8e commit aa43df4

File tree

8 files changed

+271
-104
lines changed

8 files changed

+271
-104
lines changed

src/yb/integration-tests/compaction-test.cc

Lines changed: 169 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,20 @@ DECLARE_bool(TEST_disable_adding_last_compaction_to_tablet_metadata);
9494
DECLARE_bool(TEST_disable_adding_user_frontier_to_sst);
9595
DECLARE_bool(TEST_disable_getting_user_frontier_from_mem_table);
9696
DECLARE_bool(TEST_pause_before_full_compaction);
97+
DECLARE_bool(enable_ondisk_compression);
9798
DECLARE_bool(enable_load_balancing);
9899
DECLARE_bool(file_expiration_ignore_value_ttl);
99100
DECLARE_bool(file_expiration_value_ttl_overrides_table_ttl);
101+
DECLARE_bool(rocksdb_allow_multiple_pending_compactions_for_priority_thread_pool);
102+
DECLARE_bool(rocksdb_determine_compaction_input_at_start);
100103
DECLARE_bool(tablet_enable_ttl_file_filter);
101104
DECLARE_bool(use_priority_thread_pool_for_compactions);
105+
DECLARE_bool(ycql_enable_packed_row);
102106

103107
DECLARE_double(auto_compact_percent_obsolete);
104108

105109
DECLARE_int32(auto_compact_check_interval_sec);
110+
DECLARE_int32(cleanup_split_tablets_interval_sec);
106111
DECLARE_int32(full_compaction_pool_max_queue_size);
107112
DECLARE_int32(full_compaction_pool_max_threads);
108113
DECLARE_int32(priority_thread_pool_size);
@@ -122,13 +127,12 @@ DECLARE_int64(rocksdb_compact_flush_rate_limit_bytes_per_sec);
122127
DECLARE_uint32(auto_compact_min_obsolete_keys_found);
123128
DECLARE_uint32(auto_compact_stat_window_seconds);
124129

130+
DECLARE_uint64(post_split_compaction_input_size_threshold_bytes);
125131
DECLARE_uint64(rocksdb_max_file_size_for_compaction);
126132

127133
DECLARE_string(allow_compaction_failures_for_tablet_ids);
128134

129-
namespace yb {
130-
131-
namespace tserver {
135+
namespace yb::tserver {
132136

133137
namespace {
134138

@@ -206,6 +210,8 @@ class CompactionTest : public YBTest {
206210

207211
ANNOTATE_UNPROTECTED_WRITE(FLAGS_priority_thread_pool_size) = 2;
208212

213+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cleanup_split_tablets_interval_sec) = 1;
214+
209215
// Disable scheduled compactions by default so we don't have surprise compactions.
210216
ANNOTATE_UNPROTECTED_WRITE(FLAGS_scheduled_full_compaction_frequency_hours) = 0;
211217
ANNOTATE_UNPROTECTED_WRITE(FLAGS_scheduled_full_compaction_jitter_factor_percentage) = 0;
@@ -219,12 +225,8 @@ class CompactionTest : public YBTest {
219225
// These flags should be set after minicluster start, so it wouldn't override them.
220226
ANNOTATE_UNPROTECTED_WRITE(FLAGS_db_write_buffer_size) = kMemStoreSize;
221227
ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = 3;
222-
// Patch tablet options inside tablet manager, will be applied to newly created tablets.
223-
for (int i = 0 ; i < NumTabletServers(); i++) {
224-
ANNOTATE_IGNORE_WRITES_BEGIN();
225-
cluster_->GetTabletManager(i)->TEST_tablet_options()->listeners.push_back(rocksdb_listener_);
226-
ANNOTATE_IGNORE_WRITES_END();
227-
}
228+
229+
AddRocksDBListener(rocksdb_listener_);
228230

229231
client_ = ASSERT_RESULT(cluster_->CreateClient());
230232
transaction_manager_ = std::make_unique<client::TransactionManager>(
@@ -242,6 +244,15 @@ class CompactionTest : public YBTest {
242244
YBTest::TearDown();
243245
}
244246

247+
void AddRocksDBListener(std::shared_ptr<rocksdb::EventListener> listener) {
248+
// Patch tablet options inside tablet manager, will be applied to newly created tablets.
249+
for (int i = 0 ; i < NumTabletServers(); i++) {
250+
ANNOTATE_IGNORE_WRITES_BEGIN();
251+
cluster_->GetTabletManager(i)->TEST_tablet_options()->listeners.push_back(listener);
252+
ANNOTATE_IGNORE_WRITES_END();
253+
}
254+
}
255+
245256
void SetupWorkload(IsolationLevel isolation_level, int num_tablets = kDefaultNumTablets) {
246257
workload_.reset(new TestWorkload(cluster_.get()));
247258
workload_->set_timeout_allowed(true);
@@ -1846,6 +1857,154 @@ TEST_F(CompactionTest, CheckLastRequestTimePersistence) {
18461857
ASSERT_GT(table_info->LockForRead()->pb.last_full_compaction_request_time(), last_request_time);
18471858
}
18481859

1860+
// Covers https://github.com/yugabyte/yugabyte-db/issues/27426. Refer to D44394 for the description.
1861+
TEST_F(CompactionTest, BackgroundCompactionDuringPostSplitCompaction) {
1862+
constexpr size_t kNumTablets = 1;
1863+
constexpr size_t kNumFiles = 9;
1864+
constexpr size_t kTrigger = kNumFiles - 2;
1865+
constexpr uint64_t kSstFileSize = 500_KB;
1866+
constexpr uint64_t kThreshold = kSstFileSize * 0.80;
1867+
1868+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ycql_enable_packed_row) = true;
1869+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_ondisk_compression) = false;
1870+
1871+
// Configuring flags to guarantee a background compaction will kick in between post split
1872+
// compaction iterations.
1873+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_db_write_buffer_size) = kSstFileSize;
1874+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_max_file_size_for_compaction) = kThreshold;
1875+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = kTrigger;
1876+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_post_split_compaction_input_size_threshold_bytes) = kThreshold;
1877+
1878+
// Configuring flags to guarantee background compaction picks SST files at the end of post split
1879+
// compaction and keeps them locked till the compaction is finished.
1880+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_determine_compaction_input_at_start) = false;
1881+
ANNOTATE_UNPROTECTED_WRITE(
1882+
FLAGS_rocksdb_allow_multiple_pending_compactions_for_priority_thread_pool) = true;
1883+
1884+
// Sanity checks for minimal requirements.
1885+
ASSERT_GT(ANNOTATE_UNPROTECTED_READ(FLAGS_full_compaction_pool_max_threads), 0);
1886+
ASSERT_GT(ANNOTATE_UNPROTECTED_READ(FLAGS_priority_thread_pool_size), 1);
1887+
1888+
// Helpers to extract files information.
1889+
auto files_ids = [](auto&& files) {
1890+
return AsString(files, [](auto&& file) { return file.name_id; });
1891+
};
1892+
auto max_file_id = [](auto&& files) {
1893+
return std::ranges::max_element(files, {}, &rocksdb::LiveFileMetaData::name_id)->name_id;
1894+
};
1895+
auto min_file_id = [](auto&& files) {
1896+
return std::ranges::min_element(files, {}, &rocksdb::LiveFileMetaData::name_id)->name_id;
1897+
};
1898+
1899+
// Additional RocksDB listener to guarantee compaction flow.
1900+
struct DBListener : public rocksdb::EventListener {
1901+
bool background_compaction_in_progress = false;
1902+
size_t num_post_split_iterations = 0;
1903+
std::mutex mutex;
1904+
std::condition_variable_any compaction_started_cv;
1905+
1906+
void OnCompactionStarted() override {
1907+
UniqueLock lock(mutex);
1908+
1909+
// Background compaction will be always
1910+
if (num_post_split_iterations == kTrigger && !background_compaction_in_progress) {
1911+
LOG(INFO) << "Background compaction started";
1912+
background_compaction_in_progress = true;
1913+
1914+
// Wait for the next post split compaction iteration got triggered or exit on timeout.
1915+
compaction_started_cv.wait_for(
1916+
lock, std::chrono::seconds(30),
1917+
[this] { return num_post_split_iterations != kTrigger; });
1918+
} else {
1919+
++num_post_split_iterations;
1920+
compaction_started_cv.notify_all();
1921+
}
1922+
}
1923+
1924+
void OnCompactionCompleted(rocksdb::DB* db, const rocksdb::CompactionJobInfo& info) override {
1925+
LOG(INFO) << "Compaction completed, reason: " << info.compaction_reason;
1926+
1927+
std::lock_guard lock(mutex);
1928+
if (info.is_no_op_compaction) {
1929+
// Sanity check, the only no-op compaction is the post split compaction final iteration.
1930+
ASSERT_EQ(info.compaction_reason, rocksdb::CompactionReason::kPostSplitCompaction);
1931+
1932+
LOG(INFO) << "Number of post split compaction iterations: " << num_post_split_iterations;
1933+
num_post_split_iterations = 0; // Resetting to track compactions for the next child.
1934+
1935+
// This no op post split compaction iteration happens in any case, let's unblock
1936+
// background compaction to complete it.
1937+
compaction_started_cv.notify_all();
1938+
} else if (info.compaction_reason != rocksdb::CompactionReason::kPostSplitCompaction) {
1939+
background_compaction_in_progress = false;
1940+
EXPECT_EQ(info.compaction_reason, rocksdb::CompactionReason::kUniversalSizeAmplification);
1941+
LOG(INFO) << "Background compaction done";
1942+
}
1943+
}
1944+
};
1945+
auto listener = std::make_shared<DBListener>();
1946+
AddRocksDBListener(listener);
1947+
1948+
SetupWorkload(IsolationLevel::NON_TRANSACTIONAL, kNumTablets);
1949+
1950+
// Change the table to have a default time to live. This is required for the easiest reproing,
1951+
// but the issue may happen even without default TTL.
1952+
ASSERT_OK(ChangeTableTTL(workload_->table_name(), /* ttl_sec = */ 1000));
1953+
ASSERT_OK(WriteAtLeastFilesPerDb(kNumFiles));
1954+
1955+
// Flush mem tables to have the predictable number of SST files.
1956+
const auto table_info = ASSERT_RESULT(FindTable(cluster_.get(), workload_->table_name()));
1957+
ASSERT_OK(workload_->client().FlushTables(
1958+
{table_info->id()}, /* add_indexes = */ false,
1959+
/* timeout_secs = */ 60, /* is_compaction = */ false));
1960+
1961+
// Remember parent files before split.
1962+
auto dbs = GetAllRocksDbs(cluster_.get(), /* include_intents = */ false);
1963+
ASSERT_EQ(dbs.size(), 1);
1964+
1965+
uint64_t parent_max_file_id = 0;
1966+
{
1967+
const auto files = dbs.front()->GetLiveFilesMetaData();
1968+
parent_max_file_id = max_file_id(files);
1969+
LOG(INFO) << "Parent files: " << files_ids(files);
1970+
}
1971+
1972+
// Trigger manual tablet split.
1973+
auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kAll);
1974+
ASSERT_EQ(peers.size(), kNumTablets);
1975+
const auto tablet = ASSERT_RESULT(peers.front()->shared_tablet_safe());
1976+
ASSERT_OK(InvokeSplitTabletRpcAndWaitForDataCompacted(cluster_.get(), tablet->tablet_id()));
1977+
1978+
// Wait until parent tablet got cleaned up.
1979+
ASSERT_OK(LoggedWaitFor(
1980+
[cluster = cluster_.get()]{
1981+
return ListTabletPeers(cluster, ListPeersFilter::kAll).size() == 2;
1982+
}, 60s, "Parent tablet cleanup"));
1983+
1984+
// Total number of compactions equals to a sum of number of post split compaction iterations and
1985+
// one background compaction. Number of post split compaction iterations equals to the number of
1986+
// parent files plus one empty iteration to indicate post split compaction completion.
1987+
constexpr size_t kNumParentFiles = kNumFiles + 1; // One more file due to an explicit flush.
1988+
constexpr size_t kNumPostSplitCompactionIterations = kNumParentFiles + 1;
1989+
constexpr size_t kNumBackgroundCompactions = 1;
1990+
constexpr size_t kNumExpectedCompactions =
1991+
kNumPostSplitCompactionIterations + kNumBackgroundCompactions;
1992+
1993+
// Postpone status check for logging children files.
1994+
auto status = WaitForNumCompactionsPerDb(kNumExpectedCompactions);
1995+
1996+
// Make sure child tablets do not have parent files.
1997+
dbs = GetAllRocksDbs(cluster_.get(), /* include_intents = */ false);
1998+
ASSERT_EQ(dbs.size(), 2);
1999+
for (auto* db : dbs) {
2000+
const auto files = db->GetLiveFilesMetaData();
2001+
LOG(INFO) << "Child files: " << files_ids(files);
2002+
ASSERT_LT(parent_max_file_id, min_file_id(files));
2003+
}
2004+
2005+
ASSERT_OK(status);
2006+
}
2007+
18492008
class FullCompactionMonitoringTest : public CompactionTest {
18502009
protected:
18512010
void SetUp() override {
@@ -2183,5 +2342,4 @@ TEST_F(CompactionTest, RemoveCorruptDataBlocks) {
21832342
ASSERT_LE(num_keys_lost, num_max_corrupt_keys_estimate);
21842343
}
21852344

2186-
} // namespace tserver
2187-
} // namespace yb
2345+
} // namespace yb::tserver

src/yb/integration-tests/mini_cluster.cc

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,14 @@
4747
#include "yb/gutil/strings/join.h"
4848
#include "yb/gutil/strings/substitute.h"
4949

50+
#include "yb/integration-tests/cluster_itest_util.h"
51+
5052
#include "yb/master/catalog_entity_info.h"
5153
#include "yb/master/catalog_manager_if.h"
5254
#include "yb/master/catalog_manager.h"
5355
#include "yb/master/master.h"
5456
#include "yb/master/master_admin.pb.h"
57+
#include "yb/master/master_admin.proxy.h"
5558
#include "yb/master/master_client.pb.h"
5659
#include "yb/master/master_cluster.pb.h"
5760
#include "yb/master/master_ddl.pb.h"
@@ -188,6 +191,12 @@ bool IsForTable(const tablet::TabletPeer& peer, const TableId& table_id) {
188191
return false;
189192
}
190193

194+
bool IsTabletInCollection(const master::TabletInfoPtr& tablet, const master::TabletInfos& tablets) {
195+
return tablets.end() != std::find_if(
196+
tablets.begin(), tablets.end(),
197+
[&tablet](const master::TabletInfoPtr& p) { return p->tablet_id() == tablet->tablet_id(); });
198+
}
199+
191200
} // namespace
192201

193202
MiniCluster::MiniCluster(const MiniClusterOptions& options)
@@ -1547,6 +1556,70 @@ void SetCompactFlushRateLimitBytesPerSec(MiniCluster* cluster, const size_t byte
15471556
}
15481557
}
15491558

1559+
Status InvokeSplitTabletRpc(MiniCluster* cluster, const TabletId& tablet_id, MonoDelta timeout) {
1560+
auto& master = *VERIFY_RESULT(cluster->GetLeaderMiniMaster());
1561+
auto proxy = master::MasterAdminProxy(&cluster->proxy_cache(), master.bound_rpc_addr());
1562+
1563+
master::SplitTabletRequestPB req;
1564+
req.set_tablet_id(tablet_id);
1565+
1566+
rpc::RpcController controller;
1567+
controller.set_timeout(timeout);
1568+
master::SplitTabletResponsePB resp;
1569+
RETURN_NOT_OK(proxy.SplitTablet(req, &resp, &controller));
1570+
if (resp.has_error()) {
1571+
RETURN_NOT_OK(StatusFromPB(resp.error().status()));
1572+
}
1573+
return Status::OK();
1574+
}
1575+
1576+
Status InvokeSplitTabletRpcAndWaitForDataCompacted(
1577+
MiniCluster* cluster, const master::TableInfoPtr& table,
1578+
const master::TabletInfoPtr& tablet, MonoDelta rpc_timeout) {
1579+
// Keep current tablets.
1580+
const auto tablets = VERIFY_RESULT(table->GetTablets());
1581+
1582+
// Sanity check that tablet belongs to the table.
1583+
if (!IsTabletInCollection(tablet, tablets)) {
1584+
return STATUS(InvalidArgument, "The tablet does not belong to table's tablets list.");
1585+
}
1586+
1587+
// Send split RPC.
1588+
RETURN_NOT_OK(InvokeSplitTabletRpc(cluster, tablet->tablet_id(), rpc_timeout));
1589+
1590+
// Wait for new tablets are added.
1591+
RETURN_NOT_OK(WaitForTableActiveTabletLeadersPeers(cluster, table->id(), tablets.size() + 1));
1592+
1593+
// Wait until split is replicated across all tablet servers.
1594+
RETURN_NOT_OK(WaitAllReplicasReady(
1595+
cluster, table->id(), MonoDelta::FromSeconds(20) * kTimeMultiplier));
1596+
1597+
// Select new tablets ids
1598+
const auto all_tablets = VERIFY_RESULT(table->GetTablets());
1599+
std::vector<TabletId> new_tablet_ids;
1600+
new_tablet_ids.reserve(all_tablets.size());
1601+
for (const auto& t : all_tablets) {
1602+
if (!IsTabletInCollection(t, tablets)) {
1603+
new_tablet_ids.push_back(t->tablet_id());
1604+
}
1605+
}
1606+
1607+
// Wait for new peers are fully compacted.
1608+
return WaitForPeersPostSplitCompacted(cluster, new_tablet_ids);
1609+
}
1610+
1611+
Status InvokeSplitTabletRpcAndWaitForDataCompacted(
1612+
MiniCluster* cluster, const TabletId& tablet_id, MonoDelta rpc_timeout) {
1613+
auto* master = VERIFY_RESULT(cluster->GetLeaderMiniMaster());
1614+
auto& catalog_manager = master->catalog_manager();
1615+
const auto tablet = VERIFY_RESULT(catalog_manager.GetTabletInfo(tablet_id));
1616+
1617+
// Get current number of tablets for the table.
1618+
const auto table = catalog_manager.GetTableInfo(tablet->table()->id());
1619+
1620+
return InvokeSplitTabletRpcAndWaitForDataCompacted(cluster, table, tablet, rpc_timeout);
1621+
}
1622+
15501623
Status WaitAllReplicasSynchronizedWithLeader(
15511624
MiniCluster* cluster, CoarseTimePoint deadline) {
15521625
auto leaders = ListTabletPeers(cluster, ListPeersFilter::kLeaders);

src/yb/integration-tests/mini_cluster.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,18 @@ Result<size_t> ServerWithLeaders(MiniCluster* cluster);
480480
// for already created tablets.
481481
void SetCompactFlushRateLimitBytesPerSec(MiniCluster* cluster, size_t bytes_per_sec);
482482

483+
Status InvokeSplitTabletRpc(
484+
MiniCluster* cluster, const TabletId& tablet_id,
485+
MonoDelta timeout = MonoDelta::FromSeconds(60) * kTimeMultiplier);
486+
487+
Status InvokeSplitTabletRpcAndWaitForDataCompacted(
488+
MiniCluster* cluster, const master::TableInfoPtr& table, const master::TabletInfoPtr& tablet,
489+
MonoDelta rpc_timeout = MonoDelta::FromSeconds(60) * kTimeMultiplier);
490+
491+
Status InvokeSplitTabletRpcAndWaitForDataCompacted(
492+
MiniCluster* cluster, const TabletId& tablet_id,
493+
MonoDelta rpc_timeout = MonoDelta::FromSeconds(60) * kTimeMultiplier);
494+
483495
Status WaitAllReplicasSynchronizedWithLeader(MiniCluster* cluster, CoarseTimePoint deadline);
484496

485497
Status WaitAllReplicasSynchronizedWithLeader(MiniCluster* cluster, CoarseDuration timeout);

src/yb/rocksdb/db/db_impl.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3910,8 +3910,13 @@ Result<FileNumbersHolder> DBImpl::BackgroundCompaction(
39103910
}
39113911

39123912
Result<FileNumbersHolder> result = FileNumbersHolder();
3913-
for (auto listener : db_options_.listeners) {
3914-
listener->OnCompactionStarted();
3913+
3914+
{
3915+
mutex_.Unlock();
3916+
for (auto listener : db_options_.listeners) {
3917+
listener->OnCompactionStarted();
3918+
}
3919+
mutex_.Lock();
39153920
}
39163921

39173922
if (!c) {

src/yb/util/shmem/robust_hash_map.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ class RobustHashMap {
414414
SHARED_MEMORY_STORE(in_progress_node_, nullptr);
415415
}
416416

417-
void DoDelete(List::const_iterator prev_itr) {
417+
void DoDelete(typename List::const_iterator prev_itr) {
418418
auto* prev = &*prev_itr.unconst();
419419
auto* node = &*std::next(prev_itr).unconst();
420420
SHARED_MEMORY_STORE(in_progress_num_elements_, SHARED_MEMORY_LOAD(num_elements_) - 1);

0 commit comments

Comments
 (0)