Skip to content

Commit

Permalink
in_elasticsearch: refer the plugin provided tag
Browse files Browse the repository at this point in the history
This PR makes in_elasticsearch honor the tag setting.

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 authored May 12, 2023
1 parent 46ce5a1 commit 03c879f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 24 deletions.
39 changes: 23 additions & 16 deletions plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -704,26 +704,33 @@ int in_elasticsearch_bulk_prot_handle(struct flb_in_elasticsearch *ctx,
uri[diff] = '\0';
}

/* Compose the query string using the URI */
len = strlen(uri);

if (len == 1) {
tag = NULL; /* use default tag */
}
else {
tag = flb_sds_create_size(len);
if (!tag) {
mk_mem_free(uri);
/* Refer the tag at first*/
if (ctx->ins->tag && !ctx->ins->tag_default) {
tag = flb_sds_create(ctx->ins->tag);
if (tag == NULL) {
return -1;
}
}
else {
/* Compose the query string using the URI */
len = strlen(uri);

/* New tag skipping the URI '/' */
flb_sds_cat(tag, uri + 1, len - 1);
if (len == 1) {
tag = NULL; /* use default tag */
}
else {
/* New tag skipping the URI '/' */
tag = flb_sds_create_len(&uri[1], len - 1);
if (!tag) {
mk_mem_free(uri);
return -1;
}

/* Sanitize, only allow alphanum chars */
for (i = 0; i < flb_sds_len(tag); i++) {
if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') {
tag[i] = '_';
/* Sanitize, only allow alphanum chars */
for (i = 0; i < flb_sds_len(tag); i++) {
if (!isalnum(tag[i]) && tag[i] != '_' && tag[i] != '.') {
tag[i] = '_';
}
}
}
}
Expand Down
37 changes: 29 additions & 8 deletions tests/runtime/in_elasticsearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ void flb_test_in_elasticsearch_version()
test_ctx_destroy(ctx);
}

void flb_test_in_elasticsearch(char *write_op, int port)
void flb_test_in_elasticsearch(char *write_op, int port, char *tag)
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
Expand Down Expand Up @@ -314,12 +314,27 @@ void flb_test_in_elasticsearch(char *write_op, int port)
"port", sport,
NULL);
TEST_CHECK(ret == 0);
if (tag != NULL) {
ret = flb_input_set(ctx->flb, ctx->i_ffd,
"tag", tag,
NULL);
TEST_CHECK(ret == 0);
}

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "*",
"format", "json",
NULL);
TEST_CHECK(ret == 0);
if (tag != NULL) {
ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", tag,
"format", "json",
NULL);
TEST_CHECK(ret == 0);
}
else {
ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "*",
"format", "json",
NULL);
TEST_CHECK(ret == 0);
}

/* Start the engine */
ret = flb_start(ctx->flb);
Expand Down Expand Up @@ -363,12 +378,12 @@ void flb_test_in_elasticsearch(char *write_op, int port)

void flb_test_in_elasticsearch_index_op()
{
flb_test_in_elasticsearch("index", 9202);
flb_test_in_elasticsearch("index", 9202, NULL);
}

void flb_test_in_elasticsearch_create_op()
{
flb_test_in_elasticsearch("create", 9203);
flb_test_in_elasticsearch("create", 9203, NULL);
}

void flb_test_in_elasticsearch_invalid(char *write_op, int status, char *expected_op, int port)
Expand Down Expand Up @@ -793,6 +808,11 @@ void flb_test_in_elasticsearch_tag_key()
test_ctx_destroy(ctx);
}

void flb_test_in_elasticsearch_index_op_with_plugin_tag()
{
flb_test_in_elasticsearch("index", 9210, "es.index");
}

TEST_LIST = {
{"version", flb_test_in_elasticsearch_version},
{"index_op", flb_test_in_elasticsearch_index_op},
Expand All @@ -802,6 +822,7 @@ TEST_LIST = {
{"nonexistent_op", flb_test_in_elasticsearch_nonexistent_op},
{"multi_ops", flb_test_in_elasticsearch_multi_ops},
{"multi_ops_gzip", flb_test_in_elasticsearch_multi_ops_gzip},
{"index_op_with_plugin_tag", flb_test_in_elasticsearch_index_op_with_plugin_tag},
{"node_info", flb_test_in_elasticsearch_node_info},
{"tag_key", flb_test_in_elasticsearch_tag_key},
{NULL, NULL}
Expand Down

0 comments on commit 03c879f

Please sign in to comment.