Skip to content

Commit aa29a79

Browse files
author
leoparente
authored
Implement tcp timeout with LRU list (#249)
* Implement tcp timeout with LRU list * put connection on LRUList on start callback
1 parent c1e78ff commit aa29a79

File tree

3 files changed

+169
-6
lines changed

3 files changed

+169
-6
lines changed

src/inputs/pcap/LRUList.h

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#ifndef PV_PCAPPP_LRU_LIST
2+
#define PV_PCAPPP_LRU_LIST
3+
4+
#include <list>
5+
#include <map>
6+
7+
#if __cplusplus > 199711L || _MSC_VER >= 1800
8+
#include <utility>
9+
#endif
10+
11+
/// @file
12+
13+
/**
14+
* \namespace pcpp
15+
* \brief The main namespace for the PcapPlusPlus lib
16+
*/
17+
namespace pcpp {
18+
19+
/**
20+
* @class LRUList
21+
* A template class that implements a LRU cache with limited size. Each time the user puts an element it goes to head of the
22+
* list as the most recently used element (if the element was already in the list it advances to the head of the list).
23+
* The last element in the list is the one least recently used and will be pulled out of the list if it reaches its max size
24+
* and a new element comes in. All actions on this LRU list are O(1)
25+
*/
26+
template <typename T, typename V>
27+
class LRUList
28+
{
29+
public:
30+
typedef typename std::list<std::pair<T, V>>::iterator ListIterator;
31+
typedef typename std::map<T, ListIterator>::iterator MapIterator;
32+
33+
/**
34+
* A c'tor for this class
35+
* @param[in] maxSize The max size this list can go
36+
*/
37+
38+
LRUList(size_t maxSize)
39+
{
40+
m_MaxSize = maxSize;
41+
}
42+
43+
/**
44+
* A c'tor for this class with unlimited size
45+
*/
46+
LRUList()
47+
{
48+
m_MaxSize = 0;
49+
}
50+
51+
/**
52+
* Puts an element in the list. This element will be inserted (or advanced if it already exists) to the head of the
53+
* list as the most recently used element. If the list already reached its max size and the element is new this method
54+
* will remove the least recently used element and return a value in deletedValue. Method complexity is O(log(getSize())).
55+
* This is a optimized version of the method T* put(const T&).
56+
* @param[in] element The element to insert or to advance to the head of the list (if already exists)
57+
* @param[out] deletedValue The value of deleted element if a pointer is not NULL. This parameter is optional.
58+
* @return 0 if the list didn't reach its max size, 1 otherwise. In case the list already reached its max size
59+
* and deletedValue is not NULL the value of deleted element is copied into the place the deletedValue points to.
60+
*/
61+
int put(const T &element, const V &value, std::pair<T, V> *deletedValue = NULL)
62+
{
63+
m_CacheItemsList.push_front(std::make_pair(element, value));
64+
65+
// Inserting a new element. If an element with an equivalent key already exists the method returns an iterator to the element that prevented the insertion
66+
std::pair<MapIterator, bool> pair = m_CacheItemsMap.insert(std::make_pair(element, m_CacheItemsList.begin()));
67+
if (pair.second == false) // already exists
68+
{
69+
m_CacheItemsList.erase(pair.first->second);
70+
pair.first->second = m_CacheItemsList.begin();
71+
}
72+
73+
if (m_MaxSize && m_CacheItemsMap.size() > m_MaxSize) {
74+
ListIterator lruIter = m_CacheItemsList.end();
75+
lruIter--;
76+
77+
if (deletedValue != NULL)
78+
#if __cplusplus > 199711L || _MSC_VER >= 1800
79+
*deletedValue = std::move(*lruIter);
80+
#else
81+
*deletedValue = *lruIter;
82+
#endif
83+
m_CacheItemsMap.erase(lruIter->first);
84+
m_CacheItemsList.erase(lruIter);
85+
return 1;
86+
}
87+
88+
return 0;
89+
}
90+
91+
/**
92+
* Get the most recently used element (the one at the beginning of the list)
93+
* @return The most recently used element
94+
*/
95+
inline const std::pair<T, V> &getMRUElement() const
96+
{
97+
return m_CacheItemsList.front();
98+
}
99+
100+
/**
101+
* Get the least recently used element (the one at the end of the list)
102+
* @return The least recently used element
103+
*/
104+
inline const std::pair<T, V> &getLRUElement() const
105+
{
106+
return m_CacheItemsList.back();
107+
}
108+
109+
/**
110+
* Erase an element from the list. If element isn't found in the list nothing happens
111+
* @param[in] element The element to erase
112+
*/
113+
inline void eraseElement(const T &element)
114+
{
115+
MapIterator iter = m_CacheItemsMap.find(element);
116+
if (iter == m_CacheItemsMap.end())
117+
return;
118+
119+
m_CacheItemsList.erase(iter->second);
120+
m_CacheItemsMap.erase(iter);
121+
}
122+
123+
/**
124+
* @return The max size of this list as determined in the c'tor
125+
*/
126+
inline size_t getMaxSize() const
127+
{
128+
return m_MaxSize;
129+
}
130+
131+
/**
132+
* @return The number of elements currently in this list
133+
*/
134+
inline size_t getSize() const
135+
{
136+
return m_CacheItemsMap.size();
137+
}
138+
139+
private:
140+
std::list<std::pair<T, V>> m_CacheItemsList;
141+
std::map<T, ListIterator> m_CacheItemsMap;
142+
size_t m_MaxSize;
143+
};
144+
145+
} // namespace pcpp
146+
147+
#endif /* PV_PCAPPP_LRU_LIST */

src/inputs/pcap/PcapInputStream.cpp

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ PcapInputStream::PcapInputStream(const std::string &name)
7171
this,
7272
_tcp_connection_start_cb,
7373
_tcp_connection_end_cb,
74-
{true, 5, 500, 50})
74+
{true, 1, 1000, 50})
7575
{
7676
pcpp::Logger::getInstance().suppressLogs();
7777
}
@@ -230,16 +230,19 @@ void PcapInputStream::stop()
230230
void PcapInputStream::tcp_message_ready(int8_t side, const pcpp::TcpStreamData &tcpData)
231231
{
232232
tcp_message_ready_signal(side, tcpData);
233+
_lru_list.put(tcpData.getConnectionData().flowKey, tcpData.getConnectionData().endTime);
233234
}
234235

235236
void PcapInputStream::tcp_connection_start(const pcpp::ConnectionData &connectionData)
236237
{
237238
tcp_connection_start_signal(connectionData);
239+
_lru_list.put(connectionData.flowKey, connectionData.startTime);
238240
}
239241

240242
void PcapInputStream::tcp_connection_end(const pcpp::ConnectionData &connectionData, pcpp::TcpReassembly::ConnectionEndReason reason)
241243
{
242244
tcp_connection_end_signal(connectionData, reason);
245+
_lru_list.eraseElement(connectionData.flowKey);
243246
}
244247

245248
void PcapInputStream::process_pcap_stats(const pcpp::IPcapDevice::PcapStats &stats)
@@ -319,7 +322,6 @@ void PcapInputStream::_generate_mock_traffic()
319322

320323
void PcapInputStream::process_raw_packet(pcpp::RawPacket *rawPacket)
321324
{
322-
323325
pcpp::ProtocolType l3(pcpp::UnknownProtocol), l4(pcpp::UnknownProtocol);
324326
pcpp::Packet packet(rawPacket, pcpp::TCP | pcpp::UDP);
325327
if (packet.isPacketOfType(pcpp::IPv4)) {
@@ -359,18 +361,19 @@ void PcapInputStream::process_raw_packet(pcpp::RawPacket *rawPacket)
359361
}
360362
}
361363

364+
auto timestamp = rawPacket->getPacketTimeStamp();
362365
// interface to handlers
363-
packet_signal(packet, dir, l3, l4, rawPacket->getPacketTimeStamp());
366+
packet_signal(packet, dir, l3, l4, timestamp);
364367

365368
if (l4 == pcpp::UDP) {
366-
udp_signal(packet, dir, l3, pcpp::hash5Tuple(&packet), rawPacket->getPacketTimeStamp());
369+
udp_signal(packet, dir, l3, pcpp::hash5Tuple(&packet), timestamp);
367370
} else if (l4 == pcpp::TCP) {
368371
auto result = _tcp_reassembly.reassemblePacket(packet);
369372
switch (result) {
370373
case pcpp::TcpReassembly::Error_PacketDoesNotMatchFlow:
371374
case pcpp::TcpReassembly::NonTcpPacket:
372375
case pcpp::TcpReassembly::NonIpPacket:
373-
tcp_reassembly_error_signal(packet, dir, l3, rawPacket->getPacketTimeStamp());
376+
tcp_reassembly_error_signal(packet, dir, l3, timestamp);
374377
case pcpp::TcpReassembly::TcpMessageHandled:
375378
case pcpp::TcpReassembly::OutOfOrderTcpMessageBuffered:
376379
case pcpp::TcpReassembly::FIN_RSTWithNoData:
@@ -379,6 +382,15 @@ void PcapInputStream::process_raw_packet(pcpp::RawPacket *rawPacket)
379382
case pcpp::TcpReassembly::Ignore_Retransimission:
380383
break;
381384
}
385+
386+
for (uint8_t counter = 0; counter < MAX_TCP_CLEANUPS; counter++) {
387+
auto connection = _lru_list.getLRUElement();
388+
if (timestamp.tv_sec < connection.second.tv_sec + TCP_TIMEOUT) {
389+
break;
390+
}
391+
_tcp_reassembly.closeConnection(connection.first);
392+
_lru_list.eraseElement(connection.first);
393+
}
382394
} else {
383395
// unsupported layer3 protocol
384396
}

src/inputs/pcap/PcapInputStream.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <TcpReassembly.h>
1313
#include <UdpLayer.h>
1414
#pragma GCC diagnostic pop
15+
#include "LRUList.h"
1516
#include "utils.h"
1617
#include <functional>
1718
#include <memory>
@@ -40,8 +41,11 @@ class PcapInputStream : public visor::InputStream
4041
{
4142

4243
private:
43-
static const PcapSource DefaultPcapSource = PcapSource::libpcap;
44+
static constexpr uint8_t TCP_TIMEOUT = 30;
45+
static constexpr uint8_t MAX_TCP_CLEANUPS = 100;
4446

47+
static const PcapSource DefaultPcapSource = PcapSource::libpcap;
48+
pcpp::LRUList<uint32_t, timeval> _lru_list;
4549
IPv4subnetList _hostIPv4;
4650
IPv6subnetList _hostIPv6;
4751

0 commit comments

Comments
 (0)