Skip to content

Commit 6d2fe30

Browse files
committed
[yugabyte#26481] DocDB: Handle exclusive object lock(s) release for non schema changing DDL/DML(s)
Summary: Commit yugabyte@47c5cf2 handled acquire/release for schema changing DDLs alone. Problems: 1. In YB, we seem to have another category of DDLs which don't perform schema changes, but might take exclusive object locks. These aren't tracked by master's background DDL verification task. 2. pggate doesn't seem to tag statements like 'BACKFILL INDEX' as DDL, but they end up taking exclusive object locks. When table locking feature is enabled, the release of exclusive locks taken by above type transactions wasn't handled in the above referenced commit. This diff addresses the issue by making `PgClientSession` responsible for triggering release of exclusive locks whenever necessary. Note: Failure handling on the exclusive locks release path from the tserver side can go in a subsequent change. Tracked by yugabyte#26498 Jira: DB-15848 Test Plan: Jenkins ./yb_build.sh --cxx-test='TEST_F(PgObjectLocksTest, BackfillIndexSanityTest) {' ./yb_build.sh --cxx-test='TEST_F(PgObjectLocksTest, ReleaseExpiredLocksInvalidatesCatalogCache) {' Reviewers: rthallam, amitanand, zdrudi, myang, esheng Reviewed By: amitanand, zdrudi Subscribers: ybase, yql Differential Revision: https://phorge.dev.yugabyte.com/D42706
1 parent a3354b0 commit 6d2fe30

File tree

6 files changed

+320
-136
lines changed

6 files changed

+320
-136
lines changed

src/yb/integration-tests/object_lock-test.cc

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ MATCHER_P(EqualsStatus, expected_status, "") {
8181

8282
constexpr uint64_t kDefaultMasterYSQLLeaseTTLMilli = 5 * 1000;
8383
constexpr uint64_t kDefaultYSQLLeaseRefreshIntervalMilli = 500;
84-
constexpr size_t kPgConnectionTimeoutSecs = 10 * kTimeMultiplier;
8584
const std::string kTServerYsqlLeaseRefreshFlagName = "TEST_tserver_enable_ysql_lease_refresh";
8685

8786
class ObjectLockTest : public MiniClusterTestWithClient<MiniCluster> {
@@ -1368,59 +1367,4 @@ Status ExternalObjectLockTest::WaitForTServerLease(const std::string& ts_uuid, M
13681367
timeout, "Wait for master to establish tserver's lease");
13691368
}
13701369

1371-
TEST_F(ExternalObjectLockTest, ExclusiveLockReleaseInvalidatesCatalogCache) {
1372-
const auto ts1_idx = 1;
1373-
const auto ts2_idx = 2;
1374-
auto* ts1 = cluster_->tablet_server(ts1_idx);
1375-
auto* ts2 = cluster_->tablet_server(ts2_idx);
1376-
1377-
ts2->Shutdown();
1378-
LogWaiter log_waiter(ts2, "Received new lease epoch");
1379-
ASSERT_OK(ts2->Restart(
1380-
ExternalMiniClusterOptions::kDefaultStartCqlProxy,
1381-
{std::make_pair("TEST_ysql_disable_transparent_cache_refresh_retry", "true")}));
1382-
ASSERT_OK(log_waiter.WaitFor(MonoDelta::FromSeconds(kTimeMultiplier * 10)));
1383-
1384-
auto conn1 = ASSERT_RESULT(ConnectToTabletServer(ts1, kPgConnectionTimeoutSecs));
1385-
auto conn2 = ASSERT_RESULT(ConnectToTabletServer(ts2, kPgConnectionTimeoutSecs));
1386-
1387-
ASSERT_OK(conn1.Execute("CREATE TABLE test(k INT PRIMARY KEY, v INT)"));
1388-
ASSERT_OK(conn1.Execute("INSERT INTO test SELECT generate_series(1,11), 0"));
1389-
1390-
ASSERT_OK(conn2.FetchMatrix("SELECT * FROM test WHERE k=1", 1 /* rows */, 2 /* columns */));
1391-
1392-
// Disable catalog cache invalidation on tserver-master heartbeat path. Set it after the tserver
1393-
// boots up since setting it as part of initialization seems to error.
1394-
ASSERT_OK(cluster_->SetFlag(
1395-
ts2,
1396-
"TEST_tserver_disable_catalog_refresh_on_heartbeat",
1397-
"true"));
1398-
1399-
// Release of exclusive locks of the below DDL should invalidate catalog cache on all tservers.
1400-
ASSERT_OK(conn1.Execute("ALTER TABLE test ADD COLUMN v1 INT"));
1401-
// The DML should now see the updated schema and not hit a catalog cache/schema version mismatch.
1402-
ASSERT_OK(conn2.FetchMatrix("SELECT * FROM test WHERE k=1", 1 /* rows */, 3 /* columns */));
1403-
}
1404-
1405-
TEST_F(ExternalObjectLockTest, ConsecutiveAltersSucceedWithoutCatalogVersionIssues) {
1406-
const auto ts1_idx = 1;
1407-
const auto ts2_idx = 2;
1408-
auto* ts1 = cluster_->tablet_server(ts1_idx);
1409-
auto* ts2 = cluster_->tablet_server(ts2_idx);
1410-
1411-
auto conn1 = ASSERT_RESULT(ConnectToTabletServer(ts1, kPgConnectionTimeoutSecs));
1412-
auto conn2 = ASSERT_RESULT(ConnectToTabletServer(ts2, kPgConnectionTimeoutSecs));
1413-
1414-
ASSERT_OK(conn1.Execute("CREATE TABLE t1(c1 INT, c2 INT)"));
1415-
ASSERT_OK(conn2.Fetch("SELECT * FROM t1"));
1416-
1417-
ASSERT_OK(cluster_->SetFlag(
1418-
ts2,
1419-
"TEST_tserver_disable_catalog_refresh_on_heartbeat",
1420-
"true"));
1421-
1422-
ASSERT_OK(conn1.Execute("ALTER TABLE t1 ADD COLUMN c3 INT"));
1423-
ASSERT_OK(conn2.Execute("ALTER TABLE t1 ADD COLUMN c4 INT"));
1424-
}
1425-
14261370
} // namespace yb

src/yb/master/object_lock_info_manager.cc

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,9 @@ class ObjectLockInfoManager::Impl {
9494
const tserver::AcquireObjectLockRequestPB& req, rpc::RpcContext context,
9595
StdStatusCallback callback);
9696

97+
void PopulateDbCatalogVersionCache(ReleaseObjectLockRequestPB& req);
9798
void UnlockObject(
98-
ReleaseObjectLockRequestPB req, ReleaseObjectLocksGlobalResponsePB* resp,
99+
ReleaseObjectLockRequestPB&& req, ReleaseObjectLocksGlobalResponsePB* resp,
99100
rpc::RpcContext rpc);
100101
void UnlockObject(
101102
const tserver::ReleaseObjectLockRequestPB& req, std::optional<rpc::RpcContext> context,
@@ -602,9 +603,35 @@ void ObjectLockInfoManager::Impl::LockObject(
602603
lock_objects->Launch();
603604
}
604605

606+
void ObjectLockInfoManager::Impl::PopulateDbCatalogVersionCache(ReleaseObjectLockRequestPB& req) {
607+
// TODO: Currently, we fetch and send catalog version of all dbs because the cache invalidation
608+
// logic on the tserver side expects a full report. Fix it and then optimize the below to only
609+
// send the catalog version of the db being operated on by the txn.
610+
DbOidToCatalogVersionMap versions;
611+
uint64_t fingerprint;
612+
auto s = catalog_manager_->GetYsqlAllDBCatalogVersions(
613+
FLAGS_enable_heartbeat_pg_catalog_versions_cache, &versions, &fingerprint);
614+
if (!s.ok()) {
615+
// In this case, we fallback to delayed cache invalidation on tserver-master heartbeat path.
616+
LOG(WARNING) << "Couldn't populate catalog version on exclusive lock release: " << s;
617+
return;
618+
}
619+
if (versions.empty()) {
620+
return;
621+
}
622+
auto* db_catalog_version_data = req.mutable_db_catalog_version_data();
623+
for (const auto& it : versions) {
624+
auto* const catalog_version_pb = db_catalog_version_data->add_db_catalog_versions();
625+
catalog_version_pb->set_db_oid(it.first);
626+
catalog_version_pb->set_current_version(it.second.current_version);
627+
catalog_version_pb->set_last_breaking_version(it.second.last_breaking_version);
628+
}
629+
}
630+
605631
void ObjectLockInfoManager::Impl::UnlockObject(
606-
ReleaseObjectLockRequestPB req, ReleaseObjectLocksGlobalResponsePB* resp,
632+
ReleaseObjectLockRequestPB&& req, ReleaseObjectLocksGlobalResponsePB* resp,
607633
rpc::RpcContext context) {
634+
PopulateDbCatalogVersionCache(req);
608635
UnlockObject(req, std::move(context), std::nullopt, [resp, clock = clock_](const Status& s) {
609636
resp->set_propagated_hybrid_time(clock->Now().ToUint64());
610637
FillErrorIfRequired(s, resp);
@@ -636,26 +663,7 @@ void ObjectLockInfoManager::Impl::UnlockObject(const TransactionId& txn_id) {
636663
req.set_lease_epoch(it->second.lease_epoch);
637664
}
638665

639-
// TODO: Currently, we fetch and send catalog version of all dbs because the cache invalidation
640-
// logic on the tserver side expects a full report. Fix it and then optimize the below to only
641-
// send the catalog version of the db being operated on by the txn.
642-
DbOidToCatalogVersionMap versions;
643-
uint64_t fingerprint;
644-
auto s = catalog_manager_->GetYsqlAllDBCatalogVersions(
645-
FLAGS_enable_heartbeat_pg_catalog_versions_cache, &versions, &fingerprint);
646-
if (s.ok()) {
647-
auto* db_catalog_version_data = req.mutable_db_catalog_version_data();
648-
for (const auto& it : versions) {
649-
auto* const catalog_version_pb = db_catalog_version_data->add_db_catalog_versions();
650-
catalog_version_pb->set_db_oid(it.first);
651-
catalog_version_pb->set_current_version(it.second.current_version);
652-
catalog_version_pb->set_last_breaking_version(it.second.last_breaking_version);
653-
}
654-
} else {
655-
// In this case, we fallback to delayed cache invalidation on tserver-master heartbeat path.
656-
LOG(WARNING) << "Couldn't populate catalog version on exclusive lock release: " << s;
657-
}
658-
666+
PopulateDbCatalogVersionCache(req);
659667
return UnlockObject(
660668
std::move(req), std::nullopt /* context */, std::nullopt /* leader epoch */,
661669
[txn_id](Status s) {
@@ -683,6 +691,11 @@ void ObjectLockInfoManager::Impl::ReleaseLocksHeldByExpiredLeaseEpoch(
683691
auto txn_id = CHECK_RESULT(TransactionId::FromString(txn_id_str));
684692
request->set_txn_id(txn_id.data(), txn_id.size());
685693
request->set_lease_epoch(max_lease_epoch_to_release + 1);
694+
if (requests_per_txn.empty()) {
695+
// Set the db catalog cache on just one of the unlock requests, since it would be the same
696+
// unless a new DDL modified it, in which case it's release would set the latest cache.
697+
PopulateDbCatalogVersionCache(*request.get());
698+
}
686699
requests_per_txn.push_back(request);
687700
}
688701
}

src/yb/master/ysql_ddl_handler.cc

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ DEFINE_test_flag(double, ysql_ddl_rollback_failure_probability, 0.0,
4949
DEFINE_test_flag(double, ysql_ddl_verification_failure_probability, 0.0,
5050
"Inject random failure of ddl verification operations");
5151

52+
DEFINE_test_flag(bool, disable_release_object_locks_on_ddl_verification, false,
53+
"When set, skip release object lock rpcs to tservers triggered at the end of DDL verification, "
54+
"that release object locks acquired by the DDL.");
55+
5256
DECLARE_bool(TEST_enable_object_locking_for_table_locks);
5357

5458
using namespace std::placeholders;
@@ -631,11 +635,14 @@ void CatalogManager::RemoveDdlTransactionStateUnlocked(
631635
// 1. Either the alter waits inline successfully before issuing the commit,
632636
// 2. or when the above times out, this branch is involed by the multi step
633637
// TableSchemaVerificationTask's callback post the schema changes have been applied.
634-
WARN_NOT_OK(
635-
background_tasks_thread_pool_->SubmitFunc([this, txn_id]() {
636-
DoReleaseObjectLocksIfNecessary(txn_id);
637-
}),
638-
Format("Failed to submit task for releasing exclusive object locks of txn $0", txn_id));
638+
if (FLAGS_TEST_enable_object_locking_for_table_locks &&
639+
!FLAGS_TEST_disable_release_object_locks_on_ddl_verification) {
640+
WARN_NOT_OK(
641+
background_tasks_thread_pool_->SubmitFunc([this, txn_id]() {
642+
DoReleaseObjectLocksIfNecessary(txn_id);
643+
}),
644+
Format("Failed to submit task for releasing exclusive object locks of txn $0", txn_id));
645+
}
639646
} else {
640647
VLOG(1) << "DDL Verification state for " << txn_id << " has "
641648
<< tables.size() << " tables remaining";
@@ -791,9 +798,6 @@ void CatalogManager::ScheduleTriggerDdlVerificationIfNeeded(
791798
}
792799

793800
void CatalogManager::DoReleaseObjectLocksIfNecessary(const TransactionId& txn_id) {
794-
if (!PREDICT_FALSE(FLAGS_TEST_enable_object_locking_for_table_locks)) {
795-
return;
796-
}
797801
DEBUG_ONLY_TEST_SYNC_POINT("DoReleaseObjectLocksIfNecessary");
798802
object_lock_info_manager_->ReleaseLocksForTxn(txn_id);
799803
}

0 commit comments

Comments
 (0)