Skip to content

Commit

Permalink
* Introducing AMQP support for dumps of BGP tables (bgp_table_dump fe…
Browse files Browse the repository at this point in the history
…ature)

  so that data can now be transported via a RabbitMQ message exchange to
  (remote) consumers. This feature is in addition to the existing file
  method. Configurable AMQP parameters are: host, username, password,
  exchange, exchange type and routing key (ie. bgp_table_dump_amqp_host,
  bgp_table_dump_amqp_user, etc.)
  • Loading branch information
paololucente authored and paolo committed May 29, 2014
1 parent 8b8da7b commit f03d4bd
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 62 deletions.
20 changes: 11 additions & 9 deletions CONFIG-KEYS
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,16 @@ KEY: logfile
DESC: enables logging to a file (bypassing syslog); expected value is a pathname (default: none,
console logging)

KEY: [ amqp_host | bgp_daemon_msglog_amqp_host ]
KEY: [ amqp_host | bgp_daemon_msglog_amqp_host | bgp_table_dump_amqp_host ]
DESC: defines the AMQP/RabbitMQ server IP (default: localhost).

KEY: [ amqp_user | bgp_daemon_msglog_amqp_user ]
KEY: [ amqp_user | bgp_daemon_msglog_amqp_user | bgp_table_dump_amqp_user ]
DESC: defines the username to use when connecting to the AMQP/RabbitMQ server (default: guest).

KEY: [ amqp_passwd | bgp_daemon_msglog_amqp_passwd ]
KEY: [ amqp_passwd | bgp_daemon_msglog_amqp_passwd | bgp_table_dump_amqp_passwd ]
DESC: defines the password to use when connecting to the server (default: guest).

KEY: [ amqp_routing_key | bgp_daemon_msglog_amqp_routing_key ]
KEY: [ amqp_routing_key | bgp_daemon_msglog_amqp_routing_key | bgp_table_dump_amqp_routing_key ]
DESC: Name of the AMQP routing key to attach to published data. Dynamic names are supported through
the use of variables, which are computed at the moment when data is purged to the backend.
The list of variables supported by amqp_routing_key:
Expand All @@ -314,20 +314,22 @@ DESC: Name of the AMQP routing key to attach to published data. Dynamic names a

$post_tag2 Configured value of post_tag2.

The list of variables supported by bgp_daemon_msglog_amqp_routing_key:
Variables supported by bgp_daemon_msglog_amqp_routing_key, bgp_table_dump_amqp_routing_key:

$peer_src_ip Value of the peer_src_ip primitive of the record being processed.

(default: amqp_routing_key: acct; bgp_daemon_msglog_amqp_routing_key no default value).
(default: amqp_routing_key: acct)

KEY: [ amqp_exchange | bgp_daemon_msglog_amqp_exchange ]
KEY: [ amqp_exchange | bgp_daemon_msglog_amqp_exchange | bgp_table_dump_amqp_exchange ]
DESC: Name of the AMQP exchange to publish data (default: pmacct).

KEY: [ amqp_exchange_type | bgp_daemon_msglog_amqp_exchange_type ]
KEY: [ amqp_exchange_type | bgp_daemon_msglog_amqp_exchange_type |
bgp_table_dump_amqp_exchange_type ]
DESC: Type of the AMQP exchange to publish data. Currently only 'direct' and 'fanout' types are
supported (default: direct).

KEY: [ amqp_persistent_msg | bgp_daemon_msglog_amqp_persistent_msg ]
KEY: [ amqp_persistent_msg | bgp_daemon_msglog_amqp_persistent_msg |
bgp_table_dump_amqp_persistent_msg ]
VALUES: [ true | false ]
DESC: Marks messages as persistent so that a queue content does not get lost if RabbitMQ restarts.
Note from RabbitMQ docs: "Marking messages as persistent doesn't fully guarantee that a
Expand Down
23 changes: 15 additions & 8 deletions src/bgp/bgp.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ void skinny_bgp_daemon()
}
}

if (config.bgp_table_dump_file && config.bgp_table_dump_amqp_routing_key) {
Log(LOG_ERR, "ERROR ( %s/core/BGP ): bgp_table_dump_file and bgp_table_dump_amqp_routing_key are mutually exclusive. Terminating thread.\n", config.name);
exit_all(1);
}

if (!config.bgp_table_peer_buckets) config.bgp_table_peer_buckets = DEFAULT_BGP_INFO_HASH;
if (!config.bgp_table_per_peer_buckets) config.bgp_table_per_peer_buckets = DEFAULT_BGP_INFO_PER_PEER_HASH;

Expand Down Expand Up @@ -281,7 +286,8 @@ void skinny_bgp_daemon()
Log(LOG_WARNING, "WARN ( %s/core/BGP ): bgp_daemon_msglog_output set to json but will produce no output (missing --enable-jansson).\n", config.name);
#endif

if (!config.bgp_table_dump_output && config.bgp_table_dump_file)
if (!config.bgp_table_dump_output && (config.bgp_table_dump_file ||
config.bgp_table_dump_amqp_routing_key))
#ifdef WITH_JANSSON
config.bgp_table_dump_output = PRINT_OUTPUT_JSON;
#else
Expand All @@ -304,7 +310,7 @@ void skinny_bgp_daemon()
}
else {
config.bgp_table_dump_file = NULL;
Log(LOG_WARNING, "WARN ( %s/core/BGP ): 'bgp_table_dump_file' ignored due to invalid 'bgp_table_dump_refresh_time'.\n", config.name);
Log(LOG_WARNING, "WARN ( %s/core/BGP ): Invalid 'bgp_table_dump_refresh_time'.\n", config.name);
}

bgp_table_dump_init_amqp_host();
Expand All @@ -319,7 +325,7 @@ void skinny_bgp_daemon()
select_fd++;
memcpy(&read_descs, &bkp_read_descs, sizeof(bkp_read_descs));

if (config.bgp_table_dump_file) {
if (config.bgp_table_dump_file || config.bgp_table_dump_amqp_routing_key) {
int delta;

calc_refresh_timeout_sec(dump_refresh_deadline, log_tstamp.tv_sec, &delta);
Expand Down Expand Up @@ -353,20 +359,20 @@ void skinny_bgp_daemon()
}
}

if (config.nfacctd_bgp_msglog_file || config.bgp_table_dump_file ||
config.nfacctd_bgp_msglog_amqp_routing_key) {
if (config.nfacctd_bgp_msglog_file || config.nfacctd_bgp_msglog_amqp_routing_key ||
config.bgp_table_dump_file || config.bgp_table_dump_amqp_routing_key) {
gettimeofday(&log_tstamp, NULL);
compose_timestamp(log_tstamp_str, SRVBUFLEN, &log_tstamp, TRUE);

if (config.bgp_table_dump_file) {
if (config.bgp_table_dump_file || config.bgp_table_dump_amqp_routing_key) {
while (log_tstamp.tv_sec > dump_refresh_deadline) {
bgp_handle_dump_event();
dump_refresh_deadline += config.bgp_table_dump_refresh_time;
}
}

#ifdef WITH_RABBITMQ
if (config.nfacctd_bgp_msglog_amqp_routing_key) {
if (config.nfacctd_bgp_msglog_amqp_routing_key) {
time_t last_fail = p_amqp_get_last_fail(&bgp_daemon_msglog_amqp_host);

if (last_fail && (last_fail + config.nfacctd_bgp_msglog_amqp_retry < log_tstamp.tv_sec)) {
Expand Down Expand Up @@ -1998,7 +2004,8 @@ void bgp_peer_close(struct bgp_peer *peer)
bgp_info_delete()/bgp_route_next() in bgp_peer_info_delete()
that require some more testing in the field.
*/
if (config.bgp_table_dump_file) bgp_peer_info_delete(peer);
if (config.bgp_table_dump_file || config.bgp_table_dump_amqp_routing_key)
bgp_peer_info_delete(peer);

if (config.nfacctd_bgp_msglog_file || config.nfacctd_bgp_msglog_amqp_routing_key)
bgp_peer_log_close(peer, config.nfacctd_bgp_msglog_output);
Expand Down
90 changes: 75 additions & 15 deletions src/bgp/bgp_logdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ void bgp_peer_log_msg(struct bgp_node *route, struct bgp_info *ri, safi_t safi,
struct bgp_attr *attr = ri->attr;

#ifdef WITH_RABBITMQ
if (config.nfacctd_bgp_msglog_amqp_routing_key)
if (config.nfacctd_bgp_msglog_amqp_routing_key ||
config.bgp_table_dump_amqp_routing_key)
p_amqp_set_routing_key(peer->log->amqp_host, peer->log->filename);
#endif

Expand Down Expand Up @@ -137,7 +138,8 @@ void bgp_peer_log_msg(struct bgp_node *route, struct bgp_info *ri, safi_t safi,
write_and_free_json(peer->log->fd, obj);

#ifdef WITH_RABBITMQ
if (config.nfacctd_bgp_msglog_amqp_routing_key) {
if (config.nfacctd_bgp_msglog_amqp_routing_key ||
config.bgp_table_dump_amqp_routing_key) {
write_and_free_json_amqp(peer->log->amqp_host, obj);
p_amqp_unset_routing_key(peer->log->amqp_host);
}
Expand Down Expand Up @@ -341,7 +343,12 @@ void bgp_peer_dump_init(struct bgp_peer *peer, int output)
{
char event_type[] = "dump_init";

if (!peer || !peer->log || !peer->log->fd) return;
if (!peer || !peer->log) return;

#ifdef WITH_RABBITMQ
if (config.bgp_table_dump_amqp_routing_key)
p_amqp_set_routing_key(peer->log->amqp_host, peer->log->filename);
#endif

if (output == PRINT_OUTPUT_JSON) {
#ifdef WITH_JANSSON
Expand All @@ -361,7 +368,15 @@ void bgp_peer_dump_init(struct bgp_peer *peer, int output)
json_object_update_missing(obj, kv);
json_decref(kv);

write_and_free_json(peer->log->fd, obj);
if (config.bgp_table_dump_file)
write_and_free_json(peer->log->fd, obj);

#ifdef WITH_RABBITMQ
if (config.bgp_table_dump_amqp_routing_key) {
write_and_free_json_amqp(peer->log->amqp_host, obj);
p_amqp_unset_routing_key(peer->log->amqp_host);
}
#endif
#endif
}
}
Expand All @@ -370,7 +385,12 @@ void bgp_peer_dump_close(struct bgp_peer *peer, int output)
{
char event_type[] = "dump_close";

if (!peer || !peer->log || !peer->log->fd) return;
if (!peer || !peer->log) return;

#ifdef WITH_RABBITMQ
if (config.bgp_table_dump_amqp_routing_key)
p_amqp_set_routing_key(peer->log->amqp_host, peer->log->filename);
#endif

if (output == PRINT_OUTPUT_JSON) {
#ifdef WITH_JANSSON
Expand All @@ -390,7 +410,15 @@ void bgp_peer_dump_close(struct bgp_peer *peer, int output)
json_object_update_missing(obj, kv);
json_decref(kv);

write_and_free_json(peer->log->fd, obj);
if (config.bgp_table_dump_file)
write_and_free_json(peer->log->fd, obj);

#ifdef WITH_RABBITMQ
if (config.bgp_table_dump_amqp_routing_key) {
write_and_free_json_amqp(peer->log->amqp_host, obj);
p_amqp_unset_routing_key(peer->log->amqp_host);
}
#endif
#endif
}
}
Expand All @@ -410,7 +438,9 @@ void bgp_handle_dump_event()
time_t start;

/* pre-flight check */
if (!config.bgp_table_dump_file || !config.bgp_table_dump_refresh_time) return;
if ((!config.bgp_table_dump_file && !config.bgp_table_dump_amqp_routing_key) ||
!config.bgp_table_dump_refresh_time)
return;

switch (ret = fork()) {
case 0: /* Child */
Expand All @@ -422,6 +452,14 @@ void bgp_handle_dump_event()
memset(current_filename, 0, sizeof(current_filename));
memset(&peer_log, 0, sizeof(struct bgp_peer_log));

#ifdef WITH_RABBITMQ
if (config.bgp_table_dump_amqp_routing_key) {
bgp_table_dump_init_amqp_host();
ret = p_amqp_connect(&bgp_table_dump_amqp_host);
if (ret) exit(ret);
}
#endif

dumper_pid = getpid();
Log(LOG_INFO, "INFO ( %s/core/BGP ): *** Dumping BGP tables - START (PID: %u) ***\n", config.name, dumper_pid);
start = time(NULL);
Expand All @@ -432,18 +470,37 @@ void bgp_handle_dump_event()
peer = &peers[peers_idx];
peer->log = &peer_log; /* abusing struct bgp_peer a bit, but we are in a child */

bgp_peer_log_dynname(current_filename, SRVBUFLEN, config.bgp_table_dump_file, peer);
if (config.bgp_table_dump_file)
bgp_peer_log_dynname(current_filename, SRVBUFLEN, config.bgp_table_dump_file, peer);

if (config.bgp_table_dump_amqp_routing_key)
bgp_peer_log_dynname(current_filename, SRVBUFLEN, config.bgp_table_dump_amqp_routing_key, peer);

strftime_same(current_filename, SRVBUFLEN, tmpbuf, &log_tstamp.tv_sec);

/*
we close last_filename and open current_filename in case they differ;
we are safe with this approach until $peer_src_ip is the only variable
supported as part of bgp_table_dump_file configuration directive.
supported as part of bgp_table_dump_file configuration directive.
*/
if (strcmp(last_filename, current_filename)) {
if (saved_peer && saved_peer->log && strlen(last_filename)) fclose(saved_peer->log->fd);
peer->log->fd = open_logfile(current_filename, "w");
if (config.bgp_table_dump_file) {
if (strcmp(last_filename, current_filename)) {
if (saved_peer && saved_peer->log && strlen(last_filename)) fclose(saved_peer->log->fd);
peer->log->fd = open_logfile(current_filename, "w");
}
}

#ifdef WITH_RABBITMQ
/*
a bit pedantic maybe but should come at little cost and emulating
bgp_table_dump_file behaviour will work
*/
if (config.bgp_table_dump_amqp_routing_key) {
peer->log->amqp_host = &bgp_table_dump_amqp_host;
strcpy(peer->log->filename, current_filename);
}
#endif

bgp_peer_dump_init(peer, config.bgp_table_dump_output);

for (afi = AFI_IP; afi < AFI_MAX; afi++) {
Expand Down Expand Up @@ -476,14 +533,19 @@ void bgp_handle_dump_event()
}
}

#ifdef WITH_RABBITMQ
if (config.bgp_table_dump_amqp_routing_key)
p_amqp_close(&bgp_table_dump_amqp_host, FALSE);
#endif

duration = time(NULL)-start;
Log(LOG_INFO, "INFO ( %s/core/BGP ): *** Dumping BGP tables - END (PID: %u, TABLES: %u ET: %u) ***\n",
config.name, dumper_pid, tables_num, duration);

exit(0);
default: /* Parent */
if (ret == -1) { /* Something went wrong */
Log(LOG_WARNING, "WARN ( %s/core/BGP ): Unable to fork DB writer: %s\n", config.name, strerror(errno));
Log(LOG_WARNING, "WARN ( %s/core/BGP ): Unable to fork BGP table dump writer: %s\n", config.name, strerror(errno));
}

break;
Expand All @@ -504,7 +566,6 @@ void bgp_daemon_msglog_init_amqp_host()
p_amqp_set_user(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_user);
p_amqp_set_passwd(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_passwd);
p_amqp_set_exchange(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_exchange);
// p_amqp_set_routing_key(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_routing_key);
p_amqp_set_exchange_type(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_exchange_type);
p_amqp_set_host(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_host);
p_amqp_set_persistent_msg(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_persistent_msg);
Expand All @@ -529,7 +590,6 @@ void bgp_table_dump_init_amqp_host()
p_amqp_set_user(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_user);
p_amqp_set_passwd(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_passwd);
p_amqp_set_exchange(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_exchange);
p_amqp_set_routing_key(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_routing_key);
p_amqp_set_exchange_type(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_exchange_type);
p_amqp_set_host(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_host);
p_amqp_set_persistent_msg(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_persistent_msg);
Expand Down
2 changes: 1 addition & 1 deletion src/pmacct-build.h
Original file line number Diff line number Diff line change
@@ -1 +1 @@
#define PMACCT_BUILD "20140528-00"
#define PMACCT_BUILD "20140529-00"
30 changes: 2 additions & 28 deletions src/util.c
Original file line number Diff line number Diff line change
Expand Up @@ -2072,6 +2072,8 @@ void write_and_free_json(FILE *f, void *obj)
char *tmpbuf = NULL;
json_t *json_obj = (json_t *) obj;

if (!f) return;

tmpbuf = json_dumps(json_obj, 0);
json_decref(json_obj);

Expand Down Expand Up @@ -2121,34 +2123,6 @@ void write_and_free_json_amqp(void *amqp_log, void *obj)
}
#endif

#ifdef WITH_JANSSON
char *compose_log_json(char *msg)
{
char *tmpbuf = NULL, empty_string[] = "";
json_t *obj = json_object(), *kv;

if (msg) kv = json_pack("{ss}", "log", msg);
else kv = json_pack("{ss}", "log", empty_string);

json_object_update_missing(obj, kv);
json_decref(kv);

tmpbuf = json_dumps(obj, 0);
json_decref(obj);

return tmpbuf;
}
#else
char *compose_log_json(char *msg)
{
config.log_amqp_routing_key = NULL; /* force to screen if no other logging method is selected */

if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): compose_log_json(): JSON object not created due to missing --enable-jansson\n", config.name, config.type);

return NULL;
}
#endif

void compose_timestamp(char *buf, int buflen, struct timeval *tv, int usec)
{
char tmpbuf[SRVBUFLEN];
Expand Down
1 change: 0 additions & 1 deletion src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ EXT char *compose_json(u_int64_t, u_int64_t, u_int8_t, struct pkt_primitives *,
EXT void compose_timestamp(char *, int, struct timeval *, int);
EXT void write_and_free_json(FILE *, void *);
EXT void write_and_free_json_amqp(void *, void *);
EXT char *compose_log_json(char *);

EXT struct packet_ptrs *copy_packet_ptrs(struct packet_ptrs *);
EXT void free_packet_ptrs(struct packet_ptrs *);
Expand Down

0 comments on commit f03d4bd

Please sign in to comment.