Skip to content

Commit

Permalink
add p_stream config and dynamic namespace action
Browse files Browse the repository at this point in the history
Signed-off-by: AdheipSingh <[email protected]>
  • Loading branch information
AdheipSingh committed Dec 4, 2024
1 parent 845697f commit 7c87878
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 61 deletions.
138 changes: 77 additions & 61 deletions plugins/out_parseable/parseable.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ static void cb_parseable_flush(struct flb_event_chunk *event_chunk,
struct flb_http_client *client;
struct flb_connection *u_conn;
flb_sds_t body;
flb_sds_t x_p_stream_value = NULL;
int ret;
size_t b_sent;

Expand All @@ -85,62 +86,83 @@ static void cb_parseable_flush(struct flb_event_chunk *event_chunk,
msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);

/*
* Pack an empty map of original size + 1 because we are appending one
* additional keyval pair later
*/
/* Pack original key-value pairs */
msgpack_pack_map(&pk, p->via.map.size + 1);

/* Pack the original keyval pairs in first */
for (int i = 0; i < p->via.map.size; i++) {
msgpack_pack_object(&pk, p->via.map.ptr[i].key);
msgpack_pack_object(&pk, p->via.map.ptr[i].val);
}

/* Append one more keyval pair to this log */
/* Append one more key-value pair */
msgpack_pack_str_with_body(&pk, "source", 6);
msgpack_pack_str_with_body(&pk, "fluent bit parseable plugin", 25);

/* Convert from msgpack serialization to JSON serialization for sending through HTTP */
/* Convert from msgpack to JSON */
body = flb_msgpack_raw_to_json_sds(sbuf.data, sbuf.size);
flb_plg_info(ctx->ins, "Body content: %s", body);

/* Free up buffer as we don't need it anymore */
msgpack_sbuffer_destroy(&sbuf);
/* Retrieve the namespace_header value from the body (assuming body is a JSON object) */
// Assuming you have a way to parse JSON or MsgPack (like searching by key)
// For simplicity, let's assume "namespace_header" is a key in the JSON object
/* Convert from msgpack serialization to JSON serialization for sending through HTTP */
/* Copy the body to another variable to avoid mutating the original */
flb_sds_t body_copy = flb_sds_create(body);
if (body_copy == NULL) {
flb_plg_error(ctx->ins, "Failed to create a copy of the body");
flb_sds_destroy(body);
return NULL; // Handle the error appropriately
}

/* Retrieve the namespace_name value from the body copy */
flb_sds_t namespace_name = flb_sds_create_size(256); // Dynamic string
if (body_copy != NULL) {
// Search for the "namespace_name" field in the JSON string
char *namespace_name_value = strstr(body_copy, "\"namespace_name\":\"");
if (namespace_name_value != NULL) {
namespace_name_value += strlen("\"namespace_name\":\"");
char *end_quote = strchr(namespace_name_value, '\"');
if (end_quote != NULL) {
*end_quote = '\0'; // Null-terminate the extracted value
namespace_name = flb_sds_printf(&namespace_name, "%s", namespace_name_value);
flb_plg_info(ctx->ins, "Namespace name extracted value: %s", namespace_name_value);
/* Determine the value of the X-P-Stream header */
if (ctx->p_stream && strcmp(ctx->p_stream, "$NAMESPACE") == 0) {
/* Extract namespace_name from the body */
flb_sds_t body_copy = flb_sds_create(body);
if (body_copy == NULL) {
flb_plg_error(ctx->ins, "Failed to create a copy of the body");
flb_sds_destroy(body);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

flb_sds_t namespace_name = flb_sds_create_size(256); // Dynamic string
if (body_copy != NULL) {
char *namespace_name_value = strstr(body_copy, "\"namespace_name\":\"");
if (namespace_name_value != NULL) {
namespace_name_value += strlen("\"namespace_name\":\"");
char *end_quote = strchr(namespace_name_value, '\"');
if (end_quote != NULL) {
*end_quote = '\0'; // Null-terminate the extracted value
namespace_name = flb_sds_printf(&namespace_name, "%s", namespace_name_value);
flb_plg_info(ctx->ins, "Namespace name extracted value: %s", namespace_name_value);
}
}
}
}
flb_sds_destroy(body_copy);

if (!namespace_name || flb_sds_len(namespace_name) == 0) {
flb_plg_error(ctx->ins, "Failed to extract namespace_name from the body");
flb_sds_destroy(body);
flb_sds_destroy(namespace_name);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

x_p_stream_value = namespace_name;
}
else if (ctx->p_stream) {
/* Use the user-specified stream directly */
x_p_stream_value = flb_sds_create(ctx->p_stream);
if (!x_p_stream_value) {
flb_plg_error(ctx->ins, "Failed to set X-P-Stream header to the specified stream: %s", ctx->p_stream);
flb_sds_destroy(body);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}
}
else {
flb_plg_error(ctx->ins, "P_Stream is not set. Cannot determine the value for X-P-Stream.");
flb_sds_destroy(body);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

/* Get upstream connection */
u_conn = flb_upstream_conn_get(ctx->upstream);
if (!u_conn) {
flb_plg_error(ctx->ins, "connection initialization error");
flb_sds_destroy(body);
flb_sds_destroy(x_p_stream_value);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}
Expand All @@ -155,55 +177,34 @@ static void cb_parseable_flush(struct flb_event_chunk *event_chunk,
if (!client) {
flb_plg_error(ctx->ins, "could not create HTTP client");
flb_sds_destroy(body);
flb_sds_destroy(x_p_stream_value);
flb_upstream_conn_release(u_conn);
msgpack_unpacked_destroy(&result);
FLB_OUTPUT_RETURN(FLB_ERROR);
}

// Handle cases where namespace_name is empty
if (flb_sds_len(namespace_name) == 0) {
namespace_name = flb_sds_cat(namespace_name, "default-stream", 13);
flb_plg_info(ctx->ins, "Namespace name not found, using default: %s", namespace_name);
}

flb_plg_info(ctx->ins, "Namespace name to be passed is: %s", namespace_name);

// Add namespace_name to the HTTP header
/* Add HTTP headers */
flb_http_add_header(client, "Content-Type", 12, "application/json", 16);
flb_plg_info(ctx->ins, "Adding Header: Content-Type: application/json");

flb_http_add_header(client, "X-P-Stream", 10, namespace_name, flb_sds_len(namespace_name));
flb_plg_info(ctx->ins, "Adding Header: X-P-Stream: %s", namespace_name);

flb_http_basic_auth(client, "admin", "admin");
flb_plg_info(ctx->ins, "Adding Header: Authorization: Basic <hidden>");

flb_http_add_header(client, "X-P-Stream", 10, x_p_stream_value, flb_sds_len(x_p_stream_value));
flb_http_basic_auth(client, ctx->p_username, ctx->p_password);

/* Perform request */
ret = flb_http_do(client, &b_sent);
flb_plg_info(ctx->ins, "HTTP request http_do=%i, HTTP Status: %i",
ret, client->resp.status);

/* Log the HTTP request details */
flb_plg_info(ctx->ins, "HTTP Request:");
flb_plg_info(ctx->ins, " Method: POST");
flb_plg_info(ctx->ins, " URL: %s", client->uri);
flb_plg_info(ctx->ins, " Host: %s:%d", ctx->p_server, ctx->p_port);
flb_plg_info(ctx->ins, " Payload: %s", body ? body : "(null)");


/* Free up resources */

/* Clean up resources */
flb_sds_destroy(body);
flb_sds_destroy(x_p_stream_value);
flb_http_client_destroy(client);
flb_upstream_conn_release(u_conn);
flb_sds_destroy(namespace_name);

}
msgpack_unpacked_destroy(&result);

FLB_OUTPUT_RETURN(FLB_OK);
}


static int cb_parseable_exit(void *data, struct flb_config *config)
{
struct flb_out_parseable *ctx = data;
Expand All @@ -227,6 +228,21 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_out_parseable, p_server),
"The host of the server to send logs to."
},
{
FLB_CONFIG_MAP_STR, "P_Username", NULL,
0, FLB_TRUE, offsetof(struct flb_out_parseable, p_username),
"The parseable server username."
},
{
FLB_CONFIG_MAP_STR, "P_Password", NULL,
0, FLB_TRUE, offsetof(struct flb_out_parseable, p_password),
"The parseable server password."
},
{
FLB_CONFIG_MAP_STR, "P_Stream", NULL,
0, FLB_TRUE, offsetof(struct flb_out_parseable, p_stream),
"The stream name to send logs to. using $NAMESPACE will dynamically create namespace."
},
{
FLB_CONFIG_MAP_INT, "P_Port", 0,
0, FLB_TRUE, offsetof(struct flb_out_parseable, p_port),
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_parseable/parseable.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
struct flb_out_parseable {
flb_sds_t p_server;
int p_port;
flb_sds_t p_username;
flb_sds_t p_password;
flb_sds_t p_stream;
struct flb_upstream *upstream;
struct flb_output_instance *ins;
};
Expand Down

0 comments on commit 7c87878

Please sign in to comment.