From f03d4bdc81a325130f1a3dca5f0acefbc0e85ddf Mon Sep 17 00:00:00 2001 From: Paolo Lucente Date: Thu, 29 May 2014 22:56:04 +0000 Subject: [PATCH] * Introducing AMQP support for dumps of BGP tables (bgp_table_dump feature) so that data can now be transported via a RabbitMQ message exchange to (remote) consumers. This feature is in addition to the existing file method. Configurable AMQP parameters are: host, username, password, exchange, exchange type and routing key (ie. bgp_table_dump_amqp_host, bgp_table_dump_amqp_user, etc.) --- CONFIG-KEYS | 20 +++++----- src/bgp/bgp.c | 23 +++++++---- src/bgp/bgp_logdump.c | 90 +++++++++++++++++++++++++++++++++++-------- src/pmacct-build.h | 2 +- src/util.c | 30 +-------------- src/util.h | 1 - 6 files changed, 104 insertions(+), 62 deletions(-) diff --git a/CONFIG-KEYS b/CONFIG-KEYS index b941a17ee..2a2e07a02 100644 --- a/CONFIG-KEYS +++ b/CONFIG-KEYS @@ -292,16 +292,16 @@ KEY: logfile DESC: enables logging to a file (bypassing syslog); expected value is a pathname (default: none, console logging) -KEY: [ amqp_host | bgp_daemon_msglog_amqp_host ] +KEY: [ amqp_host | bgp_daemon_msglog_amqp_host | bgp_table_dump_amqp_host ] DESC: defines the AMQP/RabbitMQ server IP (default: localhost). -KEY: [ amqp_user | bgp_daemon_msglog_amqp_user ] +KEY: [ amqp_user | bgp_daemon_msglog_amqp_user | bgp_table_dump_amqp_user ] DESC: defines the username to use when connecting to the AMQP/RabbitMQ server (default: guest). -KEY: [ amqp_passwd | bgp_daemon_msglog_amqp_passwd ] +KEY: [ amqp_passwd | bgp_daemon_msglog_amqp_passwd | bgp_table_dump_amqp_passwd ] DESC: defines the password to use when connecting to the server (default: guest). -KEY: [ amqp_routing_key | bgp_daemon_msglog_amqp_routing_key ] +KEY: [ amqp_routing_key | bgp_daemon_msglog_amqp_routing_key | bgp_table_dump_amqp_routing_key ] DESC: Name of the AMQP routing key to attach to published data. Dynamic names are supported through the use of variables, which are computed at the moment when data is purged to the backend. The list of variables supported by amqp_routing_key: @@ -314,20 +314,22 @@ DESC: Name of the AMQP routing key to attach to published data. Dynamic names a $post_tag2 Configured value of post_tag2. - The list of variables supported by bgp_daemon_msglog_amqp_routing_key: + Variables supported by bgp_daemon_msglog_amqp_routing_key, bgp_table_dump_amqp_routing_key: $peer_src_ip Value of the peer_src_ip primitive of the record being processed. - (default: amqp_routing_key: acct; bgp_daemon_msglog_amqp_routing_key no default value). + (default: amqp_routing_key: acct) -KEY: [ amqp_exchange | bgp_daemon_msglog_amqp_exchange ] +KEY: [ amqp_exchange | bgp_daemon_msglog_amqp_exchange | bgp_table_dump_amqp_exchange ] DESC: Name of the AMQP exchange to publish data (default: pmacct). -KEY: [ amqp_exchange_type | bgp_daemon_msglog_amqp_exchange_type ] +KEY: [ amqp_exchange_type | bgp_daemon_msglog_amqp_exchange_type | + bgp_table_dump_amqp_exchange_type ] DESC: Type of the AMQP exchange to publish data. Currently only 'direct' and 'fanout' types are supported (default: direct). -KEY: [ amqp_persistent_msg | bgp_daemon_msglog_amqp_persistent_msg ] +KEY: [ amqp_persistent_msg | bgp_daemon_msglog_amqp_persistent_msg | + bgp_table_dump_amqp_persistent_msg ] VALUES: [ true | false ] DESC: Marks messages as persistent so that a queue content does not get lost if RabbitMQ restarts. Note from RabbitMQ docs: "Marking messages as persistent doesn't fully guarantee that a diff --git a/src/bgp/bgp.c b/src/bgp/bgp.c index d9c036711..2d7453b99 100644 --- a/src/bgp/bgp.c +++ b/src/bgp/bgp.c @@ -164,6 +164,11 @@ void skinny_bgp_daemon() } } + if (config.bgp_table_dump_file && config.bgp_table_dump_amqp_routing_key) { + Log(LOG_ERR, "ERROR ( %s/core/BGP ): bgp_table_dump_file and bgp_table_dump_amqp_routing_key are mutually exclusive. Terminating thread.\n", config.name); + exit_all(1); + } + if (!config.bgp_table_peer_buckets) config.bgp_table_peer_buckets = DEFAULT_BGP_INFO_HASH; if (!config.bgp_table_per_peer_buckets) config.bgp_table_per_peer_buckets = DEFAULT_BGP_INFO_PER_PEER_HASH; @@ -281,7 +286,8 @@ void skinny_bgp_daemon() Log(LOG_WARNING, "WARN ( %s/core/BGP ): bgp_daemon_msglog_output set to json but will produce no output (missing --enable-jansson).\n", config.name); #endif - if (!config.bgp_table_dump_output && config.bgp_table_dump_file) + if (!config.bgp_table_dump_output && (config.bgp_table_dump_file || + config.bgp_table_dump_amqp_routing_key)) #ifdef WITH_JANSSON config.bgp_table_dump_output = PRINT_OUTPUT_JSON; #else @@ -304,7 +310,7 @@ void skinny_bgp_daemon() } else { config.bgp_table_dump_file = NULL; - Log(LOG_WARNING, "WARN ( %s/core/BGP ): 'bgp_table_dump_file' ignored due to invalid 'bgp_table_dump_refresh_time'.\n", config.name); + Log(LOG_WARNING, "WARN ( %s/core/BGP ): Invalid 'bgp_table_dump_refresh_time'.\n", config.name); } bgp_table_dump_init_amqp_host(); @@ -319,7 +325,7 @@ void skinny_bgp_daemon() select_fd++; memcpy(&read_descs, &bkp_read_descs, sizeof(bkp_read_descs)); - if (config.bgp_table_dump_file) { + if (config.bgp_table_dump_file || config.bgp_table_dump_amqp_routing_key) { int delta; calc_refresh_timeout_sec(dump_refresh_deadline, log_tstamp.tv_sec, &delta); @@ -353,12 +359,12 @@ void skinny_bgp_daemon() } } - if (config.nfacctd_bgp_msglog_file || config.bgp_table_dump_file || - config.nfacctd_bgp_msglog_amqp_routing_key) { + if (config.nfacctd_bgp_msglog_file || config.nfacctd_bgp_msglog_amqp_routing_key || + config.bgp_table_dump_file || config.bgp_table_dump_amqp_routing_key) { gettimeofday(&log_tstamp, NULL); compose_timestamp(log_tstamp_str, SRVBUFLEN, &log_tstamp, TRUE); - if (config.bgp_table_dump_file) { + if (config.bgp_table_dump_file || config.bgp_table_dump_amqp_routing_key) { while (log_tstamp.tv_sec > dump_refresh_deadline) { bgp_handle_dump_event(); dump_refresh_deadline += config.bgp_table_dump_refresh_time; @@ -366,7 +372,7 @@ void skinny_bgp_daemon() } #ifdef WITH_RABBITMQ - if (config.nfacctd_bgp_msglog_amqp_routing_key) { + if (config.nfacctd_bgp_msglog_amqp_routing_key) { time_t last_fail = p_amqp_get_last_fail(&bgp_daemon_msglog_amqp_host); if (last_fail && (last_fail + config.nfacctd_bgp_msglog_amqp_retry < log_tstamp.tv_sec)) { @@ -1998,7 +2004,8 @@ void bgp_peer_close(struct bgp_peer *peer) bgp_info_delete()/bgp_route_next() in bgp_peer_info_delete() that require some more testing in the field. */ - if (config.bgp_table_dump_file) bgp_peer_info_delete(peer); + if (config.bgp_table_dump_file || config.bgp_table_dump_amqp_routing_key) + bgp_peer_info_delete(peer); if (config.nfacctd_bgp_msglog_file || config.nfacctd_bgp_msglog_amqp_routing_key) bgp_peer_log_close(peer, config.nfacctd_bgp_msglog_output); diff --git a/src/bgp/bgp_logdump.c b/src/bgp/bgp_logdump.c index 3a036d1df..2ea491fcb 100644 --- a/src/bgp/bgp_logdump.c +++ b/src/bgp/bgp_logdump.c @@ -40,7 +40,8 @@ void bgp_peer_log_msg(struct bgp_node *route, struct bgp_info *ri, safi_t safi, struct bgp_attr *attr = ri->attr; #ifdef WITH_RABBITMQ - if (config.nfacctd_bgp_msglog_amqp_routing_key) + if (config.nfacctd_bgp_msglog_amqp_routing_key || + config.bgp_table_dump_amqp_routing_key) p_amqp_set_routing_key(peer->log->amqp_host, peer->log->filename); #endif @@ -137,7 +138,8 @@ void bgp_peer_log_msg(struct bgp_node *route, struct bgp_info *ri, safi_t safi, write_and_free_json(peer->log->fd, obj); #ifdef WITH_RABBITMQ - if (config.nfacctd_bgp_msglog_amqp_routing_key) { + if (config.nfacctd_bgp_msglog_amqp_routing_key || + config.bgp_table_dump_amqp_routing_key) { write_and_free_json_amqp(peer->log->amqp_host, obj); p_amqp_unset_routing_key(peer->log->amqp_host); } @@ -341,7 +343,12 @@ void bgp_peer_dump_init(struct bgp_peer *peer, int output) { char event_type[] = "dump_init"; - if (!peer || !peer->log || !peer->log->fd) return; + if (!peer || !peer->log) return; + +#ifdef WITH_RABBITMQ + if (config.bgp_table_dump_amqp_routing_key) + p_amqp_set_routing_key(peer->log->amqp_host, peer->log->filename); +#endif if (output == PRINT_OUTPUT_JSON) { #ifdef WITH_JANSSON @@ -361,7 +368,15 @@ void bgp_peer_dump_init(struct bgp_peer *peer, int output) json_object_update_missing(obj, kv); json_decref(kv); - write_and_free_json(peer->log->fd, obj); + if (config.bgp_table_dump_file) + write_and_free_json(peer->log->fd, obj); + +#ifdef WITH_RABBITMQ + if (config.bgp_table_dump_amqp_routing_key) { + write_and_free_json_amqp(peer->log->amqp_host, obj); + p_amqp_unset_routing_key(peer->log->amqp_host); + } +#endif #endif } } @@ -370,7 +385,12 @@ void bgp_peer_dump_close(struct bgp_peer *peer, int output) { char event_type[] = "dump_close"; - if (!peer || !peer->log || !peer->log->fd) return; + if (!peer || !peer->log) return; + +#ifdef WITH_RABBITMQ + if (config.bgp_table_dump_amqp_routing_key) + p_amqp_set_routing_key(peer->log->amqp_host, peer->log->filename); +#endif if (output == PRINT_OUTPUT_JSON) { #ifdef WITH_JANSSON @@ -390,7 +410,15 @@ void bgp_peer_dump_close(struct bgp_peer *peer, int output) json_object_update_missing(obj, kv); json_decref(kv); - write_and_free_json(peer->log->fd, obj); + if (config.bgp_table_dump_file) + write_and_free_json(peer->log->fd, obj); + +#ifdef WITH_RABBITMQ + if (config.bgp_table_dump_amqp_routing_key) { + write_and_free_json_amqp(peer->log->amqp_host, obj); + p_amqp_unset_routing_key(peer->log->amqp_host); + } +#endif #endif } } @@ -410,7 +438,9 @@ void bgp_handle_dump_event() time_t start; /* pre-flight check */ - if (!config.bgp_table_dump_file || !config.bgp_table_dump_refresh_time) return; + if ((!config.bgp_table_dump_file && !config.bgp_table_dump_amqp_routing_key) || + !config.bgp_table_dump_refresh_time) + return; switch (ret = fork()) { case 0: /* Child */ @@ -422,6 +452,14 @@ void bgp_handle_dump_event() memset(current_filename, 0, sizeof(current_filename)); memset(&peer_log, 0, sizeof(struct bgp_peer_log)); +#ifdef WITH_RABBITMQ + if (config.bgp_table_dump_amqp_routing_key) { + bgp_table_dump_init_amqp_host(); + ret = p_amqp_connect(&bgp_table_dump_amqp_host); + if (ret) exit(ret); + } +#endif + dumper_pid = getpid(); Log(LOG_INFO, "INFO ( %s/core/BGP ): *** Dumping BGP tables - START (PID: %u) ***\n", config.name, dumper_pid); start = time(NULL); @@ -432,18 +470,37 @@ void bgp_handle_dump_event() peer = &peers[peers_idx]; peer->log = &peer_log; /* abusing struct bgp_peer a bit, but we are in a child */ - bgp_peer_log_dynname(current_filename, SRVBUFLEN, config.bgp_table_dump_file, peer); + if (config.bgp_table_dump_file) + bgp_peer_log_dynname(current_filename, SRVBUFLEN, config.bgp_table_dump_file, peer); + + if (config.bgp_table_dump_amqp_routing_key) + bgp_peer_log_dynname(current_filename, SRVBUFLEN, config.bgp_table_dump_amqp_routing_key, peer); + strftime_same(current_filename, SRVBUFLEN, tmpbuf, &log_tstamp.tv_sec); /* we close last_filename and open current_filename in case they differ; we are safe with this approach until $peer_src_ip is the only variable - supported as part of bgp_table_dump_file configuration directive. + supported as part of bgp_table_dump_file configuration directive. */ - if (strcmp(last_filename, current_filename)) { - if (saved_peer && saved_peer->log && strlen(last_filename)) fclose(saved_peer->log->fd); - peer->log->fd = open_logfile(current_filename, "w"); + if (config.bgp_table_dump_file) { + if (strcmp(last_filename, current_filename)) { + if (saved_peer && saved_peer->log && strlen(last_filename)) fclose(saved_peer->log->fd); + peer->log->fd = open_logfile(current_filename, "w"); + } + } + +#ifdef WITH_RABBITMQ + /* + a bit pedantic maybe but should come at little cost and emulating + bgp_table_dump_file behaviour will work + */ + if (config.bgp_table_dump_amqp_routing_key) { + peer->log->amqp_host = &bgp_table_dump_amqp_host; + strcpy(peer->log->filename, current_filename); } +#endif + bgp_peer_dump_init(peer, config.bgp_table_dump_output); for (afi = AFI_IP; afi < AFI_MAX; afi++) { @@ -476,6 +533,11 @@ void bgp_handle_dump_event() } } +#ifdef WITH_RABBITMQ + if (config.bgp_table_dump_amqp_routing_key) + p_amqp_close(&bgp_table_dump_amqp_host, FALSE); +#endif + duration = time(NULL)-start; Log(LOG_INFO, "INFO ( %s/core/BGP ): *** Dumping BGP tables - END (PID: %u, TABLES: %u ET: %u) ***\n", config.name, dumper_pid, tables_num, duration); @@ -483,7 +545,7 @@ void bgp_handle_dump_event() exit(0); default: /* Parent */ if (ret == -1) { /* Something went wrong */ - Log(LOG_WARNING, "WARN ( %s/core/BGP ): Unable to fork DB writer: %s\n", config.name, strerror(errno)); + Log(LOG_WARNING, "WARN ( %s/core/BGP ): Unable to fork BGP table dump writer: %s\n", config.name, strerror(errno)); } break; @@ -504,7 +566,6 @@ void bgp_daemon_msglog_init_amqp_host() p_amqp_set_user(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_user); p_amqp_set_passwd(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_passwd); p_amqp_set_exchange(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_exchange); -// p_amqp_set_routing_key(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_routing_key); p_amqp_set_exchange_type(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_exchange_type); p_amqp_set_host(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_host); p_amqp_set_persistent_msg(&bgp_daemon_msglog_amqp_host, config.nfacctd_bgp_msglog_amqp_persistent_msg); @@ -529,7 +590,6 @@ void bgp_table_dump_init_amqp_host() p_amqp_set_user(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_user); p_amqp_set_passwd(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_passwd); p_amqp_set_exchange(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_exchange); - p_amqp_set_routing_key(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_routing_key); p_amqp_set_exchange_type(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_exchange_type); p_amqp_set_host(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_host); p_amqp_set_persistent_msg(&bgp_table_dump_amqp_host, config.bgp_table_dump_amqp_persistent_msg); diff --git a/src/pmacct-build.h b/src/pmacct-build.h index db18b19fd..482485402 100644 --- a/src/pmacct-build.h +++ b/src/pmacct-build.h @@ -1 +1 @@ -#define PMACCT_BUILD "20140528-00" +#define PMACCT_BUILD "20140529-00" diff --git a/src/util.c b/src/util.c index 738f3de9b..b8b859a96 100644 --- a/src/util.c +++ b/src/util.c @@ -2072,6 +2072,8 @@ void write_and_free_json(FILE *f, void *obj) char *tmpbuf = NULL; json_t *json_obj = (json_t *) obj; + if (!f) return; + tmpbuf = json_dumps(json_obj, 0); json_decref(json_obj); @@ -2121,34 +2123,6 @@ void write_and_free_json_amqp(void *amqp_log, void *obj) } #endif -#ifdef WITH_JANSSON -char *compose_log_json(char *msg) -{ - char *tmpbuf = NULL, empty_string[] = ""; - json_t *obj = json_object(), *kv; - - if (msg) kv = json_pack("{ss}", "log", msg); - else kv = json_pack("{ss}", "log", empty_string); - - json_object_update_missing(obj, kv); - json_decref(kv); - - tmpbuf = json_dumps(obj, 0); - json_decref(obj); - - return tmpbuf; -} -#else -char *compose_log_json(char *msg) -{ - config.log_amqp_routing_key = NULL; /* force to screen if no other logging method is selected */ - - if (config.debug) Log(LOG_DEBUG, "DEBUG ( %s/%s ): compose_log_json(): JSON object not created due to missing --enable-jansson\n", config.name, config.type); - - return NULL; -} -#endif - void compose_timestamp(char *buf, int buflen, struct timeval *tv, int usec) { char tmpbuf[SRVBUFLEN]; diff --git a/src/util.h b/src/util.h index 9cbf0d5a5..66fe56abd 100644 --- a/src/util.h +++ b/src/util.h @@ -104,7 +104,6 @@ EXT char *compose_json(u_int64_t, u_int64_t, u_int8_t, struct pkt_primitives *, EXT void compose_timestamp(char *, int, struct timeval *, int); EXT void write_and_free_json(FILE *, void *); EXT void write_and_free_json_amqp(void *, void *); -EXT char *compose_log_json(char *); EXT struct packet_ptrs *copy_packet_ptrs(struct packet_ptrs *); EXT void free_packet_ptrs(struct packet_ptrs *);