Skip to content

Commit abf33ba

Browse files
committed
[yugabyte#27202] YSQL: Fix analyze for large rows
Summary: If multiple batched request are sent to multiple nodes in parallel, their results are combined into single response. This single response can exceed size limits. There are two ways to fix it. 1. set the fetch size limit to ~ `rpc_max_message_size * max_buffer_size_to_rpc_limit_ratio / send_count` 2. modify the batching behaviour #1 is much simpler, so we go with that approach. If the request has no size limit (yb_fetch_size_limit = 0) or a size limit greater than the max allowed, set the size limit to `rpc_max_message_size * max_buffer_size_to_rpc_limit_ratio / send_count`. Jira: DB-16688 Test Plan: ```lang=sql CREATE TABLE tbl1(col0 TEXT) SPLIT INTO 10 TABLETS; INSERT INTO tbl1 SELECT repeat('a', 30000) FROM generate_series(0, 30000); ANALYZE tbl1; -- fails before this diff ``` ```lang=sh ./yb_build.sh --cxx-test pg_tablet_split-test --gtest_filter PgManyTabletsSelect.AnalyzeTableWithLargeRows -n 20 ``` Note that this test uses a separate test class because RPC_max_message_size is a non-runtime flag, and setting it at runtime can result in flaky failures as some sections of the code are aware of the new limit and other's aren't. Reviewers: amartsinchyk Reviewed By: amartsinchyk Subscribers: yql Differential Revision: https://phorge.dev.yugabyte.com/D44455
1 parent aa43df4 commit abf33ba

File tree

2 files changed

+62
-0
lines changed

2 files changed

+62
-0
lines changed

src/yb/yql/pggate/pg_doc_op.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
#include "yb/yql/pggate/util/pg_doc_data.h"
4040
#include "yb/yql/pggate/util/ybc_util.h"
4141

42+
DECLARE_uint64(rpc_max_message_size);
43+
DECLARE_double(max_buffer_size_to_rpc_limit_ratio);
44+
4245
namespace yb::pggate {
4346
namespace {
4447

@@ -375,6 +378,26 @@ Status PgDocOp::SendRequestImpl(ForceNonBufferable force_non_bufferable) {
375378
if (active_op_count_ > 0) {
376379
// Send at most "parallelism_level_" number of requests at one time.
377380
size_t send_count = std::min(parallelism_level_, active_op_count_);
381+
382+
uint64_t max_size = FLAGS_rpc_max_message_size * FLAGS_max_buffer_size_to_rpc_limit_ratio
383+
/ send_count;
384+
385+
for (const auto& op : pgsql_ops_) {
386+
if (op->is_active() && op->is_read()) {
387+
auto& read_op = down_cast<PgsqlReadOp&>(*op);
388+
auto req_size_limit = read_op.read_request().size_limit();
389+
if (req_size_limit > 0) {
390+
VLOG(2) << "Capping read op at size limit: " << max_size
391+
<< " (from " << req_size_limit << ")";
392+
}
393+
394+
// Cap the size limit if the size limit is unset or exceeds the maximum size.
395+
if (req_size_limit > max_size || req_size_limit == 0) {
396+
read_op.read_request().set_size_limit(max_size);
397+
}
398+
}
399+
}
400+
378401
VLOG(1) << "Number of operations to send: " << send_count;
379402
response_ = VERIFY_RESULT(sender_(
380403
pg_session_.get(), pgsql_ops_.data(), send_count, *table_,

src/yb/yql/pgwrapper/pg_tablet_split-test.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ DECLARE_int32(ysql_client_read_write_timeout_ms);
7373
DECLARE_int64(db_block_size_bytes);
7474
DECLARE_uint64(post_split_compaction_input_size_threshold_bytes);
7575
DECLARE_string(ysql_pg_conf_csv);
76+
DECLARE_int32(ysql_select_parallelism);
77+
DECLARE_uint64(rpc_max_message_size);
7678

7779
DECLARE_bool(TEST_asyncrpc_common_response_check_fail_once);
7880
DECLARE_bool(TEST_pause_before_full_compaction);
@@ -776,6 +778,43 @@ TEST_F(PgTabletSplitTest, TestMetaCacheLookupsPostSplit) {
776778
ASSERT_NE(remote_child->tablet_id(), parent_tablet_id);
777779
}
778780

781+
class PgManyTabletsSelect : public PgTabletSplitTest {
782+
protected:
783+
protected:
784+
void SetUp() override {
785+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_select_parallelism) = 20;
786+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_rpc_max_message_size) = 1_MB;
787+
788+
PgTabletSplitTest::SetUp();
789+
}
790+
};
791+
792+
TEST_F(PgManyTabletsSelect, AnalyzeTableWithLargeRows) {
793+
const auto table_name = "big_table";
794+
const auto rows = 1000;
795+
auto conn = ASSERT_RESULT(Connect());
796+
797+
ASSERT_OK(conn.ExecuteFormat(
798+
"CREATE TABLE $0(wide TEXT) SPLIT INTO $1 TABLETS",
799+
table_name, FLAGS_ysql_select_parallelism));
800+
ASSERT_OK(conn.ExecuteFormat(
801+
"INSERT INTO $0 SELECT repeat('a', 1100) FROM generate_series(1, $1)", table_name, rows));
802+
// 1100 bytes per row * 1000 rows ~= 1.1 MB total size.
803+
804+
ASSERT_OK(conn.ExecuteFormat("SET yb_fetch_row_limit = 0"));
805+
806+
const auto explain_str = ASSERT_RESULT(conn.FetchAllAsString(Format(
807+
"EXPLAIN (ANALYZE, DIST, FORMAT JSON) SELECT * FROM $0 WHERE wide = wide", table_name)));
808+
LOG(INFO) << "Explain output: " << explain_str;
809+
810+
rapidjson::Document explain_json;
811+
explain_json.Parse(explain_str.c_str());
812+
813+
// The response is too large to fit in one request (RPC_MAX_SIZE_LIMIT), so it is split into two
814+
ASSERT_EQ(explain_json[0]["Storage Read Requests"].GetInt(), 2);
815+
ASSERT_EQ(explain_json[0]["Plan"]["Actual Rows"].GetInt(), rows);
816+
}
817+
779818
class PgPartitioningVersionTest :
780819
public PgTabletSplitTest,
781820
public testing::WithParamInterface<uint32_t> {

0 commit comments

Comments
 (0)