Skip to content

Commit 65f27b1

Browse files
committed
[yugabyte#25709] xCluster: Handle tables with same name in non-DBScoped DR xCluster
Summary: If PITR is enabled we hide the table, instead of immediately deleting it when dropped. This causes non-DBScoped DR xCluster to fail if a table or index of the same name is immediately created again. - Moved `snapshot_test_util.cc` from `ql-dml-test-base` to `yb_client_test_util` Jira: DB-14966 Test Plan: XClusterDbScopedYsqlIndexTest.CreateDropIndexWithPITR XClusterYSqlTestConsistentTransactionsTest.CreateDropTableAndIndexWithPITR Reviewers: jhe, xCluster Reviewed By: jhe Subscribers: ybase Differential Revision: https://phorge.dev.yugabyte.com/D41386
1 parent 9133dce commit 65f27b1

File tree

7 files changed

+115
-26
lines changed

7 files changed

+115
-26
lines changed

src/yb/client/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,11 @@ endif()
119119
# part of the client itself (ie we don't want to ship it to customers,
120120
# and therefore don't need to worry about export strictness)
121121
ADD_YB_LIBRARY(yb_client_test_util
122-
SRCS client-test-util.cc
122+
SRCS client-test-util.cc snapshot_test_util.cc
123123
DEPS gmock gtest yb_client yb_test_util)
124124

125125
ADD_YB_LIBRARY(ql-dml-test-base
126-
SRCS ql-dml-test-base.cc txn-test-base.cc snapshot_test_util.cc
126+
SRCS ql-dml-test-base.cc txn-test-base.cc
127127
DEPS integration-tests)
128128

129129
# Tests

src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
#include <gmock/gmock.h>
1515

16-
#include "yb/client/snapshot_test_util.h"
1716
#include "yb/client/table.h"
1817
#include "yb/client/xcluster_client.h"
1918
#include "yb/client/yb_table_name.h"
@@ -68,19 +67,6 @@ class XClusterDBScopedTest : public XClusterYsqlTestBase {
6867
const NamespaceId& namespace_id) {
6968
return GetXClusterStreams(namespace_id, /*table_names=*/{}, /*pg_schema_names=*/{});
7069
}
71-
72-
Status EnablePITROnClusters() {
73-
return RunOnBothClusters([this](Cluster* cluster) -> Status {
74-
client::SnapshotTestUtil snapshot_util;
75-
snapshot_util.SetProxy(&cluster->client_->proxy_cache());
76-
snapshot_util.SetCluster(cluster->mini_cluster_.get());
77-
78-
RETURN_NOT_OK(snapshot_util.CreateSchedule(
79-
nullptr, YQL_DATABASE_PGSQL, namespace_name, client::WaitSnapshot::kTrue,
80-
2s * kTimeMultiplier, 20h));
81-
return Status::OK();
82-
});
83-
}
8470
};
8571

8672
TEST_F(XClusterDBScopedTest, TestCreateWithCheckpoint) {

src/yb/integration-tests/xcluster/xcluster_ysql-test.cc

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3196,4 +3196,49 @@ TEST_F(XClusterYsqlTest, TransactionalBidirectionalWithTwoDBs) {
31963196
ASSERT_OK(WriteWorkload(250, 350, &consumer_cluster_, consumer_tables_[1]->name()));
31973197
}
31983198

3199+
// Create and drop tables and indexes in a loop with PITR which will keep the dropped tables in
3200+
// hidden state.
3201+
TEST_F(XClusterYSqlTestConsistentTransactionsTest, CreateDropTableAndIndexWithPITR) {
3202+
const auto kBatchSize = 10;
3203+
ASSERT_OK(CreateTableAndSetupReplication());
3204+
ASSERT_OK(EnablePITROnClusters());
3205+
3206+
auto producer_conn =
3207+
EXPECT_RESULT(producer_cluster_.ConnectToDB(producer_table_->name().namespace_name()));
3208+
auto consumer_conn =
3209+
EXPECT_RESULT(consumer_cluster_.ConnectToDB(consumer_table_->name().namespace_name()));
3210+
3211+
for (int run_count = 0; run_count < 5; run_count++) {
3212+
auto producer_table_name = ASSERT_RESULT(CreateYsqlTable(
3213+
/*idx=*/1, /*num_tablets=*/1, &producer_cluster_));
3214+
std::shared_ptr<client::YBTable> new_producer_table;
3215+
ASSERT_OK(producer_client()->OpenTable(producer_table_name, &new_producer_table));
3216+
3217+
auto consumer_table_name = ASSERT_RESULT(CreateYsqlTable(
3218+
/*idx=*/1, /*num_tablets=*/1, &consumer_cluster_));
3219+
std::shared_ptr<client::YBTable> new_consumer_table;
3220+
ASSERT_OK(consumer_client()->OpenTable(consumer_table_name, &new_consumer_table));
3221+
3222+
ASSERT_OK(
3223+
AlterUniverseReplication(kReplicationGroupId, {new_producer_table}, true /* add_tables */));
3224+
3225+
const auto create_index_stmt = Format(
3226+
"CREATE INDEX my_idx ON $0 ($1 ASC)", new_producer_table->name().table_name(),
3227+
kKeyColumnName);
3228+
3229+
ASSERT_OK(producer_conn.Execute(create_index_stmt));
3230+
ASSERT_OK(consumer_conn.Execute(create_index_stmt));
3231+
3232+
ASSERT_OK(InsertRowsInProducer(
3233+
run_count * kBatchSize, (run_count + 1) * kBatchSize, new_producer_table));
3234+
ASSERT_OK(VerifyWrittenRecords(new_producer_table, new_consumer_table));
3235+
3236+
ASSERT_OK(DropYsqlTable(producer_cluster_, *new_producer_table.get()));
3237+
ASSERT_OK(DropYsqlTable(consumer_cluster_, *new_consumer_table.get()));
3238+
}
3239+
3240+
ASSERT_OK(InsertRowsInProducer(0, 10, producer_table_));
3241+
ASSERT_OK(VerifyWrittenRecords());
3242+
}
3243+
31993244
} // namespace yb

src/yb/integration-tests/xcluster/xcluster_ysql_index-test.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ namespace yb {
4444
const string kTableName = "test_table";
4545
const string kIndexName = "test_index";
4646
const auto kInsertStmtFormat = Format("INSERT INTO $0 VALUES($1, $1)", kTableName, "$0");
47-
const auto kDropIndexStmt = Format("DROP INDEX $0", kIndexName);
4847
const auto kId1CountStmt = Format("SELECT COUNT(*) FROM $0 WHERE id1 >= 0", kTableName);
4948
const auto kId2CountStmt = Format("SELECT COUNT(*) FROM $0 WHERE id2 >= 0", kTableName);
5049
const auto kSelectAllId12Stmt = Format("SELECT id1, id2 FROM $0 ORDER BY id1, id2", kTableName);
@@ -157,6 +156,10 @@ class XClusterYsqlIndexTest : public XClusterYsqlTestBase {
157156
return conn.Execute(Format("CREATE INDEX $0 ON $1 (id2 ASC)", kIndexName, kTableName));
158157
}
159158

159+
virtual Status DropIndex(pgwrapper::PGConn& conn) {
160+
return conn.Execute(Format("DROP INDEX $0", kIndexName));
161+
}
162+
160163
auto GetAllRows(pgwrapper::PGConn* conn) {
161164
return conn->FetchRows<int32_t, int32_t>(kSelectAllId12Stmt);
162165
}
@@ -541,6 +544,30 @@ TEST_F(XClusterDbScopedYsqlIndexTest, CreateIndexWithWorkload) {
541544
ASSERT_OK(TestCreateIndexConcurrentWorkload());
542545
}
543546

547+
// Create and drop indexes in a loop with PITR which will keep the dropped tables in
548+
// hidden state.
549+
TEST_F(XClusterDbScopedYsqlIndexTest, CreateDropIndexWithPITR) {
550+
ASSERT_OK(EnablePITROnClusters());
551+
552+
for (int run_count = 0; run_count < 2; run_count++) {
553+
LOG(INFO) << "Run count: " << run_count;
554+
555+
ASSERT_OK(CreateIndex(*producer_conn_));
556+
ASSERT_OK(CreateIndex(*consumer_conn_));
557+
558+
// Insert more rows and validate.
559+
for (int i = 0; i < 10; i++, row_count_++) {
560+
ASSERT_OK(producer_conn_->ExecuteFormat(kInsertStmtFormat, row_count_));
561+
}
562+
563+
ASSERT_OK(WaitForSafeTimeToAdvanceToNow());
564+
ASSERT_OK(ValidateRows());
565+
566+
ASSERT_OK(DropIndex(*producer_conn_));
567+
ASSERT_OK(DropIndex(*consumer_conn_));
568+
}
569+
}
570+
544571
class XClusterYsqlIndexProducerOnlyTest : public XClusterYsqlIndexTest {
545572
void SetUp() override {
546573
XClusterYsqlTestBase::SetUp();

src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "yb/integration-tests/xcluster/xcluster_ysql_test_base.h"
1515

1616
#include "yb/client/client.h"
17+
#include "yb/client/snapshot_test_util.h"
1718
#include "yb/client/table.h"
1819
#include "yb/client/xcluster_client.h"
1920
#include "yb/client/yb_table_name.h"
@@ -1112,4 +1113,16 @@ Status XClusterYsqlTestBase::VerifyDDLExtensionTablesDeletion(
11121113
});
11131114
}
11141115

1116+
Status XClusterYsqlTestBase::EnablePITROnClusters() {
1117+
return RunOnBothClusters([this](Cluster* cluster) -> Status {
1118+
client::SnapshotTestUtil snapshot_util;
1119+
snapshot_util.SetProxy(&cluster->client_->proxy_cache());
1120+
snapshot_util.SetCluster(cluster->mini_cluster_.get());
1121+
1122+
RETURN_NOT_OK(snapshot_util.CreateSchedule(
1123+
nullptr, YQL_DATABASE_PGSQL, namespace_name, client::WaitSnapshot::kTrue,
1124+
2s * kTimeMultiplier, 20h));
1125+
return Status::OK();
1126+
});
1127+
}
11151128
} // namespace yb

src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ class XClusterYsqlTestBase : public XClusterTestBase {
185185
Status VerifyDDLExtensionTablesCreation(const NamespaceName& db_name, bool only_source = false);
186186
Status VerifyDDLExtensionTablesDeletion(const NamespaceName& db_name, bool only_source = false);
187187

188+
Status EnablePITROnClusters();
189+
188190
protected:
189191
void TestReplicationWithSchemaChanges(TableId producer_table_id, bool bootstrap);
190192

src/yb/master/xrepl_catalog_manager.cc

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "yb/util/is_operation_done_result.h"
3636
#include "yb/master/xcluster/xcluster_manager.h"
3737
#include "yb/master/xcluster/xcluster_replication_group.h"
38+
#include "yb/master/xcluster/master_xcluster_util.h"
3839
#include "yb/master/xcluster_consumer_registry_service.h"
3940
#include "yb/master/xcluster_rpc_tasks.h"
4041
#include "yb/master/master.h"
@@ -3546,9 +3547,7 @@ Status CatalogManager::BootstrapProducer(
35463547
SCHECK(
35473548
pg_database_type || req->db_type() == YQL_DATABASE_CQL, InvalidArgument,
35483549
"Invalid database type");
3549-
SCHECK(
3550-
req->has_namespace_name() && !req->namespace_name().empty(), InvalidArgument,
3551-
"No namespace specified");
3550+
SCHECK_PB_FIELDS_NOT_EMPTY(*req, namespace_name);
35523551
SCHECK_GT(req->table_name_size(), 0, InvalidArgument, "No tables specified");
35533552
if (pg_database_type) {
35543553
SCHECK_EQ(
@@ -3560,21 +3559,38 @@ Status CatalogManager::BootstrapProducer(
35603559
"Pg Schema does not apply to CQL databases");
35613560
}
35623561

3562+
NamespaceIdentifierPB ns_id;
3563+
ns_id.set_database_type(req->db_type());
3564+
ns_id.set_name(req->namespace_name());
3565+
auto ns = VERIFY_RESULT(FindNamespace(ns_id));
3566+
// We can skip sequence data, since that is only used in db scoped xCluster which does not call
3567+
// this function.
3568+
auto all_tables = VERIFY_RESULT(
3569+
GetTablesEligibleForXClusterReplication(*this, ns->id(), /*include_sequences_data= */ false));
3570+
35633571
cdc::BootstrapProducerRequestPB bootstrap_req;
35643572
master::TSDescriptorPtr ts = nullptr;
35653573
for (int i = 0; i < req->table_name_size(); i++) {
35663574
string pg_schema_name = pg_database_type ? req->pg_schema_name(i) : "";
3567-
auto table_info = GetTableInfoFromNamespaceNameAndTableName(
3568-
req->db_type(), req->namespace_name(), req->table_name(i), pg_schema_name);
3575+
3576+
auto table_designator = std::find_if(
3577+
all_tables.begin(), all_tables.end(),
3578+
[&table_name = req->table_name(i),
3579+
&pg_schema_name](const TableDesignator& table_designator) {
3580+
return table_designator.name() == table_name &&
3581+
table_designator.pgschema_name() == pg_schema_name;
3582+
});
35693583
SCHECK(
3570-
table_info, NotFound, Format("Table $0.$1$2 not found"), req->namespace_name(),
3571-
(pg_schema_name.empty() ? "" : pg_schema_name + "."), req->table_name(i));
3584+
table_designator != all_tables.end(), NotFound, Format("Table $0.$1$2 not found"),
3585+
req->namespace_name(), (pg_schema_name.empty() ? "" : pg_schema_name + "."),
3586+
req->table_name(i));
35723587

3573-
bootstrap_req.add_table_ids(table_info->id());
3574-
resp->add_table_ids(table_info->id());
3588+
bootstrap_req.add_table_ids(table_designator->id);
3589+
resp->add_table_ids(table_designator->id);
35753590

35763591
// Pick a valid tserver to bootstrap from.
35773592
if (!ts) {
3593+
auto table_info = VERIFY_RESULT(FindTableById(table_designator->id));
35783594
ts = VERIFY_RESULT(VERIFY_RESULT(table_info->GetTablets()).front()->GetLeader());
35793595
}
35803596
}

0 commit comments

Comments
 (0)