Skip to content

Commit

Permalink
* Implemented bgp_daemon_msglog_amqp_retry: if logging BGP messages/e…
Browse files Browse the repository at this point in the history
…vents,

  it allows to define the interval of time after which a connection to the
  RabbitMQ server should be retried after a failure is detected.
* Code cleanups
  • Loading branch information
paololucente authored and paolo committed May 28, 2014
1 parent 8187c63 commit 8b8da7b
Show file tree
Hide file tree
Showing 25 changed files with 48 additions and 349 deletions.
26 changes: 14 additions & 12 deletions CONFIG-KEYS
Original file line number Diff line number Diff line change
Expand Up @@ -292,20 +292,19 @@ KEY: logfile
DESC: enables logging to a file (bypassing syslog); expected value is a pathname (default: none,
console logging)

KEY: [ amqp_host | log_amqp_host | bgp_daemon_msglog_amqp_host ]
KEY: [ amqp_host | bgp_daemon_msglog_amqp_host ]
DESC: defines the AMQP/RabbitMQ server IP (default: localhost).

KEY: [ amqp_user | log_amqp_user | bgp_daemon_msglog_amqp_user ]
KEY: [ amqp_user | bgp_daemon_msglog_amqp_user ]
DESC: defines the username to use when connecting to the AMQP/RabbitMQ server (default: guest).

KEY: [ amqp_passwd | log_amqp_passwd | bgp_daemon_msglog_amqp_passwd ]
KEY: [ amqp_passwd | bgp_daemon_msglog_amqp_passwd ]
DESC: defines the password to use when connecting to the server (default: guest).

KEY: [ amqp_routing_key | log_amqp_routing_key | bgp_daemon_msglog_amqp_routing_key ]
KEY: [ amqp_routing_key | bgp_daemon_msglog_amqp_routing_key ]
DESC: Name of the AMQP routing key to attach to published data. Dynamic names are supported through
the use of variables, which are computed at the moment when data is purged to the backend.
Dynamic names are not supported for log_amqp_routing_key. The list of variables supported
by amqp_routing_key:
The list of variables supported by amqp_routing_key:

$peer_src_ip Value of the peer_src_ip primitive of the record being processed.

Expand All @@ -319,15 +318,14 @@ DESC: Name of the AMQP routing key to attach to published data. Dynamic names a

$peer_src_ip Value of the peer_src_ip primitive of the record being processed.

(default: amqp_routing_key: acct; log_amqp_routing_key, bgp_daemon_msglog_amqp_routing_key
no default value).
(default: amqp_routing_key: acct; bgp_daemon_msglog_amqp_routing_key no default value).

KEY: [ amqp_exchange | log_amqp_exchange | bgp_daemon_msglog_amqp_exchange ]
KEY: [ amqp_exchange | bgp_daemon_msglog_amqp_exchange ]
DESC: Name of the AMQP exchange to publish data (default: pmacct).

KEY: [ amqp_exchange_type | log_amqp_exchange_type | bgp_daemon_msglog_amqp_exchange_type ]
KEY: [ amqp_exchange_type | bgp_daemon_msglog_amqp_exchange_type ]
DESC: Type of the AMQP exchange to publish data. Currently only 'direct' and 'fanout' types are
supported. (default: direct).
supported (default: direct).

KEY: [ amqp_persistent_msg | bgp_daemon_msglog_amqp_persistent_msg ]
VALUES: [ true | false ]
Expand All @@ -337,7 +335,11 @@ DESC: Marks messages as persistent so that a queue content does not ge
still a short time window when RabbitMQ has accepted a message and hasn't saved it yet.
Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and
not really written to the disk. The persistence guarantees aren't strong, but it is more
than enough for our simple task queue.". (default: false)
than enough for our simple task queue." (default: false).

KEY: bgp_daemon_msglog_amqp_retry
DESC: Defines the interval of time, in seconds, after which a connection to the RabbitMQ server
should be retried after a failure is detected. (default: 60).

KEY: pidfile (-F) [GLOBAL]
DESC: writes PID of Core process to the specified file. PIDs of the active plugins are written
Expand Down
12 changes: 5 additions & 7 deletions src/amqp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void p_amqp_unset_last_fail(struct p_amqp_host *amqp_host)
if (amqp_host) amqp_host->last_fail = FALSE;
}

int p_amqp_connect(struct p_amqp_host *amqp_host, int type)
int p_amqp_connect(struct p_amqp_host *amqp_host)
{
amqp_host->conn = amqp_new_connection();

Expand Down Expand Up @@ -142,18 +142,16 @@ int p_amqp_connect(struct p_amqp_host *amqp_host, int type)
return SUCCESS;
}

int p_amqp_publish(struct p_amqp_host *amqp_host, char *json_str, int type)
int p_amqp_publish(struct p_amqp_host *amqp_host, char *json_str)
{
if (p_amqp_is_alive(amqp_host) == ERR) {
p_amqp_close(amqp_host, TRUE);
return ERR;
}

if (type == AMQP_PUBLISH_MSG) {
if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): publishing [E=%s RK=%s DM=%u]: %s\n", config.name,
config.type, amqp_host->exchange, amqp_host->routing_key,
amqp_host->msg_props.delivery_mode, json_str);
}
if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): publishing [E=%s RK=%s DM=%u]: %s\n", config.name,
config.type, amqp_host->exchange, amqp_host->routing_key,
amqp_host->msg_props.delivery_mode, json_str);

amqp_host->status = amqp_basic_publish(amqp_host->conn, 1, amqp_cstring_bytes(amqp_host->exchange),
amqp_cstring_bytes(amqp_host->routing_key), 0, 0, &amqp_host->msg_props,
Expand Down
12 changes: 5 additions & 7 deletions src/amqp_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
#include <amqp_tcp_socket.h>

/* defines */
#define AMQP_PUBLISH_MSG 1
#define AMQP_PUBLISH_LOG 2

#define AMQP_DEFAULT_RETRY 60

/* structures */
Expand Down Expand Up @@ -69,14 +66,15 @@ EXT time_t p_amqp_get_last_fail(struct p_amqp_host *);
EXT void p_amqp_unset_routing_key(struct p_amqp_host *);
EXT void p_amqp_unset_last_fail(struct p_amqp_host *);

EXT int p_amqp_connect(struct p_amqp_host *, int);
EXT int p_amqp_publish(struct p_amqp_host *, char *, int);
EXT int p_amqp_connect(struct p_amqp_host *);
EXT int p_amqp_publish(struct p_amqp_host *, char *);
EXT void p_amqp_close(struct p_amqp_host *, int);
EXT int p_amqp_is_alive(struct p_amqp_host *);

/* global vars */
EXT struct p_amqp_host amqpp_amqp_host, log_amqp_host;
EXT struct p_amqp_host bgp_daemon_msglog_amqp_host, bgp_table_dump_amqp_host;
EXT struct p_amqp_host amqpp_amqp_host;
EXT struct p_amqp_host bgp_daemon_msglog_amqp_host;
EXT struct p_amqp_host bgp_table_dump_amqp_host;

static char rabbitmq_user[] = "guest";
static char rabbitmq_pwd[] = "guest";
Expand Down
6 changes: 3 additions & 3 deletions src/amqp_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ void amqp_cache_purge(struct chained_cache *queue[], int index)
memset(&empty_pmpls, 0, sizeof(struct pkt_mpls_primitives));
memset(empty_pcust, 0, config.cpptrs.len);

ret = p_amqp_connect(&amqpp_amqp_host, AMQP_PUBLISH_MSG);
ret = p_amqp_connect(&amqpp_amqp_host);
if (ret) return;

for (j = 0, stop = 0; (!stop) && P_preprocess_funcs[j]; j++)
Expand Down Expand Up @@ -316,15 +316,15 @@ void amqp_cache_purge(struct chained_cache *queue[], int index)
p_amqp_set_routing_key(&amqpp_amqp_host, dyn_amqp_routing_key);
}

ret = p_amqp_publish(&amqpp_amqp_host, json_str, AMQP_PUBLISH_MSG);
ret = p_amqp_publish(&amqpp_amqp_host, json_str);
free(json_str);
qn++;

if (ret) return;
}
}

p_amqp_close(&amqpp_amqp_host, AMQP_PUBLISH_MSG);
p_amqp_close(&amqpp_amqp_host, FALSE);

duration = time(NULL)-start;
Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: %u/%u, ET: %u) ***\n",
Expand Down
9 changes: 6 additions & 3 deletions src/bgp/bgp.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ void skinny_bgp_daemon()
if (config.nfacctd_bgp_msglog_amqp_routing_key) {
#ifdef WITH_RABBITMQ
bgp_daemon_msglog_init_amqp_host();
p_amqp_connect(&bgp_daemon_msglog_amqp_host, AMQP_PUBLISH_MSG);
p_amqp_connect(&bgp_daemon_msglog_amqp_host);

if (!config.nfacctd_bgp_msglog_amqp_retry)
config.nfacctd_bgp_msglog_amqp_retry = AMQP_DEFAULT_RETRY;
#else
Log(LOG_WARNING, "WARN ( %s/core/BGP ): p_amqp_connect() not possible due to missing --enable-rabbitmq\n", config.name);
#endif
Expand Down Expand Up @@ -366,9 +369,9 @@ void skinny_bgp_daemon()
if (config.nfacctd_bgp_msglog_amqp_routing_key) {
time_t last_fail = p_amqp_get_last_fail(&bgp_daemon_msglog_amqp_host);

if (last_fail && (last_fail + AMQP_DEFAULT_RETRY < log_tstamp.tv_sec)) {
if (last_fail && (last_fail + config.nfacctd_bgp_msglog_amqp_retry < log_tstamp.tv_sec)) {
bgp_daemon_msglog_init_amqp_host();
p_amqp_connect(&bgp_daemon_msglog_amqp_host, AMQP_PUBLISH_MSG);
p_amqp_connect(&bgp_daemon_msglog_amqp_host);
}
}
#endif
Expand Down
6 changes: 3 additions & 3 deletions src/bgp/bgp_logdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ void bgp_peer_log_msg(struct bgp_node *route, struct bgp_info *ri, safi_t safi,

#ifdef WITH_RABBITMQ
if (config.nfacctd_bgp_msglog_amqp_routing_key) {
write_and_free_json_amqp(peer->log->amqp_host, obj, AMQP_PUBLISH_MSG);
write_and_free_json_amqp(peer->log->amqp_host, obj);
p_amqp_unset_routing_key(peer->log->amqp_host);
}
#endif
Expand Down Expand Up @@ -217,7 +217,7 @@ void bgp_peer_log_init(struct bgp_peer *peer, int output)

#ifdef WITH_RABBITMQ
if (config.nfacctd_bgp_msglog_amqp_routing_key) {
write_and_free_json_amqp(peer->log->amqp_host, obj, AMQP_PUBLISH_MSG);
write_and_free_json_amqp(peer->log->amqp_host, obj);
p_amqp_unset_routing_key(peer->log->amqp_host);
}
#endif
Expand Down Expand Up @@ -274,7 +274,7 @@ void bgp_peer_log_close(struct bgp_peer *peer, int output)

#ifdef WITH_RABBITMQ
if (config.nfacctd_bgp_msglog_amqp_routing_key) {
write_and_free_json_amqp(amqp_log_ptr, obj, AMQP_PUBLISH_MSG);
write_and_free_json_amqp(amqp_log_ptr, obj);
p_amqp_unset_routing_key(amqp_log_ptr);
}
#endif
Expand Down
8 changes: 0 additions & 8 deletions src/cfg.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,6 @@ struct configuration {
int active_plugins;
char *logfile;
FILE *logfile_fd;
char *log_amqp_host;
char *log_amqp_user;
char *log_amqp_passwd;
char *log_amqp_exchange;
char *log_amqp_exchange_type;
char *log_amqp_routing_key;
int log_amqp_persistent_msg;
int log_amqp_retry;
char *pidfile;
int networks_mask;
char *networks_file;
Expand Down
163 changes: 0 additions & 163 deletions src/cfg_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -3789,169 +3789,6 @@ void cfg_set_aggregate(char *filename, u_int64_t registry[], u_int64_t input, ch
else registry[index] |= value;
}

int cfg_key_log_amqp_host(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
int changes = 0;

if (!name) for (; list; list = list->next, changes++) list->cfg.log_amqp_host = value_ptr;
else {
for (; list; list = list->next) {
if (!strcmp(name, list->name)) {
list->cfg.log_amqp_host = value_ptr;
changes++;
break;
}
}
}

return changes;
}

int cfg_key_log_amqp_user(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
int changes = 0;

if (!name) for (; list; list = list->next, changes++) list->cfg.log_amqp_user = value_ptr;
else {
for (; list; list = list->next) {
if (!strcmp(name, list->name)) {
list->cfg.log_amqp_user = value_ptr;
changes++;
break;
}
}
}

return changes;
}

int cfg_key_log_amqp_passwd(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
int changes = 0;

if (!name) for (; list; list = list->next, changes++) list->cfg.log_amqp_passwd = value_ptr;
else {
for (; list; list = list->next) {
if (!strcmp(name, list->name)) {
list->cfg.log_amqp_passwd = value_ptr;
changes++;
break;
}
}
}

return changes;
}

int cfg_key_log_amqp_exchange(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
int changes = 0;

if (!name) for (; list; list = list->next, changes++) list->cfg.log_amqp_exchange = value_ptr;
else {
for (; list; list = list->next) {
if (!strcmp(name, list->name)) {
list->cfg.log_amqp_exchange = value_ptr;
changes++;
break;
}
}
}

return changes;
}

int cfg_key_log_amqp_exchange_type(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
int changes = 0;

if (!name) for (; list; list = list->next, changes++) list->cfg.log_amqp_exchange_type = value_ptr;
else {
for (; list; list = list->next) {
if (!strcmp(name, list->name)) {
list->cfg.log_amqp_exchange_type = value_ptr;
changes++;
break;
}
}
}

return changes;
}

int cfg_key_log_amqp_routing_key(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
int changes = 0;

// XXX: support for dynamic routing keys?

if (!name) for (; list; list = list->next, changes++) list->cfg.log_amqp_routing_key = value_ptr;
else {
for (; list; list = list->next) {
if (!strcmp(name, list->name)) {
list->cfg.log_amqp_routing_key = value_ptr;
changes++;
break;
}
}
}

return changes;
}

int cfg_key_log_amqp_persistent_msg(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
int value, changes = 0;

value = parse_truefalse(value_ptr);
if (value < 0) return ERR;

if (!name) for (; list; list = list->next, changes++) list->cfg.log_amqp_persistent_msg = value;
else {
for (; list; list = list->next) {
if (!strcmp(name, list->name)) {
list->cfg.log_amqp_persistent_msg = value;
changes++;
break;
}
}
}

return changes;
}

int cfg_key_log_amqp_retry(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
int value, changes = 0;

value = atoi(value_ptr);
if (value <= 0) {
Log(LOG_ERR, "WARN ( %s ): 'log_amqp_retry' has to be > 0.\n", filename);
return ERR;
}

if (!name) for (; list; list = list->next, changes++) list->cfg.log_amqp_retry = value;
else {
for (; list; list = list->next) {
if (!strcmp(name, list->name)) {
list->cfg.log_amqp_retry = value;
changes++;
break;
}
}
}

return changes;
}

int cfg_key_nfacctd_bgp_msglog_amqp_host(char *filename, char *name, char *value_ptr)
{
struct plugins_list_entry *list = plugins_list;
Expand Down
8 changes: 0 additions & 8 deletions src/cfg_handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@ EXT int parse_truefalse(char *);
EXT int cfg_key_debug(char *, char *, char *);
EXT int cfg_key_syslog(char *, char *, char *);
EXT int cfg_key_logfile(char *, char *, char *);
EXT int cfg_key_log_amqp_host(char *, char *, char *);
EXT int cfg_key_log_amqp_user(char *, char *, char *);
EXT int cfg_key_log_amqp_passwd(char *, char *, char *);
EXT int cfg_key_log_amqp_exchange(char *, char *, char *);
EXT int cfg_key_log_amqp_exchange_type(char *, char *, char *);
EXT int cfg_key_log_amqp_routing_key(char *, char *, char *);
EXT int cfg_key_log_amqp_persistent_msg(char *, char *, char *);
EXT int cfg_key_log_amqp_retry(char *, char *, char *);
EXT int cfg_key_pidfile(char *, char *, char *);
EXT int cfg_key_daemonize(char *, char *, char *);
EXT int cfg_key_proc_name(char *, char *, char *);
Expand Down
Loading

0 comments on commit 8b8da7b

Please sign in to comment.