Skip to content

Commit 89b6cef

Browse files
author
Leonardo Parente
authored
Add support to packet size and throughput (#261)
* Add support to packet size and throughput * rename size varible to payload_size
1 parent 2164854 commit 89b6cef

File tree

12 files changed

+121
-28
lines changed

12 files changed

+121
-28
lines changed

src/Metrics.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,11 @@ class Rate final : public Metric
414414
return *this;
415415
}
416416

417+
void operator+=(uint64_t i)
418+
{
419+
_counter.fetch_add(i, std::memory_order_relaxed);
420+
}
421+
417422
uint64_t rate() const
418423
{
419424
return _rate.load(std::memory_order_relaxed);

src/handlers/dns/DnsStreamHandler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ void DnsStreamHandler::stop()
137137
}
138138

139139
// callback from input module
140-
void DnsStreamHandler::process_dnstap_cb(const dnstap::Dnstap &d)
140+
void DnsStreamHandler::process_dnstap_cb(const dnstap::Dnstap &d, [[maybe_unused]] size_t size)
141141
{
142142
if (_f_enabled[Filters::DnstapMsgType] && !_f_dnstap_types[d.message().type()]) {
143143
_metrics->process_dnstap(d, true);

src/handlers/dns/DnsStreamHandler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ class DnsStreamHandler final : public visor::StreamMetricsHandler<DnsMetricsMana
268268
sigslot::connection _tcp_message_connection;
269269

270270
void process_udp_packet_cb(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, uint32_t flowkey, timespec stamp);
271-
void process_dnstap_cb(const dnstap::Dnstap &);
271+
void process_dnstap_cb(const dnstap::Dnstap &, size_t);
272272
void tcp_message_ready_cb(int8_t side, const pcpp::TcpStreamData &tcpData);
273273
void tcp_connection_start_cb(const pcpp::ConnectionData &connectionData);
274274
void tcp_connection_end_cb(const pcpp::ConnectionData &connectionData, pcpp::TcpReassembly::ConnectionEndReason reason);

src/handlers/input_resources/InputResourcesStreamHandler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ void InputResourcesStreamHandler::process_sflow_cb([[maybe_unused]] const SFSamp
9898
}
9999
}
100100

101-
void InputResourcesStreamHandler::process_dnstap_cb([[maybe_unused]] const dnstap::Dnstap &)
101+
void InputResourcesStreamHandler::process_dnstap_cb([[maybe_unused]] const dnstap::Dnstap &, [[maybe_unused]] size_t)
102102
{
103103
if (difftime(time(NULL), _timer) >= MEASURE_INTERVAL) {
104104
_timer = time(NULL);

src/handlers/input_resources/InputResourcesStreamHandler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class InputResourcesStreamHandler final : public visor::StreamMetricsHandler<Inp
9595
sigslot::connection _policies_connection;
9696

9797
void process_sflow_cb(const SFSample &);
98-
void process_dnstap_cb(const dnstap::Dnstap &);
98+
void process_dnstap_cb(const dnstap::Dnstap &, size_t);
9999
void process_policies_cb(const Policy *policy, InputStream::Action action);
100100
void process_packet_cb(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, timespec stamp);
101101

src/handlers/net/NetStreamHandler.cpp

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,9 @@ void NetStreamHandler::process_sflow_cb(const SFSample &payload)
121121
_metrics->process_sflow(payload);
122122
}
123123

124-
void NetStreamHandler::process_dnstap_cb(const dnstap::Dnstap &payload)
124+
void NetStreamHandler::process_dnstap_cb(const dnstap::Dnstap &payload, size_t size)
125125
{
126-
_metrics->process_dnstap(payload);
126+
_metrics->process_dnstap(payload, size);
127127
}
128128

129129
void NetStreamHandler::process_udp_packet_cb(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, [[maybe_unused]] uint32_t flowkey, timespec stamp)
@@ -139,6 +139,8 @@ void NetworkMetricsBucket::specialized_merge(const AbstractMetricsBucket &o)
139139
// rates maintain their own thread safety
140140
_rate_in.merge(other._rate_in);
141141
_rate_out.merge(other._rate_out);
142+
_throughput_in.merge(other._throughput_in);
143+
_throughput_out.merge(other._throughput_out);
142144

143145
std::shared_lock r_lock(other._mutex);
144146
std::unique_lock w_lock(_mutex);
@@ -166,13 +168,17 @@ void NetworkMetricsBucket::specialized_merge(const AbstractMetricsBucket &o)
166168
_topGeoLoc.merge(other._topGeoLoc);
167169
_topASN.merge(other._topASN);
168170
}
171+
172+
_payload_size.merge(other._payload_size);
169173
}
170174

171175
void NetworkMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap add_labels) const
172176
{
173177

174178
_rate_in.to_prometheus(out, add_labels);
175179
_rate_out.to_prometheus(out, add_labels);
180+
_throughput_in.to_prometheus(out, add_labels);
181+
_throughput_out.to_prometheus(out, add_labels);
176182

177183
{
178184
auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe
@@ -208,6 +214,8 @@ void NetworkMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMa
208214
_topGeoLoc.to_prometheus(out, add_labels);
209215
_topASN.to_prometheus(out, add_labels);
210216
}
217+
218+
_payload_size.to_prometheus(out, add_labels);
211219
}
212220

213221
void NetworkMetricsBucket::to_json(json &j) const
@@ -217,6 +225,8 @@ void NetworkMetricsBucket::to_json(json &j) const
217225
bool live_rates = !read_only() && !recorded_stream();
218226
_rate_in.to_json(j, live_rates);
219227
_rate_out.to_json(j, live_rates);
228+
_throughput_in.to_json(j, live_rates);
229+
_throughput_out.to_json(j, live_rates);
220230

221231
{
222232
auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe
@@ -252,13 +262,15 @@ void NetworkMetricsBucket::to_json(json &j) const
252262
_topGeoLoc.to_json(j);
253263
_topASN.to_json(j);
254264
}
265+
266+
_payload_size.to_json(j);
255267
}
256268

257269
// the main bucket analysis
258270
void NetworkMetricsBucket::process_packet(bool deep, pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4)
259271
{
260272
if (!deep) {
261-
process_net_layer(dir, l3, l4);
273+
process_net_layer(dir, l3, l4, payload.getRawPacket()->getRawDataLen());
262274
return;
263275
}
264276

@@ -285,9 +297,9 @@ void NetworkMetricsBucket::process_packet(bool deep, pcpp::Packet &payload, Pack
285297
}
286298
}
287299

288-
process_net_layer(dir, l3, l4, is_ipv6, ipv4_in, ipv4_out, ipv6_in, ipv6_out);
300+
process_net_layer(dir, l3, l4, payload.getRawPacket()->getRawDataLen(), is_ipv6, ipv4_in, ipv4_out, ipv6_in, ipv6_out);
289301
}
290-
void NetworkMetricsBucket::process_dnstap(bool deep, const dnstap::Dnstap &payload)
302+
void NetworkMetricsBucket::process_dnstap(bool deep, const dnstap::Dnstap &payload, size_t size)
291303
{
292304
pcpp::ProtocolType l3;
293305
bool is_ipv6{false};
@@ -335,7 +347,7 @@ void NetworkMetricsBucket::process_dnstap(bool deep, const dnstap::Dnstap &paylo
335347
}
336348

337349
if (!deep) {
338-
process_net_layer(dir, l3, l4);
350+
process_net_layer(dir, l3, l4, size);
339351
return;
340352
}
341353

@@ -354,7 +366,7 @@ void NetworkMetricsBucket::process_dnstap(bool deep, const dnstap::Dnstap &paylo
354366
ipv6_out = pcpp::IPv6Address(reinterpret_cast<const uint8_t *>(payload.message().response_address().data()));
355367
}
356368

357-
process_net_layer(dir, l3, l4, is_ipv6, ipv4_in, ipv4_out, ipv6_in, ipv6_out);
369+
process_net_layer(dir, l3, l4, size, is_ipv6, ipv4_in, ipv4_out, ipv6_in, ipv6_out);
358370
}
359371

360372
void NetworkMetricsBucket::process_sflow(bool deep, const SFSample &payload)
@@ -385,7 +397,7 @@ void NetworkMetricsBucket::process_sflow(bool deep, const SFSample &payload)
385397
}
386398

387399
if (!deep) {
388-
process_net_layer(dir, l3, l4);
400+
process_net_layer(dir, l3, l4, payload.rawSampleLen);
389401
return;
390402
}
391403

@@ -410,20 +422,22 @@ void NetworkMetricsBucket::process_sflow(bool deep, const SFSample &payload)
410422
ipv6_out = pcpp::IPv6Address(sample.ipdst.address.ip_v6.addr);
411423
}
412424

413-
process_net_layer(dir, l3, l4, is_ipv6, ipv4_in, ipv4_out, ipv6_in, ipv6_out);
425+
process_net_layer(dir, l3, l4, payload.rawSampleLen, is_ipv6, ipv4_in, ipv4_out, ipv6_in, ipv6_out);
414426
}
415427
}
416428

417-
void NetworkMetricsBucket::process_net_layer(PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4)
429+
void NetworkMetricsBucket::process_net_layer(PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, size_t payload_size)
418430
{
419431
std::unique_lock lock(_mutex);
420432

421433
switch (dir) {
422434
case PacketDirection::fromHost:
423435
++_rate_out;
436+
_throughput_out += payload_size;
424437
break;
425438
case PacketDirection::toHost:
426439
++_rate_in;
440+
_throughput_in += payload_size;
427441
break;
428442
case PacketDirection::unknown:
429443
break;
@@ -464,18 +478,22 @@ void NetworkMetricsBucket::process_net_layer(PacketDirection dir, pcpp::Protocol
464478
break;
465479
}
466480
}
481+
482+
_payload_size.update(payload_size);
467483
}
468484

469-
void NetworkMetricsBucket::process_net_layer(PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, bool is_ipv6, pcpp::IPv4Address &ipv4_in, pcpp::IPv4Address &ipv4_out, pcpp::IPv6Address &ipv6_in, pcpp::IPv6Address &ipv6_out)
485+
void NetworkMetricsBucket::process_net_layer(PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, size_t payload_size, bool is_ipv6, pcpp::IPv4Address &ipv4_in, pcpp::IPv4Address &ipv4_out, pcpp::IPv6Address &ipv6_in, pcpp::IPv6Address &ipv6_out)
470486
{
471487
std::unique_lock lock(_mutex);
472488

473489
switch (dir) {
474490
case PacketDirection::fromHost:
475491
++_rate_out;
492+
_throughput_out += payload_size;
476493
break;
477494
case PacketDirection::toHost:
478495
++_rate_in;
496+
_throughput_in += payload_size;
479497
break;
480498
case PacketDirection::unknown:
481499
break;
@@ -517,6 +535,8 @@ void NetworkMetricsBucket::process_net_layer(PacketDirection dir, pcpp::Protocol
517535
}
518536
}
519537

538+
_payload_size.update(payload_size);
539+
520540
struct sockaddr_in sa4;
521541
struct sockaddr_in6 sa6;
522542

@@ -586,7 +606,7 @@ void NetworkMetricsManager::process_packet(pcpp::Packet &payload, PacketDirectio
586606
live_bucket()->process_packet(_deep_sampling_now, payload, dir, l3, l4);
587607
}
588608

589-
void NetworkMetricsManager::process_dnstap(const dnstap::Dnstap &payload)
609+
void NetworkMetricsManager::process_dnstap(const dnstap::Dnstap &payload, size_t size)
590610
{
591611
// dnstap message type
592612
auto mtype = payload.message().type();
@@ -616,7 +636,7 @@ void NetworkMetricsManager::process_dnstap(const dnstap::Dnstap &payload)
616636
// base event
617637
new_event(stamp);
618638
// process in the "live" bucket. this will parse the resources if we are deep sampling
619-
live_bucket()->process_dnstap(_deep_sampling_now, payload);
639+
live_bucket()->process_dnstap(_deep_sampling_now, payload, size);
620640
}
621641

622642
void NetworkMetricsManager::process_sflow(const SFSample &payload)

src/handlers/net/NetStreamHandler.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,12 @@ class NetworkMetricsBucket final : public visor::AbstractMetricsBucket
6767
};
6868
counters _counters;
6969

70+
Quantile<std::size_t> _payload_size;
71+
7072
Rate _rate_in;
7173
Rate _rate_out;
74+
Rate _throughput_in;
75+
Rate _throughput_out;
7276

7377
public:
7478
NetworkMetricsBucket()
@@ -78,8 +82,11 @@ class NetworkMetricsBucket final : public visor::AbstractMetricsBucket
7882
, _topASN("packets", "asn", {"top_ASN"}, "Top ASNs by IP")
7983
, _topIPv4("packets", "ipv4", {"top_ipv4"}, "Top IPv4 IP addresses")
8084
, _topIPv6("packets", "ipv6", {"top_ipv6"}, "Top IPv6 IP addresses")
85+
, _payload_size("packets", {"payload_size"}, "Quantiles of payload sizes, in bytes")
8186
, _rate_in("packets", {"rates", "pps_in"}, "Rate of ingress in packets per second")
8287
, _rate_out("packets", {"rates", "pps_out"}, "Rate of egress in packets per second")
88+
, _throughput_in("payload", {"rates", "bps_in"}, "Rate of ingress packets size in bytes per second")
89+
, _throughput_out("payload", {"rates", "bps_out"}, "Rate of egress packets size in bytes per second")
8390
{
8491
set_event_rate_info("packets", {"rates", "pps_total"}, "Rate of all packets (combined ingress and egress) in packets per second");
8592
set_num_events_info("packets", {"total"}, "Total packets processed");
@@ -104,13 +111,15 @@ class NetworkMetricsBucket final : public visor::AbstractMetricsBucket
104111
// stop rate collection
105112
_rate_in.cancel();
106113
_rate_out.cancel();
114+
_throughput_in.cancel();
115+
_throughput_out.cancel();
107116
}
108117

109118
void process_packet(bool deep, pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4);
110-
void process_dnstap(bool deep, const dnstap::Dnstap &payload);
119+
void process_dnstap(bool deep, const dnstap::Dnstap &payload, size_t size);
111120
void process_sflow(bool deep, const SFSample &payload);
112-
void process_net_layer(PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4);
113-
void process_net_layer(PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, bool is_ipv6, pcpp::IPv4Address &ipv4_in, pcpp::IPv4Address &ipv4_out, pcpp::IPv6Address &ipv6_in, pcpp::IPv6Address &ipv6_out);
121+
void process_net_layer(PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, size_t payload_size);
122+
void process_net_layer(PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, size_t payload_size, bool is_ipv6, pcpp::IPv4Address &ipv4_in, pcpp::IPv4Address &ipv4_out, pcpp::IPv6Address &ipv6_in, pcpp::IPv6Address &ipv6_out);
114123
};
115124

116125
class NetworkMetricsManager final : public visor::AbstractMetricsManager<NetworkMetricsBucket>
@@ -122,7 +131,7 @@ class NetworkMetricsManager final : public visor::AbstractMetricsManager<Network
122131
}
123132

124133
void process_packet(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, timespec stamp);
125-
void process_dnstap(const dnstap::Dnstap &payload);
134+
void process_dnstap(const dnstap::Dnstap &payload, size_t size);
126135
void process_sflow(const SFSample &payload);
127136
};
128137

@@ -155,7 +164,7 @@ class NetStreamHandler final : public visor::StreamMetricsHandler<NetworkMetrics
155164
{"top_ips", group::NetMetrics::TopIps}};
156165

157166
void process_sflow_cb(const SFSample &);
158-
void process_dnstap_cb(const dnstap::Dnstap &);
167+
void process_dnstap_cb(const dnstap::Dnstap &, size_t);
159168
void process_packet_cb(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, timespec stamp);
160169
void process_udp_packet_cb(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, uint32_t flowkey, timespec stamp);
161170
void set_start_tstamp(timespec stamp);

src/handlers/net/tests/test_net_layer.cpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ TEST_CASE("Parse net (dns) random UDP/TCP tests", "[pcap][net]")
163163
CHECK(j["cardinality"]["src_ips_in"] == 1);
164164
CHECK(j["top_ipv4"][0]["estimate"] == 16147);
165165
CHECK(j["top_ipv4"][0]["name"] == "8.8.8.8");
166+
CHECK(j["payload_size"]["p50"] == 74);
166167
}
167168

168169
TEST_CASE("Parse net (dns) with DNS filter only_qname_suffix", "[pcap][dns][net]")
@@ -245,6 +246,47 @@ TEST_CASE("Parse net (dns) sflow stream", "[sflow][net]")
245246
CHECK(j["cardinality"]["src_ips_in"] == 4);
246247
CHECK(j["top_ipv4"][0]["estimate"] == 27054);
247248
CHECK(j["top_ipv4"][0]["name"] == "10.4.2.2");
249+
CHECK(j["payload_size"]["p50"] == 1372);
250+
CHECK(j["payload_size"]["p99"] == 1392);
251+
}
252+
253+
TEST_CASE("Parse net dnstap stream", "[dnstap][net]")
254+
{
255+
256+
DnstapInputStream stream{"dnstap-test"};
257+
stream.config_set("dnstap_file", "inputs/dnstap/tests/fixtures/fixture.dnstap");
258+
stream.config_set<visor::Configurable::StringList>("only_hosts", {"192.168.0.0/24", "2001:db8::/48"});
259+
visor::Config c;
260+
c.config_set<uint64_t>("num_periods", 1);
261+
NetStreamHandler net_handler{"dns-test", &stream, &c};
262+
263+
net_handler.start();
264+
stream.start();
265+
stream.stop();
266+
net_handler.stop();
267+
268+
auto counters = net_handler.metrics()->bucket(0)->counters();
269+
auto event_data = net_handler.metrics()->bucket(0)->event_data_locked();
270+
271+
// confirmed with wireshark
272+
CHECK(event_data.num_events->value() == 153);
273+
CHECK(event_data.num_samples->value() == 153);
274+
CHECK(counters.TCP.value() == 0);
275+
CHECK(counters.UDP.value() == 153);
276+
CHECK(counters.IPv4.value() == 153);
277+
CHECK(counters.IPv6.value() == 0);
278+
CHECK(counters.total_in.value() == 74);
279+
CHECK(counters.total_out.value() == 79);
280+
281+
nlohmann::json j;
282+
net_handler.metrics()->bucket(0)->to_json(j);
283+
284+
CHECK(j["cardinality"]["dst_ips_out"] == 1);
285+
CHECK(j["cardinality"]["src_ips_in"] == 1);
286+
CHECK(j["top_ipv4"][0]["estimate"] == 153);
287+
CHECK(j["top_ipv4"][0]["name"] == "192.168.0.54");
288+
CHECK(j["payload_size"]["p50"] == 100);
289+
CHECK(j["payload_size"]["p99"] == 350);
248290
}
249291

250292
TEST_CASE("Net groups", "[pcap][net]")

src/handlers/net/tests/window-schema.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@
6363
"length": 31,
6464
"start_ts": 1614874231
6565
},
66+
"payload_size": {
67+
"p50": 74,
68+
"p90": 176,
69+
"p95": 187,
70+
"p99": 202
71+
},
6672
"tcp": 13176,
6773
"top_ASN": [],
6874
"top_geoLoc": [],
@@ -86,6 +92,7 @@
8692
"other_l4",
8793
"out",
8894
"period",
95+
"payload_size",
8996
"tcp",
9097
"top_ASN",
9198
"top_geoLoc",
@@ -235,6 +242,16 @@
235242
},
236243
"additionalProperties": false
237244
},
245+
"payload_size": {
246+
"$id": "#/properties/packets/properties/payload_size",
247+
"type": "object",
248+
"title": "The payload_size schema",
249+
"description": "An explanation about the purpose of this instance.",
250+
"default": {},
251+
"examples": [
252+
{}
253+
]
254+
},
238255
"tcp": {
239256
"$id": "#/properties/packets/properties/tcp",
240257
"type": "integer",

0 commit comments

Comments
 (0)