diff --git a/include/fluent-bit/flb_mp.h b/include/fluent-bit/flb_mp.h index 28a216d82c9..88d249ccab4 100644 --- a/include/fluent-bit/flb_mp.h +++ b/include/fluent-bit/flb_mp.h @@ -85,4 +85,7 @@ void flb_mp_accessor_set_active(struct flb_mp_accessor *mpa, int status); int flb_mp_accessor_set_active_by_pattern(struct flb_mp_accessor *mpa, const char *pattern, int status); +struct cfl_object *flb_mp_object_to_cfl(msgpack_object *o); +int flb_mp_cfl_to_msgpack(struct cfl_object *obj, char **out_buf, size_t *out_size); + #endif diff --git a/src/flb_mp.c b/src/flb_mp.c index 77f53fefe4a..e4ce9dcb60d 100644 --- a/src/flb_mp.c +++ b/src/flb_mp.c @@ -677,7 +677,6 @@ void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa) { struct mk_list *tmp; struct mk_list *head; - struct flb_record_accessor *ra; struct flb_mp_accessor_ra *mp_ra; if (!mpa) { @@ -697,3 +696,321 @@ void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa) flb_free(mpa); } + + +static int mp_object_to_cfl(void **ptr, msgpack_object *o) +{ + int i; + int ret = -1; + struct cfl_array *array; + struct cfl_kvlist *kvlist; + void *var; + msgpack_object key; + msgpack_object val; + + switch(o->type) { + case MSGPACK_OBJECT_NIL: + var = cfl_variant_create_from_null(); + if (!var) { + return -1; + } + *ptr = var; + ret = CFL_OBJECT_VARIANT; + break; + case MSGPACK_OBJECT_BOOLEAN: + var = cfl_variant_create_from_bool(o->via.boolean); + if (!var) { + return -1; + } + *ptr = var; + ret = CFL_OBJECT_VARIANT; + break; + case MSGPACK_OBJECT_POSITIVE_INTEGER: + var = cfl_variant_create_from_uint64(o->via.u64); + if (!var) { + return -1; + } + *ptr = var; + ret = CFL_OBJECT_VARIANT; + break; + case MSGPACK_OBJECT_NEGATIVE_INTEGER: + var = cfl_variant_create_from_int64(o->via.i64); + if (!var) { + return -1; + } + *ptr = var; + ret = CFL_OBJECT_VARIANT; + break; + case MSGPACK_OBJECT_FLOAT32: + case MSGPACK_OBJECT_FLOAT64: + var = cfl_variant_create_from_double(o->via.f64); + if (!var) { + return -1; + } + *ptr = var; + ret = CFL_OBJECT_VARIANT; + break; + case MSGPACK_OBJECT_STR: + var = cfl_variant_create_from_string_s((char *) o->via.str.ptr, + o->via.str.size); + if (!var) { + return -1; + } + *ptr = var; + ret = CFL_OBJECT_VARIANT; + break; + case MSGPACK_OBJECT_BIN: + var = cfl_variant_create_from_bytes((char *) o->via.str.ptr, + o->via.str.size); + if (!var) { + return -1; + } + + *ptr = var; + ret = CFL_OBJECT_VARIANT; + break; + case MSGPACK_OBJECT_EXT: + /* we do not pack extension type content */ + *ptr = NULL; + ret = CFL_OBJECT_NONE; + break; + case MSGPACK_OBJECT_ARRAY: + array = cfl_array_create(o->via.map.size); + if (!array) { + return -1; + } + ret = 0; + + for (i = 0; i < o->via.map.size; i++) { + ret = mp_object_to_cfl((void *) &var, &o->via.array.ptr[i]); + if (ret == CFL_OBJECT_KVLIST) { + ret = cfl_array_append_kvlist(array, var); + } + else if (ret == CFL_OBJECT_VARIANT) { + ret = cfl_array_append(array, var); + } + else if (ret == CFL_OBJECT_ARRAY) { + ret = cfl_array_append_array(array, var); + } + else { + ret = -1; + break; + } + } + + if (ret == -1) { + cfl_array_destroy(array); + return -1; + } + + *ptr = array; + ret = CFL_OBJECT_ARRAY; + break; + case MSGPACK_OBJECT_MAP: + kvlist = cfl_kvlist_create(); + if (!kvlist) { + return -1; + } + + ret = 0; + for (i = 0; i < o->via.map.size; i++) { + key = o->via.map.ptr[i].key; + val = o->via.map.ptr[i].val; + + /* force key type to be strin, otherwise just abort */ + if (key.type != MSGPACK_OBJECT_STR) { + ret = -1; + break; + } + + /* key variant is ready, now we need the value variant */ + ret = mp_object_to_cfl((void *) &var, &val); + if (ret == -1) { + break; + } + + if (ret == CFL_OBJECT_KVLIST) { + ret = cfl_kvlist_insert_kvlist_s(kvlist, + (char *) key.via.str.ptr, key.via.str.size, + var); + } + else if (ret == CFL_OBJECT_VARIANT) { + ret = cfl_kvlist_insert_s(kvlist, + (char *) key.via.str.ptr, key.via.str.size, + var); + } + else if (ret == CFL_OBJECT_ARRAY) { + ret = cfl_kvlist_insert_array_s(kvlist, + (char *) key.via.str.ptr, key.via.str.size, + var); + } + else { + ret = -1; + break; + } + } + + if (ret == -1) { + cfl_kvlist_destroy(kvlist); + return -1; + } + + *ptr = kvlist; + ret = CFL_OBJECT_KVLIST; + break; + default: + break; + } + + return ret; +} + + +/* Convert a msgpack object to a cfl_object */ +struct cfl_object *flb_mp_object_to_cfl(msgpack_object *o) +{ + int ret; + void *out = NULL; + struct cfl_object *obj; + + /* For now, only allow to convert to map (kvlist) or array */ + if (o->type != MSGPACK_OBJECT_MAP && o->type != MSGPACK_OBJECT_ARRAY) { + return NULL; + } + + obj = cfl_object_create(); + if (!obj) { + return NULL; + } + + ret = mp_object_to_cfl(&out, o); + if (ret < 0) { + cfl_object_destroy(obj); + return NULL; + } + + ret = cfl_object_set(obj, ret, out); + if (ret == -1) { + if (ret == CFL_OBJECT_KVLIST) { + cfl_kvlist_destroy(out); + } + else if (ret == CFL_OBJECT_ARRAY) { + cfl_array_destroy(out); + } + cfl_object_destroy(obj); + return NULL; + } + + return obj; +} + +static int mp_cfl_to_msgpack(struct cfl_variant *var, + msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck) +{ + int i; + int ret; + struct cfl_list *head; + struct cfl_kvpair *kv; + struct cfl_kvlist *kvlist; + struct cfl_variant *variant; + struct flb_mp_map_header mh; + + switch (var->type) { + case CFL_VARIANT_BOOL: + if (var->data.as_bool) { + msgpack_pack_true(mp_pck); + } + else { + msgpack_pack_false(mp_pck); + } + break; + case CFL_VARIANT_INT: + msgpack_pack_int64(mp_pck, var->data.as_int64); + break; + case CFL_VARIANT_UINT: + msgpack_pack_uint64(mp_pck, var->data.as_uint64); + break; + case CFL_VARIANT_DOUBLE: + msgpack_pack_double(mp_pck, var->data.as_double); + break; + case CFL_VARIANT_NULL: + msgpack_pack_nil(mp_pck); + break; + case CFL_VARIANT_REFERENCE: + /* we don't save references */ + break; + case CFL_VARIANT_STRING: + msgpack_pack_str(mp_pck, cfl_sds_len(var->data.as_string)); + msgpack_pack_str_body(mp_pck, + var->data.as_string, cfl_sds_len(var->data.as_string)); + break; + case CFL_VARIANT_BYTES: + msgpack_pack_bin(mp_pck, cfl_sds_len(var->data.as_bytes)); + msgpack_pack_bin_body(mp_pck, + var->data.as_bytes, cfl_sds_len(var->data.as_bytes)); + break; + case CFL_VARIANT_ARRAY: + msgpack_pack_array(mp_pck, var->data.as_array->entry_count); + for (i = 0; i < var->data.as_array->entry_count; i++) { + variant = var->data.as_array->entries[i]; + ret = mp_cfl_to_msgpack(variant, mp_sbuf, mp_pck); + if (ret == -1) { + return -1; + } + } + break; + case CFL_VARIANT_KVLIST: + kvlist = var->data.as_kvlist; + flb_mp_map_header_init(&mh, mp_pck); + cfl_list_foreach(head, &kvlist->list) { + kv = cfl_list_entry(head, struct cfl_kvpair, _head); + + flb_mp_map_header_append(&mh); + + /* key */ + msgpack_pack_str(mp_pck, cfl_sds_len(kv->key)); + msgpack_pack_str_body(mp_pck, kv->key, cfl_sds_len(kv->key)); + + /* value */ + ret = mp_cfl_to_msgpack(kv->val, mp_sbuf, mp_pck); + if (ret == -1) { + return -1; + } + } + flb_mp_map_header_end(&mh); + break; + } + + return 0; +} + +/* Convert a CFL Object and serialize it content in a msgpack buffer */ +int flb_mp_cfl_to_msgpack(struct cfl_object *obj, char **out_buf, size_t *out_size) +{ + int ret; + msgpack_sbuffer mp_sbuf; + msgpack_packer mp_pck; + + if (!obj) { + return -1; + } + + /* unitialized CFL object ? */ + if (obj->type == CFL_OBJECT_NONE) { + return -1; + } + + /* initialize msgpack buffer */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + ret = mp_cfl_to_msgpack(obj->variant, &mp_sbuf, &mp_pck); + if (ret == -1) { + return -1; + } + + *out_buf = mp_sbuf.data; + *out_size = mp_sbuf.size; + + return 0; +} diff --git a/tests/internal/mp.c b/tests/internal/mp.c index 5c1e17c224f..6651d8200e0 100644 --- a/tests/internal/mp.c +++ b/tests/internal/mp.c @@ -326,7 +326,7 @@ void remove_subkey_keys(char *list[], int list_size, int index_start) void test_keys_remove_subkey_keys() { char *list[] = {"$kubernetes[2]['annotations']['fluentbit.io/tag']", - "$kubernetes[2]['a']", + "$kubernetes[2]['a']", "$kubernetes"}; char *list2[] = {"$kubernetes[2]['annotations']['fluentbit.io/tag']", "$kubernetes", @@ -334,7 +334,7 @@ void test_keys_remove_subkey_keys() int size = sizeof(list)/sizeof(char*); int i; - + for (i=0; i> original JSON in msgpack:\n"); + flb_pack_print(buf, size); + + /* Unpack msgpack to get a msgpack object, and convert it to CFL */ + /* Unpack the content */ + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, buf, size, &off); + map = result.data; + + obj = flb_mp_object_to_cfl(&map); + TEST_CHECK(obj != NULL); + + /* Print the CFL object*/ + printf("\n>> CFL print:\n"); + cfl_object_print(stdout, obj); + + /* Convert back the CFL Object to a msgpack buffer. */ + ret = flb_mp_cfl_to_msgpack(obj, &out_buf, &out_size); + TEST_CHECK(ret == 0); + + /* Print msgpack */ + + printf("\n>> CFL to msgpack content:\n"); + flb_pack_print(out_buf, out_size); + + /* + * Convert buf (msgpack 1 buffer) to JSON, and compare the strings + * generated by out_buf (msgpack 2 buffer). They must match. + */ + buf1 = flb_msgpack_raw_to_json_sds(buf, size); + buf2 = flb_msgpack_raw_to_json_sds(out_buf, out_size); + + ret = strcmp(buf1, buf2); + printf("\n>> Compare JSON buf1 v/s JSON buf2 (ret=%i):\n", ret); + TEST_CHECK(ret == 0); + printf("[buf1]: %s\n", buf1); + printf("[buf2]: %s\n\n", buf2); + + flb_sds_destroy(buf1); + flb_sds_destroy(buf2); + + msgpack_unpacked_destroy(&result); + flb_free(buf); + flb_free(out_buf); + + cfl_object_destroy(obj); +} + TEST_LIST = { {"count" , test_count}, {"map_header" , test_map_header}, {"accessor_keys_remove" , test_accessor_keys_remove}, {"accessor_keys_remove_subkey_key" , test_keys_remove_subkey_key}, {"accessor_keys_remove_subkey_keys" , test_keys_remove_subkey_keys}, + {"object_to_cfl_to_msgpack" , test_object_to_cfl_to_msgpack}, { 0 } };