Skip to content

Commit cf0c67f

Browse files
committed
[yugabyte#19137][yugabyte#20022] YSQL, ASH: ASH Infrastructure and instrumentation for PGGate
Summary: Postgres has the infrastructure to start and end wait events. This diff adds a callback to use a postgres function to start and end wait events in pggate layer A new class `Flusher` is introduced which flushes the bufferable operations and returns a `FlushFuture` object which contains the `PerformFuture` object and a struct `MetricContext`. The `MetricContext` struct can update the wait event and the duration of the flush request while waiting for the future object. This diff also instruments the reads and flushes in the pggate layer using RAII resource management. This diff also includes some other minor changes. Jira: DB-7935, DB-8989 Test Plan: Jenkins Reviewers: jason, dmitry Reviewed By: dmitry Subscribers: hbhanawat, amitanand, yql Differential Revision: https://phorge.dev.yugabyte.com/D30454
1 parent f0cd315 commit cf0c67f

File tree

11 files changed

+212
-76
lines changed

11 files changed

+212
-76
lines changed

src/postgres/src/backend/utils/misc/pg_yb_utils.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,7 @@ YBInitPostgresBackend(
863863
callbacks.UnixEpochToPostgresEpoch = &YbUnixEpochToPostgresEpoch;
864864
callbacks.ConstructArrayDatum = &YbConstructArrayDatum;
865865
callbacks.CheckUserMap = &check_usermap;
866+
callbacks.PgstatReportWaitStart = &yb_pgstat_report_wait_start;
866867
YBCInitPgGate(type_table, count, callbacks, session_id, &MyProc->yb_ash_metadata,
867868
&MyProc->yb_is_ash_metadata_set);
868869
YBCInstallTxnDdlHook();

src/postgres/src/include/pgstat.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1433,6 +1433,34 @@ pgstat_report_wait_end(void)
14331433
return pgstat_report_wait_end_for_proc(MyProc);
14341434
}
14351435

1436+
/* ----------
1437+
* yb_pgstat_report_wait_start() -
1438+
*
1439+
* Called to get the current wait event info and set a new wait
1440+
* event info.
1441+
*
1442+
* NB: this *must* be able to survive being called before MyProc has been
1443+
* initialized.
1444+
* ----------
1445+
*/
1446+
static inline uint32
1447+
yb_pgstat_report_wait_start(uint32 wait_event_info)
1448+
{
1449+
uint32 prev_wait_event_info = 0;
1450+
volatile PGPROC *proc = MyProc;
1451+
1452+
if (pgstat_track_activities && proc)
1453+
{
1454+
/*
1455+
* Since this is a four-byte field which is always read and written as
1456+
* four-bytes, updates are atomic.
1457+
*/
1458+
prev_wait_event_info = proc->wait_event_info;
1459+
proc->wait_event_info = wait_event_info;
1460+
}
1461+
return prev_wait_event_info;
1462+
}
1463+
14361464
/* nontransactional event counts are simple enough to inline */
14371465

14381466
#define pgstat_count_heap_scan(rel) \

src/yb/ash/wait_state.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ YB_DEFINE_TYPED_ENUM(WaitStateCode, uint32_t,
105105
// Don't change the value of kUnused
106106
((kUnused, 0xFFFFFFFFU))
107107

108+
// Wait states related to postgres
109+
// Don't change the position of kPostgresReserved
110+
((kPostgresReserved, YB_ASH_MAKE_EVENT(TServerWait)))
111+
(kCatalogRead)
112+
(kIndexRead)
113+
(kStorageRead)
114+
(kStorageFlush)
115+
108116
// Common wait states
109117
((kOnCpu_Active, YB_ASH_MAKE_EVENT(Common)))
110118
(kOnCpu_Passive)

src/yb/yql/pggate/pg_doc_op.cc

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
#include <algorithm>
1818
#include <utility>
1919

20+
#include "yb/ash/wait_state.h"
21+
2022
#include "yb/common/pg_system_attr.h"
2123
#include "yb/common/row_mark.h"
2224

@@ -36,10 +38,7 @@
3638
#include "yb/yql/pggate/util/pg_doc_data.h"
3739
#include "yb/yql/pggate/util/ybc_util.h"
3840

39-
using std::string;
40-
41-
namespace yb {
42-
namespace pggate {
41+
namespace yb::pggate {
4342
namespace {
4443

4544
struct PgDocReadOpCachedHelper {
@@ -128,7 +127,7 @@ auto BuildRowOrders(const LWPgsqlResponsePB& response,
128127
// Helper function to determine the type of relation that the given Pgsql operation is being
129128
// performed on. This function classifies the operation into one of three buckets: system catalog,
130129
// secondary index or user table requests.
131-
TableType ResolveRelationType(const PgsqlOp& op, const PgTable& table) {
130+
[[nodiscard]] TableType ResolveRelationType(const PgsqlOp& op, const PgTable& table) {
132131
if (table->schema().table_properties().is_ysql_catalog_table()) {
133132
// We don't distinguish between table reads and index reads for a catalog table.
134133
return TableType::SYSTEM;
@@ -150,6 +149,35 @@ TableType ResolveRelationType(const PgsqlOp& op, const PgTable& table) {
150149
return TableType::USER;
151150
}
152151

152+
[[nodiscard]] inline ash::WaitStateCode ResolveWaitEventCode(TableType table_type) {
153+
switch (table_type) {
154+
case TableType::SYSTEM: return ash::WaitStateCode::kCatalogRead;
155+
case TableType::INDEX: return ash::WaitStateCode::kIndexRead;
156+
case TableType::USER: return ash::WaitStateCode::kStorageRead;
157+
}
158+
FATAL_INVALID_ENUM_VALUE(TableType, table_type);
159+
}
160+
161+
Result<PgDocResponse::Data> FetchResponseData(
162+
PgDocResponse* response, PgSession* session, TableType table_type, bool is_write) {
163+
if (is_write) {
164+
return response->Get();
165+
}
166+
// Update session stats instrumentation only for read requests. Reads are executed
167+
// synchronously with respect to Postgres query execution, and thus it is possible to
168+
// correlate wait/execution times directly with the request. We update instrumentation for
169+
// reads exactly once, upon receiving a success response from the underlying storage layer.
170+
auto& metrics = session->metrics();
171+
uint64_t wait_time = 0;
172+
const auto result = VERIFY_RESULT(metrics.CallWithDuration(
173+
[response,
174+
event_watcher = session->StartWaitEvent(ResolveWaitEventCode(table_type))] {
175+
return response->Get(); },
176+
&wait_time));
177+
metrics.ReadRequest(table_type, wait_time);
178+
return result;
179+
}
180+
153181
} // namespace
154182

155183
PgDocResult::PgDocResult(rpc::SidecarHolder data, std::vector<int64_t>&& row_orders)
@@ -283,18 +311,8 @@ Result<std::list<PgDocResult>> PgDocOp::GetResult() {
283311
RETURN_NOT_OK(SendRequest());
284312
}
285313

286-
uint64_t wait_time = 0;
287-
auto result_data = VERIFY_RESULT(pg_session_->metrics().CallWithDuration(
288-
[&response = response_] { return response.Get(); }, &wait_time));
289-
290-
// Update session stats instrumentation only for read requests. Reads are executed
291-
// synchronously with respect to Postgres query execution, and thus it is possible to
292-
// correlate wait/execution times directly with the request. We update instrumentation for
293-
// reads exactly once, upon receiving a success response from the underlying storage layer.
294-
if (!IsWrite()) {
295-
pg_session_->metrics().ReadRequest(
296-
ResolveRelationType(*pgsql_ops_.front(), table_), wait_time);
297-
}
314+
const auto result_data = VERIFY_RESULT(FetchResponseData(
315+
&response_, &*pg_session_, ResolveRelationType(*pgsql_ops_.front(), table_), IsWrite()));
298316

299317
result = VERIFY_RESULT(ProcessResponse(result_data));
300318

@@ -343,7 +361,7 @@ Status PgDocOp::SendRequestImpl(ForceNonBufferable force_non_bufferable) {
343361

344362
// Update session stats instrumentation for write requests only. Writes are buffered and flushed
345363
// asynchronously, and thus it is not possible to correlate wait/execution times directly with
346-
// the request. We update instrumentation for writes sexactly once, after successfully sending an
364+
// the request. We update instrumentation for writes exactly once, after successfully sending an
347365
// RPC request to the underlying storage layer.
348366
if (IsWrite()) {
349367
pg_session_->metrics().WriteRequest(ResolveRelationType(*pgsql_ops_.front(), table_));
@@ -1175,7 +1193,7 @@ Result<bool> PgDocReadOp::SetScanPartitionBoundary() {
11751193
partition_key != partition_keys.end(), InvalidArgument, "invalid partition key given");
11761194

11771195
// Seek upper bound (Beginning of next tablet).
1178-
string upper_bound;
1196+
std::string upper_bound;
11791197
const auto& next_partition_key = std::next(partition_key, 1);
11801198
if (next_partition_key != partition_keys.end()) {
11811199
upper_bound = *next_partition_key;
@@ -1493,5 +1511,4 @@ PgDocOp::SharedPtr MakeDocReadOpWithData(
14931511
return std::make_shared<PgDocReadOpCached>(pg_session, std::move(data));
14941512
}
14951513

1496-
} // namespace pggate
1497-
} // namespace yb
1514+
} // namespace yb::pggate

src/yb/yql/pggate/pg_operation_buffer.cc

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@
4242
#include "yb/yql/pggate/pg_op.h"
4343
#include "yb/yql/pggate/pg_tabledesc.h"
4444

45-
namespace yb {
46-
namespace pggate {
45+
namespace yb::pggate {
4746

4847
namespace {
4948

@@ -142,11 +141,60 @@ inline bool IsTableUsedByRequest(const LWPgsqlReadRequestPB& request, const Slic
142141

143142
using RowKeys = std::unordered_set<RowIdentifier, boost::hash<RowIdentifier>>;
144143

144+
struct MetricContext {
145+
PgDocMetrics& metrics;
146+
PgWaitEventWatcher::Starter wait_starter;
147+
};
148+
149+
class FlushFuture {
150+
public:
151+
FlushFuture(PerformFuture&& future, MetricContext* context)
152+
: future_(std::move(future)), context_(context) {}
153+
154+
bool Ready() const {
155+
return future_.Ready();
156+
}
157+
158+
Status EnsureCompleted() {
159+
uint64_t duration = 0;
160+
auto& metrics = context_->metrics;
161+
{
162+
PgWaitEventWatcher watcher(context_->wait_starter,
163+
ash::WaitStateCode::kStorageFlush);
164+
RETURN_NOT_OK(metrics.CallWithDuration(
165+
[&future = future_] { return future.Get(); }, &duration));
166+
}
167+
metrics.FlushRequest(duration);
168+
return Status::OK();
169+
}
170+
171+
private:
172+
PerformFuture future_;
173+
MetricContext* context_;
174+
};
175+
176+
class Flusher {
177+
public:
178+
Flusher(
179+
PgOperationBuffer::OperationsFlusher&& ops_flusher, PgDocMetrics* metrics,
180+
PgWaitEventWatcher::Starter wait_starter)
181+
: ops_flusher_(std::move(ops_flusher)),
182+
context_{*metrics, wait_starter} {}
183+
184+
Result<FlushFuture> Flush(BufferableOperations&& ops, bool transactional) {
185+
return FlushFuture{VERIFY_RESULT(ops_flusher_(std::move(ops), transactional)), &context_};
186+
}
187+
188+
private:
189+
PgOperationBuffer::OperationsFlusher ops_flusher_;
190+
MetricContext context_;
191+
};
192+
145193
struct InFlightOperation {
146194
RowKeys keys;
147-
PerformFuture future;
195+
FlushFuture future;
148196

149-
explicit InFlightOperation(PerformFuture future_)
197+
explicit InFlightOperation(FlushFuture&& future_)
150198
: future(std::move(future_)) {}
151199
};
152200

@@ -199,12 +247,10 @@ size_t BufferableOperations::size() const {
199247
class PgOperationBuffer::Impl {
200248
public:
201249
Impl(
202-
const Flusher& flusher,
203-
const BufferingSettings& buffering_settings,
204-
PgDocMetrics* metrics)
205-
: flusher_(flusher),
206-
buffering_settings_(buffering_settings),
207-
metrics_(*metrics) {}
250+
OperationsFlusher&& ops_flusher, PgDocMetrics* metrics,
251+
PgWaitEventWatcher::Starter wait_starter, const BufferingSettings& buffering_settings)
252+
: flusher_(std::move(ops_flusher), metrics, wait_starter),
253+
buffering_settings_(buffering_settings) {}
208254

209255
Status Add(const PgTableDesc& table, PgsqlWriteOpPtr op, bool transactional) {
210256
return ClearOnError(DoAdd(table, std::move(op), transactional));
@@ -317,10 +363,7 @@ class PgOperationBuffer::Impl {
317363

318364
Status EnsureCompleted(size_t count) {
319365
for (; count && !in_flight_ops_.empty(); --count) {
320-
uint64_t duration = 0;
321-
auto result = VERIFY_RESULT(metrics_.CallWithDuration(
322-
[&future = in_flight_ops_.front().future] { return future.Get(); }, &duration));
323-
metrics_.FlushRequest(duration);
366+
RETURN_NOT_OK(in_flight_ops_.front().future.EnsureCompleted());
324367
in_flight_ops_.pop_front();
325368
}
326369
return Status::OK();
@@ -378,7 +421,7 @@ class PgOperationBuffer::Impl {
378421
RETURN_NOT_OK(EnsureCompleted(1));
379422
}
380423
in_flight_ops_.push_back(
381-
InFlightOperation(VERIFY_RESULT(flusher_(std::move(ops), transactional))));
424+
InFlightOperation(VERIFY_RESULT(flusher_.Flush(std::move(ops), transactional))));
382425
return true;
383426
}
384427
return false;
@@ -401,20 +444,18 @@ class PgOperationBuffer::Impl {
401444
return false;
402445
}
403446

404-
const Flusher flusher_;
447+
Flusher flusher_;
405448
const BufferingSettings& buffering_settings_;
406449
BufferableOperations ops_;
407450
BufferableOperations txn_ops_;
408451
RowKeys keys_;
409452
InFlightOps in_flight_ops_;
410-
PgDocMetrics& metrics_;
411453
};
412454

413-
PgOperationBuffer::PgOperationBuffer(const Flusher& flusher,
414-
const BufferingSettings& buffering_settings,
415-
PgDocMetrics* metrics)
416-
: impl_(new Impl(flusher, buffering_settings, metrics)) {
417-
}
455+
PgOperationBuffer::PgOperationBuffer(
456+
OperationsFlusher&& ops_flusher, PgDocMetrics* metrics,
457+
PgWaitEventWatcher::Starter wait_starter, const BufferingSettings& buffering_settings)
458+
: impl_(new Impl(std::move(ops_flusher), metrics, wait_starter, buffering_settings)) {}
418459

419460
PgOperationBuffer::~PgOperationBuffer() = default;
420461

@@ -439,5 +480,4 @@ void PgOperationBuffer::Clear() {
439480
impl_->Clear();
440481
}
441482

442-
} // namespace pggate
443-
} // namespace yb
483+
} // namespace yb::pggate

src/yb/yql/pggate/pg_operation_buffer.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#include <functional>
1717
#include <memory>
1818

19+
#include "yb/ash/wait_state.h"
20+
1921
#include "yb/common/common_fwd.h"
2022
#include "yb/common/pg_types.h"
2123

@@ -24,9 +26,9 @@
2426

2527
#include "yb/yql/pggate/pg_gate_fwd.h"
2628
#include "yb/yql/pggate/pg_perform_future.h"
29+
#include "yb/yql/pggate/pg_tools.h"
2730

28-
namespace yb {
29-
namespace pggate {
31+
namespace yb::pggate {
3032

3133
class PgDocMetrics;
3234

@@ -49,12 +51,11 @@ struct BufferableOperations {
4951

5052
class PgOperationBuffer {
5153
public:
52-
using Flusher = std::function<Result<PerformFuture>(BufferableOperations, bool)>;
54+
using OperationsFlusher = std::function<Result<PerformFuture>(BufferableOperations&&, bool)>;
5355

5456
PgOperationBuffer(
55-
const Flusher& flusher,
56-
const BufferingSettings& buffering_settings,
57-
PgDocMetrics* metrics);
57+
OperationsFlusher&& ops_flusher, PgDocMetrics* metrics,
58+
PgWaitEventWatcher::Starter wait_starter, const BufferingSettings& buffering_settings);
5859
~PgOperationBuffer();
5960
Status Add(const PgTableDesc& table, PgsqlWriteOpPtr op, bool transactional);
6061
Status Flush();
@@ -68,5 +69,4 @@ class PgOperationBuffer {
6869
std::unique_ptr<Impl> impl_;
6970
};
7071

71-
} // namespace pggate
72-
} // namespace yb
72+
} // namespace yb::pggate

0 commit comments

Comments
 (0)