diff --git a/include/ipfixprobe/flowifc.hpp b/include/ipfixprobe/flowifc.hpp index 2f3b5acb..7799e5de 100644 --- a/include/ipfixprobe/flowifc.hpp +++ b/include/ipfixprobe/flowifc.hpp @@ -267,9 +267,9 @@ struct Flow : public Record { #ifdef WITH_CTT uint64_t flow_hash_ctt; /**< Flow hash for CTT. */ - bool record_in_ctt; /**< CTT - offload or not. */ - bool is_delayed; /**< Delayed export flag. */ - time_t delay_time; /**< Time until export of the flow is delayed. */ + //bool record_in_ctt; /**< CTT - offload or not. */ + //bool is_delayed; /**< Delayed export flag. */ + //timeval delay_time; /**< Time until export of the flow is delayed. */ #endif PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check diff --git a/include/ipfixprobe/packet.hpp b/include/ipfixprobe/packet.hpp index 90bfe4fc..82156da8 100644 --- a/include/ipfixprobe/packet.hpp +++ b/include/ipfixprobe/packet.hpp @@ -47,7 +47,7 @@ namespace ipxp { * \brief Structure for storing parsed packet fields */ struct Packet : public Record { - struct timeval ts; + timeval ts; uint8_t dst_mac[6]; uint8_t src_mac[6]; diff --git a/include/ipfixprobe/storage.hpp b/include/ipfixprobe/storage.hpp index f16bb59f..b51ce7c1 100644 --- a/include/ipfixprobe/storage.hpp +++ b/include/ipfixprobe/storage.hpp @@ -93,6 +93,7 @@ class StoragePlugin : public Plugin virtual void export_expired(time_t ts) { } + virtual void finish() { } @@ -189,12 +190,6 @@ class StoragePlugin : public Plugin */ int plugins_post_create(Flow& rec, const Packet& pkt) { - // if metadata are valid, add flow hash ctt to the flow record -#ifdef WITH_CTT - if (pkt.cttmeta_valid) { - rec.flow_hash_ctt = pkt.cttmeta.flow_hash; - } -#endif /* WITH_CTT */ PluginStatusConverter plugin_status_converter(m_plugins_status); int ret = 0; for (unsigned int i = 0; i < m_plugin_cnt; i++) { diff --git a/storage/cache.cpp b/storage/cache.cpp index 99fdab69..ff95e3de 100644 --- a/storage/cache.cpp +++ b/storage/cache.cpp @@ -29,19 +29,19 @@ * * */ +#include "cache.hpp" +#include #include #include #include #include #include - -#include -#include "cache.hpp" - #include + #include "cacheRowSpan.hpp" #include "xxhash.h" +#include "fragmentationCache/timevalUtils.hpp" namespace ipxp { @@ -72,6 +72,7 @@ NHTFlowCache::NHTFlowCache() : NHTFlowCache::~NHTFlowCache() { close(); + print_report(); } void NHTFlowCache::get_parser_options(CacheOptParser& parser) noexcept @@ -162,16 +163,19 @@ void NHTFlowCache::export_flow(size_t flow_index, int reason) + m_flow_table[index]->m_delayed_flow.dst_packets); ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow); } - m_total_exported++; update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason); update_flow_record_stats( m_flow_table[index]->m_flow.src_packets + m_flow_table[index]->m_flow.dst_packets); - m_flows_in_cache--;*/ + */ m_flow_table[flow_index]->m_flow.end_reason = reason; - m_cache_stats.expired++; + update_flow_record_stats(m_flow_table[flow_index]->m_flow.src_packets + m_flow_table[flow_index]->m_flow.dst_packets); + update_flow_end_reason_stats(m_flow_table[flow_index]->m_flow.end_reason); + m_cache_stats.exported++; push_to_export_queue(flow_index); m_flow_table[flow_index]->erase(); + m_cache_stats.flows_in_cache--; + m_cache_stats.total_exported++; } void NHTFlowCache::push_to_export_queue(size_t flow_index) noexcept @@ -193,13 +197,16 @@ void NHTFlowCache::finish() } } -void NHTFlowCache::flush(Packet &pkt, size_t flow_index, int status, bool source_flow) +void NHTFlowCache::flush(Packet &pkt, size_t flow_index, int return_flags) { m_cache_stats.flushed++; - if (status == ProcessPlugin::FlowAction::FLUSH_WITH_REINSERT) { + if (return_flags == ProcessPlugin::FlowAction::FLUSH_WITH_REINSERT) { //FlowRecord *flow = m_flow_table[flow_index]; //export_flow(flow_index, FLOW_END_FORCED); +#ifdef WITH_CTT + send_export_request_to_ctt(m_flow_table[flow_index]->m_flow.flow_hash_ctt); +#endif /* WITH_CTT */ push_to_export_queue(flow_index); //flow->m_flow.end_reason = FLOW_END_FORCED; //ipx_ring_push(m_export_queue, &flow->m_flow); @@ -212,16 +219,17 @@ void NHTFlowCache::flush(Packet &pkt, size_t flow_index, int status, bool source m_flow_table[flow_index]->m_flow.m_exts = nullptr; m_flow_table[flow_index]->reuse(); // Clean counters, set time first to last - m_flow_table[flow_index]->update(pkt, source_flow); // Set new counters from packet + m_flow_table[flow_index]->update(pkt); // Set new counters from packet const size_t post_create_return_flags = plugins_post_create(m_flow_table[flow_index]->m_flow, pkt); if (post_create_return_flags & ProcessPlugin::FlowAction::FLUSH) { - flush(pkt, flow_index, post_create_return_flags, source_flow); + flush(pkt, flow_index, post_create_return_flags); } - } else { - //m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_FORCED; - export_flow(flow_index, FLOW_END_FORCED); + return; } + //m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_FORCED; + try_to_export(flow_index, false, pkt.ts, FLOW_END_FORCED); + //export_flow(flow_index, FLOW_END_FORCED); } std::tuple, std::optional, bool> NHTFlowCache::find_flow_index(const Packet& packet) noexcept @@ -240,48 +248,170 @@ std::tuple, std::optional, bool> NHTFlowCache::fin const CacheRowSpan raw_span_direct(&m_flow_table[first_flow_in_raw], m_line_size); std::optional flow_index = raw_span_direct.find_by_hash(direct_hash_value); if (flow_index.has_value()) { - return {direct_hash_value, flow_index, true}; + return {direct_hash_value, flow_index.value(), true}; } const size_t reversed_hash_value = std::visit(key_hasher, m_key_reversed); - const size_t first_flow_in_raw_reversed = direct_hash_value & m_line_mask; + const size_t first_flow_in_raw_reversed = reversed_hash_value & m_line_mask; const CacheRowSpan raw_span_reverse(&m_flow_table[first_flow_in_raw_reversed], m_line_size); - flow_index = raw_span_reverse.find_by_hash(direct_hash_value); + flow_index = raw_span_reverse.find_by_hash(reversed_hash_value); if (flow_index.has_value()) { - return {reversed_hash_value, flow_index, false}; + return {reversed_hash_value, flow_index.value(), false}; } return {direct_hash_value, std::nullopt, false}; } -static bool isTcpConnectionRestart(const Packet& packet, const Flow& flow, bool source_to_destination) noexcept +static bool is_tcp_connection_restart(const Packet& packet, const Flow& flow) noexcept { constexpr uint8_t TCP_FIN = 0x01; constexpr uint8_t TCP_RST = 0x04; constexpr uint8_t TCP_SYN = 0x02; - const uint8_t flags = source_to_destination ? flow.src_tcp_flags : flow.dst_tcp_flags; + const uint8_t flags = packet.source_pkt ? flow.src_tcp_flags : flow.dst_tcp_flags; return packet.tcp_flags & TCP_SYN && (flags & (TCP_FIN | TCP_RST)); } -bool NHTFlowCache::export_on_inactive_timeout(size_t flow_index, time_t ts) noexcept +bool NHTFlowCache::try_to_export_on_inactive_timeout(size_t flow_index, const timeval& now) noexcept { - if (ts - m_flow_table[flow_index]->m_flow.time_last.tv_sec >= m_inactive) { - plugins_pre_export(m_flow_table[flow_index]->m_flow); - export_flow(flow_index); - return true; + if (!m_flow_table[flow_index]->is_empty() && now.tv_sec - m_flow_table[flow_index]->m_flow.time_last.tv_sec >= m_inactive) { + //plugins_pre_export(m_flow_table[flow_index]->m_flow); + //export_flow(flow_index); + return try_to_export(flow_index, false, now); } return false; } -bool NHTFlowCache::export_on_active_timeout(size_t flow_index, time_t ts) noexcept +void NHTFlowCache::create_record(const Packet& packet, size_t flow_index, size_t hash_value) noexcept +{ + m_cache_stats.flows_in_cache++; + m_flow_table[flow_index]->create(packet, hash_value); + const size_t post_create_return_flags = plugins_post_create(m_flow_table[flow_index]->m_flow, packet); + if (post_create_return_flags & ProcessPlugin::FlowAction::FLUSH) { + //export_flow(flow_index); + if (try_to_export(flow_index, false, packet.ts)) { + m_cache_stats.flushed++; + } + } +#ifdef WITH_CTT + // if metadata are valid, add flow hash ctt to the flow record + if (packet.cttmeta_valid) { + m_flow_table[flow_index]->m_flow.flow_hash_ctt = packet.cttmeta.flow_hash; + } + if (only_metadata_required(m_flow_table[flow_index]->m_flow)) { + m_ctt_controller.create_record(m_flow_table[flow_index]->m_flow.flow_hash_ctt, m_flow_table[flow_index]->m_flow.time_first); + m_flow_table[flow_index]->is_in_ctt = true; + } +#endif /* WITH_CTT */ +} + +#ifdef WITH_CTT +void NHTFlowCache::try_to_add_flow_to_ctt(size_t flow_index) noexcept +{ + if (m_flow_table[flow_index]->is_in_ctt) { + return; + } + if (only_metadata_required(m_flow_table[flow_index]->m_flow)) { + m_ctt_controller.create_record(m_flow_table[flow_index]->m_flow.flow_hash_ctt, m_flow_table[flow_index]->m_flow.time_first); + m_flow_table[flow_index]->is_in_ctt = true; + } +} +#endif /* WITH_CTT */ + +int NHTFlowCache::process_flow(Packet& packet, size_t flow_index, size_t hash_value, bool flow_is_waiting_for_export) noexcept +{ + if (is_tcp_connection_restart(packet, m_flow_table[flow_index]->m_flow) && !flow_is_waiting_for_export) { + //m_flow_table[flow_index.value()]->m_flow.end_reason = FLOW_END_EOF; + if (try_to_export(flow_index, false, packet.ts, FLOW_END_EOF)) { + put_pkt(packet); + return 0; + } + //export_flow(flow_index, FLOW_END_EOF); + } + + if (m_flow_table[flow_index]->is_empty()) { + create_record(packet, flow_index, hash_value); + export_expired(packet.ts); + return 0; + } + /* Check if flow record is expired (inactive timeout). */ + if (!flow_is_waiting_for_export + && try_to_export_on_inactive_timeout(flow_index, packet.ts)) { + return put_pkt(packet); + } + + if (!flow_is_waiting_for_export + && try_to_export_on_active_timeout(flow_index, packet.ts)) { + return put_pkt(packet); + } + + const size_t pre_update_return_flags = plugins_pre_update(m_flow_table[flow_index]->m_flow, packet); + if ((pre_update_return_flags & ProcessPlugin::FlowAction::FLUSH) + && !flow_is_waiting_for_export) { + flush(packet, flow_index, pre_update_return_flags); + return 0; + } + + m_flow_table[flow_index]->update(packet); +#ifdef WITH_CTT + try_to_add_flow_to_ctt(flow_index); +#endif /* WITH_CTT */ + const size_t post_update_return_flags = plugins_post_update(m_flow_table[flow_index]->m_flow, packet); + if ((post_update_return_flags & ProcessPlugin::FlowAction::FLUSH) + && !flow_is_waiting_for_export) { + flush(packet, flow_index, post_update_return_flags); + return 0; + } + + export_expired(packet.ts); + return 0; +} +#ifdef WITH_CTT +bool NHTFlowCache::try_to_export_delayed_flow(const Packet& packet, const std::optional& flow_index, + size_t row_begin) noexcept +{ + const bool flow_is_waiting_for_export = flow_index.has_value() + && m_flow_table[row_begin + flow_index.value()]->is_waiting_for_export; + if (flow_index.has_value() && flow_is_waiting_for_export + && (!packet.cttmeta.ctt_rec_matched || packet.ts > m_flow_table[row_begin + flow_index.value()]->export_time)) { + plugins_pre_export(m_flow_table[row_begin + flow_index.value()]->m_flow); + export_flow(row_begin + flow_index.value()); + return false; + } + return flow_is_waiting_for_export; +} +#endif /* WITH_CTT */ + +bool NHTFlowCache::try_to_export(size_t flow_index, bool call_pre_export, const timeval& now) noexcept +{ + return try_to_export(flow_index, call_pre_export, now, get_export_reason(m_flow_table[flow_index]->m_flow)); +} + +#ifdef WITH_CTT +void NHTFlowCache::send_export_request_to_ctt(size_t ctt_flow_hash) noexcept { - if (ts - m_flow_table[flow_index]->m_flow.time_first.tv_sec >= m_active) { - m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_ACTIVE; + m_ctt_controller.export_record(ctt_flow_hash); +} +#endif /* WITH_CTT */ + +bool NHTFlowCache::try_to_export(size_t flow_index, bool call_pre_export, const timeval& now, int reason) noexcept +{ +#ifdef WITH_CTT + if (!m_flow_table[flow_index]->is_waiting_for_export) { + m_flow_table[flow_index]->is_waiting_for_export = true; + send_export_request_to_ctt(m_flow_table[flow_index]->m_flow.flow_hash_ctt); + m_flow_table[flow_index]->export_time = {now.tv_sec + 1, now.tv_usec}; + return false; + } + if (m_flow_table[flow_index]->export_time > now) { + return false; + } + m_flow_table[flow_index]->is_waiting_for_export = false; +#endif /* WITH_CTT */ + if (call_pre_export) { plugins_pre_export(m_flow_table[flow_index]->m_flow); - export_flow(flow_index); - return true; } - return false; + export_flow(flow_index, reason); + return true; } int NHTFlowCache::put_pkt(Packet &pkt) @@ -293,6 +423,7 @@ int NHTFlowCache::put_pkt(Packet &pkt) } auto [hash_value, flow_index, source_to_destination] = find_flow_index(pkt); + pkt.source_pkt = source_to_destination; const bool hash_created = hash_value.has_value(); const bool flow_found = flow_index.has_value(); if (!hash_created) { @@ -301,75 +432,61 @@ int NHTFlowCache::put_pkt(Packet &pkt) const size_t row_begin = hash_value.value() & m_line_mask; CacheRowSpan row_span(&m_flow_table[row_begin], m_line_size); +#ifdef WITH_CTT + const bool flow_is_waiting_for_export = try_to_export_delayed_flow(pkt, flow_index, row_begin); +#else + constexpr bool flow_is_waiting_for_export = false; +#endif /* WITH_CTT */ + prefetch_export_expired(); if (flow_found) { /* Existing flow record was found, put flow record at the first index of flow line. */ - m_cache_stats.lookups += (flow_index.value() - row_begin + 1); - m_cache_stats.lookups2 += (flow_index.value() - row_begin + 1) * (flow_index.value() - row_begin + 1); + m_cache_stats.lookups += flow_index.value() + 1; + m_cache_stats.lookups2 += (flow_index.value() + 1) * (flow_index.value() + 1); m_cache_stats.hits++; row_span.advance_flow(flow_index.value()); + flow_index = row_begin; + return process_flow(pkt, flow_index.value(), hash_value.value(), flow_is_waiting_for_export); + } + /* Existing flow record was not found. Find free place in flow line. */ + const std::optional empty_index = row_span.find_empty(); + const bool empty_found = empty_index.has_value(); + if (empty_found) { + flow_index = empty_index.value() + row_begin; + m_cache_stats.empty++; } else { - /* Existing flow record was not found. Find free place in flow line. */ - const std::optional empty_index = row_span.find_empty(); - const bool empty_found = empty_index.has_value(); - if (empty_found) { - flow_index = empty_index.value() + row_begin; - m_cache_stats.empty++; - } else { +#ifdef WITH_CTT + flow_index = row_span.find_if_export_timeout_expired(pkt.ts); + if (!flow_index.has_value()) { +#endif /* WITH_CTT */ row_span.advance_flow_to(m_line_size - 1, m_new_flow_insert_index); flow_index = row_begin + m_new_flow_insert_index; - plugins_pre_export(m_flow_table[flow_index.value()]->m_flow); - m_flow_table[flow_index.value()]->m_flow.end_reason = FLOW_END_NO_RES; - export_flow(flow_index.value()); - m_cache_stats.expired++; - m_cache_stats.not_empty++; - } - } - - pkt.source_pkt = source_to_destination; - if (isTcpConnectionRestart(pkt, m_flow_table[flow_index.value()]->m_flow, source_to_destination)) { - //m_flow_table[flow_index.value()]->m_flow.end_reason = FLOW_END_EOF; - export_flow(flow_index.value(), FLOW_END_EOF); - put_pkt(pkt); - return 0; - } - - if (m_flow_table[flow_index.value()]->is_empty()) { - m_cache_stats.flows_in_cache++; - m_flow_table[flow_index.value()]->create(pkt, hash_value.value()); - if (plugins_post_create(m_flow_table[flow_index.value()]->m_flow, pkt) & ProcessPlugin::FlowAction::FLUSH) { - export_flow(flow_index.value()); - m_cache_stats.flushed++; +#ifdef WITH_CTT } - export_expired(pkt.ts.tv_sec); - return 0; - } - /* Check if flow record is expired (inactive timeout). */ - if (export_on_inactive_timeout(flow_index.value(), pkt.ts.tv_sec)) { - return put_pkt(pkt); - } - - if (export_on_active_timeout(flow_index.value(), pkt.ts.tv_sec)) { - return put_pkt(pkt); +#endif /* WITH_CTT */ + plugins_pre_export(m_flow_table[flow_index.value()]->m_flow); + export_flow(flow_index.value(), FLOW_END_NO_RES); + //plugins_pre_export(m_flow_table[flow_index.value()]->m_flow); + //m_flow_table[flow_index.value()]->m_flow.end_reason = FLOW_END_NO_RES; + //export_flow(flow_index.value()); + //m_cache_stats.exported++; + m_cache_stats.not_empty++; } + return process_flow(pkt, flow_index.value(), hash_value.value(), false); +} - const size_t pre_update_return_flags = plugins_pre_update(m_flow_table[flow_index.value()]->m_flow, pkt); - if (pre_update_return_flags & ProcessPlugin::FlowAction::FLUSH) { - flush(pkt, flow_index.value(), pre_update_return_flags, source_to_destination); - return 0; - } - m_flow_table[flow_index.value()]->update(pkt, source_to_destination); - const size_t post_update_return_flags = plugins_post_update(m_flow_table[flow_index.value()]->m_flow, pkt); +bool NHTFlowCache::try_to_export_on_active_timeout(size_t flow_index, const timeval& now) noexcept +{ + if (!m_flow_table[flow_index]->is_empty() && now.tv_sec - m_flow_table[flow_index]->m_flow.time_first.tv_sec >= m_active) { + //m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_ACTIVE; - if (post_update_return_flags & ProcessPlugin::FlowAction::FLUSH) { - flush(pkt, flow_index.value(), post_update_return_flags, source_to_destination); - return 0; + //plugins_pre_export(m_flow_table[flow_index]->m_flow); + //export_flow(flow_index, FLOW_END_ACTIVE); + return try_to_export(flow_index, true, now, FLOW_END_ACTIVE); } - - export_expired(pkt.ts.tv_sec); - return 0; + return false; } void NHTFlowCache::try_to_fill_ports_to_fragmented_packet(Packet& packet) @@ -384,19 +501,24 @@ uint8_t NHTFlowCache::get_export_reason(const Flow& flow) if ((flow.src_tcp_flags | flow.dst_tcp_flags) & (TCP_FIN | TCP_RST)) { // When FIN or RST is set, TCP connection ended naturally return FLOW_END_EOF; - } else { - return FLOW_END_INACTIVE; } + return FLOW_END_INACTIVE; } -void NHTFlowCache::export_expired(time_t ts) +void NHTFlowCache::export_expired(time_t now) { - for (decltype(m_last_exported_on_timeout_index) i = m_last_exported_on_timeout_index; i < m_last_exported_on_timeout_index + m_new_flow_insert_index; i++) { - if (!m_flow_table[i]->is_empty() && ts - m_flow_table[i]->m_flow.time_last.tv_sec >= m_inactive) { + export_expired({now, 0}); +} + +void NHTFlowCache::export_expired(const timeval& now) +{ + for (size_t i = m_last_exported_on_timeout_index; i < m_last_exported_on_timeout_index + m_new_flow_insert_index; i++) { + try_to_export_on_inactive_timeout(i, now); + /*if (!m_flow_table[i]->is_empty() && ts - m_flow_table[i]->m_flow.time_last.tv_sec >= m_inactive) { m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow); plugins_pre_export(m_flow_table[i]->m_flow); export_flow(i); - /*if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) { + if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) { m_flow_table[i]->m_flow.is_delayed = false; plugins_pre_export(m_flow_table[i]->m_flow); export_flow(i); @@ -405,9 +527,9 @@ void NHTFlowCache::export_expired(time_t ts) m_flow_table[i]->m_delayed_flow_waiting = false; plugins_pre_export(m_flow_table[i]->m_delayed_flow); export_flow(i); - }*/ + } m_cache_stats.expired++; - } + }*/ } m_last_exported_on_timeout_index = (m_last_exported_on_timeout_index + m_new_flow_insert_index) & (m_cache_size - 1); @@ -421,12 +543,6 @@ static std::array pointerToByteArray(const Type* pointer) no return res; } -template -static const uint8_t* scalarToArrayEnd(const ScalarType& scalar) noexcept -{ - return scalarToArrayBegin(scalar) + sizeof(scalar); -} - bool NHTFlowCache::create_hash_key(const Packet& packet) { if (packet.ip_version == IP::v4) { @@ -454,17 +570,17 @@ bool NHTFlowCache::create_hash_key(const Packet& packet) return false; } -void NHTFlowCache::print_report() +void NHTFlowCache::print_report() const { - /*1float tmp = float(m_cache_stats.lookups) / m_cache_stats.hits; + const float tmp = static_cast(m_cache_stats.lookups) / m_cache_stats.hits; - cout << "Hits: " << m_cache_stats.hits << endl; - cout << "Empty: " << m_cache_stats.empty << endl; - cout << "Not empty: " << m_cache_stats.not_empty << endl; - cout << "Expired: " << m_cache_stats.expired << endl; - cout << "Flushed: " << m_cache_stats.flushed << endl; - cout << "Average Lookup: " << tmp << endl; - cout << "Variance Lookup: " << float(m_cache_stats.lookups2) / m_cache_stats.hits - tmp * tmp << endl;*/ + std::cout << "Hits: " << m_cache_stats.hits << std::endl; + std::cout << "Empty: " << m_cache_stats.empty << std::endl; + std::cout << "Not empty: " << m_cache_stats.not_empty << std::endl; + std::cout << "Expired: " << m_cache_stats.exported << std::endl; + std::cout << "Flushed: " << m_cache_stats.flushed << std::endl; + std::cout << "Average Lookup: " << tmp << std::endl; + std::cout << "Variance Lookup: " << static_cast(m_cache_stats.lookups2) / m_cache_stats.hits - tmp * tmp << std::endl; } void NHTFlowCache::set_telemetry_dir(std::shared_ptr dir) diff --git a/storage/cache.hpp b/storage/cache.hpp index f3692330..76dc5352 100644 --- a/storage/cache.hpp +++ b/storage/cache.hpp @@ -32,52 +32,21 @@ #ifndef IPXP_STORAGE_CACHE_HPP #define IPXP_STORAGE_CACHE_HPP -#include -#include #include #include - #include -//#include #include #include #include #include "fragmentationCache/fragmentationCache.hpp" - #include "cacheOptParser.hpp" #include "flowKey.tpp" #include "flowRecord.hpp" #include "cttController.hpp" - namespace ipxp { - - -/*struct __attribute__((packed)) flow_key_v4_t { - uint16_t src_port; - uint16_t dst_port; - uint8_t proto; - uint8_t ip_version; - uint32_t src_ip; - uint32_t dst_ip; - uint16_t vlan_id; -}; - -struct __attribute__((packed)) flow_key_v6_t { - uint16_t src_port; - uint16_t dst_port; - uint8_t proto; - uint8_t ip_version; - uint8_t src_ip[16]; - uint8_t dst_ip[16]; - uint16_t vlan_id; -};*/ - -//#define MAX_KEY_LENGTH (std::max(sizeof(flow_key_v4_t), sizeof(flow_key_v6_t))) - - struct FlowEndReasonStats { uint64_t active_timeout; uint64_t inactive_timeout; @@ -99,10 +68,10 @@ struct FlowCacheStats{ uint64_t empty; uint64_t not_empty; uint64_t hits; - uint64_t expired; + uint64_t exported{0}; uint64_t flushed; - uint64_t lookups; - uint64_t lookups2; + uint64_t lookups{0}; + uint64_t lookups2{0}; uint64_t flows_in_cache; uint64_t total_exported; }; @@ -119,7 +88,7 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin std::string get_name() const noexcept override; int put_pkt(Packet& pkt) override; - void export_expired(time_t ts) override; + void export_expired(time_t now) override; /** * @brief Set and configure the telemetry directory where cache stats will be stored. @@ -155,9 +124,8 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin CttController m_ctt_controller; #endif /* WITH_CTT */ - void try_to_fill_ports_to_fragmented_packet(Packet& packet); - void flush(Packet &pkt, size_t flow_index, int ret, bool source_flow); + void flush(Packet &pkt, size_t flow_index, int return_flags); bool create_hash_key(const Packet &packet); static uint8_t get_export_reason(const Flow &flow); void finish(); @@ -169,11 +137,20 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin void get_parser_options(CacheOptParser& parser) noexcept; void push_to_export_queue(size_t flow_index) noexcept; std::tuple, std::optional, bool> find_flow_index(const Packet& packet) noexcept; - bool export_on_inactive_timeout(size_t flow_index, time_t ts) noexcept; - bool export_on_active_timeout(size_t flow_index, time_t ts) noexcept; + bool try_to_export_on_inactive_timeout(size_t flow_index, const timeval& now) noexcept; + bool try_to_export_on_active_timeout(size_t flow_index, const timeval& now) noexcept; void export_flow(size_t flow_index, int reason); void export_flow(size_t flow_index); - void print_report(); + int process_flow(Packet& packet, size_t flow_index, size_t hash_value, bool flow_is_waiting_for_export) noexcept; + bool try_to_export_delayed_flow(const Packet& packet, const std::optional& flow_index, + size_t row_begin) noexcept; + void create_record(const Packet& packet, size_t flow_index, size_t hash_value) noexcept; + bool try_to_export(size_t flow_index, bool call_pre_export, const timeval& now, int reason) noexcept; + bool try_to_export(size_t flow_index, bool call_pre_export, const timeval& now) noexcept; + void print_report() const; + void send_export_request_to_ctt(size_t ctt_flow_hash) noexcept; + void export_expired(const timeval& now); + void try_to_add_flow_to_ctt(size_t flow_index) noexcept; }; } diff --git a/storage/cacheOptParser.cpp b/storage/cacheOptParser.cpp index 2899243a..7010199d 100644 --- a/storage/cacheOptParser.cpp +++ b/storage/cacheOptParser.cpp @@ -1,12 +1,35 @@ +/** +* \file + * \author Damir Zainullin + * \brief CacheOptParser implementation. + */ +/* + * Copyright (C) 2023 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + */ + #include "cacheOptParser.hpp" + #include #include #include namespace ipxp { - - #ifdef IPXP_FLOW_CACHE_SIZE static const uint32_t DEFAULT_FLOW_CACHE_SIZE = IPXP_FLOW_CACHE_SIZE; #else diff --git a/storage/cacheOptParser.hpp b/storage/cacheOptParser.hpp index 2731d8b4..03df45e1 100644 --- a/storage/cacheOptParser.hpp +++ b/storage/cacheOptParser.hpp @@ -1,12 +1,32 @@ +/** +* \file + * \author Damir Zainullin + * \brief Contains the CacheOptParser class for parsing cache options. + */ +/* + * Copyright (C) 2023 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + */ + #pragma once #include #include - - - - - +#include namespace ipxp { diff --git a/storage/cacheRowSpan.cpp b/storage/cacheRowSpan.cpp index 9e0f6320..41250ef9 100644 --- a/storage/cacheRowSpan.cpp +++ b/storage/cacheRowSpan.cpp @@ -1,6 +1,31 @@ -#include +/** +* \file + * \author Damir Zainullin + * \brief CacheRowSpan implementation. + */ +/* + * Copyright (C) 2023 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + */ + #include "cacheRowSpan.hpp" +#include + namespace ipxp { CacheRowSpan::CacheRowSpan(FlowRecord** begin, size_t count) noexcept @@ -39,4 +64,17 @@ std::optional CacheRowSpan::find_empty() const noexcept return it - m_begin; } +#ifdef WITH_CTT +std::optional CacheRowSpan::find_if_export_timeout_expired(const timeval& now) const noexcept +{ + auto it = std::find_if(m_begin, m_begin + m_count, [&now](const FlowRecord* flow) { + return flow->is_waiting_for_export && flow->export_time.tv_sec < now.tv_sec; + }); + if (it == m_begin + m_count) { + return std::nullopt; + } + return it - m_begin; +} +#endif /* WITH_CTT */ + } // ipxp \ No newline at end of file diff --git a/storage/cacheRowSpan.hpp b/storage/cacheRowSpan.hpp index 6b8ca4c9..68da7d42 100644 --- a/storage/cacheRowSpan.hpp +++ b/storage/cacheRowSpan.hpp @@ -1,3 +1,27 @@ +/** +* \file + * \author Damir Zainullin + * \brief CacheRowSpan declaration. + */ +/* + * Copyright (C) 2023 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + */ + #pragma once #include @@ -5,14 +29,50 @@ #include "flowRecord.hpp" namespace ipxp { - +/** + * \brief Class representing a non-owning view of a row span in a cache. + */ class CacheRowSpan { public: + /** + * \brief Construct a new CacheRowSpan object. + * \param begin Pointer to the first element in the row. + * \param count Number of elements in the row. + */ CacheRowSpan(FlowRecord** begin, size_t count) noexcept; + + /** + * \brief Find a flow record by hash. + * \param hash Hash value to search for. + * \return Index of the flow record relative to row begin if found, std::nullopt otherwise. + */ std::optional find_by_hash(uint64_t hash) const noexcept; + /** + * \brief Move a flow record to the beginning of the row. + * \param flow_index Index of the flow record to move. + */ void advance_flow(size_t flow_index) noexcept; + + /** + * \brief Move a flow record to a specific position in the row. + * \param from Index of the flow record to move. + * \param to Index of the position to move the flow record to. + */ void advance_flow_to(size_t from, size_t to) noexcept; + + /** + * \brief Find an empty flow record in the row. + * \return Index of the empty flow record if found, std::nullopt otherwise. + */ std::optional find_empty() const noexcept; +#ifdef WITH_CTT + /** + * \brief Find a flow record with an export timeout that has expired. + * \param now Current time. + * \return Index of the flow record with an expired export timeout if found, std::nullopt otherwise. + */ + std::optional find_if_export_timeout_expired(const timeval& now) const noexcept; +#endif /* WITH_CTT */ private: FlowRecord** m_begin; size_t m_count; diff --git a/storage/cttController.cpp b/storage/cttController.cpp index 708fdb5f..46b7b1bc 100644 --- a/storage/cttController.cpp +++ b/storage/cttController.cpp @@ -1,6 +1,26 @@ -// -// Created by zaida on 21.12.2024. -// +/** +* \file + * \author Damir Zainullin + * \brief CttController implementation. + */ +/* + * Copyright (C) 2023 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + */ #include "cttController.hpp" diff --git a/storage/cttController.hpp b/storage/cttController.hpp index 4104ad65..0a6cc791 100644 --- a/storage/cttController.hpp +++ b/storage/cttController.hpp @@ -1,7 +1,31 @@ +/** +* \file + * \author Damir Zainullin + * \brief CttController declaration. + */ +/* + * Copyright (C) 2023 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + */ + #pragma once //#define WITH_CTT 1 // TODO REMOVE - +#include #ifdef WITH_CTT #include #include diff --git a/storage/flowKey.tpp b/storage/flowKey.tpp index aefff366..7ed5ec94 100644 --- a/storage/flowKey.tpp +++ b/storage/flowKey.tpp @@ -1,3 +1,27 @@ +/** +* \file + * \author Damir Zainullin + * \brief FlowKey structure declaration. + */ +/* + * Copyright (C) 2023 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + */ + #pragma once #include diff --git a/storage/flowRecord.cpp b/storage/flowRecord.cpp index f0d35cb5..34ed7bdb 100644 --- a/storage/flowRecord.cpp +++ b/storage/flowRecord.cpp @@ -1,7 +1,32 @@ +/** +* \file + * \author Damir Zainullin + * \brief FlowRecord implementation. + */ +/* + * Copyright (C) 2023 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + */ + +#include "flowRecord.hpp" + #include #include #include -#include "flowRecord.hpp" namespace ipxp { @@ -33,6 +58,9 @@ void FlowRecord::erase() m_flow.dst_bytes = 0; m_flow.src_tcp_flags = 0; m_flow.dst_tcp_flags = 0; +#ifdef WITH_CTT + is_waiting_for_export = false; +#endif /* WITH_CTT */ } void FlowRecord::reuse() { @@ -44,6 +72,9 @@ void FlowRecord::reuse() m_flow.dst_bytes = 0; m_flow.src_tcp_flags = 0; m_flow.dst_tcp_flags = 0; +#ifdef WITH_CTT + is_waiting_for_export = false; +#endif /* WITH_CTT */ } bool FlowRecord::is_empty() const noexcept @@ -95,13 +126,12 @@ void FlowRecord::create(const Packet &pkt, uint64_t hash) m_flow.src_port = pkt.src_port; m_flow.dst_port = pkt.dst_port; } - #ifdef WITH_CTT - m_flow.is_delayed = false; - m_delayed_flow_waiting = false; - #endif /* WITH_CTT */ +#ifdef WITH_CTT + is_waiting_for_export = false; +#endif /* WITH_CTT */ } -void FlowRecord::update(const Packet &pkt, bool src) +void FlowRecord::update(const Packet &pkt) { /*if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow auto flow_hash = m_hash; @@ -112,7 +142,7 @@ void FlowRecord::update(const Packet &pkt, bool src) return; }*/ m_flow.time_last = pkt.ts; - if (src) { + if (pkt.source_pkt) { m_flow.src_packets++; m_flow.src_bytes += pkt.ip_len; diff --git a/storage/flowRecord.hpp b/storage/flowRecord.hpp index 6f6eae1a..0e65bd76 100644 --- a/storage/flowRecord.hpp +++ b/storage/flowRecord.hpp @@ -1,5 +1,30 @@ +/** +* \file + * \author Damir Zainullin + * \brief FlowRecord declaration. + */ +/* + * Copyright (C) 2023 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + */ + #pragma once +#include #include #include #include @@ -13,8 +38,11 @@ class alignas(64) FlowRecord public: Flow m_flow; #ifdef WITH_CTT - Flow m_delayed_flow; - bool m_delayed_flow_waiting; + //Flow m_delayed_flow; + //bool ; + bool is_in_ctt; /**< Flow is ofloaded by CTT if set. */ + bool is_waiting_for_export; /**< Flow cant be exported if set. */ + timeval export_time; /**< Time until the export of the flow is delayed. */ #endif /* WITH_CTT */ FlowRecord(); @@ -26,7 +54,7 @@ class alignas(64) FlowRecord bool is_empty() const noexcept; bool belongs(uint64_t pkt_hash) const noexcept; void create(const Packet &pkt, uint64_t pkt_hash); - void update(const Packet &pkt, bool src); + void update(const Packet &pkt); }; } // ipxp diff --git a/storage/fragmentationCache/timevalUtils.hpp b/storage/fragmentationCache/timevalUtils.hpp index fdd66f6e..590f9190 100644 --- a/storage/fragmentationCache/timevalUtils.hpp +++ b/storage/fragmentationCache/timevalUtils.hpp @@ -28,7 +28,7 @@ namespace ipxp { -struct timeval operator+(const struct timeval& a, const struct timeval& b) noexcept +inline struct timeval operator+(const struct timeval& a, const struct timeval& b) noexcept { constexpr time_t USEC_IN_SEC = 1000000; @@ -42,7 +42,7 @@ struct timeval operator+(const struct timeval& a, const struct timeval& b) noexc return result; } -bool operator>(const struct timeval& a, const struct timeval& b) noexcept +inline bool operator>(const struct timeval& a, const struct timeval& b) noexcept { if (a.tv_sec == b.tv_sec) return a.tv_usec > b.tv_usec;