Skip to content

Commit e1f77aa

Browse files
committed
Merge branch '2025/10/optimistic'
2 parents 0fee078 + 46165da commit e1f77aa

File tree

3 files changed

+54
-29
lines changed

3 files changed

+54
-29
lines changed

src/sv2/connman.cpp

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -113,37 +113,27 @@ bool Sv2Connman::EventNewConnectionAccepted(NodeId node_id,
113113
return true;
114114
}
115115

116-
void Sv2Connman::EventReadyToSend(NodeId node_id, bool& cancel_recv)
116+
std::pair<size_t, bool> Sv2Connman::SendMessagesAsBytes(Sv2Client& client)
117117
{
118-
AssertLockNotHeld(m_clients_mutex);
119-
120-
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)};
121-
if (client == nullptr) {
122-
cancel_recv = true;
123-
return;
124-
}
125-
126-
LOCK(client->cs_send);
127-
auto it = client->m_send_messages.begin();
118+
auto it = client.m_send_messages.begin();
128119
std::optional<bool> expected_more;
129-
130120
size_t total_sent = 0;
131121

132-
while(true) {
133-
if (it != client->m_send_messages.end()) {
122+
while (true) {
123+
if (it != client.m_send_messages.end()) {
134124
// If possible, move one message from the send queue to the transport.
135125
// This fails when there is an existing message still being sent,
136126
// or when the handshake has not yet completed.
137127
//
138128
// Wrap Sv2NetMsg inside CSerializedNetMsg for transport
139129
CSerializedNetMsg net_msg{*it};
140-
if (client->m_transport->SetMessageToSend(net_msg)) {
130+
if (client.m_transport->SetMessageToSend(net_msg)) {
141131
++it;
142132
}
143133
}
144134

145-
const auto& [data, more, _m_message_type] = client->m_transport->GetBytesToSend(/*have_next_message=*/it != client->m_send_messages.end());
146-
135+
const auto& [data, more, _m_message_type] =
136+
client.m_transport->GetBytesToSend(/*have_next_message=*/it != client.m_send_messages.end());
147137

148138
// We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more
149139
// bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check,
@@ -155,29 +145,53 @@ void Sv2Connman::EventReadyToSend(NodeId node_id, bool& cancel_recv)
155145
std::string errmsg;
156146

157147
if (!data.empty()) {
158-
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Send %d bytes to client id=%zu\n",
159-
data.size() - total_sent, node_id);
148+
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace,
149+
"Send %d bytes to client id=%zu\n", data.size(), client.m_id);
160150

161-
sent = SendBytes(node_id, data, more, errmsg);
151+
sent = SendBytes(client.m_id, data, more, errmsg);
162152
}
163153

164154
if (sent > 0) {
165-
client->m_transport->MarkBytesSent(sent);
155+
client.m_transport->MarkBytesSent(sent);
156+
total_sent += sent;
166157
if (static_cast<size_t>(sent) != data.size()) {
167158
// could not send full message; stop sending more
168159
break;
169160
}
170161
} else {
171162
if (sent < 0) {
172-
LogDebug(BCLog::NET, "socket send error for peer=%d: %s\n", node_id, errmsg);
173-
CloseConnection(node_id);
163+
LogDebug(BCLog::NET, "socket send error for peer=%d: %s\n",
164+
client.m_id, errmsg);
165+
CloseConnection(client.m_id);
174166
}
175167
break;
176168
}
177169
}
178170

179171
// Clear messages that have been handed to transport from the queue
180-
client->m_send_messages.erase(client->m_send_messages.begin(), it);
172+
client.m_send_messages.erase(client.m_send_messages.begin(), it);
173+
174+
return {total_sent, expected_more.value_or(false)};
175+
}
176+
177+
void Sv2Connman::TryOptimisticSend(Sv2Client& client)
178+
{
179+
AssertLockHeld(client.cs_send);
180+
SendMessagesAsBytes(client);
181+
}
182+
183+
void Sv2Connman::EventReadyToSend(NodeId node_id, bool& cancel_recv)
184+
{
185+
AssertLockNotHeld(m_clients_mutex);
186+
187+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)};
188+
if (client == nullptr) {
189+
cancel_recv = true;
190+
return;
191+
}
192+
193+
LOCK(client->cs_send);
194+
const auto [total_sent, more_pending] = SendMessagesAsBytes(*client);
181195

182196
// If both receiving and (non-optimistic) sending were possible, we first attempt
183197
// sending. If that succeeds, but does not fully drain the send queue, do not
@@ -186,9 +200,7 @@ void Sv2Connman::EventReadyToSend(NodeId node_id, bool& cancel_recv)
186200
// sending actually succeeded to make sure progress is always made; otherwise a
187201
// deadlock would be possible when both sides have data to send, but neither is
188202
// receiving.
189-
//
190-
// TODO: decide if this is useful for Sv2
191-
cancel_recv = total_sent > 0; // && more;
203+
cancel_recv = total_sent > 0 && more_pending;
192204
}
193205

194206
void Sv2Connman::EventGotData(Id id, std::span<const uint8_t> data)

src/sv2/connman.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <sv2/transport.h>
1111
#include <pubkey.h>
1212

13+
#include <utility>
14+
1315
namespace {
1416
/*
1517
* Supported Stratum v2 subprotocols
@@ -144,6 +146,9 @@ class Sv2Connman : SockMan
144146

145147
void DisconnectFlagged() EXCLUSIVE_LOCKS_REQUIRED(m_clients_mutex);
146148

149+
std::pair<size_t, bool> SendMessagesAsBytes(Sv2Client& client)
150+
EXCLUSIVE_LOCKS_REQUIRED(client.cs_send);
151+
147152
/**
148153
* Create a `Sv2Client` object and add it to the `m_sv2_clients` member.
149154
* @param[in] node_id Id of the newly accepted connection.
@@ -211,6 +216,13 @@ class Sv2Connman : SockMan
211216
*/
212217
void ProcessSv2Message(const node::Sv2NetMsg& sv2_header, Sv2Client& client);
213218

219+
/**
220+
* Attempt to flush the send queue immediately, without waiting for the
221+
* socket layer to signal readiness. Falls back to the regular send path if
222+
* the socket would block.
223+
*/
224+
void TryOptimisticSend(Sv2Client& client) EXCLUSIVE_LOCKS_REQUIRED(client.cs_send);
225+
214226
std::shared_ptr<Sv2Client> GetClientById(NodeId node_id) const EXCLUSIVE_LOCKS_REQUIRED(m_clients_mutex);
215227

216228
using Sv2ClientFn = std::function<void(Sv2Client&)>;

src/sv2/template_provider.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ void Sv2TemplateProvider::RequestTransactionData(Sv2Client& client, node::Sv2Req
437437
client.m_id);
438438
LOCK(client.cs_send);
439439
client.m_send_messages.emplace_back(request_tx_data_success);
440+
m_connman->TryOptimisticSend(client);
440441
}
441442

442443
void Sv2TemplateProvider::SubmitSolution(node::Sv2SubmitSolutionMsg solution)
@@ -561,8 +562,6 @@ bool Sv2TemplateProvider::SendWork(Sv2Client& client, uint64_t template_id, Bloc
561562
template_id,
562563
future_template};
563564

564-
// TODO: use optimistic send instead of adding to the queue
565-
566565
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x71 NewTemplate id=%lu future=%d to client id=%zu\n", template_id, future_template, client.m_id);
567566
{
568567
LOCK(client.cs_send);
@@ -573,6 +572,8 @@ bool Sv2TemplateProvider::SendWork(Sv2Client& client, uint64_t template_id, Bloc
573572
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x72 SetNewPrevHash to client id=%zu\n", client.m_id);
574573
client.m_send_messages.emplace_back(new_prev_hash);
575574
}
575+
576+
m_connman->TryOptimisticSend(client);
576577
}
577578

578579
CAmount total_fees{0};

0 commit comments

Comments
 (0)