Skip to content

Commit

Permalink
mp: new API to convert between msgpack and CFL object
Browse files Browse the repository at this point in the history
Looking forward to improve how do we manipulate msgpack buffers,
we aim to operate on top of the new CFL object structure and
reduce the number of serializations required, plus decrease
the complexity of the logic in certain plugins that requires
several modifcation operations.

This patch implements 2 new API calls and test case for:

- Convert from MessagePack Object (unpacked) to CFL object
- Convert from CFL object to MessagePack buffer (serialized)

In the next patches we will extend Log processors by using this
new interface.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Feb 26, 2024
1 parent 7c120da commit 90e0cdc
Show file tree
Hide file tree
Showing 3 changed files with 418 additions and 3 deletions.
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_mp.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@ void flb_mp_accessor_set_active(struct flb_mp_accessor *mpa, int status);
int flb_mp_accessor_set_active_by_pattern(struct flb_mp_accessor *mpa,
const char *pattern, int status);

struct cfl_object *flb_mp_object_to_cfl(msgpack_object *o);
int flb_mp_cfl_to_msgpack(struct cfl_object *obj, char **out_buf, size_t *out_size);

#endif
319 changes: 318 additions & 1 deletion src/flb_mp.c
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,6 @@ void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa)
{
struct mk_list *tmp;
struct mk_list *head;
struct flb_record_accessor *ra;
struct flb_mp_accessor_ra *mp_ra;

if (!mpa) {
Expand All @@ -697,3 +696,321 @@ void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa)

flb_free(mpa);
}


static int mp_object_to_cfl(void **ptr, msgpack_object *o)
{
int i;
int ret = -1;
struct cfl_array *array;
struct cfl_kvlist *kvlist;
void *var;
msgpack_object key;
msgpack_object val;

switch(o->type) {
case MSGPACK_OBJECT_NIL:
var = cfl_variant_create_from_null();
if (!var) {
return -1;
}
*ptr = var;
ret = CFL_OBJECT_VARIANT;
break;
case MSGPACK_OBJECT_BOOLEAN:
var = cfl_variant_create_from_bool(o->via.boolean);
if (!var) {
return -1;
}
*ptr = var;
ret = CFL_OBJECT_VARIANT;
break;
case MSGPACK_OBJECT_POSITIVE_INTEGER:
var = cfl_variant_create_from_uint64(o->via.u64);
if (!var) {
return -1;
}
*ptr = var;
ret = CFL_OBJECT_VARIANT;
break;
case MSGPACK_OBJECT_NEGATIVE_INTEGER:
var = cfl_variant_create_from_int64(o->via.i64);
if (!var) {
return -1;
}
*ptr = var;
ret = CFL_OBJECT_VARIANT;
break;
case MSGPACK_OBJECT_FLOAT32:
case MSGPACK_OBJECT_FLOAT64:
var = cfl_variant_create_from_double(o->via.f64);
if (!var) {
return -1;
}
*ptr = var;
ret = CFL_OBJECT_VARIANT;
break;
case MSGPACK_OBJECT_STR:
var = cfl_variant_create_from_string_s((char *) o->via.str.ptr,
o->via.str.size);
if (!var) {
return -1;
}
*ptr = var;
ret = CFL_OBJECT_VARIANT;
break;
case MSGPACK_OBJECT_BIN:
var = cfl_variant_create_from_bytes((char *) o->via.str.ptr,
o->via.str.size);
if (!var) {
return -1;
}

*ptr = var;
ret = CFL_OBJECT_VARIANT;
break;
case MSGPACK_OBJECT_EXT:
/* we do not pack extension type content */
*ptr = NULL;
ret = CFL_OBJECT_NONE;
break;
case MSGPACK_OBJECT_ARRAY:
array = cfl_array_create(o->via.map.size);
if (!array) {
return -1;
}
ret = 0;

for (i = 0; i < o->via.map.size; i++) {
ret = mp_object_to_cfl((void *) &var, &o->via.array.ptr[i]);
if (ret == CFL_OBJECT_KVLIST) {
ret = cfl_array_append_kvlist(array, var);
}
else if (ret == CFL_OBJECT_VARIANT) {
ret = cfl_array_append(array, var);
}
else if (ret == CFL_OBJECT_ARRAY) {
ret = cfl_array_append_array(array, var);
}
else {
ret = -1;
break;
}
}

if (ret == -1) {
cfl_array_destroy(array);
return -1;
}

*ptr = array;
ret = CFL_OBJECT_ARRAY;
break;
case MSGPACK_OBJECT_MAP:
kvlist = cfl_kvlist_create();
if (!kvlist) {
return -1;
}

ret = 0;
for (i = 0; i < o->via.map.size; i++) {
key = o->via.map.ptr[i].key;
val = o->via.map.ptr[i].val;

/* force key type to be strin, otherwise just abort */
if (key.type != MSGPACK_OBJECT_STR) {
ret = -1;
break;
}

/* key variant is ready, now we need the value variant */
ret = mp_object_to_cfl((void *) &var, &val);
if (ret == -1) {
break;
}

if (ret == CFL_OBJECT_KVLIST) {
ret = cfl_kvlist_insert_kvlist_s(kvlist,
(char *) key.via.str.ptr, key.via.str.size,
var);
}
else if (ret == CFL_OBJECT_VARIANT) {
ret = cfl_kvlist_insert_s(kvlist,
(char *) key.via.str.ptr, key.via.str.size,
var);
}
else if (ret == CFL_OBJECT_ARRAY) {
ret = cfl_kvlist_insert_array_s(kvlist,
(char *) key.via.str.ptr, key.via.str.size,
var);
}
else {
ret = -1;
break;
}
}

if (ret == -1) {
cfl_kvlist_destroy(kvlist);
return -1;
}

*ptr = kvlist;
ret = CFL_OBJECT_KVLIST;
break;
default:
break;
}

return ret;
}


/* Convert a msgpack object to a cfl_object */
struct cfl_object *flb_mp_object_to_cfl(msgpack_object *o)
{
int ret;
void *out = NULL;
struct cfl_object *obj;

/* For now, only allow to convert to map (kvlist) or array */
if (o->type != MSGPACK_OBJECT_MAP && o->type != MSGPACK_OBJECT_ARRAY) {
return NULL;
}

obj = cfl_object_create();
if (!obj) {
return NULL;
}

ret = mp_object_to_cfl(&out, o);
if (ret < 0) {
cfl_object_destroy(obj);
return NULL;
}

ret = cfl_object_set(obj, ret, out);
if (ret == -1) {
if (ret == CFL_OBJECT_KVLIST) {
cfl_kvlist_destroy(out);
}
else if (ret == CFL_OBJECT_ARRAY) {
cfl_array_destroy(out);
}
cfl_object_destroy(obj);
return NULL;
}

return obj;
}

static int mp_cfl_to_msgpack(struct cfl_variant *var,
msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck)
{
int i;
int ret;
struct cfl_list *head;
struct cfl_kvpair *kv;
struct cfl_kvlist *kvlist;
struct cfl_variant *variant;
struct flb_mp_map_header mh;

switch (var->type) {
case CFL_VARIANT_BOOL:
if (var->data.as_bool) {
msgpack_pack_true(mp_pck);
}
else {
msgpack_pack_false(mp_pck);
}
break;
case CFL_VARIANT_INT:
msgpack_pack_int64(mp_pck, var->data.as_int64);
break;
case CFL_VARIANT_UINT:
msgpack_pack_uint64(mp_pck, var->data.as_uint64);
break;
case CFL_VARIANT_DOUBLE:
msgpack_pack_double(mp_pck, var->data.as_double);
break;
case CFL_VARIANT_NULL:
msgpack_pack_nil(mp_pck);
break;
case CFL_VARIANT_REFERENCE:
/* we don't save references */
break;
case CFL_VARIANT_STRING:
msgpack_pack_str(mp_pck, cfl_sds_len(var->data.as_string));
msgpack_pack_str_body(mp_pck,
var->data.as_string, cfl_sds_len(var->data.as_string));
break;
case CFL_VARIANT_BYTES:
msgpack_pack_bin(mp_pck, cfl_sds_len(var->data.as_bytes));
msgpack_pack_bin_body(mp_pck,
var->data.as_bytes, cfl_sds_len(var->data.as_bytes));
break;
case CFL_VARIANT_ARRAY:
msgpack_pack_array(mp_pck, var->data.as_array->entry_count);
for (i = 0; i < var->data.as_array->entry_count; i++) {
variant = var->data.as_array->entries[i];
ret = mp_cfl_to_msgpack(variant, mp_sbuf, mp_pck);
if (ret == -1) {
return -1;
}
}
break;
case CFL_VARIANT_KVLIST:
kvlist = var->data.as_kvlist;
flb_mp_map_header_init(&mh, mp_pck);
cfl_list_foreach(head, &kvlist->list) {
kv = cfl_list_entry(head, struct cfl_kvpair, _head);

flb_mp_map_header_append(&mh);

/* key */
msgpack_pack_str(mp_pck, cfl_sds_len(kv->key));
msgpack_pack_str_body(mp_pck, kv->key, cfl_sds_len(kv->key));

/* value */
ret = mp_cfl_to_msgpack(kv->val, mp_sbuf, mp_pck);
if (ret == -1) {
return -1;
}
}
flb_mp_map_header_end(&mh);
break;
}

return 0;
}

/* Convert a CFL Object and serialize it content in a msgpack buffer */
int flb_mp_cfl_to_msgpack(struct cfl_object *obj, char **out_buf, size_t *out_size)
{
int ret;
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;

if (!obj) {
return -1;
}

/* unitialized CFL object ? */
if (obj->type == CFL_OBJECT_NONE) {
return -1;
}

/* initialize msgpack buffer */
msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

ret = mp_cfl_to_msgpack(obj->variant, &mp_sbuf, &mp_pck);
if (ret == -1) {
return -1;
}

*out_buf = mp_sbuf.data;
*out_size = mp_sbuf.size;

return 0;
}
Loading

0 comments on commit 90e0cdc

Please sign in to comment.