Skip to content

Commit 3021850

Browse files
[BACKPORT 2024.1][yugabyte#23482] CDCSDK: Add an option to disable dynamic table addition while creating a stream
Summary: ####Backport Description No merge conflicts were encountered. ####Original Description This diff adds an option in the yb-admin command `create_change_data_stream` to disable dynamic tables addition on the newly created streams. Dynamic table addition cannot be re-enabled on such streams. By default all the newly created streams will have dynamic tables addition enabled. In order to create a stream with dynamic tables addition disabled, the user needs to specify the `[<dynamic_tables_option>]` which can take the following values: - DYNAMIC_TABLES_ENABLED - DYNAMIC_TABLES_DISABLED Example to create a stream with dynamic tables disabled: ``` ./yb-admin --master_addresses <master-addresses> create_change_data_stream ysql.yugabyte EXPLICIT CHANGE USE_SNAPSHOT DYNAMIC_TABLES_DISABLED ``` For a stream created with dynamic table addition disabled, the field `cdcsdk_disable_dynamic_table_addition` in stream metadata will be set to true. Dynamic table addition will not be disabled by this mechanism in streams associated with replication slots. **Upgrade / Rollback safety ** The diff introduces a new message `CDCSDKStreamCreateOptionsPB` in common.proto. This message contains the newly introduced `CDCSDKDynamicTablesOption` as its only field. The message `CDCSDKStreamCreateOptionsPB` is used only in `CreateCDCStreamRequestPB` and should be used for adding more options if required in future. These changes do not need to be protected by an autoflag for the following reasons: # The proto changes do not get written to the disk. # The newly introduced field is optional. # The proto changes are for yb-admin command related rpcs. Jira: DB-12399 Original commit: d0dfe63 / D37534 Test Plan: ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestDisablingDynamicTableAdditionAtStreamCreationTime ./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestDynamicTablesShouldBeEnabledForStreamsWithSlotName Manual testing Reviewers: skumar, asrinivasan, siddharth.shah, xCluster, hsunder Reviewed By: siddharth.shah Subscribers: ycdcxcluster, ybase, yql Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D37732
1 parent dd776ba commit 3021850

File tree

15 files changed

+156
-15
lines changed

15 files changed

+156
-15
lines changed

src/yb/cdc/cdc_service.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,10 +1092,18 @@ Status CDCServiceImpl::CreateCDCStreamForNamespace(
10921092
snapshot_option = req->cdcsdk_consistent_snapshot_option();
10931093
}
10941094

1095+
// Dynamic Tables option
1096+
CDCSDKDynamicTablesOption dynamic_tables_option =
1097+
CDCSDKDynamicTablesOption::DYNAMIC_TABLES_ENABLED;
1098+
if (req->has_cdcsdk_stream_create_options() &&
1099+
req->cdcsdk_stream_create_options().has_cdcsdk_dynamic_tables_option()) {
1100+
dynamic_tables_option = req->cdcsdk_stream_create_options().cdcsdk_dynamic_tables_option();
1101+
}
1102+
10951103
xrepl::StreamId db_stream_id = VERIFY_RESULT_OR_SET_CODE(
10961104
client()->CreateCDCSDKStreamForNamespace(
10971105
ns_id, options, populate_namespace_id_as_table_id, ReplicationSlotName(""), std::nullopt,
1098-
snapshot_option, deadline),
1106+
snapshot_option, deadline, dynamic_tables_option),
10991107
CDCError(CDCErrorPB::INTERNAL_ERROR));
11001108
resp->set_db_stream_id(db_stream_id.ToString());
11011109
return Status::OK();

src/yb/cdc/cdc_service.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,9 @@ message CreateCDCStreamRequestPB {
230230
optional bool transactional = 7; // [default = false]
231231
optional YQLDatabase db_type = 8 [default = YQL_DATABASE_PGSQL] ;
232232
optional CDCSDKSnapshotOption cdcsdk_consistent_snapshot_option = 9;
233+
// cdcsdk_stream_create_options field should be used for adding more stream creation options
234+
// related to CDCSDK in future.
235+
optional CDCSDKStreamCreateOptionsPB cdcsdk_stream_create_options = 10;
233236
}
234237

235238
message CreateCDCStreamResponsePB {

src/yb/client/client.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,7 +1497,8 @@ Result<xrepl::StreamId> YBClient::CreateCDCSDKStreamForNamespace(
14971497
const std::optional<std::string>& replication_slot_plugin_name,
14981498
const std::optional<CDCSDKSnapshotOption>& consistent_snapshot_option,
14991499
CoarseTimePoint deadline,
1500-
uint64_t *consistent_snapshot_time) {
1500+
const CDCSDKDynamicTablesOption& dynamic_tables_option,
1501+
uint64_t *consistent_snapshot_time_out) {
15011502
CreateCDCStreamRequestPB req;
15021503

15031504
if (populate_namespace_id_as_table_id) {
@@ -1521,13 +1522,15 @@ Result<xrepl::StreamId> YBClient::CreateCDCSDKStreamForNamespace(
15211522
if (replication_slot_plugin_name.has_value()) {
15221523
req.set_cdcsdk_ysql_replication_slot_plugin_name(*replication_slot_plugin_name);
15231524
}
1525+
req.mutable_cdcsdk_stream_create_options()->set_cdcsdk_dynamic_tables_option(
1526+
dynamic_tables_option);
15241527

15251528
CreateCDCStreamResponsePB resp;
15261529
deadline = PatchAdminDeadline(deadline);
15271530
CALL_SYNC_LEADER_MASTER_RPC_WITH_DEADLINE(Replication, req, resp, deadline, CreateCDCStream);
15281531

1529-
if (consistent_snapshot_time && resp.has_cdcsdk_consistent_snapshot_time()) {
1530-
*consistent_snapshot_time = resp.cdcsdk_consistent_snapshot_time();
1532+
if (consistent_snapshot_time_out && resp.has_cdcsdk_consistent_snapshot_time()) {
1533+
*consistent_snapshot_time_out = resp.cdcsdk_consistent_snapshot_time();
15311534
}
15321535
return xrepl::StreamId::FromString(resp.stream_id());
15331536
}

src/yb/client/client.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,9 @@ class YBClient {
611611
const std::optional<std::string>& replication_slot_plugin_name = std::nullopt,
612612
const std::optional<CDCSDKSnapshotOption>& consistent_snapshot_option = std::nullopt,
613613
CoarseTimePoint deadline = CoarseTimePoint(),
614-
uint64_t *consistent_snapshot_time = nullptr);
614+
const CDCSDKDynamicTablesOption& dynamic_tables_option =
615+
CDCSDKDynamicTablesOption::DYNAMIC_TABLES_ENABLED,
616+
uint64_t* consistent_snapshot_time_out = nullptr);
615617

616618
// Delete multiple CDC streams.
617619
Status DeleteCDCStream(

src/yb/common/common.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,3 +671,9 @@ message WaitStateInfoPB {
671671
// A string representation of wait_status_code for easy human consumption.
672672
optional string wait_status_code_as_string = 4;
673673
}
674+
675+
// This message should be used for specifying stream creation options for CDCSDK streams.
676+
message CDCSDKStreamCreateOptionsPB {
677+
optional
678+
CDCSDKDynamicTablesOption cdcsdk_dynamic_tables_option = 1 [default = DYNAMIC_TABLES_ENABLED];
679+
}

src/yb/common/common_types.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,12 @@ enum CDCSDKSnapshotOption {
180180
USE_SNAPSHOT = 2; // Client wishes to consume snapshot from the source universe
181181
}
182182

183+
enum CDCSDKDynamicTablesOption {
184+
DYNAMIC_TABLES_UNDEFINED = 0;
185+
DYNAMIC_TABLES_ENABLED = 1;
186+
DYNAMIC_TABLES_DISABLED = 2;
187+
}
188+
183189
// Available replica identity modes for use in CDC
184190
enum PgReplicaIdentity {
185191
// Entire updated row as new image, only key as old image for DELETE

src/yb/integration-tests/cdcsdk_test_base.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,24 +407,29 @@ void CDCSDKTestBase::InitCreateStreamRequest(
407407
CreateCDCStreamRequestPB* create_req,
408408
const CDCCheckpointType& checkpoint_type,
409409
const CDCRecordType& record_type,
410-
const std::string& namespace_name) {
410+
const std::string& namespace_name,
411+
CDCSDKDynamicTablesOption dynamic_tables_option) {
411412
create_req->set_namespace_name(namespace_name);
412413
create_req->set_checkpoint_type(checkpoint_type);
413414
create_req->set_record_type(record_type);
414415
create_req->set_record_format(CDCRecordFormat::PROTO);
415416
create_req->set_source_type(CDCSDK);
417+
create_req->mutable_cdcsdk_stream_create_options()->set_cdcsdk_dynamic_tables_option(
418+
dynamic_tables_option);
416419
}
417420

418421
// This creates a DB stream on the database kNamespaceName by default.
419422
Result<xrepl::StreamId> CDCSDKTestBase::CreateDBStream(
420-
CDCCheckpointType checkpoint_type, CDCRecordType record_type, std::string namespace_name) {
423+
CDCCheckpointType checkpoint_type, CDCRecordType record_type, std::string namespace_name,
424+
CDCSDKDynamicTablesOption dynamic_tables_option) {
421425
CreateCDCStreamRequestPB req;
422426
CreateCDCStreamResponsePB resp;
423427

424428
rpc::RpcController rpc;
425429
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_write_rpc_timeout_ms));
426430

427-
InitCreateStreamRequest(&req, checkpoint_type, record_type, namespace_name);
431+
InitCreateStreamRequest(
432+
&req, checkpoint_type, record_type, namespace_name, dynamic_tables_option);
428433

429434
RETURN_NOT_OK(cdc_proxy_->CreateCDCStream(req, &resp, &rpc));
430435
if (resp.has_error()) {

src/yb/integration-tests/cdcsdk_test_base.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,12 +215,16 @@ class CDCSDKTestBase : public YBTest {
215215
CreateCDCStreamRequestPB* create_req,
216216
const CDCCheckpointType& checkpoint_type = CDCCheckpointType::EXPLICIT,
217217
const CDCRecordType& record_type = CDCRecordType::CHANGE,
218-
const std::string& namespace_name = kNamespaceName);
218+
const std::string& namespace_name = kNamespaceName,
219+
CDCSDKDynamicTablesOption dynamic_tables_option =
220+
CDCSDKDynamicTablesOption::DYNAMIC_TABLES_ENABLED);
219221

220222
Result<xrepl::StreamId> CreateDBStream(
221223
CDCCheckpointType checkpoint_type = CDCCheckpointType::EXPLICIT,
222224
CDCRecordType record_type = CDCRecordType::CHANGE,
223-
std::string namespace_name = kNamespaceName);
225+
std::string namespace_name = kNamespaceName,
226+
CDCSDKDynamicTablesOption dynamic_tables_option =
227+
CDCSDKDynamicTablesOption::DYNAMIC_TABLES_ENABLED);
224228

225229
// Creates a DB stream on the database kNamespaceName using the Replication Slot syntax.
226230
// Only supports the CDCCheckpointType::EXPLICIT and CDCRecordType::CHANGE.

src/yb/integration-tests/cdcsdk_ysql-test.cc

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9985,5 +9985,70 @@ TEST_F(CDCSDKYsqlTest, TestCleanupOfUnqualifiedTableOnDrop) {
99859985
"Waiting for cdc_state table to be in sync after table drop");
99869986
}
99879987

9988+
TEST_F(CDCSDKYsqlTest, TestDisablingDynamicTableAdditionAtStreamCreationTime) {
9989+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true;
9990+
9991+
// Setup cluster.
9992+
ASSERT_OK(SetUpWithParams(3, 3, false));
9993+
9994+
auto table_1 = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, "table_1"));
9995+
auto table_2 = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, "table_2"));
9996+
9997+
// Create a yb-admin stream with dynamic tables enabled and one with dynamic tables disabled.
9998+
auto stream_id_1 = ASSERT_RESULT(CreateDBStream(
9999+
CDCCheckpointType::EXPLICIT, CDCRecordType::CHANGE, kNamespaceName,
10000+
CDCSDKDynamicTablesOption::DYNAMIC_TABLES_ENABLED));
10001+
auto stream_id_2 = ASSERT_RESULT(CreateDBStream(
10002+
CDCCheckpointType::EXPLICIT, CDCRecordType::CHANGE, kNamespaceName,
10003+
CDCSDKDynamicTablesOption::DYNAMIC_TABLES_DISABLED));
10004+
10005+
auto stream_info = ASSERT_RESULT(GetDBStreamInfo(stream_id_1));
10006+
ASSERT_EQ(stream_info.table_info_size(), 2);
10007+
10008+
stream_info = ASSERT_RESULT(GetDBStreamInfo(stream_id_2));
10009+
ASSERT_EQ(stream_info.table_info_size(), 2);
10010+
10011+
// Create a dynamic table.
10012+
auto table_3 = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, "table_3"));
10013+
10014+
// Sleep for 10 seconds for master bg thread to run.
10015+
SleepFor(MonoDelta::FromSeconds(10 * kTimeMultiplier));
10016+
10017+
// Stream_id_1 will have the dynamic table added.
10018+
stream_info = ASSERT_RESULT(GetDBStreamInfo(stream_id_1));
10019+
ASSERT_EQ(stream_info.table_info_size(), 3);
10020+
10021+
// Stream_id_2 will not have the dynamic table added.
10022+
stream_info = ASSERT_RESULT(GetDBStreamInfo(stream_id_2));
10023+
ASSERT_EQ(stream_info.table_info_size(), 2);
10024+
}
10025+
10026+
TEST_F(CDCSDKYsqlTest, TestDynamicTablesShouldBeEnabledForStreamsWithSlotName) {
10027+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true;
10028+
10029+
// Setup cluster.
10030+
ASSERT_OK(SetUpWithParams(3, 3, false));
10031+
10032+
auto table_1 = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, "table_1"));
10033+
auto table_2 = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, "table_2"));
10034+
10035+
// Create a slot.
10036+
auto slot_stream_id = ASSERT_RESULT(CreateDBStreamWithReplicationSlot());
10037+
10038+
auto stream_info = ASSERT_RESULT(GetDBStreamInfo(slot_stream_id));
10039+
ASSERT_EQ(stream_info.table_info_size(), 2);
10040+
10041+
// Create a dynamic table.
10042+
auto table_3 = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, "table_3"));
10043+
10044+
// Sleep for 10 seconds for master bg thread to run.
10045+
SleepFor(MonoDelta::FromSeconds(10 * kTimeMultiplier));
10046+
10047+
// Since dynamic table addition is always enabled in streams associated with slots, table_3 will
10048+
// get added to its stream metadata.
10049+
stream_info = ASSERT_RESULT(GetDBStreamInfo(slot_stream_id));
10050+
ASSERT_EQ(stream_info.table_info_size(), 3);
10051+
}
10052+
998810053
} // namespace cdc
998910054
} // namespace yb

src/yb/master/master_replication.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ message CreateCDCStreamRequestPB {
7676
optional string cdcsdk_ysql_replication_slot_name = 8;
7777
optional CDCSDKSnapshotOption cdcsdk_consistent_snapshot_option = 9;
7878
optional string cdcsdk_ysql_replication_slot_plugin_name = 10;
79+
// cdcsdk_stream_create_options field should be used for adding more stream creation options
80+
// related to CDCSDK in future.
81+
optional CDCSDKStreamCreateOptionsPB cdcsdk_stream_create_options = 11;
7982
}
8083

8184
message CreateCDCStreamResponsePB {

0 commit comments

Comments
 (0)