Skip to content

Commit

Permalink
added output rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
havraji6 committed Mar 9, 2021
1 parent dd59f65 commit 6eec0d9
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 19 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ us.
- `-D NUMBER` Direction bit field value.
- `-F STRING` String containing filter expression to filter traffic. See man pcap-filter.
- `-O` Send ODID field instead of LINK_BIT_FIELD.
- `-q NUMBER` Input queue size (default 64).
- `-Q NUMBER` Output queue size (default 16536).
- `-e NUMBER` Export max N flows per second.
- `-x STRING` Export to IPFIX collector. Format: HOST:PORT or [HOST]:PORT.
- `-u` Use UDP when exporting to IPFIX collector.
- `-V` Print version.
- `-v` Increase verbosity of the output, it can be duplicated like -vv / -vvv.

### Common TRAP parameters
- `-h [trap,1]` Print help message for this module / for libtrap specific parameters.
Expand Down
1 change: 1 addition & 0 deletions ipfixprobe.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct options_t {
uint32_t input_qsize;
uint32_t input_pktblock_size;
uint32_t snaplen;
uint32_t fps; // max exported flows per second
struct timeval inactive_timeout;
struct timeval active_timeout;
struct timeval cache_stats_interval;
Expand Down
96 changes: 77 additions & 19 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include <iomanip>
#include <stdlib.h>
#include <thread>
#include <sys/time.h>

#ifdef WITH_NEMEA
#include "fields.h"
Expand Down Expand Up @@ -128,10 +129,11 @@ int terminate_input = 0;
PARAM('O', "odid", "Send ODID field instead of LINK_BIT_FIELD in unirec message.", no_argument, "none") \
PARAM('x', "ipfix", "Export to IPFIX collector. Format: HOST:PORT or [HOST]:PORT", required_argument, "string") \
PARAM('u', "udp", "Use UDP when exporting to IPFIX collector.", no_argument, "none") \
PARAM('q', "q", "Input queue size (default 64).", required_argument, "uint32") \
PARAM('Q', "Q", "Output queue size (default 16536).", required_argument, "uint32") \
PARAM('q', "iqueue", "Input queue size (default 64).", required_argument, "uint32") \
PARAM('Q', "oqueue", "Output queue size (default 16536).", required_argument, "uint32") \
PARAM('e', "fps", "Export max N flows per second.", required_argument, "uint32") \
PARAM('V', "version", "Print version.", no_argument, "none")\
PARAM('v', "verbose", "Increase verbosity of the output, it can be duplicated like -vv / -vvv.", no_argument, "none")\
PARAM('v', "verbose", "Increase verbosity of the output, it can be duplicated like -vv / -vvv.", no_argument, "none")

#define PRINT_HELP_PARAM(p_short_opt, p_long_opt, p_description, p_required_argument, p_argument_type) \
if (p_required_argument == no_argument) { \
Expand Down Expand Up @@ -359,29 +361,75 @@ struct OutputStats {
bool error;
};

void export_thread(FlowExporter *exp, ipx_ring_t *queue, std::promise<OutputStats> *threadOutput)
#define MICRO_SEC 1000000L
long timeval_diff(const struct timeval *start, const struct timeval *end)
{
struct timespec last;
struct timespec now;
OutputStats stats = {0, 0, 0, 0, false};
return (end->tv_sec - start->tv_sec) * MICRO_SEC
+ (end->tv_usec - start->tv_usec);
}

clock_gettime(CLOCK_MONOTONIC_COARSE, &last);
void export_thread(FlowExporter *exp, ipx_ring_t *queue, std::promise<OutputStats> *threadOutput, uint32_t fps)
{
OutputStats stats = {0, 0, 0, 0, false};
struct timespec sleep_time = {0};
struct timeval begin;
struct timeval end;
struct timeval last_flush;
uint32_t pkts_from_begin = 0;
double time_per_pkt = 1000000.0 / fps; // [micro seconds]

// Rate limiting algorithm from https://github.com/CESNET/ipfixcol2/blob/master/src/tools/ipfixsend/sender.c#L98
gettimeofday(&begin, NULL);
last_flush = begin;
while (1) {
clock_gettime(CLOCK_MONOTONIC_COARSE, &now);
gettimeofday(&end, NULL);

Flow *flow = static_cast<Flow *>(ipx_ring_pop(queue));
if (flow) {
stats.biflows++;
stats.bytes += flow->src_octet_total_length + flow->dst_octet_total_length;
stats.packets += flow->src_pkt_total_cnt + flow->dst_pkt_total_cnt;
exp->export_flow(*flow);
} else if (terminate_export && !ipx_ring_cnt(queue)) {
if (terminate_export && !ipx_ring_cnt(queue)) {
break;
} else if (!flow) {
if (end.tv_sec - last_flush.tv_sec > 1) {
last_flush = end;
exp->flush();
}
continue;
}

if (now.tv_sec - last.tv_sec > 1) {
last = now;
exp->flush();
stats.biflows++;
stats.bytes += flow->src_octet_total_length + flow->dst_octet_total_length;
stats.packets += flow->src_pkt_total_cnt + flow->dst_pkt_total_cnt;
exp->export_flow(*flow);

pkts_from_begin++;
if (fps == 0) {
// Limit for packets/s is not enabled
continue;
}

// Calculate expected time of sending next packet
long elapsed = timeval_diff(&begin, &end);
if (elapsed < 0) {
// Should be never negative. Just for sure...
elapsed = pkts_from_begin * time_per_pkt;
}

long next_start = pkts_from_begin * time_per_pkt;
long diff = next_start - elapsed;

if (diff >= MICRO_SEC) {
diff = MICRO_SEC - 1;
}

// Sleep
if (diff > 0) {
sleep_time.tv_nsec = diff * 1000L;
nanosleep(&sleep_time, NULL);
}

if (pkts_from_begin >= fps) {
// Restart counter
gettimeofday(&begin, NULL);
pkts_from_begin = 0;
}
}
stats.dropped = exp->dropped;
Expand Down Expand Up @@ -480,6 +528,7 @@ int main(int argc, char *argv[])
options.flow_cache_qsize = 16536;
options.input_qsize = 64;
options.input_pktblock_size = 32;
options.fps = 0;

#ifdef WITH_NEMEA
bool odid = false;
Expand Down Expand Up @@ -784,6 +833,15 @@ int main(int argc, char *argv[])
options.flow_cache_qsize = tmp;
}
break;
case 'e':
if (!str_to_uint32(optarg, options.fps)) {
#ifdef WITH_NEMEA
FREE_MODULE_INFO_STRUCT(MODULE_BASIC_INFO, MODULE_PARAMS);
TRAP_DEFAULT_FINALIZATION();
#endif
return error("Invalid argument for option -e");
}
break;
default:
#ifdef WITH_NEMEA
FREE_MODULE_INFO_STRUCT(MODULE_BASIC_INFO, MODULE_PARAMS);
Expand Down Expand Up @@ -878,7 +936,7 @@ int main(int argc, char *argv[])
std::promise<OutputStats> *exporter_stats = new std::promise<OutputStats>();
ExporterWorker tmp = {
exporter,
new std::thread(export_thread, exporter, export_queue, exporter_stats),
new std::thread(export_thread, exporter, export_queue, exporter_stats, options.fps),
exporter_stats,
export_queue
};
Expand Down

0 comments on commit 6eec0d9

Please sign in to comment.