Skip to content

Commit

Permalink
Add the yyjson library
Browse files Browse the repository at this point in the history
The existing JSON generation code uses sprintf and allocates quite a bit.
The open source yyjson library, via limiting allocation, copying, and
specialized type pretty-printing code, claims to be able to hit gigabytes-
per-second rates both with encode and decode.

Switching to this code in the storage plugin critical section more than
doubles the per-process workload we can handle at our site.
  • Loading branch information
evanmcc committed Jan 29, 2025
1 parent 2902a3c commit dcfc34f
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 4 deletions.
5 changes: 5 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ AS_IF([test "x$HAVE_LIBJANSSON" = xno],
[AC_MSG_ERROR([libjansson or jansson.h not found])])
AM_CONDITIONAL([HAVE_LIBJANSSON], [test "x$HAVE_LIBJANSSON" = xyes])

AC_LIB_HAVE_LINKFLAGS([yyjson], [], [#include <yyjson.h>])
AS_IF([test "x$HAVE_LIBYYJSON" = xno],
[AC_MSG_ERROR([libyyjson or yyjson.h not found])])
AM_CONDITIONAL([HAVE_LIBYYJSON], [test "x$HAVE_LIBYYJSON" = xyes])

dnl change sharedstatedir default
test "$sharedstatedir" = '${prefix}/com' && sharedstatedir='${prefix}/var/lib'

Expand Down
2 changes: 1 addition & 1 deletion ldms/src/ldmsd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ldmsd_SOURCES = ldmsd.c ldmsd_config.c \
ldmsd_LDADD = ../core/libldms.la libldmsd_request.la libldmsd_stream.la \
$(LZAP) $(LMMALLOC) $(LOVIS_UTIL) $(LCOLL) $(LJSON_UTIL) $(LTLIBJANSSON) \
$(LOVIS_EVENT) $(LOVIS_EV) -lpthread $(LOVIS_CTRL) -lm -ldl \
$(LOVIS_LOG)
$(LOVIS_LOG) $(LTLIBYYJSON)
ldmsd_CFLAGS = $(AM_CFLAGS)
ldmsd_LDFLAGS = $(AM_LDFLAGS) -rdynamic -pthread

Expand Down
22 changes: 22 additions & 0 deletions ldms/src/ldmsd/ldmsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,28 @@ int ldmsd_row_to_json_array(ldmsd_row_t row, char **str, int *len);
*/
int ldmsd_row_to_json_object(ldmsd_row_t row, char **str, int *len);

/**
* Create a JSON text object from an ldmsd_row_t using the yyjson
* library.
*
* The user is responsible for freeing the allocated memory
* returned in \c str.
*
* The output format is in the form of JSON object as follows
* \code
* { "COL_1_NAME":COL_1_VAL, "COL_2_NAME":COL_2_VAL, ...,
* "COL_N_NAME":COL_N_VAL }
* \endcode
*
* \param row The row handle.
* \param [out] str The output C string containing JSON object for the \c row.
* \param [out] len The strlen() of \c *str.
*
* \retval 0 If succeded.
* \retval errno If there is an error.
*/
int ldmsd_row_to_json_object_jansson(ldmsd_row_t row, char **str, int *len);

/**
* Create an Avro schema definition from an ldmsd_row_t
*
Expand Down
135 changes: 135 additions & 0 deletions ldms/src/ldmsd/ldmsd_decomp.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include <openssl/sha.h>

#include <jansson.h>
#include <yyjson.h>
#include "coll/rbt.h"

#include "ldmsd.h"
Expand Down Expand Up @@ -712,6 +713,140 @@ int ldmsd_row_to_json_object(ldmsd_row_t row, char **str, int *len)
return rc;
}

bool yyjson_add_by_type(yyjson_mut_doc *doc, yyjson_mut_val *root, ldmsd_col_t col)
{
int rc, i;
char buf[2];
yyjson_mut_val *arr = NULL;

switch (col->type) {
case LDMS_V_S8:
return yyjson_mut_obj_add_int(doc, root, col->name, col->mval->v_s8);
break;
case LDMS_V_U8:
return yyjson_mut_obj_add_int(doc, root, col->name, col->mval->v_u8);
break;
case LDMS_V_S16:
return yyjson_mut_obj_add_int(doc, root, col->name, col->mval->v_s16);
break;
case LDMS_V_U16:
return yyjson_mut_obj_add_int(doc, root, col->name, col->mval->v_u16);
break;
case LDMS_V_S32:
return yyjson_mut_obj_add_int(doc, root, col->name, col->mval->v_s32);
break;
case LDMS_V_U32:
return yyjson_mut_obj_add_int(doc, root, col->name, col->mval->v_u32);
break;
case LDMS_V_S64:
return yyjson_mut_obj_add_int(doc, root, col->name, col->mval->v_u64);
break;
case LDMS_V_U64:
return yyjson_mut_obj_add_int(doc, root, col->name, col->mval->v_u64);
break;
case LDMS_V_F32:
return yyjson_mut_obj_add_real(doc, root, col->name, col->mval->v_f);
break;
case LDMS_V_D64:
return yyjson_mut_obj_add_real(doc, root, col->name, col->mval->v_d);
break;
case LDMS_V_CHAR:
buf[0] = col->mval->v_char;
buf[1] = '\0';
return yyjson_mut_obj_add_strcpy(doc, root, col->name, buf);
break;
case LDMS_V_CHAR_ARRAY:
return yyjson_mut_obj_add_strcpy(doc, root, col->name, (char *)col->mval->a_char);
break;
case LDMS_V_S8_ARRAY:
arr = yyjson_mut_arr_with_sint8(doc, col->mval->a_s8, col->array_len);
return yyjson_mut_obj_add_val(doc, root, col->name, arr);
break;
case LDMS_V_U8_ARRAY:
arr = yyjson_mut_arr_with_sint16(doc, col->mval->a_s16, col->array_len);
return yyjson_mut_obj_add_val(doc, root, col->name, arr);
break;
case LDMS_V_S16_ARRAY:
arr = yyjson_mut_arr_with_sint32(doc, col->mval->a_s32, col->array_len);
return yyjson_mut_obj_add_val(doc, root, col->name, arr);
break;
case LDMS_V_U16_ARRAY:
arr = yyjson_mut_arr_with_sint64(doc, col->mval->a_s64, col->array_len);
return yyjson_mut_obj_add_val(doc, root, col->name, arr);
break;
case LDMS_V_S32_ARRAY:
arr = yyjson_mut_arr_with_uint8(doc, col->mval->a_u8, col->array_len);
return yyjson_mut_obj_add_val(doc, root, col->name, arr);
break;
case LDMS_V_U32_ARRAY:
arr = yyjson_mut_arr_with_uint16(doc, col->mval->a_u16, col->array_len);
return yyjson_mut_obj_add_val(doc, root, col->name, arr);
break;
case LDMS_V_S64_ARRAY:
arr = yyjson_mut_arr_with_uint32(doc, col->mval->a_u32, col->array_len);
return yyjson_mut_obj_add_val(doc, root, col->name, arr);
break;
case LDMS_V_U64_ARRAY:
arr = yyjson_mut_arr_with_uint64(doc, col->mval->a_u64, col->array_len);
return yyjson_mut_obj_add_val(doc, root, col->name, arr);
break;
case LDMS_V_F32_ARRAY:
arr = yyjson_mut_arr_with_float(doc, col->mval->a_f, col->array_len);
return yyjson_mut_obj_add_val(doc, root, col->name, arr);
break;
case LDMS_V_D64_ARRAY:
arr = yyjson_mut_arr_with_double(doc, col->mval->a_d, col->array_len);
return yyjson_mut_obj_add_val(doc, root, col->name, arr);
break;
case LDMS_V_TIMESTAMP:
char *ts_buf = alloca(128);
snprintf(ts_buf, 128, "%u.%06u", col->mval->v_ts.sec,
col->mval->v_ts.usec);
return yyjson_mut_obj_add_strcpy(doc, root, col->name, ts_buf);
break;
case LDMS_V_LAST+1:
goto err;
break;
}

err:
return NULL;
}

int ldmsd_row_to_json_object_yyjson(ldmsd_row_t row, char **out_str, int *out_len)
{
ldmsd_col_t col;
int i, rc = -2;
size_t size = 0;

yyjson_mut_doc *doc = yyjson_mut_doc_new(NULL);
yyjson_mut_val *root = yyjson_mut_obj(doc);
yyjson_mut_doc_set_root(doc, root);
yyjson_mut_val * val = NULL;

if(doc == NULL || root == NULL) {
goto err_0;
}
for (i = 0; i < row->col_count; i++) {
col = &row->cols[i];
bool good = yyjson_add_by_type(doc, root, col);
if (!good) {
rc = -5;
goto err_0;
}
}
char *json = yyjson_mut_write(doc, 0, &size);
if (size == 0) {
return -3;
}
*out_str = json;
*out_len = size;
yyjson_mut_doc_free(doc);
return 0;
err_0:
return rc;
}

static const char *col_type_str(enum ldms_value_type type)
{
static char *type_str[] = {
Expand Down
17 changes: 14 additions & 3 deletions ldms/src/store/avro_kafka/store_avro_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ typedef struct aks_handle_s
serdes_t *serdes; /* The serdes handle */
enum {
AKS_ENCODING_JSON = 1,
AKS_ENCODING_AVRO = 2
AKS_ENCODING_AVRO = 2,
AKS_ENCODING_YYJSON = 3
} encoding;
char *topic_fmt; /* Format to use to create topic name from row */
char *topic_name;
Expand All @@ -60,7 +61,7 @@ typedef struct aks_handle_s
static const char *_help_str =
" config name=store_avro_kafka [path=JSON_FILE]\n"
" encoding=MODE"
" MODE is one of JSON or AVRO (default)."
" MODE is one of JSON or YYJSON or AVRO (default)."
" kafka_conf=PATH"
" Path to a file in Apache Kafka format containing key/value\n"
" pairs defining Kafka configuration properties. See\n"
Expand Down Expand Up @@ -372,6 +373,8 @@ static int config(struct ldmsd_plugin *self, struct attr_value_list *kwl,
g_serdes_encoding = AKS_ENCODING_AVRO;
} else if (0 == strcasecmp(encoding, "json")) {
g_serdes_encoding = AKS_ENCODING_JSON;
} else if (0 == strcasecmp(encoding, "yyjson")) {
g_serdes_encoding = AKS_ENCODING_YYJSON;
} else {
LOG_ERROR("Ignoring unrecognized serialization encoding '%s'\n", encoding);
}
Expand Down Expand Up @@ -887,7 +890,8 @@ static char *get_topic_name(aks_handle_t sh, ldms_set_t set, ldmsd_row_t row)
topic = str_cat_s(str, row->schema_name);
break;
case 'F': /* Format */
if (sh->encoding == AKS_ENCODING_JSON)
if (sh->encoding == AKS_ENCODING_JSON ||
sh->encoding == AKS_ENCODING_YYJSON)
topic = str_cat_s(str, "json");
else
topic = str_cat_s(str, "avro");
Expand Down Expand Up @@ -1033,6 +1037,13 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list,
}
ser_buf_size = (size_t)ser_size;
break;
case AKS_ENCODING_YYJSON:
/* Encode row as a JSON text object using yyjson*/
rc = ldmsd_row_to_json_object_yyjson(row, (char **)&ser_buf, &ser_size);
if (rc < 0) {
LOG_ERROR("Failed to serialize row as YYJSON object, error: %d", rc);
continue;
}
default:
assert(0 == "Invalid/unsupported serialization encoding");
}
Expand Down

0 comments on commit dcfc34f

Please sign in to comment.