diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index c4fa507a609..6cea47572ef 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -608,6 +608,107 @@ static ssize_t parse_payload_msgpack(struct flb_http *ctx, flb_sds_t tag, return 0; } +static ssize_t parse_payload_msgpack_ng(flb_sds_t tag, + struct flb_http_request *request) +{ + int ret = FLB_EVENT_ENCODER_SUCCESS; + struct flb_time tm; + size_t offset = 0; + msgpack_unpacked result; + msgpack_object *record; + msgpack_object *metadata; + msgpack_object *data; + struct flb_http *ctx; + flb_sds_t tag_from_record = NULL; + char *payload; + size_t size; + + ctx = (struct flb_http *) request->stream->user_data; + payload = (char *) request->body; + size = cfl_sds_len(request->body); + + msgpack_unpacked_init(&result); + + while (ret == FLB_EVENT_ENCODER_SUCCESS && + msgpack_unpack_next(&result, payload, size, &offset) == MSGPACK_UNPACK_SUCCESS) { + + if (result.data.type != MSGPACK_OBJECT_ARRAY) { + msgpack_unpacked_destroy(&result); + return -1; + } + + record = &result.data; + metadata = &record->via.array.ptr[0]; + data = &record->via.array.ptr[1]; + + if (ctx->tag_key) { + tag_from_record = tag_key(ctx, data); + } + + ret = flb_log_event_encoder_begin_record(&ctx->log_encoder); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = flb_time_msgpack_to_time(&tm, &metadata->via.array.ptr[0]); + + if (ret == -1) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = flb_log_event_encoder_set_timestamp( + &ctx->log_encoder, + &tm); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = flb_log_event_encoder_set_body_from_msgpack_object(&ctx->log_encoder, data); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + msgpack_unpacked_destroy(&result); + return -1; + } + + if (tag_from_record) { + ret = flb_input_log_append(ctx->ins, tag_from_record, + flb_sds_len(tag_from_record), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + else if (tag) { + ret = flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + else { + ret = flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder.output_buffer, + ctx->log_encoder.output_length); + } + + if (ret != 0) { + msgpack_unpacked_destroy(&result); + return -1; + } + + flb_log_event_encoder_reset(&ctx->log_encoder); + } + + msgpack_unpacked_destroy(&result); + return 0; +} + static int process_payload(struct flb_http *ctx, struct http_conn *conn, flb_sds_t tag, struct mk_http_session *session, @@ -1159,8 +1260,8 @@ static ssize_t parse_payload_urlencoded_ng(flb_sds_t tag, } static int process_payload_ng(flb_sds_t tag, - struct flb_http_request *request, - struct flb_http_response *response) + struct flb_http_request *request, + struct flb_http_response *response) { int type; @@ -1179,6 +1280,10 @@ static int process_payload_ng(flb_sds_t tag, type = HTTP_CONTENT_URLENCODED; } + if (strcasecmp(request->content_type, "application/msgpack") == 0) { + type = HTTP_CONTENT_MSGPACK; + } + if (type == -1) { send_response_ng(response, 400, "error: invalid 'Content-Type'\n"); return -1; @@ -1195,6 +1300,10 @@ static int process_payload_ng(flb_sds_t tag, } else if (type == HTTP_CONTENT_URLENCODED) { parse_payload_urlencoded_ng(tag, request); } + else if (type == HTTP_CONTENT_MSGPACK) { + parse_payload_msgpack_ng(tag, request); + } + return 0; } diff --git a/tests/runtime/in_http.c b/tests/runtime/in_http.c index 69db455352e..13a62157de2 100644 --- a/tests/runtime/in_http.c +++ b/tests/runtime/in_http.c @@ -356,6 +356,78 @@ void flb_test_msgpack_legacy() test_ctx_destroy(ctx); } +void flb_test_msgpack() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + char buf[] = "\xdd\x00\x00\x00\x02\xdd\x00\x00" + "\x00\x02\xd7\x00\x65\xd3\x9c\x63" + "\x19\x36\xb8\xd5\x80\x81\xa7\x6d" + "\x65\x73\x73\x61\x67\x65\xa5\x64" + "\x75\x6d\x6d\x79\xbe"; + + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"message\":\"dummy\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, sizeof(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + MSGPACK_CONTENT_TYPE, strlen(MSGPACK_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 201)) { + TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + void flb_test_http_successful_response_code(char *response_code) { struct flb_lib_out_cb cb_data; @@ -515,6 +587,7 @@ void flb_test_http_tag_key() TEST_LIST = { {"http", flb_test_http}, {"msgpack_legacy", flb_test_msgpack_legacy}, + {"msgpack", flb_test_msgpack}, {"successful_response_code_200", flb_test_http_successful_response_code_200}, {"successful_response_code_204", flb_test_http_successful_response_code_204}, {"tag_key", flb_test_http_tag_key},