diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index 7dfa1df6fdf..c3b729ce8f1 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -37,6 +37,16 @@ #include #include +// Glob support +#ifndef _MSC_VER +#include +#endif + +#ifdef _WIN32 +#include +#include +#define PATH_MAX MAX_PATH +#endif #define CALYPTIA_H_PROJECT "X-Project-Token" #define CALYPTIA_H_CTYPE "Content-Type" @@ -79,9 +89,9 @@ struct flb_in_calyptia_fleet_config { flb_sds_t cloud_port; flb_sds_t fleet_url; + flb_sds_t fleet_files_url; struct flb_input_instance *ins; /* plugin instance */ - struct flb_config *config; /* Fluent Bit context */ /* Networking */ struct flb_upstream *u; @@ -89,6 +99,36 @@ struct flb_in_calyptia_fleet_config { int collect_fd; }; +static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + const char *url, + time_t timestamp); + +static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx); + +#ifndef FLB_SYSTEM_WINDOWS + +static int is_link(const char *path) { + struct stat st = { 0 }; + + if (lstat(path, &st) != 0) { + return -1; + } + + if ((st.st_mode & S_IFMT) == S_IFLNK) { + return FLB_TRUE; + } + + return FLB_FALSE; +} +#else +/* symlinks are too difficult to use on win32 so we skip their use entirely. */ +static int is_link(const char *path) { + return FLB_FALSE; +} +#endif + + static char *find_case_header(struct flb_http_client *cli, const char *header) { char *ptr; @@ -119,7 +159,7 @@ static char *find_case_header(struct flb_http_client *cli, const char *header) if (strncasecmp(ptr, header, strlen(header)) == 0) { if (ptr[strlen(header)] == ':' && ptr[strlen(header)+1] == ' ') { - return ptr; + return ptr; } } } @@ -184,41 +224,53 @@ struct reload_ctx { flb_sds_t cfg_path; }; -static flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, char *fname) +static flb_sds_t generate_base_fleet_directory(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t *fleet_dir) { - flb_sds_t cfgname; + if (fleet_dir == NULL) { + return NULL; + } - cfgname = flb_sds_create_size(4096); + if (*fleet_dir == NULL) { + *fleet_dir = flb_sds_create_size(4096); + if (*fleet_dir == NULL) { + return NULL; + } + } if (ctx->fleet_name != NULL) { - flb_sds_printf(&cfgname, - "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s.ini", - ctx->config_dir, ctx->machine_id, ctx->fleet_name, fname); - } - else { - flb_sds_printf(&cfgname, - "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s.ini", - ctx->config_dir, ctx->machine_id, ctx->fleet_id, fname); + return flb_sds_printf(fleet_dir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s", + ctx->config_dir, ctx->machine_id, ctx->fleet_name); } - - return cfgname; + return flb_sds_printf(fleet_dir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s", + ctx->config_dir, ctx->machine_id, ctx->fleet_id); } -static flb_sds_t new_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx) +static flb_sds_t fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, char *fname) { - return fleet_config_filename(ctx, "new"); -} + flb_sds_t cfgname = NULL; + flb_sds_t ret; -static flb_sds_t cur_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx) -{ - return fleet_config_filename(ctx, "cur"); -} + if (ctx == NULL || fname == NULL) { + return NULL; + } -static flb_sds_t old_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx) -{ - return fleet_config_filename(ctx, "old"); + if (generate_base_fleet_directory(ctx, &cfgname) == NULL) { + return NULL; + } + + ret = flb_sds_printf(&cfgname, PATH_SEPARATOR "%s.conf", fname); + if (ret == NULL) { + flb_sds_destroy(cfgname); + return NULL; + } + + return cfgname; } +#define new_fleet_config_filename(a) fleet_config_filename((a), "new") +#define cur_fleet_config_filename(a) fleet_config_filename((a), "cur") +#define old_fleet_config_filename(a) fleet_config_filename((a), "old") + static flb_sds_t time_fleet_config_filename(struct flb_in_calyptia_fleet_config *ctx, time_t t) { char s_last_modified[32]; @@ -233,11 +285,19 @@ static int is_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct int ret = FLB_FALSE; + if (cfg == NULL) { + return FLB_FALSE; + } + if (cfg->conf_path_file == NULL) { return FLB_FALSE; } cfgnewname = new_fleet_config_filename(ctx); + if (cfgnewname == NULL) { + flb_plg_error(ctx->ins, "unable to allocate configuration name"); + return FLB_FALSE; + } if (strcmp(cfgnewname, cfg->conf_path_file) == 0) { ret = FLB_TRUE; @@ -253,12 +313,19 @@ static int is_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_sds_t cfgcurname; int ret = FLB_FALSE; + if (cfg == NULL) { + return FLB_FALSE; + } if (cfg->conf_path_file == NULL) { return FLB_FALSE; } cfgcurname = cur_fleet_config_filename(ctx); + if (cfgcurname == NULL) { + flb_plg_error(ctx->ins, "unable to allocate configuration name"); + return FLB_FALSE; + } if (strcmp(cfgcurname, cfg->conf_path_file) == 0) { ret = FLB_TRUE; @@ -304,6 +371,10 @@ static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, char *end; long val; + if (cfg == NULL) { + return FLB_FALSE; + } + if (cfg->conf_path_file == NULL) { return FLB_FALSE; } @@ -325,7 +396,7 @@ static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, return FLB_FALSE; } - if (strcmp(end, ".ini") == 0) { + if (strcmp(end, ".conf") == 0) { return FLB_TRUE; } @@ -334,6 +405,10 @@ static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg) { + if (cfg == NULL) { + return FLB_FALSE; + } + if (cfg->conf_path_file == NULL) { return FLB_FALSE; } @@ -351,6 +426,11 @@ static int exists_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx) cfgnewname = new_fleet_config_filename(ctx); + if (cfgnewname == NULL) { + flb_plg_error(ctx->ins, "unable to allocate configuration name"); + return FLB_FALSE; + } + ret = access(cfgnewname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE; flb_sds_destroy(cfgnewname); @@ -364,16 +444,43 @@ static int exists_cur_fleet_config(struct flb_in_calyptia_fleet_config *ctx) cfgcurname = cur_fleet_config_filename(ctx); + if (cfgcurname == NULL) { + flb_plg_error(ctx->ins, "unable to allocate configuration name"); + return FLB_FALSE; + } + ret = access(cfgcurname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE; flb_sds_destroy(cfgcurname); return ret; } +static int exists_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx) +{ + flb_sds_t cfgoldname; + int ret = FLB_FALSE; + + + cfgoldname = old_fleet_config_filename(ctx); + if (cfgoldname == NULL) { + flb_plg_error(ctx->ins, "unable to allocate configuration name"); + return FLB_FALSE; + } + + ret = access(cfgoldname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE; + + flb_sds_destroy(cfgoldname); + return ret; +} + static void *do_reload(void *data) { struct reload_ctx *reload = (struct reload_ctx *)data; + if (reload == NULL) { + return NULL; + } + /* avoid reloading the current configuration... just use our new one! */ flb_context_set(reload->flb); reload->flb->config->enable_hot_reload = FLB_TRUE; @@ -384,28 +491,37 @@ static void *do_reload(void *data) #ifndef FLB_SYSTEM_WINDOWS kill(getpid(), SIGHUP); #else - GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0); + GenerateConsoleCtrlEvent(1 /* CTRL_BREAK_EVENT_1 */, 0); #endif return NULL; } -static int test_config_is_valid(flb_sds_t cfgpath) +static int test_config_is_valid(struct flb_in_calyptia_fleet_config *ctx, + flb_sds_t cfgpath) { struct flb_cf *conf; int ret = FLB_FALSE; + if (cfgpath == NULL) { + return FLB_FALSE; + } conf = flb_cf_create(); if (conf == NULL) { + flb_plg_debug(ctx->ins, "unable to create conf during validation test: %s", + cfgpath); goto config_init_error; - } + } conf = flb_cf_create_from_file(conf, cfgpath); if (conf == NULL) { + flb_plg_debug(ctx->ins, + "unable to create conf from file during validation test: %s", + cfgpath); goto cf_create_from_file_error; - } + } ret = FLB_TRUE; @@ -415,6 +531,71 @@ static int test_config_is_valid(flb_sds_t cfgpath) return ret; } +static int parse_config_name_timestamp(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgpath, + long *config_timestamp) +{ + char *ext = NULL; + long timestamp; + char realname[4096] = {0}; + char *fname; + ssize_t len; + + if (ctx == NULL || config_timestamp == NULL || cfgpath == NULL) { + return FLB_FALSE; + } + + switch (is_link(cfgpath)) { + case FLB_TRUE: + len = readlink(cfgpath, realname, sizeof(realname)); + + if (len > sizeof(realname)) { + return FLB_FALSE; + } + break; + case FLB_FALSE: + strncpy(realname, cfgpath, sizeof(realname)-1); + break; + default: + flb_errno(); + return FLB_FALSE; + } + + fname = basename(realname); + + flb_plg_debug(ctx->ins, "parsing configuration timestamp from path: %s", fname); + + errno = 0; + timestamp = strtol(fname, &ext, 10); + + if ((errno == ERANGE && (timestamp == LONG_MAX || timestamp == LONG_MIN)) || + (errno != 0 && timestamp == 0)) { + flb_errno(); + return FLB_FALSE; + } + + /* unable to parse the timstamp */ + if (errno == ERANGE) { + return FLB_FALSE; + } + + *config_timestamp = timestamp; + + return FLB_TRUE; +} + +static int parse_config_timestamp(struct flb_in_calyptia_fleet_config *ctx, + long *config_timestamp) +{ + flb_ctx_t *flb_ctx = flb_context_get(); + + if (ctx == NULL || config_timestamp == NULL) { + return FLB_FALSE; + } + + return parse_config_name_timestamp(ctx, flb_ctx->config->conf_path_file, config_timestamp); +} + static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cfgpath) { struct reload_ctx *reload; @@ -422,6 +603,14 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf pthread_attr_t ptha; flb_ctx_t *flb = flb_context_get(); + if (parse_config_name_timestamp(ctx, cfgpath, &ctx->config_timestamp) != FLB_TRUE) { + return FLB_FALSE; + } + + reload = flb_calloc(1, sizeof(struct reload_ctx)); + reload->flb = flb; + reload->cfg_path = cfgpath; + if (ctx->collect_fd > 0) { flb_input_collector_pause(ctx->collect_fd, ctx->ins); } @@ -441,9 +630,9 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf * otherwise flb_reload errors out with: * [error] [reload] given flb context is NULL */ - flb_plg_info(ctx->ins, "loading configuration from %s.", cfgpath); + flb_plg_info(ctx->ins, "loading configuration from %s.", reload->cfg_path); - if (test_config_is_valid(cfgpath) == FLB_FALSE) { + if (test_config_is_valid(ctx, reload->cfg_path) == FLB_FALSE) { flb_plg_error(ctx->ins, "unable to load configuration."); if (ctx->collect_fd > 0) { @@ -454,9 +643,10 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf return FLB_FALSE; } - reload = flb_calloc(1, sizeof(struct reload_ctx)); - reload->flb = flb; - reload->cfg_path = cfgpath; + if (fleet_cur_chdir(ctx) == -1) { + flb_errno(); + flb_plg_error(ctx->ins, "unable to change to configuration directory"); + } pthread_attr_init(&ptha); pthread_attr_setdetachstate(&ptha, PTHREAD_CREATE_DETACHED); @@ -474,6 +664,53 @@ static char *tls_setting_string(int use_tls) return "Off"; } +static msgpack_object *msgpack_lookup_map_key(msgpack_object *obj, const char *keyname) +{ + int idx; + msgpack_object_kv *cur; + msgpack_object_str *key; + + if (obj == NULL || keyname == NULL) { + return NULL; + } + + if (obj->type != MSGPACK_OBJECT_MAP) { + return NULL; + } + + for (idx = 0; idx < obj->via.map.size; idx++) { + cur = &obj->via.map.ptr[idx]; + if (cur->key.type != MSGPACK_OBJECT_STR) { + continue; + } + + key = &cur->key.via.str; + + if (strncmp(key->ptr, keyname, key->size) == 0) { + return &cur->val; + } + } + + return NULL; +} + +static msgpack_object *msgpack_lookup_array_offset(msgpack_object *obj, size_t offset) +{ + if (obj == NULL) { + return NULL; + } + + if (obj->type != MSGPACK_OBJECT_ARRAY) { + return NULL; + } + + if (obj->via.array.size <= offset) { + return NULL; + } + + return &obj->via.array.ptr[offset]; +} + static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx, char *payload, size_t size) { @@ -483,10 +720,12 @@ static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx, struct flb_pack_state pack_state; size_t off = 0; msgpack_unpacked result; - msgpack_object_kv *cur; - msgpack_object_str *key; - flb_sds_t project_id; - int idx = 0; + msgpack_object *projectID; + flb_sds_t project_id = NULL; + + if (ctx == NULL || payload == NULL) { + return NULL; + } /* Initialize packer */ flb_pack_state_init(&pack_state); @@ -497,49 +736,36 @@ static flb_sds_t parse_api_key_json(struct flb_in_calyptia_fleet_config *ctx, flb_pack_state_reset(&pack_state); /* Handle exceptions */ - if (ret == FLB_ERR_JSON_PART) { - flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping"); - return NULL; - } - else if (ret == FLB_ERR_JSON_INVAL) { + if (ret == FLB_ERR_JSON_PART || ret == FLB_ERR_JSON_INVAL || ret == -1) { flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); return NULL; } - else if (ret == -1) { - return NULL; - } msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) { + projectID = msgpack_lookup_map_key(&result.data, "ProjectID"); - if (result.data.type == MSGPACK_OBJECT_MAP) { - for (idx = 0; idx < result.data.via.map.size; idx++) { - cur = &result.data.via.map.ptr[idx]; - key = &cur->key.via.str; - - if (strncmp(key->ptr, "ProjectID", key->size) == 0) { - - if (cur->val.type != MSGPACK_OBJECT_STR) { - flb_plg_error(ctx->ins, "unable to find fleet by name"); - msgpack_unpacked_destroy(&result); - return NULL; - } - - project_id = flb_sds_create_len(cur->val.via.str.ptr, - cur->val.via.str.size); - msgpack_unpacked_destroy(&result); - flb_free(pack); + if (projectID == NULL) { + flb_plg_error(ctx->ins, "unable to find fleet by name"); + msgpack_unpacked_destroy(&result); + return NULL; + } - return project_id; - } - } + if (projectID->type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "invalid fleet ID"); + msgpack_unpacked_destroy(&result); + return NULL; } + + project_id = flb_sds_create_len(projectID->via.str.ptr, + projectID->via.str.size); + break; } msgpack_unpacked_destroy(&result); flb_free(pack); - return NULL; + return project_id; } static ssize_t parse_fleet_search_json(struct flb_in_calyptia_fleet_config *ctx, @@ -551,10 +777,12 @@ static ssize_t parse_fleet_search_json(struct flb_in_calyptia_fleet_config *ctx, struct flb_pack_state pack_state; size_t off = 0; msgpack_unpacked result; - msgpack_object_array *results; - msgpack_object_kv *cur; - msgpack_object_str *key; - int idx = 0; + msgpack_object *map; + msgpack_object *fleet; + + if (ctx == NULL || payload == NULL) { + return -1; + } /* Initialize packer */ flb_pack_state_init(&pack_state); @@ -565,48 +793,31 @@ static ssize_t parse_fleet_search_json(struct flb_in_calyptia_fleet_config *ctx, flb_pack_state_reset(&pack_state); /* Handle exceptions */ - if (ret == FLB_ERR_JSON_PART) { - flb_plg_warn(ctx->ins, "JSON data is incomplete, skipping"); - return -1; - } - else if (ret == FLB_ERR_JSON_INVAL) { + if (ret == FLB_ERR_JSON_PART || ret == FLB_ERR_JSON_INVAL || ret == -1) { flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); return -1; } - else if (ret == -1) { - return -1; - } msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) { + map = msgpack_lookup_array_offset(&result.data, 0); + if (map == NULL) { + break; + } - if (result.data.type == MSGPACK_OBJECT_ARRAY) { - results = &result.data.via.array; - - if (results->ptr[0].type == MSGPACK_OBJECT_MAP) { - - for (idx = 0; idx < results->ptr[0].via.map.size; idx++) { - cur = &results->ptr[0].via.map.ptr[idx]; - key = &cur->key.via.str; - - if (strncasecmp(key->ptr, "id", key->size) == 0) { - - if (cur->val.type != MSGPACK_OBJECT_STR) { - flb_plg_error(ctx->ins, "unable to find fleet by name"); - msgpack_unpacked_destroy(&result); - return -1; - } + fleet = msgpack_lookup_map_key(map, "ID"); + if (fleet == NULL) { + flb_plg_error(ctx->ins, "unable to find fleet by name"); + break; + } - ctx->fleet_id_found = 1; - ctx->fleet_id = flb_sds_create_len(cur->val.via.str.ptr, - cur->val.via.str.size); - break; - } - break; - } - break; - } + if (fleet->type != MSGPACK_OBJECT_STR) { + flb_plg_error(ctx->ins, "unable to find fleet by name"); + break; } + + ctx->fleet_id = flb_sds_create_len(fleet->via.str.ptr, fleet->via.str.size); + break; } msgpack_unpacked_destroy(&result); @@ -619,25 +830,23 @@ static ssize_t parse_fleet_search_json(struct flb_in_calyptia_fleet_config *ctx, return 0; } -static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ctx, - struct flb_connection *u_conn, - struct flb_config *config) +static flb_sds_t get_project_id_from_api_key(struct flb_in_calyptia_fleet_config *ctx) { - struct flb_http_client *client; - flb_sds_t url; - flb_sds_t project_id; - unsigned char token[512] = {0}; unsigned char encoded[256]; - size_t elen; - size_t tlen; + unsigned char token[512] = {0}; char *api_token_sep; - size_t b_sent; + size_t tlen; + size_t elen; int ret; + if (ctx == NULL) { + return NULL; + } + api_token_sep = strchr(ctx->api_key, '.'); if (api_token_sep == NULL) { - return -1; + return NULL; } elen = api_token_sep-ctx->api_key; @@ -645,35 +854,40 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct if (elen > sizeof(encoded)) { flb_plg_error(ctx->ins, "API Token is too large"); - return -1; + return NULL; } memset(encoded, '=', sizeof(encoded)); memcpy(encoded, ctx->api_key, api_token_sep-ctx->api_key); ret = flb_base64_decode(token, sizeof(token)-1, &tlen, - encoded, elen); + encoded, elen); if (ret != 0) { - return ret; + return NULL; } - project_id = parse_api_key_json(ctx, (char *)token, tlen); + return parse_api_key_json(ctx, (char *)token, tlen); +} - if (project_id == NULL) { - return -1; - } +static struct flb_http_client *fleet_http_do(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + flb_sds_t url) +{ + struct flb_http_client *client; + size_t b_sent; + int ret = -1; - url = flb_sds_create_size(4096); - flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s", - project_id, ctx->fleet_name); + if (ctx == NULL || u_conn == NULL || url == NULL) { + return NULL; + } client = flb_http_client(u_conn, FLB_HTTP_GET, url, NULL, 0, ctx->ins->host.name, ctx->ins->host.port, NULL, 0); if (!client) { flb_plg_error(ctx->ins, "unable to create http client"); - return -1; + goto http_client_error; } flb_http_buffer_size(client, 8192); @@ -686,64 +900,71 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct if (ret != 0) { flb_plg_error(ctx->ins, "http do error"); - flb_http_client_destroy(client); - return -1; + goto http_do_error; } if (client->resp.status != 200) { flb_plg_error(ctx->ins, "search http status code error: %d", client->resp.status); - flb_http_client_destroy(client); - return -1; + goto http_do_error; } if (client->resp.payload_size <= 0) { flb_plg_error(ctx->ins, "empty response"); flb_http_client_destroy(client); - return -1; + goto http_do_error; } - if (parse_fleet_search_json(ctx, client->resp.payload, client->resp.payload_size) == -1) { - flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name); - flb_http_client_destroy(client); + return client; + +http_do_error: + flb_http_client_destroy(client); +http_client_error: + return NULL; +} + +static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + struct flb_config *config) +{ + struct flb_http_client *client; + flb_sds_t url; + flb_sds_t project_id; + + if (ctx == NULL || u_conn == NULL || config == NULL) { return -1; } - if (ctx->fleet_id == NULL) { - flb_http_client_destroy(client); + project_id = get_project_id_from_api_key(ctx); + + if (project_id == NULL) { return -1; } - flb_http_client_destroy(client); - return 0; -} - -#ifdef FLB_SYSTEM_WINDOWS -#define link(a, b) CreateHardLinkA(b, a, 0) + url = flb_sds_create_size(4096); + flb_sds_printf(&url, "/v1/search?project_id=%s&resource=fleet&term=%s", + project_id, ctx->fleet_name); -ssize_t readlink(const char *path, char *realpath, size_t srealpath) { - HANDLE hFile; - DWORD ret; + client = fleet_http_do(ctx, u_conn, url); - hFile = CreateFile(path, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, NULL); + if (!client) { + return -1; + } - if (hFile == INVALID_HANDLE_VALUE) { + if (parse_fleet_search_json(ctx, client->resp.payload, client->resp.payload_size) == -1) { + flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name); + flb_http_client_destroy(client); return -1; } - ret = GetFinalPathNameByHandleA(hFile, realpath, srealpath, VOLUME_NAME_NT); + flb_http_client_destroy(client); - if (ret < srealpath) { - CloseHandle(hFile); + if (ctx->fleet_id == NULL) { return -1; } - CloseHandle(hFile); - return ret; + return 0; } -#endif - #ifdef FLB_SYSTEM_WINDOWS #define _mkdir(a, b) mkdir(a) #else @@ -779,82 +1000,721 @@ static int __mkdir(const char *dir, int perms) { for (ptr = tmp + 3; *ptr; ptr++) { #endif - if (*ptr == PATH_SEPARATOR[0]) { - *ptr = 0; - if (access(tmp, F_OK) != 0) { - ret = _mkdir(tmp, perms); - if (ret != 0) { - return ret; - } - } - *ptr = PATH_SEPARATOR[0]; - } + if (*ptr == PATH_SEPARATOR[0]) { + *ptr = 0; + if (access(tmp, F_OK) != 0) { + ret = _mkdir(tmp, perms); + if (ret != 0) { + return ret; + } + } + *ptr = PATH_SEPARATOR[0]; + } + } + + return _mkdir(tmp, perms); +} + +static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + flb_sds_t url, + const char *hdr, + const char *dst, + time_t *time_last_modified) +{ + struct flb_http_client *client; + size_t len; + FILE *fp; + int ret = -1; + const char *fbit_last_modified; + struct flb_tm tm_last_modified = { 0 }; + int fbit_last_modified_len; + time_t last_modified; + flb_sds_t fname; + + if (ctx == NULL || u_conn == NULL || url == NULL) { + return -1; + } + + client = fleet_http_do(ctx, u_conn, url); + + if (client == NULL) { + return -1; + } + + ret = case_header_lookup(client, "Last-modified", strlen("Last-modified"), + &fbit_last_modified, &fbit_last_modified_len); + + if (ret < 0) { + goto client_error; + } + + if (dst == NULL) { + flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified); + last_modified = mktime(&tm_last_modified.tm); + + fname = time_fleet_config_filename(ctx, last_modified); + } else { + fname = flb_sds_create_len(dst, strlen(dst)); + } + + if (access(fname, F_OK) == 0) { + ret = 0; + goto file_exists; + } + + if (fname == NULL) { + goto client_error; + } + + fp = fopen(fname, "w+"); + + if (fp == NULL) { + goto client_error; + } + + if (hdr != NULL) { + len = fwrite(hdr, strlen(hdr), 1, fp); + if (len < 1) { + flb_plg_error(ctx->ins, "truncated write: %s", dst); + goto file_error; + } + } + + len = fwrite(client->resp.payload, client->resp.payload_size, 1, fp); + if (len < 1) { + flb_plg_error(ctx->ins, "truncated write: %s", dst); + goto file_error; + } + + if (time_last_modified) { + *time_last_modified = last_modified; + } + + ret = 1; + +file_error: + fclose(fp); +client_error: + flb_sds_destroy(fname); +file_exists: + flb_http_client_destroy(client); + return ret; +} + +#ifndef _WIN32 +static struct cfl_array *read_glob(const char *path) +{ + int ret = -1; + int ret_glb = -1; + glob_t glb; + size_t idx; + struct cfl_array *list; + + + ret_glb = glob(path, GLOB_NOSORT, NULL, &glb); + + if (ret_glb != 0) { + switch(ret_glb){ + case GLOB_NOSPACE: + flb_warn("[%s] glob: [%s] no space", __FUNCTION__, path); + break; + case GLOB_NOMATCH: + flb_warn("[%s] glob: [%s] no match", __FUNCTION__, path); + break; + case GLOB_ABORTED: + flb_warn("[%s] glob: [%s] aborted", __FUNCTION__, path); + break; + default: + flb_warn("[%s] glob: [%s] other error", __FUNCTION__, path); + } + return NULL; + } + + list = cfl_array_create(glb.gl_pathc); + for (idx = 0; idx < glb.gl_pathc; idx++) { + ret = cfl_array_append_string(list, glb.gl_pathv[idx]); + if (ret < 0) { + cfl_array_destroy(list); + return NULL; + } + } + + globfree(&glb); + return list; +} +#else +static char *dirname(char *path) +{ + char *ptr; + + + ptr = strrchr(path, '\\'); + + if (ptr == NULL) { + return path; + } + *ptr++='\0'; + return path; +} + +static struct cfl_array *read_glob_win(const char *path, struct cfl_array *list) +{ + char *star, *p0, *p1; + char pattern[MAX_PATH]; + char buf[MAX_PATH]; + int ret; + struct stat st; + HANDLE hnd; + WIN32_FIND_DATA data; + + if (strlen(path) > MAX_PATH - 1) { + flb_error("path too long: %s", path); + return NULL; + } + + star = strchr(path, '*'); + + if (star == NULL) { + flb_error("path has no wild card: %s", path); + return NULL; + } + + /* + * C:\data\tmp\input_*.conf + * 0<-----| + */ + p0 = star; + while (path <= p0 && *p0 != '\\') { + p0--; + } + + /* + * C:\data\tmp\input_*.conf + * |---->1 + */ + p1 = star; + while (*p1 && *p1 != '\\') { + p1++; + } + + memcpy(pattern, path, (p1 - path)); + pattern[p1 - path] = '\0'; + + hnd = FindFirstFileA(pattern, &data); + + if (hnd == INVALID_HANDLE_VALUE) { + flb_error("unable to open valid handle for: %s", path); + return NULL; + } + + if (list == NULL) { + list = cfl_array_create(3); + + if (list == NULL) { + flb_error("unable to allocate array"); + FindClose(hnd); + return NULL; + } + + /* cfl_array_resizable is hardcoded to return 0. */ + if (cfl_array_resizable(list, FLB_TRUE) != 0) { + flb_error("unable to make array resizable"); + FindClose(hnd); + cfl_array_destroy(list); + return NULL; + } + } + + do { + /* Ignore the current and parent dirs */ + if (!strcmp(".", data.cFileName) || !strcmp("..", data.cFileName)) { + continue; + } + + /* Avoid an infinite loop */ + if (strchr(data.cFileName, '*')) { + continue; + } + + /* Create a path (prefix + filename + suffix) */ + memcpy(buf, path, p0 - path + 1); + buf[p0 - path + 1] = '\0'; + + if (FAILED(StringCchCatA(buf, MAX_PATH, data.cFileName))) { + continue; + } + + if (FAILED(StringCchCatA(buf, MAX_PATH, p1))) { + continue; + } + + if (strchr(p1, '*')) { + if (read_glob_win(path, list) == NULL) { + cfl_array_destroy(list); + FindClose(hnd); + return NULL; + } + continue; + } + + ret = stat(buf, &st); + + if (ret == 0 && (st.st_mode & S_IFMT) == S_IFREG) { + cfl_array_append_string(list, buf); + } + } while (FindNextFileA(hnd, &data) != 0); + + FindClose(hnd); + return list; +} + +static struct cfl_array *read_glob(const char *path) +{ + return read_glob_win(path, NULL); +} + +#endif + +static int cfl_array_qsort_conf_files(const void *arg_a, const void *arg_b) +{ + struct cfl_variant *var_a = (struct cfl_variant *)*(void **)arg_a; + struct cfl_variant *var_b = (struct cfl_variant *)*(void **)arg_b; + + if (var_a == NULL && var_b == NULL) { + return 0; + } + else if (var_a == NULL) { + return -1; + } + else if (var_b == NULL) { + return 1; + } + else if (var_a->type != CFL_VARIANT_STRING && + var_b->type != CFL_VARIANT_STRING) { + return 0; + } + else if (var_a->type != CFL_VARIANT_STRING) { + return -1; + } + else if (var_b->type != CFL_VARIANT_STRING) { + return 1; + } + + return strcmp(var_a->data.as_string, var_b->data.as_string); +} + +static int calyptia_config_delete_old_dir(const char *cfgpath) +{ + flb_sds_t cfg_glob; + char *ext; + struct cfl_array *files; + int idx; + + if (cfgpath == NULL) { + return FLB_FALSE; + } + + ext = strrchr(cfgpath, '.'); + if (ext == NULL) { + return FLB_FALSE; + } + + cfg_glob = flb_sds_create_len(cfgpath, ext - cfgpath); + if (cfg_glob == NULL) { + return FLB_FALSE; + } + + if (flb_sds_cat_safe(&cfg_glob, PATH_SEPARATOR "*", strlen(PATH_SEPARATOR "*")) != 0) { + flb_sds_destroy(cfg_glob); + return FLB_FALSE; + } + + files = read_glob(cfg_glob); + + if (files != NULL) { + for (idx = 0; idx < ((ssize_t)files->entry_count); idx++) { + unlink(files->entries[idx]->data.as_string); + } + cfl_array_destroy(files); + } + + /* attempt to delete the main directory */ + ext = strrchr(cfg_glob, PATH_SEPARATOR[0]); + if (ext) { + *ext = '\0'; + rmdir(cfg_glob); + } + + flb_sds_destroy(cfg_glob); + + return FLB_TRUE; +} + +static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx) +{ + struct cfl_array *inis; + flb_sds_t glob_files = NULL; + int idx; + + if (ctx == NULL) { + return -1; + } + + if (generate_base_fleet_directory(ctx, &glob_files) == NULL) { + flb_sds_destroy(glob_files); + return -1; + } + + if (flb_sds_cat_safe(&glob_files, PATH_SEPARATOR "*.conf", strlen(PATH_SEPARATOR "*.conf")) != 0) { + flb_sds_destroy(glob_files); + return -1; + } + + inis = read_glob(glob_files); + if (inis == NULL) { + flb_sds_destroy(glob_files); + return -1; + } + + qsort(inis->entries, inis->entry_count, + sizeof(struct cfl_variant *), + cfl_array_qsort_conf_files); + + for (idx = 0; idx < (((ssize_t)inis->entry_count) -1 - 3); idx++) { + unlink(inis->entries[idx]->data.as_string); + calyptia_config_delete_old_dir(inis->entries[idx]->data.as_string); + } + + cfl_array_destroy(inis); + flb_sds_destroy(glob_files); + + return 0; +} + +static flb_sds_t calyptia_config_get_newest(struct flb_in_calyptia_fleet_config *ctx) +{ + struct cfl_array *inis; + flb_sds_t glob_conf_files = NULL; + flb_sds_t cfgnewname = NULL; + + if (ctx == NULL) { + return NULL; + } + + if (generate_base_fleet_directory(ctx, &glob_conf_files) == NULL) { + flb_plg_error(ctx->ins, "unable to generate fleet directory name"); + flb_sds_destroy(glob_conf_files); + return NULL; + } + + if (flb_sds_cat_safe(&glob_conf_files, PATH_SEPARATOR "*.conf", strlen(PATH_SEPARATOR "*.conf")) != 0) { + flb_plg_error(ctx->ins, "unable to concatenate fleet glob"); + flb_sds_destroy(glob_conf_files); + return NULL; + } + + inis = read_glob(glob_conf_files); + if (inis == NULL) { + flb_plg_error(ctx->ins, "unable to read fleet directory for conf files: %s", + glob_conf_files); + flb_sds_destroy(glob_conf_files); + return NULL; + } + + qsort(inis->entries, inis->entry_count, + sizeof(struct cfl_variant *), + cfl_array_qsort_conf_files); + + cfgnewname = flb_sds_create_len(inis->entries[inis->entry_count-1]->data.as_string, + strlen(inis->entries[inis->entry_count-1]->data.as_string)); + + cfl_array_destroy(inis); + flb_sds_destroy(glob_conf_files); + + return cfgnewname; +} + +#ifndef FLB_SYSTEM_WINDOWS + +static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgname) +{ + int rc = FLB_FALSE; + + flb_sds_t cfgnewname = NULL; + flb_sds_t cfgoldname = NULL; + flb_sds_t cfgcurname = NULL; + + cfgnewname = new_fleet_config_filename(ctx); + cfgcurname = cur_fleet_config_filename(ctx); + cfgoldname = old_fleet_config_filename(ctx); + + if (cfgnewname == NULL || cfgcurname == NULL || cfgoldname == NULL) { + goto error; + } + + if (exists_new_fleet_config(ctx) == FLB_TRUE) { + + if (rename(cfgnewname, cfgoldname)) { + goto error; + } + } + else if (exists_cur_fleet_config(ctx) == FLB_TRUE) { + + if (rename(cfgcurname, cfgoldname)) { + goto error; + } + } + + if (symlink(cfgname, cfgnewname)) { + flb_plg_error(ctx->ins, "unable to create new configuration symlink."); + goto error; + } + + rc = FLB_TRUE; + +error: + if (cfgnewname) { + flb_sds_destroy(cfgnewname); + } + + if (cfgcurname) { + flb_sds_destroy(cfgcurname); + } + + if (cfgoldname) { + flb_sds_destroy(cfgoldname); + } + + return rc; +} + +static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx) +{ + int rc = FLB_FALSE; + flb_sds_t cfgnewname = NULL; + flb_sds_t cfgcurname = NULL; + flb_sds_t cfgoldname = NULL; + + cfgnewname = new_fleet_config_filename(ctx); + cfgcurname = cur_fleet_config_filename(ctx); + cfgoldname = old_fleet_config_filename(ctx); + + if (cfgnewname == NULL || + cfgcurname == NULL || + cfgoldname == NULL) { + goto error; + } + + if (exists_old_fleet_config(ctx) == FLB_TRUE) { + unlink(cfgoldname); + } + + if (exists_cur_fleet_config(ctx) == FLB_TRUE) { + if (rename(cfgcurname, cfgoldname)) { + goto error; + } + } + + if (exists_new_fleet_config(ctx) == FLB_TRUE) { + if (rename(cfgnewname, cfgcurname)) { + goto error; + } + } + + calyptia_config_delete_old(ctx); + rc = FLB_TRUE; + +error: + if (cfgnewname) { + flb_sds_destroy(cfgnewname); + } + + if (cfgcurname) { + flb_sds_destroy(cfgcurname); + } + + if (cfgoldname) { + flb_sds_destroy(cfgoldname); + } + + return rc; +} + +static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgname) +{ + int rc = FLB_TRUE; + flb_sds_t cfgnewname; + flb_sds_t cfgcurname; + flb_sds_t cfgoldname; + + cfgnewname = new_fleet_config_filename(ctx); + cfgcurname = cur_fleet_config_filename(ctx); + cfgoldname = old_fleet_config_filename(ctx); + + if (cfgnewname == NULL || cfgcurname == NULL || cfgoldname == NULL) { + goto error; + } + + if (exists_new_fleet_config(ctx) == FLB_TRUE) { + unlink(cfgnewname); + } + + if (exists_old_fleet_config(ctx) == FLB_TRUE) { + rename(cfgoldname, cfgcurname); + } + + rc = FLB_TRUE; + +error: + if (cfgnewname) { + flb_sds_destroy(cfgnewname); + } + + if (cfgcurname) { + flb_sds_destroy(cfgcurname); + } + + if (cfgoldname) { + flb_sds_destroy(cfgoldname); } - return _mkdir(tmp, perms); + return rc; +} +#else +static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgname) +{ + return FLB_TRUE; } -static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx) +static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx) { - flb_sds_t myfleetdir; + calyptia_config_delete_old(ctx); + return FLB_TRUE; +} - flb_plg_debug(ctx->ins, "checking for configuration directory=%s", ctx->config_dir); - if (access(ctx->config_dir, F_OK) != 0) { - if (__mkdir(ctx->config_dir, 0700) != 0) { - flb_plg_error(ctx->ins, "unable to create fleet config directory"); - return -1; - } - } +static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx, + const char *cfgname) +{ + unlink(cfgname); + return FLB_TRUE; +} +#endif + +static int get_calyptia_fleet_config(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn) +{ + flb_sds_t cfgname; + flb_sds_t cfgnewname; + flb_sds_t header; + time_t time_last_modified; + int ret = -1; - myfleetdir = flb_sds_create_size(256); + if (ctx->fleet_url == NULL) { + ctx->fleet_url = flb_sds_create_size(4096); + flb_sds_printf(&ctx->fleet_url, "/v1/fleets/%s/config?format=ini", ctx->fleet_id); + } - if (ctx->fleet_name != NULL) { - flb_sds_printf(&myfleetdir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s", - ctx->config_dir, ctx->machine_id, ctx->fleet_name); + if (ctx->fleet_files_url == NULL) { + ctx->fleet_files_url = flb_sds_create_size(4096); + flb_sds_printf(&ctx->fleet_files_url, "/v1/fleets/%s/files", ctx->fleet_id); + } + + header = flb_sds_create_size(4096); + + if (ctx->fleet_name == NULL) { + flb_sds_printf(&header, + "[CUSTOM]\n" + " Name calyptia\n" + " api_key %s\n" + " fleet_id %s\n" + " add_label fleet_id %s\n" + " machine_id %s\n" + " fleet.config_dir %s\n" + " calyptia_host %s\n" + " calyptia_port %d\n" + " calyptia_tls %s\n", + ctx->api_key, + ctx->fleet_id, + ctx->fleet_id, + ctx->machine_id, + ctx->config_dir, + ctx->ins->host.name, + ctx->ins->host.port, + tls_setting_string(ctx->ins->use_tls) + ); } else { - flb_sds_printf(&myfleetdir, "%s" PATH_SEPARATOR "%s" PATH_SEPARATOR "%s", - ctx->config_dir, ctx->machine_id, ctx->fleet_id); - } + flb_sds_printf(&header, + "[CUSTOM]\n" + " Name calyptia\n" + " api_key %s\n" + " fleet_name %s\n" + " fleet_id %s\n" + " add_label fleet_id %s\n" + " machine_id %s\n" + " fleet.config_dir %s\n" + " calyptia_host %s\n" + " calyptia_port %d\n" + " calyptia_tls %s\n", + ctx->api_key, + ctx->fleet_name, + ctx->fleet_id, + ctx->fleet_id, + ctx->machine_id, + ctx->config_dir, + ctx->ins->host.name, + ctx->ins->host.port, + tls_setting_string(ctx->ins->use_tls) + ); + } + + /* create the base file. */ + ret = get_calyptia_file(ctx, u_conn, ctx->fleet_url, header, + NULL, &time_last_modified); + + /* new file created! */ + if (ret == 1) { + get_calyptia_files(ctx, u_conn, ctx->fleet_files_url, time_last_modified); + + cfgname = time_fleet_config_filename(ctx, time_last_modified); + + if (calyptia_config_add(ctx, cfgname) == FLB_FALSE) { + flb_plg_error(ctx->ins, "unable to add config: %s", cfgname); + flb_sds_destroy(cfgname); + return -1; + } - flb_plg_debug(ctx->ins, "checking for fleet directory=%s", myfleetdir); - if (access(myfleetdir, F_OK) != 0) { - if (__mkdir(myfleetdir, 0700) !=0) { - flb_plg_error(ctx->ins, "unable to create fleet specific directory"); +#ifndef FLB_SYSTEM_WINDOWS + flb_sds_destroy(cfgname); + cfgnewname = new_fleet_config_filename(ctx); + if (execute_reload(ctx, cfgnewname) == FLB_FALSE) { + calyptia_config_rollback(ctx, cfgname); + flb_sds_destroy(cfgname); return -1; } +#else + if (execute_reload(ctx, cfgname) == FLB_FALSE) { + calyptia_config_rollback(ctx, cfgname); + return -1; + } +#endif } - flb_sds_destroy(myfleetdir); return 0; } /* cb_collect callback */ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, - struct flb_config *config, + struct flb_config *config, void *in_context) { struct flb_in_calyptia_fleet_config *ctx = in_context; struct flb_connection *u_conn; - struct flb_http_client *client; - flb_sds_t cfgname; - flb_sds_t cfgnewname; - flb_sds_t cfgoldname; - flb_sds_t cfgcurname; - flb_sds_t header = NULL; - flb_sds_t hdr; - FILE *cfgfp; - const char *fbit_last_modified; - int fbit_last_modified_len; - struct flb_tm tm_last_modified = { 0 }; - time_t time_last_modified; - char *data = NULL; - size_t b_sent; int ret = -1; -#ifdef FLB_SYSTEM_WINDOWS - DWORD err; - LPSTR lpMsg; -#endif u_conn = flb_upstream_conn_get(ctx->u); @@ -869,214 +1729,106 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins, if (get_calyptia_fleet_id_by_name(ctx, u_conn, config) == -1) { flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name); goto conn_error; - } - } - - if (ctx->fleet_url == NULL) { - ctx->fleet_url = flb_sds_create_size(4096); - flb_sds_printf(&ctx->fleet_url, "/v1/fleets/%s/config?format=ini", ctx->fleet_id); + } } - client = flb_http_client(u_conn, FLB_HTTP_GET, ctx->fleet_url, - NULL, 0, - ctx->ins->host.name, ctx->ins->host.port, NULL, 0); - - if (!client) { - flb_plg_error(ins, "unable to create http client"); - goto client_error; - } - - flb_http_buffer_size(client, 8192); + ret = get_calyptia_fleet_config(ctx, u_conn); - flb_http_add_header(client, - CALYPTIA_H_PROJECT, sizeof(CALYPTIA_H_PROJECT) - 1, - ctx->api_key, flb_sds_len(ctx->api_key)); +conn_error: + FLB_INPUT_RETURN(ret); +} - ret = flb_http_do(client, &b_sent); +static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx) +{ + flb_sds_t myfleetdir = NULL; - if (ret != 0) { - flb_plg_error(ins, "http do error"); - goto http_error; + if (access(ctx->config_dir, F_OK) != 0) { + if (__mkdir(ctx->config_dir, 0700) != 0) { + return -1; + } } - if (client->resp.status != 200) { - flb_plg_error(ins, "http status code error: %d", client->resp.status); - goto http_error; + if (generate_base_fleet_directory(ctx, &myfleetdir) == NULL) { + flb_sds_destroy(myfleetdir); + return -1; } - if (client->resp.payload_size <= 0) { - flb_plg_error(ins, "empty response"); - goto http_error; + if (access(myfleetdir, F_OK) != 0) { + if (__mkdir(myfleetdir, 0700) !=0) { + return -1; + } } - /* copy and NULL terminate the payload */ - data = flb_sds_create_size(client->resp.payload_size + 1); + flb_sds_destroy(myfleetdir); + return 0; +} - if (!data) { - goto http_error; - } - memcpy(data, client->resp.payload, client->resp.payload_size); - data[client->resp.payload_size] = '\0'; +static flb_sds_t fleet_gendir(struct flb_in_calyptia_fleet_config *ctx, time_t timestamp) +{ + flb_sds_t fleetdir = NULL; + flb_sds_t fleetcurdir; - ret = case_header_lookup(client, "Last-modified", strlen("Last-modified"), - &fbit_last_modified, &fbit_last_modified_len); - if (ret == -1) { - flb_plg_error(ctx->ins, "unable to get last-modified header"); - goto payload_error; + if (generate_base_fleet_directory(ctx, &fleetdir) == NULL) { + return NULL; } - flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified); - time_last_modified = mktime(&tm_last_modified.tm); + fleetcurdir = flb_sds_create_size(strlen(fleetdir) + 32); - cfgname = time_fleet_config_filename(ctx, time_last_modified); - - if (access(cfgname, F_OK) == -1 && errno == ENOENT) { - if (create_fleet_directory(ctx) != 0) { - flb_plg_error(ctx->ins, "unable to create fleet directories"); - goto http_error; - } - cfgfp = fopen(cfgname, "w+"); + if (fleetcurdir == NULL) { + flb_sds_destroy(fleetdir); + return NULL; + } - if (cfgfp == NULL) { - flb_plg_error(ctx->ins, "unable to open configuration file: %s", cfgname); - flb_sds_destroy(cfgname); - goto payload_error; - } + if (flb_sds_printf(&fleetcurdir, "%s" PATH_SEPARATOR "%ld", fleetdir, timestamp) == NULL) { + flb_sds_destroy(fleetdir); + flb_sds_destroy(fleetcurdir); + return NULL; + } - header = flb_sds_create_size(4096); - - if (ctx->fleet_name == NULL) { - hdr = flb_sds_printf(&header, - "[CUSTOM]\n" - " Name calyptia\n" - " api_key %s\n" - " fleet_id %s\n" - " add_label fleet_id %s\n" - " fleet.config_dir %s\n" - " calyptia_host %s\n" - " calyptia_port %d\n" - " calyptia_tls %s\n", - ctx->api_key, - ctx->fleet_id, - ctx->fleet_id, - ctx->config_dir, - ctx->ins->host.name, - ctx->ins->host.port, - tls_setting_string(ctx->ins->use_tls) - ); - } - else { - hdr = flb_sds_printf(&header, - "[CUSTOM]\n" - " Name calyptia\n" - " api_key %s\n" - " fleet_name %s\n" - " fleet_id %s\n" - " add_label fleet_id %s\n" - " fleet.config_dir %s\n" - " calyptia_host %s\n" - " calyptia_port %d\n" - " calyptia_tls %s\n", - ctx->api_key, - ctx->fleet_name, - ctx->fleet_id, - ctx->fleet_id, - ctx->config_dir, - ctx->ins->host.name, - ctx->ins->host.port, - tls_setting_string(ctx->ins->use_tls) - ); - } - if (hdr == NULL) { - fclose(cfgfp); - flb_sds_destroy(cfgname); - goto header_error; - } - if (ctx->machine_id) { - hdr = flb_sds_printf(&header, " machine_id %s\n", ctx->machine_id); - if (hdr == NULL) { - fclose(cfgfp); - flb_sds_destroy(cfgname); - goto header_error; - } - } - fwrite(header, strlen(header), 1, cfgfp); - flb_sds_destroy(header); - header = NULL; - fwrite(data, client->resp.payload_size, 1, cfgfp); - fclose(cfgfp); + flb_sds_destroy(fleetdir); - cfgnewname = new_fleet_config_filename(ctx); + return fleetcurdir; +} - if (exists_new_fleet_config(ctx) == FLB_TRUE) { - cfgoldname = old_fleet_config_filename(ctx); - rename(cfgnewname, cfgoldname); - unlink(cfgnewname); - flb_sds_destroy(cfgoldname); - } +static int fleet_mkdir(struct flb_in_calyptia_fleet_config *ctx, time_t timestamp) +{ + flb_sds_t fleetcurdir; - if (!link(cfgname, cfgnewname)) { -#ifdef FLB_SYSTEM_WINDOWS - err = GetLastError(); - FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER, - NULL, err, 0, &lpMsg, 0, NULL); - flb_plg_error(ctx->ins, "unable to create hard link: %s", lpMsg); -#else - flb_errno(); -#endif - } + fleetcurdir = fleet_gendir(ctx, timestamp); - flb_sds_destroy(cfgnewname); + if (fleetcurdir == NULL) { + return -1; } - if (ctx->config_timestamp < time_last_modified) { - flb_plg_debug(ctx->ins, "new configuration is newer than current: %ld < %ld", - ctx->config_timestamp, time_last_modified); + __mkdir(fleetcurdir, 0700); + flb_sds_destroy(fleetcurdir); - if (execute_reload(ctx, cfgname) == FLB_FALSE) { - cfgoldname = old_fleet_config_filename(ctx); - cfgcurname = cur_fleet_config_filename(ctx); - rename(cfgoldname, cfgcurname); - flb_sds_destroy(cfgcurname); - flb_sds_destroy(cfgoldname); - flb_sds_destroy(cfgname); - goto reload_error; - } - } - else { - flb_sds_destroy(cfgname); - } + return 0; +} + +static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx) +{ + flb_sds_t fleetcurdir; + int ret; - ret = 0; + fleetcurdir = fleet_gendir(ctx, ctx->config_timestamp); + flb_plg_info(ctx->ins, "changing to config dir: %s", fleetcurdir); -reload_error: -header_error: - if (header) { - flb_sds_destroy(header); - } -payload_error: - if (data) { - flb_sds_destroy(data); + if (fleetcurdir == NULL) { + return -1; } -http_error: - flb_plg_debug(ctx->ins, "freeing http client in fleet collect"); - flb_http_client_destroy(client); -client_error: - flb_upstream_conn_release(u_conn); -conn_error: - FLB_INPUT_RETURN(ret); + ret = chdir(fleetcurdir); + flb_sds_destroy(fleetcurdir); + + return ret; } static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) { flb_ctx_t *flb_ctx = flb_context_get(); - char *fname; - char *ext; - long timestamp; - char realname[4096]; - ssize_t len; + flb_sds_t cfgnewname = NULL; if (create_fleet_directory(ctx) != 0) { flb_plg_error(ctx->ins, "unable to create fleet directories"); @@ -1085,6 +1837,7 @@ static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) /* check if we are already using the fleet configuration file. */ if (is_fleet_config(ctx, flb_ctx->config) == FLB_FALSE) { + flb_plg_debug(ctx->ins, "loading configuration file"); /* check which one and load it */ if (exists_cur_fleet_config(ctx) == FLB_TRUE) { return execute_reload(ctx, cur_fleet_config_filename(ctx)); @@ -1092,43 +1845,183 @@ static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx) else if (exists_new_fleet_config(ctx) == FLB_TRUE) { return execute_reload(ctx, new_fleet_config_filename(ctx)); } + else { + cfgnewname = calyptia_config_get_newest(ctx); + + if (cfgnewname != NULL) { + flb_plg_debug(ctx->ins, "loading newest configuration: %s", cfgnewname); + return execute_reload(ctx, cfgnewname); + } + else { + flb_plg_warn(ctx->ins, "unable to find latest configuration file"); + } + } } else { - if (is_new_fleet_config(ctx, flb_ctx->config) || is_cur_fleet_config(ctx, flb_ctx->config)) { - len = readlink(flb_ctx->config->conf_path_file, realname, sizeof(realname)); + flb_plg_debug(ctx->ins, "we are already using a configuration file: %s", + flb_ctx->config->conf_path_file); + parse_config_timestamp(ctx, &ctx->config_timestamp); + } - if (len > sizeof(realname)) { - return FLB_FALSE; - } + return FLB_FALSE; +} - fname = basename(realname); - } - else { - fname = basename(flb_ctx->config->conf_path_file); - } +static int create_fleet_file(flb_sds_t fleetdir, + const char *name, + int nlen, + const char *b64_content, + int blen) +{ + flb_sds_t fname; + flb_sds_t dst; + size_t dlen = 2 * blen; + FILE *fp; + int ret; - if (fname == NULL) { - return FLB_FALSE; - } + fname = flb_sds_create_size(strlen(fleetdir) + nlen + 2); + if (fname == NULL) { + return -1; + } - errno = 0; - timestamp = strtol(fname, &ext, 10); + if (flb_sds_cat_safe(&fname, fleetdir, strlen(fleetdir)) < 0) { + flb_sds_destroy(fname); + return -1; + } - if ((errno == ERANGE && (timestamp == LONG_MAX || timestamp == LONG_MIN)) || - (errno != 0 && timestamp == 0)) { - flb_errno(); - return FLB_FALSE; + if (flb_sds_cat_safe(&fname, "/", 1) < 0) { + flb_sds_destroy(fname); + return -1; + } + + if (flb_sds_cat_safe(&fname, name, nlen) < 0) { + flb_sds_destroy(fname); + return -1; + } + + fp = fopen(fname, "w+"); + if (fp == NULL) { + return -1; + } + + dst = flb_sds_create_size(dlen); + ret = flb_base64_decode((unsigned char *)dst, dlen, &dlen, + (unsigned char *)b64_content, blen); + + if (ret != 0) { + return -1; + } + + fwrite(dst, dlen, 1, fp); + + fclose(fp); + flb_sds_destroy(dst); + flb_sds_destroy(fname); + + return 0; +} + +static int create_fleet_files(struct flb_in_calyptia_fleet_config *ctx, + char *payload, size_t size, time_t timestamp) +{ + int ret; + int out_size; + char *pack; + struct flb_pack_state pack_state; + size_t off = 0; + int idx; + flb_sds_t fleetdir; + msgpack_unpacked result; + msgpack_object *map; + msgpack_object *name; + msgpack_object *contents; + + /* Initialize packer */ + flb_pack_state_init(&pack_state); + + /* Pack JSON as msgpack */ + ret = flb_pack_json_state(payload, size, + &pack, &out_size, &pack_state); + flb_pack_state_reset(&pack_state); + + /* Handle exceptions */ + if (ret == FLB_ERR_JSON_PART || ret == FLB_ERR_JSON_INVAL || ret == -1) { + flb_plg_warn(ctx->ins, "invalid JSON message, skipping"); + return -1; + } + + fleetdir = fleet_gendir(ctx, timestamp); + + msgpack_unpacked_init(&result); + while (msgpack_unpack_next(&result, pack, out_size, &off) == MSGPACK_UNPACK_SUCCESS) { + if (result.data.type != MSGPACK_OBJECT_ARRAY) { + continue; } + for (idx = 0; idx < result.data.via.array.size; idx++) { + map = msgpack_lookup_array_offset(&result.data, idx); - /* unable to parse the timstamp */ - if (errno == ERANGE) { - return FLB_FALSE; + if (map == NULL) { + return -1; + } + + name = msgpack_lookup_map_key(map, "name"); + if (name == NULL) { + return -1; + } + if (name->type != MSGPACK_OBJECT_STR) { + return -1; + } + + contents = msgpack_lookup_map_key(map, "contents"); + if (contents == NULL) { + return -1; + } + if (contents->type != MSGPACK_OBJECT_STR) { + return -1; + } + + create_fleet_file(fleetdir, + name->via.str.ptr, + name->via.str.size, + contents->via.str.ptr, + contents->via.str.size); } + } + + msgpack_unpacked_destroy(&result); + flb_free(pack); + + return 0; +} + +static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, + struct flb_connection *u_conn, + const char *url, + time_t timestamp) +{ + struct flb_http_client *client; + int ret = -1; - ctx->config_timestamp = timestamp; + if (ctx == NULL || u_conn == NULL || url == NULL) { + return -1; } - return FLB_FALSE; + client = fleet_http_do(ctx, u_conn, ctx->fleet_files_url); + + if (client == NULL) { + return -1; + } + + fleet_mkdir(ctx, timestamp); + ret = create_fleet_files(ctx, client->resp.payload, client->resp.payload_size, timestamp); + if (ret != 0) { + goto file_error; + } + + ret = 1; + +file_error: + flb_http_client_destroy(client); + return ret; } static int in_calyptia_fleet_init(struct flb_input_instance *in, @@ -1224,6 +2117,10 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, return 0; } + if (is_fleet_config(ctx, config)) { + calyptia_config_commit(ctx); + } + /* Set our collector based on time */ ret = flb_input_set_collector_time(in, in_calyptia_fleet_collect,