diff --git a/.gitignore b/.gitignore index 92382280..466563c0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ *# *~ *.o +*.log logjam-device logjam-proxy tester @@ -25,5 +26,7 @@ install-sh missing logjam-httpd logjam-tools-cov.tgz +logjam-graylog-forwarder checker -.DS_Store \ No newline at end of file +.DS_Store +compile diff --git a/Makefile.am b/Makefile.am index 6656f6b4..6e3308c0 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,5 +1,5 @@ AUTOMAKE_OPTIONS = subdir-objects -ACLOCAL_AMFLAGS = ${ACLOCAL_FLAGS} +ACLOCAL_AMFLAGS = ${ACLOCAL_FLAGS} -I m4 AM_CPPFLAGS = $(OPTDIR_CPPFLAGS) $(DEPS_CFLAGS) -D_GNU_SOURCE AM_CFLAGS = --std=gnu99 -Wall -fno-fast-math @@ -9,7 +9,8 @@ LDADD = $(OPTDIR_LDFLAGS) $(DEPS_LIBS) -lm bin_PROGRAMS = \ logjam-device \ logjam-importer \ - logjam-httpd + logjam-httpd \ + logjam-graylog-forwarder noinst_PROGRAMS = \ test_publisher \ @@ -69,6 +70,27 @@ logjam_httpd_SOURCES = \ src/logjam-util.c \ src/logjam-util.h +logjam_graylog_forwarder_SOURCES = \ + src/logjam-graylog-forwarder.c \ + src/graylog-forwarder-common.c \ + src/graylog-forwarder-common.h \ + src/graylog-forwarder-controller.c \ + src/graylog-forwarder-controller.h \ + src/graylog-forwarder-parser.c \ + src/graylog-forwarder-parser.h \ + src/graylog-forwarder-subscriber.c \ + src/graylog-forwarder-subscriber.h \ + src/graylog-forwarder-writer.c \ + src/graylog-forwarder-writer.h \ + src/logjam-util.c \ + src/logjam-util.h \ + src/gelf-message.c \ + src/gelf-message.h \ + src/logjam-message.c \ + src/logjam-message.h \ + src/str-builder.c \ + src/str-builder.h + tester_SOURCES = src/tester.c test_publisher_SOURCES = src/test_publisher.c diff --git a/README.md b/README.md index ad44304c..5010fbc7 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ infrastructure for logjam (see https://github.com/skaes/logjam_app). src="https://scan.coverity.com/projects/3357/badge.svg"/> -Currently three daemons are provided: +Currently the following daemons are provided: ## logjam-device @@ -31,6 +31,12 @@ hasn't crashed once. A daemon which takes frontend performance data via HTTP GET requests and publishes it on on ZeroMQ PUB socket for the importer to pick up. +## logjam-graylog-forwarder + +A daemon which subscribes to PUB sockets of logjam-devices and +forwards GELF messages to a graylog GELF socket endpoint. + + ## Speed On my iMac, one logjam-device can forward 20K messages per second (4K diff --git a/configure.ac b/configure.ac index dd85f3c4..73fb2c7c 100644 --- a/configure.ac +++ b/configure.ac @@ -3,6 +3,8 @@ AC_INIT([logjam-tools], [0.1], [skaes@railsexpress.de], AC_PREREQ([2.59]) AM_INIT_AUTOMAKE([1.10 -Wall no-define foreign]) +AC_CONFIG_MACRO_DIR([m4]) + # Check for pkg-config program, used for configuring some libraries. # @@ -57,6 +59,8 @@ AS_IF([test "x$with_opt_dir" == "x"], AS_IF([test `uname -s` == "Linux"], AC_SUBST([OPT_PTHREAD_FLAGS], ["-pthread"]), AC_SUBST([OPT_PTHREAD_FLAGS])) -AC_CHECK_FUNCS(htonll ntohll) +AC_CHECK_DECLS([htonll, ntohll]) + +AX_CHECK_ZLIB AC_OUTPUT diff --git a/doc/logjam-gelf-mapping.md b/doc/logjam-gelf-mapping.md new file mode 100644 index 00000000..eb163102 --- /dev/null +++ b/doc/logjam-gelf-mapping.md @@ -0,0 +1,71 @@ +Logjam Request GELF Mapping +=========================== + +Logjam Request Message: + + +GELF Specification: + + +Ruby Logger to Syslog Level Mapping: + + +Mapping +------- + +``` +GELF field Logjam field +---------- ------------ + +Standard GELF fields: + +version 1.1 (fixed value) +host host +short_message action +full_message all entries from "lines" array as "severity-as-word timestamp logged-info", concatenated by "\n" +timestamp started_at as seconds since UNIX epoch, with optional decimal places for milliseconds +level highest log level used in "lines" array, mapped to syslog level as described in "Ruby Logger to Syslog Level Mapping" link above + +Additional fields: + +_app app-env -- first frame in logjam-device ZeroMQ message +_total_time total_time +_code code +_caller_id caller_id +_caller_action caller_action +_request_id request_id +_user_id user_id +_ip ip +_process_id process_id +_http_method request_info["method"] +_http_url request_info["url"] +_http_header_* all entries from request_info["headers"] as separate fields (normalize header name/key using lowercase letters and underscores) +``` + +Example +------- + +```json +{ + "version": "1.1", + "host": "jobs-3.api.fra1.xing.com", + "short_message": "Rest::Jobs::PostingsController#show", + "full_message": "INFO 2013-05-24T09:22:33.789958 Started GET \"/rest/jobs/postings/2196851?fields=created_at%2Cfunction%2Clinks&rid=internal\" for 10.4.9.24 at 2013-05-24 09:22:33 +0200\nINFO 2013-05-24T09:22:33.792732 Processing by Rest::Jobs::PostingsController#show as JSON\nINFO 2013-05-24T09:22:33.799331 Completed 404 Not Found in 10.8ms (Views: 0.313ms | ActiveRecord: 1.057ms(1q,0h) | API: 0.000(0) | Dalli: 0.000ms(0r,0m,0w,0c) | GC: 0.403(0) | HP: 0(2498680,2558,114017,591464) | REST: 0.000(0))", + "timestamp": 1369380153, + "level": 6, + "_app": "jobs-production", + "_total_time": 10.83065, + "_code": 404, + "_caller_id": "riaktivities-production-b02e866ec44211e293a70050569ed893", + "_caller_action": "Rest::Activities::Legacy::NewsFeedController#show", + "_request_id": "b3cb8de4c44211e2b166000c29e07312", + "_user_id": 0, + "_ip": "10.4.9.24", + "_process_id": 7057, + "_http_method": "GET", + "_http_url": "/rest/jobs/postings/2196851?fields=created_at%2Cfunction%2Clinks&rid=internal", + "_http_header_user_agent": "XING AG - RestCake/0.10.6", + "_http_header_accept_encoding": "deflate, gzip", + "_http_header_accept": "application/json", +} +``` diff --git a/m4/ax_check_zlib.m4 b/m4/ax_check_zlib.m4 new file mode 100644 index 00000000..ae5705f6 --- /dev/null +++ b/m4/ax_check_zlib.m4 @@ -0,0 +1,142 @@ +# =========================================================================== +# http://www.gnu.org/software/autoconf-archive/ax_check_zlib.html +# =========================================================================== +# +# SYNOPSIS +# +# AX_CHECK_ZLIB([action-if-found], [action-if-not-found]) +# +# DESCRIPTION +# +# This macro searches for an installed zlib library. If nothing was +# specified when calling configure, it searches first in /usr/local and +# then in /usr, /opt/local and /sw. If the --with-zlib=DIR is specified, +# it will try to find it in DIR/include/zlib.h and DIR/lib/libz.a. If +# --without-zlib is specified, the library is not searched at all. +# +# If either the header file (zlib.h) or the library (libz) is not found, +# shell commands 'action-if-not-found' is run. If 'action-if-not-found' is +# not specified, the configuration exits on error, asking for a valid zlib +# installation directory or --without-zlib. +# +# If both header file and library are found, shell commands +# 'action-if-found' is run. If 'action-if-found' is not specified, the +# default action appends '-I${ZLIB_HOME}/include' to CPFLAGS, appends +# '-L$ZLIB_HOME}/lib' to LDFLAGS, prepends '-lz' to LIBS, and calls +# AC_DEFINE(HAVE_LIBZ). You should use autoheader to include a definition +# for this symbol in a config.h file. Sample usage in a C/C++ source is as +# follows: +# +# #ifdef HAVE_LIBZ +# #include +# #endif /* HAVE_LIBZ */ +# +# LICENSE +# +# Copyright (c) 2008 Loic Dachary +# Copyright (c) 2010 Bastien Chevreux +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program. If not, see . +# +# As a special exception, the respective Autoconf Macro's copyright owner +# gives unlimited permission to copy, distribute and modify the configure +# scripts that are the output of Autoconf when processing the Macro. You +# need not follow the terms of the GNU General Public License when using +# or distributing such scripts, even though portions of the text of the +# Macro appear in them. The GNU General Public License (GPL) does govern +# all other use of the material that constitutes the Autoconf Macro. +# +# This special exception to the GPL applies to versions of the Autoconf +# Macro released by the Autoconf Archive. When you make and distribute a +# modified version of the Autoconf Macro, you may extend this special +# exception to the GPL to apply to your modified version as well. + +#serial 14 + +AU_ALIAS([CHECK_ZLIB], [AX_CHECK_ZLIB]) +AC_DEFUN([AX_CHECK_ZLIB], +# +# Handle user hints +# +[AC_MSG_CHECKING(if zlib is wanted) +zlib_places="/usr/local /usr /opt/local /sw" +AC_ARG_WITH([zlib], +[ --with-zlib=DIR root directory path of zlib installation @<:@defaults to + /usr/local or /usr if not found in /usr/local@:>@ + --without-zlib to disable zlib usage completely], +[if test "$withval" != no ; then + AC_MSG_RESULT(yes) + if test -d "$withval" + then + zlib_places="$withval $zlib_places" + else + AC_MSG_WARN([Sorry, $withval does not exist, checking usual places]) + fi +else + zlib_places= + AC_MSG_RESULT(no) +fi], +[AC_MSG_RESULT(yes)]) + +# +# Locate zlib, if wanted +# +if test -n "${zlib_places}" +then + # check the user supplied or any other more or less 'standard' place: + # Most UNIX systems : /usr/local and /usr + # MacPorts / Fink on OSX : /opt/local respectively /sw + for ZLIB_HOME in ${zlib_places} ; do + if test -f "${ZLIB_HOME}/include/zlib.h"; then break; fi + ZLIB_HOME="" + done + + ZLIB_OLD_LDFLAGS=$LDFLAGS + ZLIB_OLD_CPPFLAGS=$CPPFLAGS + if test -n "${ZLIB_HOME}"; then + LDFLAGS="$LDFLAGS -L${ZLIB_HOME}/lib" + CPPFLAGS="$CPPFLAGS -I${ZLIB_HOME}/include" + fi + AC_LANG_SAVE + AC_LANG_C + AC_CHECK_LIB([z], [inflateEnd], [zlib_cv_libz=yes], [zlib_cv_libz=no]) + AC_CHECK_HEADER([zlib.h], [zlib_cv_zlib_h=yes], [zlib_cv_zlib_h=no]) + AC_LANG_RESTORE + if test "$zlib_cv_libz" = "yes" && test "$zlib_cv_zlib_h" = "yes" + then + # + # If both library and header were found, action-if-found + # + m4_ifblank([$1],[ + CPPFLAGS="$CPPFLAGS -I${ZLIB_HOME}/include" + LDFLAGS="$LDFLAGS -L${ZLIB_HOME}/lib" + LIBS="-lz $LIBS" + AC_DEFINE([HAVE_LIBZ], [1], + [Define to 1 if you have `z' library (-lz)]) + ],[ + # Restore variables + LDFLAGS="$ZLIB_OLD_LDFLAGS" + CPPFLAGS="$ZLIB_OLD_CPPFLAGS" + $1 + ]) + else + # + # If either header or library was not found, action-if-not-found + # + m4_default([$2],[ + AC_MSG_ERROR([either specify a valid zlib installation with --with-zlib=DIR or disable zlib usage with --without-zlib]) + ]) + fi +fi +]) diff --git a/src/gelf-message.c b/src/gelf-message.c new file mode 100644 index 00000000..70d952ce --- /dev/null +++ b/src/gelf-message.c @@ -0,0 +1,50 @@ +#include "gelf-message.h" + +gelf_message* gelf_message_new(const char *host, const char *short_message) +{ + json_object *json = json_object_new_object(); + json_object_object_add(json, "version", json_object_new_string("1.1")); + json_object_object_add(json, "host", json_object_new_string(host)); + json_object_object_add(json, "short_message", json_object_new_string(short_message)); + + return (gelf_message *) json; +} + +void gelf_message_add_string(gelf_message *msg, const char *key, const char *value) +{ + json_object *json = (json_object *) msg; + json_object_object_add(json, key, json_object_new_string(value)); +} + +void gelf_message_add_double(gelf_message *msg, const char *key, double value) +{ + json_object *json = (json_object *) msg; + json_object_object_add(json, key, json_object_new_double(value)); +} + +void gelf_message_add_int(gelf_message *msg, const char *key, int value) +{ + json_object *json = (json_object *) msg; + json_object_object_add(json, key, json_object_new_int(value)); +} + +void gelf_message_add_json_object(gelf_message *msg, const char *key, json_object *obj) +{ + json_object *json = (json_object *) msg; + // obj is now part of two container objects, so: increment reference count + json_object_get(obj); + json_object_object_add(json, key, obj); +} + +const char* gelf_message_to_string(const gelf_message *msg) +{ + json_object *json = (json_object *) msg; + return json_object_to_json_string_ext(json, JSON_C_TO_STRING_PLAIN); +} + +void gelf_message_destroy(gelf_message **msg) +{ + json_object *json = (json_object *) *msg; + json_object_put(json); + *msg = NULL; +} diff --git a/src/gelf-message.h b/src/gelf-message.h new file mode 100644 index 00000000..1830ce16 --- /dev/null +++ b/src/gelf-message.h @@ -0,0 +1,26 @@ +#ifndef __GELF_MESSAGE_H_INCLUDED__ +#define __GELF_MESSAGE_H_INCLUDED__ + +#include + +#define gelf_message_add_full_message(m,v) gelf_message_add_string(m, "full_message", v) +#define gelf_message_add_timestamp(m,v) gelf_message_add_double(m, "timestamp", v) +#define gelf_message_add_level(m,v) gelf_message_add_int(m, "level", v) + +typedef json_object gelf_message; + +gelf_message* gelf_message_new(const char *host, const char *short_message); + +void gelf_message_add_string(gelf_message *msg, const char *key, const char *value); + +void gelf_message_add_double(gelf_message *msg, const char *key, double value); + +void gelf_message_add_int(gelf_message *msg, const char *key, int value); + +void gelf_message_add_json_object(gelf_message *msg, const char *key, json_object *obj); + +const char* gelf_message_to_string(const gelf_message *msg); + +void gelf_message_destroy(gelf_message **msg); + +#endif diff --git a/src/graylog-forwarder-common.c b/src/graylog-forwarder-common.c new file mode 100644 index 00000000..cba4cd20 --- /dev/null +++ b/src/graylog-forwarder-common.c @@ -0,0 +1,25 @@ +#include "graylog-forwarder-common.h" + +bool dryrun = false; + +compressed_gelf_t * +compressed_gelf_new(Bytef *data, uLongf len) +{ + compressed_gelf_t *self = zmalloc(sizeof(*self)); + if (self) { + self->data = data; + self->len = len; + } + return self; +} + +void compressed_gelf_destroy(compressed_gelf_t **self_p) +{ + assert (self_p); + if (*self_p) { + compressed_gelf_t *self = *self_p; + free (self->data); + free (self); + *self_p = NULL; + } +} diff --git a/src/graylog-forwarder-common.h b/src/graylog-forwarder-common.h new file mode 100644 index 00000000..8ad1a1d2 --- /dev/null +++ b/src/graylog-forwarder-common.h @@ -0,0 +1,22 @@ +#ifndef __GRAYLOG_FORWARDER_COMMON_H_INCLUDED__ +#define __GRAYLOG_FORWARDER_COMMON_H_INCLUDED__ + +#include + +#include "logjam-util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern bool dryrun; + +typedef struct { + Bytef *data; + uLongf len; +} compressed_gelf_t; + +extern compressed_gelf_t* compressed_gelf_new(Bytef *data, uLongf len); +extern void compressed_gelf_destroy(compressed_gelf_t **self_p); + +#endif diff --git a/src/graylog-forwarder-controller.c b/src/graylog-forwarder-controller.c new file mode 100644 index 00000000..30d506f0 --- /dev/null +++ b/src/graylog-forwarder-controller.c @@ -0,0 +1,105 @@ +#include "graylog-forwarder-controller.h" +#include "graylog-forwarder-subscriber.h" +#include "graylog-forwarder-parser.h" +#include "graylog-forwarder-writer.h" + +/* + * --- PIPE --- subscriber + * controller: --- PIPE --- parsers(NUM_PARSERS) + * --- PIPE --- writer +*/ + +// The controller creates all other threads/actors. + +#define NUM_PARSERS 8 + +typedef struct { + zconfig_t *config; + zactor_t *subscriber; + zactor_t *parsers[NUM_PARSERS]; + zactor_t *writer; +} controller_state_t; + + +static +bool controller_create_actors(controller_state_t *state) +{ + // create subscriber + state->subscriber = zactor_new(graylog_forwarder_subscriber, state->config); + + // create the parsers + for (size_t i=0; iparsers[i] = graylog_forwarder_parser_new(state->config, i); + } + + // create writer + state->writer = zactor_new(graylog_forwarder_writer, state->config); + + return !zsys_interrupted; +} + +static +void controller_destroy_actors(controller_state_t *state) +{ + zactor_destroy(&state->subscriber); + zactor_destroy(&state->writer); + for (size_t i=0; iparsers[i]); + } +} + +static +int send_tick_commands(zloop_t *loop, int timer_id, void *arg) +{ + controller_state_t *state = arg; + + // send tick commands to actors to let them print out their stats + zstr_send(state->subscriber, "tick"); + zstr_send(state->writer, "tick"); + + int rc = zloop_timer(loop, 1000, 1, send_tick_commands, state); + assert(rc != -1); + + return 0; +} + +int graylog_forwarder_run_controller_loop(zconfig_t* config) +{ + set_thread_name("graylog-forwarder-controller"); + + zsys_init(); + + controller_state_t state = {.config = config}; + bool start_up_complete = controller_create_actors(&state); + + if (!start_up_complete) + goto exit; + + // set up event loop + zloop_t *loop = zloop_new(); + assert(loop); + zloop_set_verbose(loop, 0); + + // send tick commands every second + int rc = zloop_timer(loop, 1000, 1, send_tick_commands, &state); + assert(rc != -1); + + if (!zsys_interrupted) { + // run the loop + zloop_start(loop); + printf("[I] graylog-forwarder-controller: shutting down\n"); + } + + // shutdown + zloop_destroy(&loop); + assert(loop == NULL); + + exit: + printf("[I] graylog-forwarder-controller: destroying actor threads\n"); + controller_destroy_actors(&state); + printf("[I] graylog-forwarder-controller: calling zsys_shutdown\n"); + zsys_shutdown(); + + printf("[I] graylog-forwarder-controller: terminated\n"); + return 0; +} diff --git a/src/graylog-forwarder-controller.h b/src/graylog-forwarder-controller.h new file mode 100644 index 00000000..f4f8628f --- /dev/null +++ b/src/graylog-forwarder-controller.h @@ -0,0 +1,16 @@ +#ifndef __LOGJAM_GRAYLOG_FORWARDER_CONTROLLER_H__ +#define __LOGJAM_GRAYLOG_FORWARDER_CONTROLLER_H__ + +#include "graylog-forwarder-common.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern int graylog_forwarder_run_controller_loop(zconfig_t* config); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/graylog-forwarder-parser.c b/src/graylog-forwarder-parser.c new file mode 100644 index 00000000..d09d70b0 --- /dev/null +++ b/src/graylog-forwarder-parser.c @@ -0,0 +1,162 @@ +#include "graylog-forwarder-parser.h" +#include "gelf-message.h" +#include "logjam-message.h" + +typedef struct { + size_t id; + char me[16]; + zconfig_t *config; + zsock_t *pipe; // actor commands + zsock_t *pull_socket; // incoming messages from subscriber + zsock_t *push_socket; // outgoing messages to writer +} parser_state_t; + +static int process_logjam_message(parser_state_t *state) +{ + logjam_message *logjam_msg = logjam_message_read(state->pull_socket); + + if (logjam_msg && !zsys_interrupted) { + // printf("[I] graylog-forwarder-parser [%zu]: process_logjam_message\n", state->id); + + gelf_message *gelf_msg = logjam_message_to_gelf (logjam_msg); + const char *gelf_data = gelf_message_to_string (gelf_msg); + + // printf("[D] GELF Format: %s\n", gelf_data); + + // compress GELF data + const Bytef *raw_data = (Bytef *)gelf_data; + uLong raw_len = strlen(gelf_data); + uLongf compressed_len = compressBound(raw_len); + Bytef *compressed_data = zmalloc(compressed_len); + int rc = compress(compressed_data, &compressed_len, raw_data, raw_len); + assert(rc == Z_OK); + + // printf("[D] GELF bytes uncompressed/compressed: %ld/%ld\n", raw_len, compressed_len); + + compressed_gelf_t *compressed_gelf = compressed_gelf_new(compressed_data, compressed_len); + + zmsg_t *msg = zmsg_new(); + assert(msg); + zmsg_addptr(msg, compressed_gelf); + zmsg_send(&msg, state->push_socket); + + gelf_message_destroy(&gelf_msg); + logjam_message_destroy (&logjam_msg); + + // we don't free gelf_data because it's owned by the json library + } + + return 0; +} + +static +zsock_t* parser_pull_socket_new() +{ + int rc; + zsock_t *socket = zsock_new(ZMQ_PULL); + assert(socket); + // connect socket, taking thread startup time into account + // TODO: this is a hack. better let controller coordinate this + for (int i=0; i<10; i++) { + rc = zsock_connect(socket, "inproc://graylog-forwarder-subscriber"); + if (rc == 0) break; + zclock_sleep(100); + } + log_zmq_error(rc); + assert(rc == 0); + return socket; +} + +static +zsock_t* parser_push_socket_new() +{ + zsock_t *socket = zsock_new(ZMQ_PUSH); + assert(socket); + int rc; + // connect socket, taking thread startup time into account + // TODO: this is a hack. better let controller coordinate this + for (int i=0; i<10; i++) { + rc = zsock_connect(socket, "inproc://graylog-forwarder-writer"); + if (rc == 0) break; + zclock_sleep(100); + } + return socket; +} + +static +parser_state_t* parser_state_new(zconfig_t* config, size_t id) +{ + parser_state_t *state = zmalloc(sizeof(*state)); + state->config = config; + state->id = id; + snprintf(state->me, 16, "parser[%zu]", id); + state->pull_socket = parser_pull_socket_new(); + state->push_socket = parser_push_socket_new(); + return state; +} + +static +void parser_state_destroy(parser_state_t **state_p) +{ + parser_state_t *state = *state_p; + // must not destroy the pipe, as it's owned by the actor + zsock_destroy(&state->pull_socket); + zsock_destroy(&state->push_socket); + free(state); + *state_p = NULL; +} + +static +void parser(zsock_t *pipe, void *args) +{ + parser_state_t *state = (parser_state_t*)args; + state->pipe = pipe; + set_thread_name(state->me); + size_t id = state->id; + + // signal readyiness after sockets have been created + zsock_signal(pipe, 0); + + zpoller_t *poller = zpoller_new(state->pipe, state->pull_socket, NULL); + assert(poller); + + while (!zsys_interrupted) { + // -1 == block until something is readable + void *socket = zpoller_wait(poller, -1); + zmsg_t *msg = NULL; + if (socket == state->pipe) { + msg = zmsg_recv(state->pipe); + char *cmd = zmsg_popstr(msg); + zmsg_destroy(&msg); + if (streq(cmd, "$TERM")) { + fprintf(stderr, "[D] graylog-forwarder-parser [%zu]: received $TERM command\n", id); + free(cmd); + break; + } else { + fprintf(stderr, "[E] graylog-forwarder-parser [%zu]: received unknown command: %s\n", id, cmd); + free(cmd); + assert(false); + } + } else if (socket == state->pull_socket) { + process_logjam_message(state); + } else { + // socket == NULL, probably interrupted by signal handler + break; + } + } + + printf("[I] graylog-forwarder-parser [%zu]: shutting down\n", id); + parser_state_destroy(&state); + printf("[I] graylog-forwarder-parser [%zu]: terminated\n", id); +} + +zactor_t* graylog_forwarder_parser_new(zconfig_t *config, size_t id) +{ + parser_state_t *state = parser_state_new(config, id); + return zactor_new(parser, state); +} + +void graylog_forwarder_parser_destroy(zactor_t **parser_p) +{ + zactor_destroy(parser_p); +} diff --git a/src/graylog-forwarder-parser.h b/src/graylog-forwarder-parser.h new file mode 100644 index 00000000..3dd168f1 --- /dev/null +++ b/src/graylog-forwarder-parser.h @@ -0,0 +1,17 @@ +#ifndef __LOGJAM_GRAYLOG_FORWARDER_PARSER_H_INCLUDED__ +#define __LOGJAM_GRAYLOG_FORWARDER_PARSER_H_INCLUDED__ + +#include "graylog-forwarder-common.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern zactor_t* graylog_forwarder_parser_new(zconfig_t *config, size_t id); +extern void graylog_forwarder_parser_destroy(zactor_t **parser_p); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/graylog-forwarder-subscriber.c b/src/graylog-forwarder-subscriber.c new file mode 100644 index 00000000..58532f6e --- /dev/null +++ b/src/graylog-forwarder-subscriber.c @@ -0,0 +1,159 @@ +#include "graylog-forwarder-subscriber.h" +#include "logjam-message.h" + +// actor state +typedef struct { + zsock_t *pipe; // actor commands + zsock_t *sub_socket; // incoming data from logjam devices + zsock_t *push_socket; // outgoing data for parsers + size_t message_count; // how many messages we have received since last tick +} subscriber_state_t; + +static +zsock_t* subscriber_sub_socket_new(zconfig_t * config) +{ + zsock_t *socket = zsock_new(ZMQ_SUB); + assert(socket); + + // set inbound high-water-mark + int high_water_mark = atoi(zconfig_resolve(config, "/logjam/high_water_mark", "10000")); + printf("[I] graylog-forwarder-subscriber: setting high-water-mark for inbound messages to %d\n", high_water_mark); + zsock_set_rcvhwm(socket, high_water_mark); + + // set subscription + char* logjam_subscription = zconfig_resolve(config, "/logjam/subscription", ""); + printf("[I] graylog-forwarder-subscriber: subscribing to %s\n", logjam_subscription); + zsock_set_subscribe(socket, logjam_subscription); + + // connect socket to endpoints + zconfig_t *endpoints = zconfig_locate(config, "/logjam/endpoints"); + assert(endpoints); + zconfig_t *endpoint = zconfig_child(endpoints); + while (endpoint) { + char *spec = zconfig_value(endpoint); + printf("[I] graylog-forwarder-subscriber: connecting SUB socket to logjam-device via %s\n", spec); + int rc = zsock_connect(socket, "%s", spec); + log_zmq_error(rc); + assert(rc == 0); + endpoint = zconfig_next(endpoint); + } + + return socket; +} + +static +zsock_t* subscriber_push_socket_new(zconfig_t* config) +{ + zsock_t *socket = zsock_new(ZMQ_PUSH); + assert(socket); + int rc = zsock_bind(socket, "inproc://graylog-forwarder-subscriber"); + assert(rc == 0); + return socket; +} + +static +subscriber_state_t* subscriber_state_new(zsock_t *pipe, zconfig_t* config) +{ + subscriber_state_t *state = zmalloc(sizeof(*state)); + state->pipe = pipe; + state->sub_socket = subscriber_sub_socket_new(config); + state->push_socket = subscriber_push_socket_new(config); + return state; +} + +static +void subscriber_state_destroy(subscriber_state_t **state_p) +{ + subscriber_state_t *state = *state_p; + zsock_destroy(&state->sub_socket); + zsock_destroy(&state->push_socket); + *state_p = NULL; +} + +static +int read_request_and_forward(zloop_t *loop, zsock_t *socket, void *callback_data) +{ + subscriber_state_t *state = callback_data; + zmsg_t *msg = zmsg_recv(socket); + + if (msg) { + state->message_count++; + int n = zmsg_size(msg); + if (n < 3 || n > 4) { + fprintf(stderr, "[E] graylog-forwarder-subscriber: dropped invalid message\n"); + my_zmsg_fprint(msg, "[E] FRAME= ", stderr); + zmsg_destroy(&msg); + return 0; + } + if (!output_socket_ready(state->push_socket, 0)) { + fprintf(stderr, "[W] graylog-forwarder-subscriber: push socket not ready. blocking!\n"); + } + if (!zsys_interrupted) { + zmsg_send(&msg, state->push_socket); + } else { + zmsg_destroy(&msg); + } + } + return 0; +} + +static +int actor_command(zloop_t *loop, zsock_t *socket, void *callback_data) +{ + int rc = 0; + subscriber_state_t *state = callback_data; + zmsg_t *msg = zmsg_recv(socket); + if (msg) { + char *cmd = zmsg_popstr(msg); + if (streq(cmd, "$TERM")) { + fprintf(stderr, "[D] graylog-forwarder-subscriber: received $TERM command\n"); + rc = -1; + } else if (streq(cmd, "tick")) { + printf("[I] graylog-forwarder-subscriber: received %zu messages\n", + state->message_count); + state->message_count = 0; + } else { + fprintf(stderr, "[E] graylog-forwarder-subscriber: received unknown actor command: %s\n", cmd); + } + free(cmd); + zmsg_destroy(&msg); + } + return rc; +} + +void graylog_forwarder_subscriber(zsock_t *pipe, void *args) +{ + set_thread_name("graylog-forwarder-subscriber"); + + int rc; + zconfig_t* config = args; + subscriber_state_t *state = subscriber_state_new(pipe, config); + + // signal readyiness after sockets have been created + zsock_signal(pipe, 0); + + // set up event loop + zloop_t *loop = zloop_new(); + assert(loop); + zloop_set_verbose(loop, 0); + + // setup handler for actor messages + rc = zloop_reader(loop, state->pipe, actor_command, state); + assert(rc == 0); + + // setup handler for the sub socket + rc = zloop_reader(loop, state->sub_socket, read_request_and_forward, state); + assert(rc == 0); + + // run the loop + fprintf(stdout, "[I] graylog-forwarder-subscriber: listening\n"); + zloop_start(loop); + fprintf(stdout, "[I] graylog-forwarder-subscriber: shutting down\n"); + + // shutdown + subscriber_state_destroy(&state); + zloop_destroy(&loop); + assert(loop == NULL); + + fprintf(stdout, "[I] graylog-forwarder-subscriber: terminated\n"); +} diff --git a/src/graylog-forwarder-subscriber.h b/src/graylog-forwarder-subscriber.h new file mode 100644 index 00000000..23b8ea76 --- /dev/null +++ b/src/graylog-forwarder-subscriber.h @@ -0,0 +1,16 @@ +#ifndef __LOGJAM_GRAYLOG_FORWARDER_SUBSCRIBER_H_INCLUDED__ +#define __LOGJAM_GRAYLOG_FORWARDER_SUBSCRIBER_H_INCLUDED__ + +#include "graylog-forwarder-common.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern void graylog_forwarder_subscriber(zsock_t *pipe, void *args); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/graylog-forwarder-writer.c b/src/graylog-forwarder-writer.c new file mode 100644 index 00000000..56838bcd --- /dev/null +++ b/src/graylog-forwarder-writer.c @@ -0,0 +1,148 @@ +#include "graylog-forwarder-common.h" +#include "graylog-forwarder-writer.h" +#include "gelf-message.h" + +typedef struct { + zsock_t *pipe; // actor commands + zsock_t *pull_socket; // incoming messages from parsers + zsock_t *push_socket; // outgoing GELF messages to graylog; the GELF ZeroMQ PULL device should connect to this (not bind) + size_t message_count; // how many messages we have sent since last tick +} writer_state_t; + +static void send_graylog_message(zmsg_t* msg, writer_state_t* state) +{ + compressed_gelf_t *compressed_gelf = zmsg_popptr(msg); + assert(compressed_gelf); + + if (dryrun) { + compressed_gelf_destroy(&compressed_gelf); + return; + } + + zmsg_t *out_msg = zmsg_new(); + assert(out_msg); + int rc = zmsg_addmem(out_msg, compressed_gelf->data, compressed_gelf->len); + assert(rc == 0); + + compressed_gelf_destroy(&compressed_gelf); + + if (!output_socket_ready(state->push_socket, 0)) { + fprintf(stderr, "[W] graylog-forwarder-writer: push socket not ready. blocking!\n"); + } + + zmsg_send(&out_msg, state->push_socket); + state->message_count++; +} + +static +zsock_t* writer_pull_socket_new() +{ + zsock_t *socket = zsock_new(ZMQ_PULL); + assert(socket); + int rc = zsock_bind(socket, "inproc://graylog-forwarder-writer"); + assert(rc == 0); + return socket; +} + +static +zsock_t* writer_push_socket_new(zconfig_t* config) +{ + zsock_t *socket = zsock_new(ZMQ_PUSH); + assert(socket); + + char* graylog_endpoint = zconfig_resolve(config, "/graylog/endpoint", NULL); + if (graylog_endpoint == NULL) { + fprintf(stderr, "[E] graylog-forwarder-writer: missing graylog endpoint configuration.\n"); + exit(1); + } + + // bind socket, taking thread startup time into account + // TODO: this is a hack. better let controller coordinate this + for (int i=0; i<10; i++) { + int rc = zsock_bind(socket, "%s", graylog_endpoint); + if (rc != -1) { + printf("[I] graylog-forwarder-writer: binding PUSH socket for graylog to %s\n", graylog_endpoint); + break; + } + zclock_sleep(100); + } + + // set outbound high-water-mark + int high_water_mark = atoi(zconfig_resolve(config, "/graylog/high_water_mark", "10000")); + printf("[I] graylog-forwarder-writer: setting high-water-mark for outbound messages to %d\n", high_water_mark); + zsock_set_sndhwm(socket, high_water_mark); + + return socket; +} + +static +writer_state_t* writer_state_new(zsock_t *pipe, zconfig_t* config) +{ + writer_state_t *state = zmalloc(sizeof(writer_state_t)); + state->pipe = pipe; + state->pull_socket = writer_pull_socket_new(); + state->push_socket = writer_push_socket_new(config); + return state; +} + +static +void writer_state_destroy(writer_state_t **state_p) +{ + writer_state_t *state = *state_p; + // must not destroy the pipe, as it's owned by the actor + zsock_destroy(&state->pull_socket); + zsock_destroy(&state->push_socket); + free(state); + *state_p = NULL; +} + +void graylog_forwarder_writer(zsock_t *pipe, void *args) +{ + set_thread_name("graylog-forwarder-writer[0]"); + + zconfig_t* config = args; + writer_state_t *state = writer_state_new(pipe, config); + // signal readyiness after sockets have been created + zsock_signal(pipe, 0); + + zpoller_t *poller = zpoller_new(state->pipe, state->pull_socket, NULL); + assert(poller); + + while (!zsys_interrupted) { + // printf("[D] writer [%zu]: polling\n", id); + // -1 == block until something is readable + void *socket = zpoller_wait(poller, -1); + zmsg_t *msg = NULL; + if (socket == state->pipe) { + msg = zmsg_recv(state->pipe); + char *cmd = zmsg_popstr(msg); + zmsg_destroy(&msg); + if (streq(cmd, "$TERM")) { + fprintf(stderr, "[D] graylog-forwarder-writer: received $TERM command\n"); + free(cmd); + break; + } + else if (streq(cmd, "tick")) { + printf("[I] graylog-forwarder-writer: sent %zu messages\n", + state->message_count); + state->message_count = 0; + } else { + fprintf(stderr, "[E] graylog-forwarder-writer: received unknown command: %s\n", cmd); + assert(false); + } + } else if (socket == state->pull_socket) { + msg = zmsg_recv(state->pull_socket); + if (msg != NULL) { + send_graylog_message(msg, state); + zmsg_destroy(&msg); + } + } else { + // msg == NULL, probably interrupted by signal handler + break; + } + } + + fprintf(stdout, "[I] graylog-forwarder-writer: shutting down\n"); + writer_state_destroy(&state); + fprintf(stdout, "[I] graylog-forwarder-writer: terminated\n"); +} diff --git a/src/graylog-forwarder-writer.h b/src/graylog-forwarder-writer.h new file mode 100644 index 00000000..0788f9a3 --- /dev/null +++ b/src/graylog-forwarder-writer.h @@ -0,0 +1,16 @@ +#ifndef __LOGJAM_GRAYLOG_FORWARDER_WRITER_H_INCLUDED__ +#define __LOGJAM_GRAYLOG_FORWARDER_WRITER_H_INCLUDED__ + +#include "graylog-forwarder-common.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern void graylog_forwarder_writer(zsock_t *pipe, void *args); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/importer-common.c b/src/importer-common.c index 1e7a421b..e56deb77 100644 --- a/src/importer-common.c +++ b/src/importer-common.c @@ -2,57 +2,6 @@ bool dryrun = false; -void dump_json_object(FILE *f, json_object *jobj) -{ - const char *json_str = json_object_to_json_string_ext(jobj, JSON_C_TO_STRING_PLAIN); - if (f == stderr) - fprintf(f, "[E] %s\n", json_str); - else - fprintf(f, "[I] %s\n", json_str); - // don't try to free the json string. it will crash. -} - -void my_zframe_fprint(zframe_t *self, const char *prefix, FILE *file) -{ - assert (self); - if (prefix) - fprintf (file, "%s", prefix); - byte *data = zframe_data (self); - size_t size = zframe_size (self); - - int is_bin = 0; - uint char_nbr; - for (char_nbr = 0; char_nbr < size; char_nbr++) - if (data [char_nbr] < 9 || data [char_nbr] > 127) - is_bin = 1; - - fprintf (file, "[%03d] ", (int) size); - size_t max_size = is_bin? 2048: 4096; - const char *ellipsis = ""; - if (size > max_size) { - size = max_size; - ellipsis = "..."; - } - for (char_nbr = 0; char_nbr < size; char_nbr++) { - if (is_bin) - fprintf (file, "%02X", (unsigned char) data [char_nbr]); - else - fprintf (file, "%c", data [char_nbr]); - } - fprintf (file, "%s\n", ellipsis); -} - -void my_zmsg_fprint(zmsg_t* self, const char* prefix, FILE* file) -{ - zframe_t *frame = zmsg_first(self); - int frame_nbr = 0; - while (frame && frame_nbr++ < 10) { - my_zframe_fprint(frame, prefix, file); - frame = zmsg_next(self); - } -} - - // utf8 conversion static char UTF8_DOT[4] = {0xE2, 0x80, 0xA4, '\0' }; static char UTF8_CURRENCY[3] = {0xC2, 0xA4, '\0'}; @@ -345,15 +294,3 @@ bool config_update_date_info() bool changed = strcmp(old_date, iso_date_today); return changed; } - -int set_thread_name(const char* name) -{ -#if defined(__linux__) - pthread_t self = pthread_self(); - return pthread_setname_np(self, name); -#elif defined(__APPLE__) - return pthread_setname_np(name); -#else - return 0; -#endif -} diff --git a/src/importer-common.h b/src/importer-common.h index a3a23f69..cc3108f2 100644 --- a/src/importer-common.h +++ b/src/importer-common.h @@ -1,15 +1,6 @@ #ifndef __LOGJAM_IMPORTER_COMMON_H_INCLUDED__ #define __LOGJAM_IMPORTER_COMMON_H_INCLUDED__ -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include "logjam-util.h" @@ -55,25 +46,6 @@ extern char iso_date_today[ISO_DATE_STR_LEN]; extern char iso_date_tomorrow[ISO_DATE_STR_LEN]; extern time_t time_last_tick; -extern void dump_json_object(FILE *f, json_object *jobj); -extern void my_zframe_fprint(zframe_t *self, const char *prefix, FILE *file); -extern void my_zmsg_fprint(zmsg_t* self, const char* prefix, FILE* file); - -static inline int zmsg_addptr(zmsg_t* msg, void* ptr) -{ - return zmsg_addmem(msg, &ptr, sizeof(void*)); -} - -static inline void* zmsg_popptr(zmsg_t* msg) -{ - zframe_t *frame = zmsg_pop(msg); - assert(frame); - assert(zframe_size(frame) == sizeof(void*)); - void *ptr = *((void **) zframe_data(frame)); - zframe_destroy(&frame); - return ptr; -} - extern int replace_dots_and_dollars(char *s); extern int copy_replace_dots_and_dollars(char* buffer, const char *s); extern int uri_replace_dots_and_dollars(char* buffer, const char *s); diff --git a/src/importer-parser.c b/src/importer-parser.c index 4438f9ef..6c73107d 100644 --- a/src/importer-parser.c +++ b/src/importer-parser.c @@ -159,37 +159,6 @@ processor_state_t* processor_create(zframe_t* stream_frame, parser_state_t* pars return p; } - -static -json_object* parse_json_body(zframe_t *body, json_tokener* tokener) -{ - char* json_data = (char*)zframe_data(body); - int json_data_len = (int)zframe_size(body); - json_tokener_reset(tokener); - json_object *jobj = json_tokener_parse_ex(tokener, json_data, json_data_len); - enum json_tokener_error jerr = json_tokener_get_error(tokener); - if (jerr != json_tokener_success) { - fprintf(stderr, "[E] parse_json_body: %s\n", json_tokener_error_desc(jerr)); - } else { - // const char *json_str_orig = zframe_strdup(body); - // printf("[D] %s\n", json_str_orig); - // free(json_str_orig); - // dump_json_object(stdout, jobj); - } - if (tokener->char_offset < json_data_len) // XXX shouldn't access internal fields - { - // Handle extra characters after parsed object as desired. - fprintf(stderr, "[W] parse_json_body: %s\n", "extranoeus data in message payload"); - my_zframe_fprint(body, "[W] MSGBODY=", stderr); - } - // if (strnlen(json_data, json_data_len) < json_data_len) { - // fprintf(stderr, "[W] parse_json_body: json payload has null bytes\ndata: %*s\n", json_data_len, json_data); - // dump_json_object(stdout, jobj); - // return NULL; - // } - return jobj; -} - static void parse_msg_and_forward_interesting_requests(zmsg_t *msg, parser_state_t *parser_state) { diff --git a/src/logjam-graylog-forwarder.c b/src/logjam-graylog-forwarder.c new file mode 100644 index 00000000..679473e2 --- /dev/null +++ b/src/logjam-graylog-forwarder.c @@ -0,0 +1,78 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "graylog-forwarder-common.h" +#include "graylog-forwarder-controller.h" + +// global config +static zconfig_t* config = NULL; +static zfile_t *config_file = NULL; +static char *config_file_name = "logjam.conf"; +static time_t config_file_last_modified = 0; +static char *config_file_digest = ""; + +static void print_usage(char * const *argv) +{ + fprintf(stderr, "usage: %s [-c config-file]\n", argv[0]); +} + +static void process_arguments(int argc, char * const *argv) +{ + char c; + opterr = 0; + while ((c = getopt(argc, argv, "c:n")) != -1) { + switch (c) { + case 'c': + config_file_name = optarg; + break; + case 'n': + dryrun = true; + break; + case '?': + if (optopt == 'c' ) + fprintf(stderr, "option -%c requires an argument.\n", optopt); + else if (isprint (optopt)) + fprintf(stderr, "unknown option `-%c'.\n", optopt); + else + fprintf(stderr, "unknown option character `\\x%x'.\n", optopt); + print_usage(argv); + exit(1); + default: + exit(1); + } + } +} + +static void config_file_init() +{ + config_file = zfile_new(NULL, config_file_name); + config_file_last_modified = zfile_modified(config_file); + config_file_digest = strdup(zfile_digest(config_file)); +} + +int main(int argc, char * const *argv) +{ + process_arguments(argc, argv); + + if (!zsys_file_exists(config_file_name)) { + fprintf(stderr, "[E] missing config file: %s\n", config_file_name); + exit(1); + } + + // load config + if (zsys_file_exists(config_file_name)) { + config_file_init(); + config = zconfig_load((char*)config_file_name); + } + + setvbuf(stdout, NULL, _IOLBF, 0); + setvbuf(stderr, NULL, _IOLBF, 0); + + return graylog_forwarder_run_controller_loop(config); +} diff --git a/src/logjam-message.c b/src/logjam-message.c new file mode 100644 index 00000000..dfef8f93 --- /dev/null +++ b/src/logjam-message.c @@ -0,0 +1,247 @@ +#include +#include +#include "logjam-util.h" +#include "gelf-message.h" +#include "str-builder.h" +#include "logjam-message.h" + +const char *LOG_LEVELS_NAMES[6] = { + "Debug", + "Info", + "Warn", + "Error", + "Fatal", + "Unknown" +}; + +const int SYSLOG_MAPPING[6] = { + 7 /* Debug */, + 6 /* Info */, + 5 /* Notice */, + 4 /* Warning */, + 3 /* Error */, + 1 /* Alert */ +}; + +struct _logjam_message { + zframe_t *frames[4]; + size_t size; +}; + +static inline void str_normalize(char *str) +{ + for (char *p = str; *p; ++p) { + *p = tolower(*p); + if (*p == '-') + *p = '_'; + } +} + +size_t logjam_message_size(logjam_message *msg) +{ + return msg->size; +} + +logjam_message* logjam_message_read(zsock_t *receiver) +{ + int i = 0, end_of_message = 0; + zframe_t *frame = NULL; + logjam_message *msg = (logjam_message *) zmalloc (sizeof (logjam_message)); + msg->size = 0; + + // read the message parts + while (!zsys_interrupted && !end_of_message) { + frame = zframe_recv (receiver); + // zframe_print(frame, "FRAME"); + + if (!zframe_more(frame)) { + end_of_message = 1; + } + + if (i>3) { + zframe_destroy (&frame); + } else { + msg->frames[i] = frame; + msg->size += zframe_size(frame); + } + i++; + } + + int error = 0; + if (i < 4) { + if (!zsys_interrupted) { + fprintf(stderr, "[E] received only %d message parts\n", i); + } + error = 1; + } else if (i > 4) { + fprintf(stderr, "[E] received more than 4 message parts\n"); + error = 1; + } + + if (error) { + for (int j = 0; j < i && j < 4; j++) { + zframe_destroy (&msg->frames[j]); + } + free (msg); + msg = NULL; + } + + return msg; +} + +gelf_message* logjam_message_to_gelf(logjam_message *logjam_msg) +{ + json_object *obj = NULL, *http_request = NULL, *lines = NULL; + const char *host = "Not found", *action = "Not found"; + char *str = NULL; + + // TODO: no need to allocate a new tokener for each message + json_tokener* tokener = json_tokener_new(); + json_object *request = parse_json_body(logjam_msg->frames[2], tokener); + + if (!request) { + json_tokener_free(tokener); + return NULL; + } + + // dump_json_object(stdout, request); + + if (json_object_object_get_ex (request, "host", &obj)) { + host = json_object_get_string (obj); + } + + if (json_object_object_get_ex (request, "action", &obj)) { + action = json_object_get_string (obj); + } + + gelf_message *gelf_msg = gelf_message_new (host, action); + + char *app_env = zframe_strdup (logjam_msg->frames[0]); + gelf_message_add_string (gelf_msg, "_app", app_env); + + double timestamp; + + // use logjam_agent's started_ms if available, current time as fallback + if (json_object_object_get_ex (request, "started_ms", &obj)) { + int64_t started_ms = json_object_get_int64(obj); + timestamp = started_ms / 1000.0; + } else { + timestamp = zclock_time() / 1000.0; + } + + gelf_message_add_double(gelf_msg, "timestamp", timestamp); + + if (json_object_object_get_ex (request, "code", &obj)) { + gelf_message_add_json_object (gelf_msg, "_code", obj); + } + + if (json_object_object_get_ex (request, "caller_id", &obj)) { + gelf_message_add_json_object (gelf_msg, "_caller_id", obj); + } + + if (json_object_object_get_ex (request, "caller_action", &obj)) { + gelf_message_add_json_object (gelf_msg, "_caller_action", obj); + } + + if (json_object_object_get_ex (request, "request_id", &obj)) { + gelf_message_add_json_object (gelf_msg, "_request_id", obj); + } + + if (json_object_object_get_ex (request, "ip", &obj)) { + gelf_message_add_json_object (gelf_msg, "_ip", obj); + } + + if (json_object_object_get_ex (request, "process_id", &obj)) { + gelf_message_add_json_object (gelf_msg, "_process_id", obj); + } + + if (json_object_object_get_ex (request, "user_id", &obj) + && json_object_get_type (obj) != json_type_null) { + gelf_message_add_json_object (gelf_msg, "_user_id", obj); + } + + if (json_object_object_get_ex (request, "total_time", &obj)) { + gelf_message_add_json_object (gelf_msg, "_total_time", obj); + } + + if (json_object_object_get_ex (request, "request_info", &http_request)) { + if (json_object_object_get_ex (http_request, "method", &obj)) { + gelf_message_add_json_object (gelf_msg, "_http_method", obj); + } + + if (json_object_object_get_ex (http_request, "url", &obj)) { + gelf_message_add_json_object (gelf_msg, "_http_url", obj); + } + + if (json_object_object_get_ex (http_request, "headers", &obj)) { + json_type jtype = json_object_get_type (obj); + if (jtype != json_type_object) { + fprintf(stderr, "[W] unexpected json data type for headers: %s; app: %s, action: %s\n", + json_type_to_name(jtype), + app_env, + action + ); + // dump_json_object(stderr, request); + } else { + char header[1024] = "_http_header_"; + json_object_object_foreach (obj, key, value) { + snprintf (header, 1024, "_http_header_%s", key); + str_normalize (header + 13); + gelf_message_add_json_object (gelf_msg, header, value); + } + } + } + } + + int level = 0; // Debug + if (json_object_object_get_ex (request, "severity", &obj)) { + level = json_object_get_int (obj); + } + + if (json_object_object_get_ex (request, "lines", &lines) + && json_object_get_type(lines) == json_type_array) { + int n_lines = json_object_array_length (lines); + + str_builder *sb = sb_new (1024*10); + for (int i = 0; i < n_lines; i++) { + json_object *line = json_object_array_get_idx (lines, i); + if (line && json_object_get_type (line) == json_type_array) { + obj = json_object_array_get_idx (line, 0); + int l = json_object_get_int (obj); + if (l > level) + level = l; + sb_append(sb, LOG_LEVELS_NAMES[l], strlen (LOG_LEVELS_NAMES[l])); + sb_append(sb, " ", 1); + + obj = json_object_array_get_idx (line, 1); + str = (char *) json_object_get_string(obj); + sb_append (sb, str, strlen (str)); + sb_append (sb, " ", 1); + + obj = json_object_array_get_idx (line, 2); + str = (char *) json_object_get_string (obj); + sb_append (sb, str, strlen (str)); + sb_append (sb, "\n", 1); + } + } + gelf_message_add_full_message (gelf_msg, sb_string(sb)); + sb_destroy (&sb); + } + + gelf_message_add_int (gelf_msg, "level", SYSLOG_MAPPING[level]); + + free (app_env); + json_object_put (request); + json_tokener_free (tokener); + + return gelf_msg; +} + +void logjam_message_destroy(logjam_message **msg) +{ + for (int i = 0; i < 4; i++) { + zframe_destroy (&(*msg)->frames[i]); + } + free (*msg); + *msg = NULL; +} diff --git a/src/logjam-message.h b/src/logjam-message.h new file mode 100644 index 00000000..841aa400 --- /dev/null +++ b/src/logjam-message.h @@ -0,0 +1,17 @@ +#ifndef __LOGJAM_MESSAGE_H_INCLUDED__ +#define __LOGJAM_MESSAGE_H_INCLUDED__ + +#include +#include "gelf-message.h" + +typedef struct _logjam_message logjam_message; + +logjam_message* logjam_message_read(zsock_t *receiver); + +gelf_message* logjam_message_to_gelf(logjam_message *logjam_msg); + +size_t logjam_message_size(logjam_message *msg); + +void logjam_message_destroy(logjam_message **msg); + +#endif diff --git a/src/logjam-util.c b/src/logjam-util.c index 049e250a..4063a74b 100644 --- a/src/logjam-util.c +++ b/src/logjam-util.c @@ -10,7 +10,7 @@ bool output_socket_ready(zsock_t *socket, int msecs) return rc != -1 && (items[0].revents & ZMQ_POLLOUT) != 0; } -#ifndef HAVE_HTONLL +#if !HAVE_DECL_HTONLL uint64_t htonll(uint64_t net_number) { uint64_t result = 0; @@ -22,7 +22,7 @@ uint64_t htonll(uint64_t net_number) } #endif -#ifndef HAVE_NTOHLL +#if !HAVE_DECL_NTOHLL uint64_t ntohll(uint64_t native_number) { uint64_t result = 0; @@ -34,6 +34,18 @@ uint64_t ntohll(uint64_t native_number) } #endif +int set_thread_name(const char* name) +{ +#if defined(__linux__) + pthread_t self = pthread_self(); + return pthread_setname_np(self, name); +#elif defined(__APPLE__) + return pthread_setname_np(name); +#else + return 0; +#endif +} + void dump_meta_info(msg_meta_t *meta) { printf("[D] meta(tag%hx version%hu device %u sequence: %" PRIu64 " created: %" PRIu64 ")\n", @@ -110,3 +122,83 @@ int publish_on_zmq_transport(zmq_msg_t *message_parts, void *publisher, msg_meta zmq_msg_close(&meta); return rc; } + + +json_object* parse_json_body(zframe_t *body, json_tokener* tokener) +{ + char* json_data = (char*)zframe_data(body); + int json_data_len = (int)zframe_size(body); + json_tokener_reset(tokener); + json_object *jobj = json_tokener_parse_ex(tokener, json_data, json_data_len); + enum json_tokener_error jerr = json_tokener_get_error(tokener); + if (jerr != json_tokener_success) { + fprintf(stderr, "[E] parse_json_body: %s\n", json_tokener_error_desc(jerr)); + } else { + // const char *json_str_orig = zframe_strdup(body); + // printf("[D] %s\n", json_str_orig); + // free(json_str_orig); + // dump_json_object(stdout, jobj); + } + if (tokener->char_offset < json_data_len) // XXX shouldn't access internal fields + { + // Handle extra characters after parsed object as desired. + fprintf(stderr, "[W] parse_json_body: %s\n", "extranoeus data in message payload"); + my_zframe_fprint(body, "[W] MSGBODY=", stderr); + } + // if (strnlen(json_data, json_data_len) < json_data_len) { + // fprintf(stderr, "[W] parse_json_body: json payload has null bytes\ndata: %*s\n", json_data_len, json_data); + // dump_json_object(stdout, jobj); + // return NULL; + // } + return jobj; +} + +void dump_json_object(FILE *f, json_object *jobj) +{ + const char *json_str = json_object_to_json_string_ext(jobj, JSON_C_TO_STRING_PLAIN); + if (f == stderr) + fprintf(f, "[E] %s\n", json_str); + else + fprintf(f, "[I] %s\n", json_str); + // don't try to free the json string. it will crash. +} + +void my_zframe_fprint(zframe_t *self, const char *prefix, FILE *file) +{ + assert (self); + if (prefix) + fprintf (file, "%s", prefix); + byte *data = zframe_data (self); + size_t size = zframe_size (self); + + int is_bin = 0; + uint char_nbr; + for (char_nbr = 0; char_nbr < size; char_nbr++) + if (data [char_nbr] < 9 || data [char_nbr] > 127) + is_bin = 1; + + fprintf (file, "[%03d] ", (int) size); + size_t max_size = is_bin? 2048: 4096; + const char *ellipsis = ""; + if (size > max_size) { + size = max_size; + ellipsis = "..."; + } + for (char_nbr = 0; char_nbr < size; char_nbr++) { + if (is_bin) + fprintf (file, "%02X", (unsigned char) data [char_nbr]); + else + fprintf (file, "%c", data [char_nbr]); + } + fprintf (file, "%s\n", ellipsis); +} + +void my_zmsg_fprint(zmsg_t* self, const char* prefix, FILE* file) +{ + zframe_t *frame = zmsg_first(self); + int frame_nbr = 0; + while (frame && frame_nbr++ < 10) { + my_zframe_fprint(frame, prefix, file); + frame = zmsg_next(self); + } +} diff --git a/src/logjam-util.h b/src/logjam-util.h index 2809cbe8..17686dcf 100644 --- a/src/logjam-util.h +++ b/src/logjam-util.h @@ -5,7 +5,15 @@ extern "C" { #endif +#include +#include #include +#include +#include +#include +#include +#include +#include #define META_INFO_VERSION 1 #define META_INFO_TAG 0xcabd @@ -20,14 +28,16 @@ typedef struct { uint64_t sequence_number; } msg_meta_t; -#ifndef HAVE_HTONLL +#if !HAVE_DECL_HTONLL extern uint64_t htonll(uint64_t net_number); #endif -#ifndef HAVE_NTOHLL +#if !HAVE_DECL_NTOHLL extern uint64_t ntohll(uint64_t native_number); #endif +extern int set_thread_name(const char* name); + extern void dump_meta_info(msg_meta_t *meta); extern void dump_meta_info_network_format(msg_meta_t *meta); @@ -63,6 +73,14 @@ extern bool output_socket_ready(zsock_t *socket, int msecs); extern int publish_on_zmq_transport(zmq_msg_t *message_parts, void *socket, msg_meta_t *msg_meta); +extern json_object* parse_json_body(zframe_t *body, json_tokener* tokener); + +extern void dump_json_object(FILE *f, json_object *jobj); + +extern void my_zframe_fprint(zframe_t *self, const char *prefix, FILE *file); + +extern void my_zmsg_fprint(zmsg_t* self, const char* prefix, FILE* file); + static inline void log_zmq_error(int rc) { if (rc != 0) { @@ -70,6 +88,21 @@ static inline void log_zmq_error(int rc) } } +static inline int zmsg_addptr(zmsg_t* msg, void* ptr) +{ + return zmsg_addmem(msg, &ptr, sizeof(void*)); +} + +static inline void* zmsg_popptr(zmsg_t* msg) +{ + zframe_t *frame = zmsg_pop(msg); + assert(frame); + assert(zframe_size(frame) == sizeof(void*)); + void *ptr = *((void **) zframe_data(frame)); + zframe_destroy(&frame); + return ptr; +} + #ifdef __cplusplus } #endif diff --git a/src/str-builder.c b/src/str-builder.c new file mode 100644 index 00000000..8b58633c --- /dev/null +++ b/src/str-builder.c @@ -0,0 +1,53 @@ +#include "logjam-util.h" +#include "str-builder.h" + +struct _str_builder { + char *str; + int size; + int pos; +}; + +str_builder* sb_new(size_t size) +{ + str_builder *sb = (str_builder *) zmalloc (sizeof (str_builder)); + assert (sb); + sb->str = (char *) zmalloc(size); + assert (sb->str); + sb->size = size; + sb->pos = 0; + + memset (sb->str, '\0', size); + + return sb; +} + +char *sb_string(str_builder *sb) +{ + return sb->str; +} + +void sb_append(str_builder *sb, const char* str, size_t length) +{ + int new_pos = sb->pos + length; + if (new_pos >= sb->size) { + int size = sb->size; + int new_size = 2*size; + while (new_size <= new_pos) + new_size *= 2; + // printf("[D] increasing string builder size from %d to %d\n", size, new_size); + sb->str = realloc(sb->str, new_size); + assert(sb->str); + memset (sb->str + size, '\0', size); + sb->size = new_size; + } + assert (new_pos < sb->size); + memcpy (sb->str + sb->pos, str, length); + sb->pos = new_pos; +} + +void sb_destroy(str_builder **sb) +{ + free ((*sb)->str); + free (*sb); + *sb = NULL; +} diff --git a/src/str-builder.h b/src/str-builder.h new file mode 100644 index 00000000..200cfb25 --- /dev/null +++ b/src/str-builder.h @@ -0,0 +1,14 @@ +#ifndef __STR_BUILDER_H_INCLUDED__ +#define __STR_BUILDER_H_INCLUDED__ + +typedef struct _str_builder str_builder; + +str_builder* sb_new(size_t size); + +char *sb_string(str_builder *sb); + +void sb_append(str_builder *sb, const char* str, size_t length); + +void sb_destroy(str_builder **sb); + +#endif