From b7808067b824613491c76f94a6c3e1cb9c3a4d5a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 20 May 2024 16:15:16 +0900 Subject: [PATCH 01/31] aws: Distinguish parquet compression method Currently, it should be handled on out_s3. So, it is just for used to specify the parquet format. Signed-off-by: Hiroshi Hatake --- include/fluent-bit/aws/flb_aws_compress.h | 1 + src/aws/flb_aws_compress.c | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/include/fluent-bit/aws/flb_aws_compress.h b/include/fluent-bit/aws/flb_aws_compress.h index e1cf9222377..b423c312e70 100644 --- a/include/fluent-bit/aws/flb_aws_compress.h +++ b/include/fluent-bit/aws/flb_aws_compress.h @@ -24,6 +24,7 @@ #define FLB_AWS_COMPRESS_NONE 0 #define FLB_AWS_COMPRESS_GZIP 1 #define FLB_AWS_COMPRESS_ARROW 2 +#define FLB_AWS_COMPRESS_PARQUET 3 /* * Get compression type from compression keyword. The return value is used to identify diff --git a/src/aws/flb_aws_compress.c b/src/aws/flb_aws_compress.c index a06d181193f..6e1b28f4aff 100644 --- a/src/aws/flb_aws_compress.c +++ b/src/aws/flb_aws_compress.c @@ -55,6 +55,11 @@ static const struct compression_option compression_options[] = { &out_s3_compress_arrow }, #endif + { + FLB_AWS_COMPRESS_PARQUET, + "parquet", + NULL /* Currently, it is implemented in out_s3 specific source */ + }, { 0 } }; From c5e74ab0a08519c956b1768181f3ddc1f885bbea Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 20 May 2024 16:41:53 +0900 Subject: [PATCH 02/31] out_s3: Initial support for parquet format with columnify Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 469 ++++++++++++++++++++++++++++++- plugins/out_s3/s3.h | 15 + plugins/out_s3/s3_win32_compat.h | 64 +++++ 3 files changed, 544 insertions(+), 4 deletions(-) create mode 100644 plugins/out_s3/s3_win32_compat.h diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 3b1edafbdab..e2911cce922 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -33,12 +33,15 @@ #include #include #include +#include #include +#include #include #include "s3.h" #include "s3_store.h" +#include "s3_win32_compat.h" #define DEFAULT_S3_PORT 443 #define DEFAULT_S3_INSECURE_PORT 80 @@ -493,6 +496,22 @@ static void s3_context_destroy(struct flb_s3 *ctx) flb_sds_destroy(ctx->seq_index_file); } + if (ctx->parquet_compression) { + flb_sds_destroy(ctx->parquet_compression); + } + + if (ctx->parquet_record_type) { + flb_sds_destroy(ctx->parquet_record_type); + } + + if (ctx->parquet_schema_type) { + flb_sds_destroy(ctx->parquet_schema_type); + } + + if (ctx->parquet_schema_file) { + flb_sds_destroy(ctx->parquet_schema_file); + } + /* Remove uploads */ mk_list_foreach_safe(head, tmp, &ctx->uploads) { m_upload = mk_list_entry(head, struct multipart_upload, _head); @@ -514,6 +533,7 @@ static int cb_s3_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { int ret; + int i; flb_sds_t tmp_sds; char *role_arn = NULL; char *session_name; @@ -526,6 +546,8 @@ static int cb_s3_init(struct flb_output_instance *ins, struct flb_split_entry *tok; struct mk_list *split; int list_size; + FILE *cmdp = NULL; + char buf[32]; ctx = flb_calloc(1, sizeof(struct flb_s3)); if (!ctx) { @@ -657,13 +679,118 @@ static int cb_s3_init(struct flb_output_instance *ins, "use_put_object must be enabled when Apache Arrow is enabled"); return -1; } + if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + flb_plg_error(ctx->ins, + "use_put_object must be enabled when parquet is enabled"); + return -1; + } ctx->compression = ret; } + /* Parquet */ + ctx->parquet_compression = NULL; + ctx->parquet_record_type = NULL; + ctx->parquet_schema_type = NULL; + ctx->parquet_schema_file = NULL; + + if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + cmdp = flb_popen(DEFAULT_PARQUET_COMMAND_EXISTENCE, "r"); + if (cmdp == NULL) { + flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_EXISTENCE); + return -1; + } + flb_pclose(cmdp); + + tmp = flb_output_get_property("parquet.compression", ins); + if (!tmp) { + ctx->parquet_compression = \ + flb_sds_create_len(DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES, + strlen(DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES)); + flb_plg_debug(ctx->ins, "parquet.compression format is %s", + DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES); + } + else { + if (strncasecmp(tmp, "uncompressed", 12) == 0 || + strncasecmp(tmp, "snappy", 6) == 0 || + strncasecmp(tmp, "gzip", 4) == 0 || + strncasecmp(tmp, "zstd", 4) == 0) { + flb_plg_info(ctx->ins, "parquet.compression format is %s", tmp); + } + else if (strncasecmp(tmp, "lzo", 3) == 0 || + strncasecmp(tmp, "brotli", 6) == 0 || + strncasecmp(tmp, "lz4", 3) == 0) { + flb_plg_info(ctx->ins, "unsupported parquet.compression format %s", tmp); + } + else { + flb_plg_error(ctx->ins, "unknown parquet.compression format %s", tmp); + return -1; + } + for (i = 0; i < strlen(tmp) || i < sizeof(buf); i++) { + buf[i] = toupper(tmp[i]); + } + + ctx->parquet_compression = flb_sds_create_len(buf, strlen(buf)); + } + + tmp = flb_output_get_property("parquet.record_type", ins); + if (!tmp) { + flb_plg_info(ctx->ins, "parquet.record_type format is %s", + DEFAULT_PARQUET_RECORD_TYPE); + ctx->parquet_record_type = \ + flb_sds_create_len(DEFAULT_PARQUET_RECORD_TYPE, + strlen(DEFAULT_PARQUET_RECORD_TYPE)); + } + else { + if (strncasecmp(tmp, "json", 4) == 0) { + tmp = "jsonl"; /* json should be interpreted as jsonl */ + flb_plg_info(ctx->ins, "parquet.record_type format is %s", tmp); + } + else if (strncasecmp(tmp, "msgpack", 7) == 0 || + strncasecmp(tmp, "jsonl", 5) == 0) { + flb_plg_info(ctx->ins, "parquet.record_type format is %s", tmp); + } + else { + flb_plg_error(ctx->ins, "unknown parquet.record_type format %s", tmp); + return -1; + } + ctx->parquet_record_type = flb_sds_create_len(tmp, strlen(tmp)); + } + + tmp = flb_output_get_property("parquet.schema_type", ins); + if (!tmp) { + flb_plg_info(ctx->ins, "parquet.schema_type format is %s", + DEFAULT_PARQUET_SCHEMA_TYPE); + ctx->parquet_schema_type = \ + flb_sds_create_len(DEFAULT_PARQUET_SCHEMA_TYPE, + strlen(DEFAULT_PARQUET_SCHEMA_TYPE)); + } + else { + if (strncasecmp(tmp, "msgpack", 7) == 0 || + strncasecmp(tmp, "json", 4) == 0) { + flb_plg_info(ctx->ins, "parquet.record_type format is %s", tmp); + } + else { + flb_plg_error(ctx->ins, "unknown parquet.record_type format %s", tmp); + return -1; + } + ctx->parquet_schema_type = flb_sds_create_len(tmp, strlen(tmp)); + } + + tmp = flb_output_get_property("parquet.schema_file", ins); + if (!tmp) { + flb_plg_error(ctx->ins, "parquet.schema_file is missing"); + return -1; + } + ctx->parquet_schema_file = flb_sds_create_len(tmp, strlen(tmp)); + } + tmp = flb_output_get_property("content_type", ins); if (tmp) { ctx->content_type = (char *) tmp; } + if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + ctx->content_type = (char *) "application/octet-stream"; + } if (ctx->use_put_object == FLB_FALSE) { /* upload_chunk_size */ if (ctx->upload_chunk_size <= 0) { @@ -972,6 +1099,266 @@ static int cb_s3_init(struct flb_output_instance *ins, return 0; } +static int s3_compress_parquet(struct flb_s3 *ctx, + char *body, size_t body_size, + void **payload_buf, size_t *payload_size) +{ + int ret = 0; + int result = 0; + char *template_in = "out_s3-body-XXXXXX"; + char *template_out = "out_s3-parquet-XXXXXX"; + char infile[32]; + char outfile[32]; + FILE *write_ptr = NULL; + FILE *read_ptr = NULL; + flb_sds_t parquet_cmd = NULL; + size_t bytes; + flb_sds_t tmp; + flb_sds_t amount_page = NULL; + flb_sds_t amount_row_group = NULL; + FILE *cmdp = NULL; + size_t parquet_size = 0; + struct stat stbuf; + int fdout = -1; + flb_sds_t parquet_buf; + + parquet_cmd = flb_sds_create_size(384); + if (parquet_cmd == NULL) { + goto error; + } + + amount_page = flb_sds_create_size(16); + if (amount_page == NULL) { + goto error; + } + + amount_row_group = flb_sds_create_size(16); + if (amount_row_group == NULL) { + goto error; + } + + strncpy(infile, template_in, 32); + if (!mkstemp(infile)) { + ret = -2; + goto error; + } + + strncpy(outfile, template_out, 32); + if (!mkstemp(outfile)) { + ret = -2; + goto error; + } + + write_ptr = fopen(infile, "wb"); + if (write_ptr == NULL) { + ret = -3; + goto error; + } + + read_ptr = fopen(outfile, "rb"); + if (read_ptr == NULL) { + ret = -3; + goto error; + } + + fdout = open(outfile, O_RDONLY); + if (fdout == -1) { + ret = -3; + goto error; + } + + bytes = fwrite(body, body_size, 1, write_ptr); + if (bytes == -1) { + ret = -4; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + DEFAULT_PARQUET_COMMAND, strlen(DEFAULT_PARQUET_COMMAND)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -parquetCompressionCodec ", 26); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + ctx->parquet_compression, + flb_sds_len(ctx->parquet_compression)); + if (result < 0) { + ret = -1; + goto error; + } + + + result = flb_sds_cat_safe(&parquet_cmd, + " -parquetPageSize ", 18); + if (result < 0) { + ret = -1; + goto error; + } + + tmp = flb_sds_printf(&amount_page, "%zu", ctx->parquet_page_size); + if (!tmp) { + flb_errno(); + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + amount_page, strlen(amount_page)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -parquetRowGroupSize ", 22); + if (result < 0) { + ret = -1; + goto error; + } + + tmp = flb_sds_printf(&amount_row_group, "%zu", ctx->parquet_row_group_size); + if (!tmp) { + flb_errno(); + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + amount_row_group, strlen(amount_row_group)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -recordType ", 13); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + ctx->parquet_record_type, + flb_sds_len(ctx->parquet_record_type)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -schemaType ", 13); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + ctx->parquet_schema_type, + flb_sds_len(ctx->parquet_schema_type)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -schemaFile ", 13); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + ctx->parquet_schema_file, + flb_sds_len(ctx->parquet_schema_file)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + " -output ", 9); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, + outfile, strlen(outfile)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, " ", 1); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&parquet_cmd, infile, strlen(infile)); + if (result < 0) { + ret = -1; + goto error; + } + + cmdp = flb_popen(parquet_cmd, "r"); + if (cmdp == NULL) { + flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_EXISTENCE); + return -1; + } + + fclose(write_ptr); + flb_pclose(cmdp); + + if (fstat(fdout, &stbuf) == -1) { + ret = -4; + goto error; + } + parquet_size = stbuf.st_size; + parquet_buf = flb_sds_create_size(parquet_size); + fread(parquet_buf, parquet_size, 1, read_ptr); + fclose(read_ptr); + + *payload_buf = parquet_buf; + *payload_size = parquet_size; + + flb_sds_destroy(parquet_cmd); + flb_sds_destroy(amount_page); + flb_sds_destroy(amount_row_group); + + return 0; + +error: + if (write_ptr != NULL) { + fclose(write_ptr); + } + if (read_ptr != NULL) { + fclose(read_ptr); + } + if (fdout == -1) { + close(fdout); + } + if (parquet_cmd != NULL) { + flb_sds_destroy(parquet_cmd); + } + if (amount_page != NULL) { + flb_sds_destroy(amount_page); + } + if (amount_row_group != NULL) { + flb_sds_destroy(amount_row_group); + } + + return ret; +} + /* * return value is one of FLB_OK, FLB_RETRY, FLB_ERROR * @@ -1003,7 +1390,19 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, file_first_log_time = chunk->first_log_time; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + ret = s3_compress_parquet(ctx, body, body_size, &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data with %s", DEFAULT_PARQUET_COMMAND); + return FLB_RETRY; + } + else { + preCompress_size = body_size; + body = (void *) payload_buf; + body_size = payload_size; + } + } + else if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { /* Map payload */ ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size); if (ret == -1) { @@ -1071,6 +1470,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_free(payload_buf); } + else if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + flb_sds_destroy(payload_buf); + } if (ret < 0) { /* re-add chunk to list */ if (chunk) { @@ -1225,7 +1627,21 @@ static int put_all_chunks(struct flb_s3 *ctx) return -1; } - if (ctx->compression != FLB_AWS_COMPRESS_NONE) { + if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + ret = s3_compress_parquet(ctx, buffer, buffer_size, &payload_buf, &payload_size); + if (ret == -1) { + flb_plg_error(ctx->ins, "Failed to compress data with %s", DEFAULT_PARQUET_COMMAND); + return FLB_RETRY; + } + else { + flb_plg_info(ctx->ins, "Pre-compression chunk size is %zu, After compression, chunk is %zu bytes", buffer_size, payload_size); + flb_free(buffer); + + buffer = (void *) payload_buf; + buffer_size = payload_size; + } + } + else if (ctx->compression != FLB_AWS_COMPRESS_NONE) { /* Map payload */ ret = flb_aws_compression_compress(ctx->compression, buffer, buffer_size, &payload_buf, &payload_size); if (ret == -1) { @@ -1242,7 +1658,12 @@ static int put_all_chunks(struct flb_s3 *ctx) ret = s3_put_object(ctx, (const char *) fsf->meta_buf, chunk->create_time, buffer, buffer_size); - flb_free(buffer); + if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + flb_sds_destroy(buffer); + } + else { + flb_free(buffer); + } if (ret < 0) { s3_store_file_unlock(chunk); chunk->failures += 1; @@ -2371,10 +2792,50 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, - "Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. " + "Compression type for S3 objects. 'gzip', 'arrow', and 'parquet' " + "are the supported values. " "'arrow' is only an available if Apache Arrow was enabled at compile time. " + "'parquet' is only an available if columify command is installed on a system. " "Defaults to no compression. " "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." + "If 'parquet' is selected, the Content-Encoding HTTP Header will be set to 'octet-stream'." + }, + { + FLB_CONFIG_MAP_STR, "parquet.compression", "snappy", + 0, FLB_TRUE, offsetof(struct flb_s3, parquet_compression), + "Compression type for Parquet format. 'uncompressed', 'snappy', 'gzip', " + "'zstd' are the supported values. " + "'lzo', 'brotli', 'lz4' are not supported for now. " + "Defaults to snappy. " + }, + { + FLB_CONFIG_MAP_SIZE, "parquet.page_size", "8192", + 0, FLB_TRUE, offsetof(struct flb_s3, parquet_page_size), + "Page size of parquet" + "Defaults to 8192. " + }, + { + FLB_CONFIG_MAP_SIZE, "parquet.row_group_size", "134217728", /* 128 * 1024 * 1024 */ + 0, FLB_TRUE, offsetof(struct flb_s3, parquet_row_group_size), + "File row group size of parquet" + "Defaults to 134217728 (= 128 * 1024 * 1024). " + }, + { + FLB_CONFIG_MAP_STR, "parquet.record_type", "json", + 0, FLB_FALSE, 0, + "Record type for parquet objects. 'json' and 'msgpack' are the supported values. " + "Defaults to msgpack. " + }, + { + FLB_CONFIG_MAP_STR, "parquet.schema_type", "avro", + 0, FLB_FALSE, 0, + "Record type for parquet objects. 'avro' and 'bigquery' are the supported values. " + "Defaults to avro. " + }, + { + FLB_CONFIG_MAP_STR, "parquet.schema_file", NULL, + 0, FLB_FALSE, 0, + "Schema file for parquet objects. " }, { FLB_CONFIG_MAP_STR, "content_type", NULL, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index e51d39f2419..4550be60973 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -47,6 +47,13 @@ #define DEFAULT_UPLOAD_TIMEOUT 3600 +#define DEFAULT_PARQUET_COMPRESSION_FORMAT "snappy" +#define DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES "SNAPPY" +#define DEFAULT_PARQUET_RECORD_TYPE "jsonl" +#define DEFAULT_PARQUET_SCHEMA_TYPE "avro" +#define DEFAULT_PARQUET_COMMAND "columnify" +#define DEFAULT_PARQUET_COMMAND_EXISTENCE "columnify -h" + /* * If we see repeated errors on an upload/chunk, we will discard it * This saves us from scenarios where something goes wrong and an upload can @@ -147,6 +154,14 @@ struct flb_s3 { struct flb_fstore_stream *stream_upload; /* multipart upload stream */ struct flb_fstore_stream *stream_metadata; /* s3 metadata stream */ + /* Parquet */ + flb_sds_t parquet_compression; + size_t parquet_page_size; + size_t parquet_row_group_size; + flb_sds_t parquet_record_type; + flb_sds_t parquet_schema_type; + flb_sds_t parquet_schema_file; + /* * used to track that unset buffers were found on startup that have not * been sent diff --git a/plugins/out_s3/s3_win32_compat.h b/plugins/out_s3/s3_win32_compat.h new file mode 100644 index 00000000000..4fa7ca5ee53 --- /dev/null +++ b/plugins/out_s3/s3_win32_compat.h @@ -0,0 +1,64 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FLB_S3_WIN32_COMPAT_H +#define FLB_S3_WIN32_COMPAT_H + +/* + * Because Windows has to do everything differently, call _popen() and + * _pclose() instead of the POSIX popen() and pclose() functions. + * + * flb_pclose() has different return value semantics on Windows vs non-windows + * targets because it propagates the pclose() or _pclose() return value + * directly. You MUST use the FLB_WIFEXITED(), FLB_WEXITSTATUS(), + * FLB_WIFSIGNALED() and FLB_WTERMSIG() macros to consume the return value, + * rather than the underlying POSIX macros or manual bit-shifts. + */ +#if !defined(FLB_SYSTEM_WINDOWS) +static inline FILE* flb_popen(const char *command, const char *type) { + return popen(command, type); +} +static inline int flb_pclose(FILE *stream) { + return pclose(stream); +} +#define FLB_PCLOSE pclose +#else +static inline FILE* flb_popen(const char *command, const char *type) { + return _popen(command, type); +} +/* + * flb_pclose() has the same return value on Windows as win32 _pclose(), rather + * than posix pclose(). The process exit code is not bit-shifted to the high + * byte. + * + * The MSVC docs for _pclose() at + * https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/pclose?view=msvc-170 + * are misleading; they say that "The format of the return value is the same as + * for _cwait, except the low-order and high-order bytes are swapped." But + * _cwait isn't documented as having any meaningful return on success, the + * process exit code is meant to be in its "termstat" out parameter per + * https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/cwait?view=msvc-170 + * The return code of _pclose() actually appears to be the process exit code + * without the bit-shift that waitpid() applies. + */ +static inline int flb_pclose(FILE *stream) { + return _pclose(stream); +} +#endif + +#endif /* FLB_S3_WIN32_COMPAT_H */ From bf5baa91d4f836ef695706818ba4f96c13aea655 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 14:07:00 +0900 Subject: [PATCH 03/31] out_s3: Address verification for schema types Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index e2911cce922..74d034f7374 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -765,12 +765,12 @@ static int cb_s3_init(struct flb_output_instance *ins, strlen(DEFAULT_PARQUET_SCHEMA_TYPE)); } else { - if (strncasecmp(tmp, "msgpack", 7) == 0 || - strncasecmp(tmp, "json", 4) == 0) { - flb_plg_info(ctx->ins, "parquet.record_type format is %s", tmp); + if (strncasecmp(tmp, "avro", 4) == 0 || + strncasecmp(tmp, "bigquery", 8) == 0) { + flb_plg_info(ctx->ins, "parquet.schema_type format is %s", tmp); } else { - flb_plg_error(ctx->ins, "unknown parquet.record_type format %s", tmp); + flb_plg_error(ctx->ins, "unknown parquet.schema_type format %s", tmp); return -1; } ctx->parquet_schema_type = flb_sds_create_len(tmp, strlen(tmp)); From 797d7c16f601fae5bbd770c9e555313460a4c07a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 14:22:53 +0900 Subject: [PATCH 04/31] out_s3: Extract building columnify command prodedure as a function Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 192 ++++++++++++++++++++++++-------------------- 1 file changed, 107 insertions(+), 85 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 74d034f7374..45754365385 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1099,33 +1099,16 @@ static int cb_s3_init(struct flb_output_instance *ins, return 0; } -static int s3_compress_parquet(struct flb_s3 *ctx, - char *body, size_t body_size, - void **payload_buf, size_t *payload_size) +static int build_columnify_command(struct flb_s3 *ctx, + char *infile, + char *outfile, + flb_sds_t *cmd_buf) { - int ret = 0; - int result = 0; - char *template_in = "out_s3-body-XXXXXX"; - char *template_out = "out_s3-parquet-XXXXXX"; - char infile[32]; - char outfile[32]; - FILE *write_ptr = NULL; - FILE *read_ptr = NULL; - flb_sds_t parquet_cmd = NULL; - size_t bytes; - flb_sds_t tmp; + int ret = -1; + int result; + flb_sds_t tmp = NULL; flb_sds_t amount_page = NULL; flb_sds_t amount_row_group = NULL; - FILE *cmdp = NULL; - size_t parquet_size = 0; - struct stat stbuf; - int fdout = -1; - flb_sds_t parquet_buf; - - parquet_cmd = flb_sds_create_size(384); - if (parquet_cmd == NULL) { - goto error; - } amount_page = flb_sds_create_size(16); if (amount_page == NULL) { @@ -1137,57 +1120,21 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - strncpy(infile, template_in, 32); - if (!mkstemp(infile)) { - ret = -2; - goto error; - } - - strncpy(outfile, template_out, 32); - if (!mkstemp(outfile)) { - ret = -2; - goto error; - } - - write_ptr = fopen(infile, "wb"); - if (write_ptr == NULL) { - ret = -3; - goto error; - } - - read_ptr = fopen(outfile, "rb"); - if (read_ptr == NULL) { - ret = -3; - goto error; - } - - fdout = open(outfile, O_RDONLY); - if (fdout == -1) { - ret = -3; - goto error; - } - - bytes = fwrite(body, body_size, 1, write_ptr); - if (bytes == -1) { - ret = -4; - goto error; - } - - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, DEFAULT_PARQUET_COMMAND, strlen(DEFAULT_PARQUET_COMMAND)); if (result < 0) { ret = -1; goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, " -parquetCompressionCodec ", 26); if (result < 0) { ret = -1; goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, ctx->parquet_compression, flb_sds_len(ctx->parquet_compression)); if (result < 0) { @@ -1196,7 +1143,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, " -parquetPageSize ", 18); if (result < 0) { ret = -1; @@ -1210,14 +1157,14 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, amount_page, strlen(amount_page)); if (result < 0) { ret = -1; goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, " -parquetRowGroupSize ", 22); if (result < 0) { ret = -1; @@ -1231,21 +1178,21 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, amount_row_group, strlen(amount_row_group)); if (result < 0) { ret = -1; goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, " -recordType ", 13); if (result < 0) { ret = -1; goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, ctx->parquet_record_type, flb_sds_len(ctx->parquet_record_type)); if (result < 0) { @@ -1253,14 +1200,14 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, " -schemaType ", 13); if (result < 0) { ret = -1; goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, ctx->parquet_schema_type, flb_sds_len(ctx->parquet_schema_type)); if (result < 0) { @@ -1268,14 +1215,14 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, " -schemaFile ", 13); if (result < 0) { ret = -1; goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, ctx->parquet_schema_file, flb_sds_len(ctx->parquet_schema_file)); if (result < 0) { @@ -1283,32 +1230,115 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, " -output ", 9); if (result < 0) { ret = -1; goto error; } - result = flb_sds_cat_safe(&parquet_cmd, + result = flb_sds_cat_safe(cmd_buf, outfile, strlen(outfile)); if (result < 0) { ret = -1; goto error; } - result = flb_sds_cat_safe(&parquet_cmd, " ", 1); + result = flb_sds_cat_safe(cmd_buf, " ", 1); if (result < 0) { ret = -1; goto error; } - result = flb_sds_cat_safe(&parquet_cmd, infile, strlen(infile)); + result = flb_sds_cat_safe(cmd_buf, infile, strlen(infile)); if (result < 0) { ret = -1; goto error; } + flb_sds_destroy(amount_page); + flb_sds_destroy(amount_row_group); + + return 0; + +error: + if (amount_page != NULL) { + flb_sds_destroy(amount_page); + } + if (amount_row_group != NULL) { + flb_sds_destroy(amount_row_group); + } + + return ret; +} + + +static int s3_compress_parquet(struct flb_s3 *ctx, + char *body, size_t body_size, + void **payload_buf, size_t *payload_size) +{ + int ret = 0; + char *template_in = "out_s3-body-XXXXXX"; + char *template_out = "out_s3-parquet-XXXXXX"; + char infile[32]; + char outfile[32]; + FILE *write_ptr = NULL; + FILE *read_ptr = NULL; + flb_sds_t parquet_cmd = NULL; + size_t bytes; + FILE *cmdp = NULL; + size_t parquet_size = 0; + struct stat stbuf; + int fdout = -1; + flb_sds_t parquet_buf; + + parquet_cmd = flb_sds_create_size(384); + if (parquet_cmd == NULL) { + goto error; + } + + strncpy(infile, template_in, 32); + if (!mkstemp(infile)) { + ret = -2; + goto error; + } + + strncpy(outfile, template_out, 32); + if (!mkstemp(outfile)) { + ret = -2; + goto error; + } + + write_ptr = fopen(infile, "wb"); + if (write_ptr == NULL) { + ret = -3; + goto error; + } + + read_ptr = fopen(outfile, "rb"); + if (read_ptr == NULL) { + ret = -3; + goto error; + } + + fdout = open(outfile, O_RDONLY); + if (fdout == -1) { + ret = -3; + goto error; + } + + bytes = fwrite(body, body_size, 1, write_ptr); + if (bytes == -1) { + ret = -4; + goto error; + } + + ret = build_columnify_command(ctx, infile, outfile, &parquet_cmd); + if (ret != 0) { + ret = -1; + goto error; + } + cmdp = flb_popen(parquet_cmd, "r"); if (cmdp == NULL) { flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_EXISTENCE); @@ -1331,8 +1361,6 @@ static int s3_compress_parquet(struct flb_s3 *ctx, *payload_size = parquet_size; flb_sds_destroy(parquet_cmd); - flb_sds_destroy(amount_page); - flb_sds_destroy(amount_row_group); return 0; @@ -1349,12 +1377,6 @@ static int s3_compress_parquet(struct flb_s3 *ctx, if (parquet_cmd != NULL) { flb_sds_destroy(parquet_cmd); } - if (amount_page != NULL) { - flb_sds_destroy(amount_page); - } - if (amount_row_group != NULL) { - flb_sds_destroy(amount_row_group); - } return ret; } From f69f6461f625c8f308ee6ace2c47a5e1d6615b84 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 14:31:23 +0900 Subject: [PATCH 05/31] out_s3: Sweep temporary files if not neccesary Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 45754365385..c4f4fa3d2a8 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1344,7 +1344,6 @@ static int s3_compress_parquet(struct flb_s3 *ctx, flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_EXISTENCE); return -1; } - fclose(write_ptr); flb_pclose(cmdp); @@ -1354,7 +1353,18 @@ static int s3_compress_parquet(struct flb_s3 *ctx, } parquet_size = stbuf.st_size; parquet_buf = flb_sds_create_size(parquet_size); + fread(parquet_buf, parquet_size, 1, read_ptr); + + /* Tweardown for temporary files */ + if (unlink(infile) != 0) { + ret = -6; + flb_plg_warn(ctx->ins, "unlink %s is failed", infile); + } + if (unlink(outfile) != 0) { + ret = -6; + flb_plg_warn(ctx->ins, "unlink %s is failed", outfile); + } fclose(read_ptr); *payload_buf = parquet_buf; From 601748b68864e28b538d944ef50145ef5eee79d5 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 14:53:46 +0900 Subject: [PATCH 06/31] out_s3: Use flb_sds_create for simplicity Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index c4f4fa3d2a8..ab0f96d262a 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -704,8 +704,7 @@ static int cb_s3_init(struct flb_output_instance *ins, tmp = flb_output_get_property("parquet.compression", ins); if (!tmp) { ctx->parquet_compression = \ - flb_sds_create_len(DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES, - strlen(DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES)); + flb_sds_create(DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES); flb_plg_debug(ctx->ins, "parquet.compression format is %s", DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES); } From bf301fe792fd881c245350291b415d7be49c206d Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 15:59:42 +0900 Subject: [PATCH 07/31] out_s3: Plug undefined reference of mkstemp on Windows Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 4 ++-- plugins/out_s3/s3_win32_compat.h | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index ab0f96d262a..31f2ca3ac04 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1297,13 +1297,13 @@ static int s3_compress_parquet(struct flb_s3 *ctx, } strncpy(infile, template_in, 32); - if (!mkstemp(infile)) { + if (!flb_mkstemp(infile)) { ret = -2; goto error; } strncpy(outfile, template_out, 32); - if (!mkstemp(outfile)) { + if (!flb_mkstemp(outfile)) { ret = -2; goto error; } diff --git a/plugins/out_s3/s3_win32_compat.h b/plugins/out_s3/s3_win32_compat.h index 4fa7ca5ee53..20e3c258d18 100644 --- a/plugins/out_s3/s3_win32_compat.h +++ b/plugins/out_s3/s3_win32_compat.h @@ -36,6 +36,10 @@ static inline FILE* flb_popen(const char *command, const char *type) { static inline int flb_pclose(FILE *stream) { return pclose(stream); } +static inline int flb_mkstemp(char *template) { + return mkstemp(template); +} + #define FLB_PCLOSE pclose #else static inline FILE* flb_popen(const char *command, const char *type) { @@ -59,6 +63,11 @@ static inline FILE* flb_popen(const char *command, const char *type) { static inline int flb_pclose(FILE *stream) { return _pclose(stream); } + +static inline int flb_mkstemp(char *template) { + return _mktemp_s(template, strlen(template)); +} + #endif #endif /* FLB_S3_WIN32_COMPAT_H */ From 557ba6c52f5ac7b66606d0c7aca1a59f59f1c976 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 16:58:08 +0900 Subject: [PATCH 08/31] out_s3: Fix wrongly inverted conditions Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 31f2ca3ac04..0cb630d2a38 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1365,6 +1365,9 @@ static int s3_compress_parquet(struct flb_s3 *ctx, flb_plg_warn(ctx->ins, "unlink %s is failed", outfile); } fclose(read_ptr); + if (fdout != -1) { + close(fdout); + } *payload_buf = parquet_buf; *payload_size = parquet_size; @@ -1380,7 +1383,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, if (read_ptr != NULL) { fclose(read_ptr); } - if (fdout == -1) { + if (fdout != -1) { close(fdout); } if (parquet_cmd != NULL) { From dfff61a2e2c4c6ff88b6a00c30afff9feeb86f9d Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 18:19:06 +0900 Subject: [PATCH 09/31] out_s3: Align return value Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3_win32_compat.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/plugins/out_s3/s3_win32_compat.h b/plugins/out_s3/s3_win32_compat.h index 20e3c258d18..6d287a946b7 100644 --- a/plugins/out_s3/s3_win32_compat.h +++ b/plugins/out_s3/s3_win32_compat.h @@ -65,7 +65,11 @@ static inline int flb_pclose(FILE *stream) { } static inline int flb_mkstemp(char *template) { - return _mktemp_s(template, strlen(template)); + if (_mktemp_s(template, strlen(template) + 1) != 0) { + return -1; + } + + return 0; } #endif From f1f369bacb7448c3115dde9319dc0061cccbc242 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 18:24:23 +0900 Subject: [PATCH 10/31] out_s3: Use fileno instead of using open to obtain fd Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 8 +------- plugins/out_s3/s3_win32_compat.h | 2 ++ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 0cb630d2a38..84e18f69c30 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1320,7 +1320,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - fdout = open(outfile, O_RDONLY); + fdout = fileno(read_ptr); if (fdout == -1) { ret = -3; goto error; @@ -1365,9 +1365,6 @@ static int s3_compress_parquet(struct flb_s3 *ctx, flb_plg_warn(ctx->ins, "unlink %s is failed", outfile); } fclose(read_ptr); - if (fdout != -1) { - close(fdout); - } *payload_buf = parquet_buf; *payload_size = parquet_size; @@ -1383,9 +1380,6 @@ static int s3_compress_parquet(struct flb_s3 *ctx, if (read_ptr != NULL) { fclose(read_ptr); } - if (fdout != -1) { - close(fdout); - } if (parquet_cmd != NULL) { flb_sds_destroy(parquet_cmd); } diff --git a/plugins/out_s3/s3_win32_compat.h b/plugins/out_s3/s3_win32_compat.h index 6d287a946b7..f6bca522da1 100644 --- a/plugins/out_s3/s3_win32_compat.h +++ b/plugins/out_s3/s3_win32_compat.h @@ -72,6 +72,8 @@ static inline int flb_mkstemp(char *template) { return 0; } +#define fileno _fileno + #endif #endif /* FLB_S3_WIN32_COMPAT_H */ From 1950716dbf091959c4a2f7a16db838207b8d9e93 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 18:48:27 +0900 Subject: [PATCH 11/31] out_s3: Use flb_sds_create for fixed strings Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 84e18f69c30..9656f6fdcbf 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -736,8 +736,7 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_plg_info(ctx->ins, "parquet.record_type format is %s", DEFAULT_PARQUET_RECORD_TYPE); ctx->parquet_record_type = \ - flb_sds_create_len(DEFAULT_PARQUET_RECORD_TYPE, - strlen(DEFAULT_PARQUET_RECORD_TYPE)); + flb_sds_create(DEFAULT_PARQUET_RECORD_TYPE); } else { if (strncasecmp(tmp, "json", 4) == 0) { @@ -760,8 +759,7 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_plg_info(ctx->ins, "parquet.schema_type format is %s", DEFAULT_PARQUET_SCHEMA_TYPE); ctx->parquet_schema_type = \ - flb_sds_create_len(DEFAULT_PARQUET_SCHEMA_TYPE, - strlen(DEFAULT_PARQUET_SCHEMA_TYPE)); + flb_sds_create(DEFAULT_PARQUET_SCHEMA_TYPE); } else { if (strncasecmp(tmp, "avro", 4) == 0 || From 3ece0ecfd85dd84c76c9325ca7f125cf78012706 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 19:30:37 +0900 Subject: [PATCH 12/31] out_s3: Migrate Windows API based command executions Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 128 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 9656f6fdcbf..efddca96136 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1269,7 +1269,134 @@ static int build_columnify_command(struct flb_s3 *ctx, return ret; } +#if defined(FLB_SYSTEM_WINDOWS) +static int s3_compress_parquet(struct flb_s3 *ctx, + char *body, size_t body_size, + void **payload_buf, size_t *payload_size) +{ + int ret = 0; + char *template_in = "out_s3-body-XXXXXX"; + char *template_out = "out_s3-parquet-XXXXXX"; + char infile[32]; + char outfile[32]; + HANDLE wh = NULL; + HANDLE rh = NULL; + BOOL result = FALSE; + flb_sds_t parquet_cmd = NULL; + DWORD bytes; + FILE *cmdp = NULL; + size_t parquet_size = 0; + struct stat stbuf; + int fdout = -1; + flb_sds_t parquet_buf; + LPVOID lpBuffer; + + parquet_cmd = flb_sds_create_size(256); + if (parquet_cmd == NULL) { + goto error; + } + + strncpy(infile, template_in, 32); + if (_mktemp_s(infile, sizeof(infile)) != 0) { + flb_errno(); + ret = -2; + goto error; + } + + strncpy(outfile, template_out, 32); + if (_mktemp_s(outfile, sizeof(outfile)) != 0) { + flb_errno(); + ret = -2; + goto error; + } + + wh = CreateFileA(infile, + GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + NULL, + CREATE_ALWAYS, + 0, + NULL); + if (wh == INVALID_HANDLE_VALUE) { + ret = -3; + goto error; + } + + rh = CreateFileA(outfile, + GENERIC_READ, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + NULL, + CREATE_ALWAYS, + 0, + NULL); + if (rh == INVALID_HANDLE_VALUE) { + ret = -4; + goto error; + } + + fdout = _open_osfhandle((intptr_t) rh, _O_RDONLY); + if (fdout == -1) { + ret = -3; + goto error; + } + result = WriteFile(wh, body, body_size, &bytes, NULL); + if (!result) { + ret = -5; + goto error; + } + CloseHandle(wh); + + ret = build_columnify_command(ctx, infile, outfile, &parquet_cmd); + if (ret != 0) { + ret = -1; + goto error; + } + + cmdp = _popen(parquet_cmd, "r"); + if (cmdp == NULL) { + flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_EXISTENCE); + return -1; + } + _pclose(cmdp); + + if (fstat(fdout, &stbuf) == -1) { + ret = -6; + goto error; + } + parquet_size = stbuf.st_size; + parquet_buf = flb_sds_create_size(parquet_size); + + result = ReadFile(rh, parquet_buf, parquet_size, &bytes, NULL); + if (!result) { + ret = -5; + goto error; + } + + CloseHandle(rh); + + *payload_buf = parquet_buf; + *payload_size = parquet_size; + + flb_sds_destroy(parquet_cmd); + + return 0; + +error: + if (wh != NULL) { + CloseHandle(wh); + } + if (rh != NULL) { + CloseHandle(rh); + } + if (parquet_cmd != NULL) { + flb_sds_destroy(parquet_cmd); + } + + return ret; +} + +#else static int s3_compress_parquet(struct flb_s3 *ctx, char *body, size_t body_size, void **payload_buf, size_t *payload_size) @@ -1384,6 +1511,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, return ret; } +#endif /* * return value is one of FLB_OK, FLB_RETRY, FLB_ERROR From f6d518071fdbd95576134e0028381eb097b6e1e5 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 19:46:34 +0900 Subject: [PATCH 13/31] out_s3: Tweak non Windows side of external command executions part Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 17 ++++++++++------- plugins/out_s3/s3_win32_compat.h | 14 +------------- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index efddca96136..06bcdefb629 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1416,19 +1416,21 @@ static int s3_compress_parquet(struct flb_s3 *ctx, int fdout = -1; flb_sds_t parquet_buf; - parquet_cmd = flb_sds_create_size(384); + parquet_cmd = flb_sds_create_size(256); if (parquet_cmd == NULL) { goto error; } strncpy(infile, template_in, 32); - if (!flb_mkstemp(infile)) { + if (mkstemp(infile) == -1) { + flb_errno(); ret = -2; goto error; } strncpy(outfile, template_out, 32); - if (!flb_mkstemp(outfile)) { + if (mkstemp(outfile) == -1) { + flb_errno(); ret = -2; goto error; } @@ -1441,7 +1443,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, read_ptr = fopen(outfile, "rb"); if (read_ptr == NULL) { - ret = -3; + ret = -4; goto error; } @@ -1453,9 +1455,10 @@ static int s3_compress_parquet(struct flb_s3 *ctx, bytes = fwrite(body, body_size, 1, write_ptr); if (bytes == -1) { - ret = -4; + ret = -5; goto error; } + fclose(write_ptr); ret = build_columnify_command(ctx, infile, outfile, &parquet_cmd); if (ret != 0) { @@ -1468,11 +1471,10 @@ static int s3_compress_parquet(struct flb_s3 *ctx, flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_EXISTENCE); return -1; } - fclose(write_ptr); flb_pclose(cmdp); if (fstat(fdout, &stbuf) == -1) { - ret = -4; + ret = -6; goto error; } parquet_size = stbuf.st_size; @@ -1489,6 +1491,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, ret = -6; flb_plg_warn(ctx->ins, "unlink %s is failed", outfile); } + fclose(read_ptr); *payload_buf = parquet_buf; diff --git a/plugins/out_s3/s3_win32_compat.h b/plugins/out_s3/s3_win32_compat.h index f6bca522da1..f41c1110a6d 100644 --- a/plugins/out_s3/s3_win32_compat.h +++ b/plugins/out_s3/s3_win32_compat.h @@ -36,12 +36,10 @@ static inline FILE* flb_popen(const char *command, const char *type) { static inline int flb_pclose(FILE *stream) { return pclose(stream); } -static inline int flb_mkstemp(char *template) { - return mkstemp(template); -} #define FLB_PCLOSE pclose #else + static inline FILE* flb_popen(const char *command, const char *type) { return _popen(command, type); } @@ -64,16 +62,6 @@ static inline int flb_pclose(FILE *stream) { return _pclose(stream); } -static inline int flb_mkstemp(char *template) { - if (_mktemp_s(template, strlen(template) + 1) != 0) { - return -1; - } - - return 0; -} - -#define fileno _fileno - #endif #endif /* FLB_S3_WIN32_COMPAT_H */ From bd94333a7d95df09a61f700d00e688455f24f436 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 19:56:20 +0900 Subject: [PATCH 14/31] out_s3: Cleanup temporary files on Windows Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 06bcdefb629..283c2b30849 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1375,6 +1375,15 @@ static int s3_compress_parquet(struct flb_s3 *ctx, CloseHandle(rh); + if (!DeleteFileA(infile)) { + ret = -6; + flb_plg_error(ctx->ins, "DeleteFileA for %s failed", infile); + } + if (!DeleteFileA(outfile)) { + ret = -6; + flb_plg_error(ctx->ins, "DeleteFileA for %s failed", outfile); + } + *payload_buf = parquet_buf; *payload_size = parquet_size; From f97eabbe875819c1ac8ada586338753869abd5d4 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 21 May 2024 21:01:03 +0900 Subject: [PATCH 15/31] out_s3: Validate fread return value Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 283c2b30849..2702f6f8ec1 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1489,7 +1489,11 @@ static int s3_compress_parquet(struct flb_s3 *ctx, parquet_size = stbuf.st_size; parquet_buf = flb_sds_create_size(parquet_size); - fread(parquet_buf, parquet_size, 1, read_ptr); + bytes = fread(parquet_buf, parquet_size, 1, read_ptr); + if (bytes == -1) { + ret = -5; + goto error; + } /* Tweardown for temporary files */ if (unlink(infile) != 0) { From faf48c1e4b403aa9f7a84aaf99f57dc4c9421de1 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 22 May 2024 16:34:41 +0900 Subject: [PATCH 16/31] out_s3: Create temporary files under tmpdir Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 89 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 76 insertions(+), 13 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 2702f6f8ec1..e2c418487b7 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1406,15 +1406,80 @@ static int s3_compress_parquet(struct flb_s3 *ctx, } #else +static const char *get_tmpdir() +{ + const char* tmp; +#ifdef __ANDROID__ + tmp = "/data/local/tmp"; +#else + tmp = "/tmp"; +#endif + return tmp; +} + +static int create_tmpfile(char *file_path, char *template, size_t template_len) +{ + int ret; + int result; + flb_sds_t path_buf; + const char *tmpdir; + size_t tmpdir_len; + + path_buf = flb_sds_create_size(PATH_MAX); + if (path_buf == NULL) { + goto error; + } + + tmpdir = get_tmpdir(); + tmpdir_len = strlen(tmpdir); + + result = flb_sds_cat_safe(&path_buf, tmpdir, tmpdir_len); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&path_buf, "/", 1); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&path_buf, template, template_len); + if (result < 0) { + ret = -1; + goto error; + } + + strncpy(file_path, path_buf, flb_sds_len(path_buf)); + if (mkstemp(file_path) == -1) { + flb_errno(); + ret = -2; + goto error; + } + + flb_sds_destroy(path_buf); + + return 0; + +error: + if (path_buf != NULL) { + flb_sds_destroy(path_buf); + } + + return ret; +} + static int s3_compress_parquet(struct flb_s3 *ctx, char *body, size_t body_size, void **payload_buf, size_t *payload_size) { int ret = 0; - char *template_in = "out_s3-body-XXXXXX"; - char *template_out = "out_s3-parquet-XXXXXX"; - char infile[32]; - char outfile[32]; + int result; + char *template_in_suffix = "out_s3-body-XXXXXX"; + char *template_out_suffix = "out_s3-parquet-XXXXXX"; + char infile[PATH_MAX] = {0}; + char outfile[PATH_MAX] = {0}; FILE *write_ptr = NULL; FILE *read_ptr = NULL; flb_sds_t parquet_cmd = NULL; @@ -1430,17 +1495,15 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - strncpy(infile, template_in, 32); - if (mkstemp(infile) == -1) { - flb_errno(); - ret = -2; + result = create_tmpfile(infile, template_in_suffix, strlen(template_in_suffix)); + if (result < 0) { + ret = -1; goto error; } - strncpy(outfile, template_out, 32); - if (mkstemp(outfile) == -1) { - flb_errno(); - ret = -2; + result = create_tmpfile(outfile, template_out_suffix, strlen(template_out_suffix)); + if (result < 0) { + ret = -1; goto error; } @@ -1495,7 +1558,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - /* Tweardown for temporary files */ + /* Teardown for temporary files */ if (unlink(infile) != 0) { ret = -6; flb_plg_warn(ctx->ins, "unlink %s is failed", infile); From 3c4c1484ad7aeed6524fb0e768c22cb48d049584 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 22 May 2024 17:32:43 +0900 Subject: [PATCH 17/31] out_s3: Create temporary files under tempdir on Windows Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 54 ++++++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index e2c418487b7..d6100b38285 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1275,10 +1275,8 @@ static int s3_compress_parquet(struct flb_s3 *ctx, void **payload_buf, size_t *payload_size) { int ret = 0; - char *template_in = "out_s3-body-XXXXXX"; - char *template_out = "out_s3-parquet-XXXXXX"; - char infile[32]; - char outfile[32]; + char *template_in_prefix = "body"; + char *template_out_prefix = "parquet"; HANDLE wh = NULL; HANDLE rh = NULL; BOOL result = FALSE; @@ -1289,28 +1287,44 @@ static int s3_compress_parquet(struct flb_s3 *ctx, struct stat stbuf; int fdout = -1; flb_sds_t parquet_buf; - LPVOID lpBuffer; + TCHAR tmp_path[MAX_PATH]; + TCHAR in_temp_file[MAX_PATH]; + TCHAR out_temp_file[MAX_PATH]; parquet_cmd = flb_sds_create_size(256); if (parquet_cmd == NULL) { goto error; } - strncpy(infile, template_in, 32); - if (_mktemp_s(infile, sizeof(infile)) != 0) { - flb_errno(); - ret = -2; + bytes = GetTempPathA(MAX_PATH, + tmp_path); + if (bytes > MAX_PATH || bytes == 0) { + flb_plg_error(ctx->ins, "GetTempPath failed"); + ret = GetLastError(); goto error; } - strncpy(outfile, template_out, 32); - if (_mktemp_s(outfile, sizeof(outfile)) != 0) { - flb_errno(); - ret = -2; + bytes = GetTempFileNameA(tmp_path, + TEXT(template_in_prefix), + 0, /* create unique name only */ + in_temp_file); + if (bytes == 0) { + flb_plg_error(ctx->ins, "GetFileName failed"); + ret = GetLastError(); goto error; } - wh = CreateFileA(infile, + bytes = GetTempFileNameA(tmp_path, + TEXT(template_out_prefix), + 0, /* create unique name only */ + out_temp_file); + if (bytes == 0) { + flb_plg_error(ctx->ins, "GetFileName failed"); + ret = GetLastError(); + goto error; + } + + wh = CreateFileA((LPTSTR)in_temp_file, GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL, @@ -1322,7 +1336,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - rh = CreateFileA(outfile, + rh = CreateFileA((LPTSTR)out_temp_file, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL, @@ -1347,7 +1361,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, } CloseHandle(wh); - ret = build_columnify_command(ctx, infile, outfile, &parquet_cmd); + ret = build_columnify_command(ctx, in_temp_file, out_temp_file, &parquet_cmd); if (ret != 0) { ret = -1; goto error; @@ -1375,13 +1389,13 @@ static int s3_compress_parquet(struct flb_s3 *ctx, CloseHandle(rh); - if (!DeleteFileA(infile)) { + if (!DeleteFileA((LPTSTR)in_temp_file)) { ret = -6; - flb_plg_error(ctx->ins, "DeleteFileA for %s failed", infile); + flb_plg_error(ctx->ins, "DeleteFileA for %s failed", (LPTSTR)in_temp_file); } - if (!DeleteFileA(outfile)) { + if (!DeleteFileA((LPTSTR)out_temp_file)) { ret = -6; - flb_plg_error(ctx->ins, "DeleteFileA for %s failed", outfile); + flb_plg_error(ctx->ins, "DeleteFileA for %s failed", (LPTSTR)out_temp_file); } *payload_buf = parquet_buf; From 95b900b46ca5ac1bcc05909f30472e59dad2854c Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 22 May 2024 19:50:38 +0900 Subject: [PATCH 18/31] out_s3: Eliminate stdout output for existence checking of columnify on Windows Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 4550be60973..b57f5b5de10 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -52,7 +52,11 @@ #define DEFAULT_PARQUET_RECORD_TYPE "jsonl" #define DEFAULT_PARQUET_SCHEMA_TYPE "avro" #define DEFAULT_PARQUET_COMMAND "columnify" +#if defined(FLB_SYSTEM_WINDOWS) +#define DEFAULT_PARQUET_COMMAND_EXISTENCE "where columnify" +#else #define DEFAULT_PARQUET_COMMAND_EXISTENCE "columnify -h" +#endif /* * If we see repeated errors on an upload/chunk, we will discard it From 484b39aa50cdb55f9333bf3c9ce8ff504d198507 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 23 May 2024 14:35:30 +0900 Subject: [PATCH 19/31] out_s3: Suppress commandline outputs for existence check Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index b57f5b5de10..e62066d4428 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -55,7 +55,7 @@ #if defined(FLB_SYSTEM_WINDOWS) #define DEFAULT_PARQUET_COMMAND_EXISTENCE "where columnify" #else -#define DEFAULT_PARQUET_COMMAND_EXISTENCE "columnify -h" +#define DEFAULT_PARQUET_COMMAND_EXISTENCE "columnify -h > /dev/null 2>&1" #endif /* From 363e29d850430b58d9f26da856265e3ba4fcd54e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 23 May 2024 14:40:24 +0900 Subject: [PATCH 20/31] compat: in_exec: out_s3: Move common functions for compatibility to compat Signed-off-by: Hiroshi Hatake --- include/fluent-bit/flb_compat.h | 45 ++++++++++++++++- plugins/in_exec/in_exec_win32_compat.h | 42 ---------------- plugins/out_s3/s3.c | 1 - plugins/out_s3/s3_win32_compat.h | 67 -------------------------- 4 files changed, 44 insertions(+), 111 deletions(-) delete mode 100644 plugins/out_s3/s3_win32_compat.h diff --git a/include/fluent-bit/flb_compat.h b/include/fluent-bit/flb_compat.h index 2e7f28273a2..15fbb01a935 100644 --- a/include/fluent-bit/flb_compat.h +++ b/include/fluent-bit/flb_compat.h @@ -130,6 +130,29 @@ static inline int usleep(LONGLONG usec) // Convert into 100ns unit. return nanosleep(usec * 10); } + +static inline FILE* flb_popen(const char *command, const char *type) { + return _popen(command, type); +} +/* + * flb_pclose() has the same return value on Windows as win32 _pclose(), rather + * than posix pclose(). The process exit code is not bit-shifted to the high + * byte. + * + * The MSVC docs for _pclose() at + * https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/pclose?view=msvc-170 + * are misleading; they say that "The format of the return value is the same as + * for _cwait, except the low-order and high-order bytes are swapped." But + * _cwait isn't documented as having any meaningful return on success, the + * process exit code is meant to be in its "termstat" out parameter per + * https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/cwait?view=msvc-170 + * The return code of _pclose() actually appears to be the process exit code + * without the bit-shift that waitpid() applies. + */ +static inline int flb_pclose(FILE *stream) { + return _pclose(stream); +} + #else #include #include @@ -140,7 +163,27 @@ static inline int usleep(LONGLONG usec) #include #define FLB_DIRCHAR '/' -#endif + +/* + * Because Windows has to do everything differently, call _popen() and + * _pclose() instead of the POSIX popen() and pclose() functions. + * + * flb_pclose() has different return value semantics on Windows vs non-windows + * targets because it propagates the pclose() or _pclose() return value + * directly. You MUST use the FLB_WIFEXITED(), FLB_WEXITSTATUS(), + * FLB_WIFSIGNALED() and FLB_WTERMSIG() macros to consume the return value, + * rather than the underlying POSIX macros or manual bit-shifts. + */ +static inline FILE* flb_popen(const char *command, const char *type) { + return popen(command, type); +} +static inline int flb_pclose(FILE *stream) { + return pclose(stream); +} + +#define FLB_PCLOSE pclose + +#endif /* FLB_SYSTEM_WINDOWS */ #ifdef FLB_HAVE_UNIX_SOCKET #include diff --git a/plugins/in_exec/in_exec_win32_compat.h b/plugins/in_exec/in_exec_win32_compat.h index 9e1dc3ab05b..4c225a40f0e 100644 --- a/plugins/in_exec/in_exec_win32_compat.h +++ b/plugins/in_exec/in_exec_win32_compat.h @@ -49,46 +49,4 @@ #define FLB_WTERMSIG(status) (-1) #endif -/* - * Because Windows has to do everything differently, call _popen() and - * _pclose() instead of the POSIX popen() and pclose() functions. - * - * flb_pclose() has different return value semantics on Windows vs non-windows - * targets because it propagates the pclose() or _pclose() return value - * directly. You MUST use the FLB_WIFEXITED(), FLB_WEXITSTATUS(), - * FLB_WIFSIGNALED() and FLB_WTERMSIG() macros to consume the return value, - * rather than the underlying POSIX macros or manual bit-shifts. - */ -#if !defined(FLB_SYSTEM_WINDOWS) -static inline FILE* flb_popen(const char *command, const char *type) { - return popen(command, type); -} -static inline int flb_pclose(FILE *stream) { - return pclose(stream); -} -#define FLB_PCLOSE pclose -#else -static inline FILE* flb_popen(const char *command, const char *type) { - return _popen(command, type); -} -/* - * flb_pclose() has the same return value on Windows as win32 _pclose(), rather - * than posix pclose(). The process exit code is not bit-shifted to the high - * byte. - * - * The MSVC docs for _pclose() at - * https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/pclose?view=msvc-170 - * are misleading; they say that "The format of the return value is the same as - * for _cwait, except the low-order and high-order bytes are swapped." But - * _cwait isn't documented as having any meaningful return on success, the - * process exit code is meant to be in its "termstat" out parameter per - * https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/cwait?view=msvc-170 - * The return code of _pclose() actually appears to be the process exit code - * without the bit-shift that waitpid() applies. - */ -static inline int flb_pclose(FILE *stream) { - return _pclose(stream); -} -#endif - #endif /* FLB_IN_EXEC_WIN32_COMPAT_H */ diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index d6100b38285..18ea9ea1d6a 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -41,7 +41,6 @@ #include "s3.h" #include "s3_store.h" -#include "s3_win32_compat.h" #define DEFAULT_S3_PORT 443 #define DEFAULT_S3_INSECURE_PORT 80 diff --git a/plugins/out_s3/s3_win32_compat.h b/plugins/out_s3/s3_win32_compat.h deleted file mode 100644 index f41c1110a6d..00000000000 --- a/plugins/out_s3/s3_win32_compat.h +++ /dev/null @@ -1,67 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2024 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef FLB_S3_WIN32_COMPAT_H -#define FLB_S3_WIN32_COMPAT_H - -/* - * Because Windows has to do everything differently, call _popen() and - * _pclose() instead of the POSIX popen() and pclose() functions. - * - * flb_pclose() has different return value semantics on Windows vs non-windows - * targets because it propagates the pclose() or _pclose() return value - * directly. You MUST use the FLB_WIFEXITED(), FLB_WEXITSTATUS(), - * FLB_WIFSIGNALED() and FLB_WTERMSIG() macros to consume the return value, - * rather than the underlying POSIX macros or manual bit-shifts. - */ -#if !defined(FLB_SYSTEM_WINDOWS) -static inline FILE* flb_popen(const char *command, const char *type) { - return popen(command, type); -} -static inline int flb_pclose(FILE *stream) { - return pclose(stream); -} - -#define FLB_PCLOSE pclose -#else - -static inline FILE* flb_popen(const char *command, const char *type) { - return _popen(command, type); -} -/* - * flb_pclose() has the same return value on Windows as win32 _pclose(), rather - * than posix pclose(). The process exit code is not bit-shifted to the high - * byte. - * - * The MSVC docs for _pclose() at - * https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/pclose?view=msvc-170 - * are misleading; they say that "The format of the return value is the same as - * for _cwait, except the low-order and high-order bytes are swapped." But - * _cwait isn't documented as having any meaningful return on success, the - * process exit code is meant to be in its "termstat" out parameter per - * https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/cwait?view=msvc-170 - * The return code of _pclose() actually appears to be the process exit code - * without the bit-shift that waitpid() applies. - */ -static inline int flb_pclose(FILE *stream) { - return _pclose(stream); -} - -#endif - -#endif /* FLB_S3_WIN32_COMPAT_H */ From b9b13189e6fab4992f920392b45b115906ad601a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 23 May 2024 15:29:37 +0900 Subject: [PATCH 21/31] out_s3: FIx a wrong indent Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 18ea9ea1d6a..e8e673c16de 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1392,7 +1392,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, ret = -6; flb_plg_error(ctx->ins, "DeleteFileA for %s failed", (LPTSTR)in_temp_file); } - if (!DeleteFileA((LPTSTR)out_temp_file)) { + if (!DeleteFileA((LPTSTR)out_temp_file)) { ret = -6; flb_plg_error(ctx->ins, "DeleteFileA for %s failed", (LPTSTR)out_temp_file); } From 2288b4d77fd8996e068e42fa3bf70be150d6e71a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 24 May 2024 12:39:56 +0900 Subject: [PATCH 22/31] out_s3: Tweak for issues when writing a doc Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index e8e673c16de..201c0ed38a2 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -79,6 +79,13 @@ static struct flb_aws_header content_type_header = { .val_len = 0, }; +static struct flb_aws_header octetstream_type_header = { + .key = "Content-Type", + .key_len = 12, + .val = "application/octet-stream", + .val_len = 24, +}; + static struct flb_aws_header canned_acl_header = { .key = "x-amz-acl", .key_len = 9, @@ -167,7 +174,11 @@ int create_headers(struct flb_s3 *ctx, char *body_md5, return -1; } - if (ctx->content_type != NULL) { + if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { + s3_headers[n] = octetstream_type_header; + n++; + } + else if (ctx->content_type != NULL) { s3_headers[n] = content_type_header; s3_headers[n].val = ctx->content_type; s3_headers[n].val_len = strlen(ctx->content_type); @@ -536,7 +547,7 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_sds_t tmp_sds; char *role_arn = NULL; char *session_name; - const char *tmp; + const char *tmp = NULL; struct flb_s3 *ctx = NULL; struct flb_aws_client_generator *generator; (void) config; @@ -546,7 +557,7 @@ static int cb_s3_init(struct flb_output_instance *ins, struct mk_list *split; int list_size; FILE *cmdp = NULL; - char buf[32]; + char buf[32] = {0}; ctx = flb_calloc(1, sizeof(struct flb_s3)); if (!ctx) { @@ -701,13 +712,7 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_pclose(cmdp); tmp = flb_output_get_property("parquet.compression", ins); - if (!tmp) { - ctx->parquet_compression = \ - flb_sds_create(DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES); - flb_plg_debug(ctx->ins, "parquet.compression format is %s", - DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES); - } - else { + if (tmp != NULL) { if (strncasecmp(tmp, "uncompressed", 12) == 0 || strncasecmp(tmp, "snappy", 6) == 0 || strncasecmp(tmp, "gzip", 4) == 0 || @@ -723,12 +728,18 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_plg_error(ctx->ins, "unknown parquet.compression format %s", tmp); return -1; } - for (i = 0; i < strlen(tmp) || i < sizeof(buf); i++) { + for (i = 0; i < strlen(tmp); i++) { buf[i] = toupper(tmp[i]); } ctx->parquet_compression = flb_sds_create_len(buf, strlen(buf)); } + else { + ctx->parquet_compression = \ + flb_sds_create(DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES); + flb_plg_debug(ctx->ins, "parquet.compression format is %s", + DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES); + } tmp = flb_output_get_property("parquet.record_type", ins); if (!tmp) { @@ -784,9 +795,6 @@ static int cb_s3_init(struct flb_output_instance *ins, if (tmp) { ctx->content_type = (char *) tmp; } - if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { - ctx->content_type = (char *) "application/octet-stream"; - } if (ctx->use_put_object == FLB_FALSE) { /* upload_chunk_size */ if (ctx->upload_chunk_size <= 0) { @@ -3044,7 +3052,7 @@ static struct flb_config_map config_map[] = { "'parquet' is only an available if columify command is installed on a system. " "Defaults to no compression. " "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." - "If 'parquet' is selected, the Content-Encoding HTTP Header will be set to 'octet-stream'." + "If 'parquet' is selected, the Content-Type HTTP Header will be set to 'application/octet-stream'." }, { FLB_CONFIG_MAP_STR, "parquet.compression", "snappy", @@ -3070,7 +3078,7 @@ static struct flb_config_map config_map[] = { FLB_CONFIG_MAP_STR, "parquet.record_type", "json", 0, FLB_FALSE, 0, "Record type for parquet objects. 'json' and 'msgpack' are the supported values. " - "Defaults to msgpack. " + "Defaults to json. " }, { FLB_CONFIG_MAP_STR, "parquet.schema_type", "avro", From 7be69519fef06980cec0338453ced82577c76c71 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 3 Jun 2024 16:55:31 +0900 Subject: [PATCH 23/31] out_s3: Add check for sds allocations Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 201c0ed38a2..da9c3bd5671 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -733,10 +733,20 @@ static int cb_s3_init(struct flb_output_instance *ins, } ctx->parquet_compression = flb_sds_create_len(buf, strlen(buf)); + if (ctx->parquet_compression == NULL) { + flb_plg_error(ctx->ins, "Failed to create parquet compression type"); + flb_errno(); + return -1; + } } else { ctx->parquet_compression = \ flb_sds_create(DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES); + if (ctx->parquet_compression == NULL) { + flb_plg_error(ctx->ins, "Failed to create parquet compression type"); + flb_errno(); + return -1; + } flb_plg_debug(ctx->ins, "parquet.compression format is %s", DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES); } @@ -747,6 +757,11 @@ static int cb_s3_init(struct flb_output_instance *ins, DEFAULT_PARQUET_RECORD_TYPE); ctx->parquet_record_type = \ flb_sds_create(DEFAULT_PARQUET_RECORD_TYPE); + if (ctx->parquet_record_type == NULL) { + flb_plg_error(ctx->ins, "Failed to create parquet record type"); + flb_errno(); + return -1; + } } else { if (strncasecmp(tmp, "json", 4) == 0) { @@ -762,6 +777,11 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } ctx->parquet_record_type = flb_sds_create_len(tmp, strlen(tmp)); + if (ctx->parquet_record_type == NULL) { + flb_plg_error(ctx->ins, "Failed to create parquet record type"); + flb_errno(); + return -1; + } } tmp = flb_output_get_property("parquet.schema_type", ins); @@ -770,6 +790,11 @@ static int cb_s3_init(struct flb_output_instance *ins, DEFAULT_PARQUET_SCHEMA_TYPE); ctx->parquet_schema_type = \ flb_sds_create(DEFAULT_PARQUET_SCHEMA_TYPE); + if (ctx->parquet_schema_type == NULL) { + flb_plg_error(ctx->ins, "Failed to create parquet schema type"); + flb_errno(); + return -1; + } } else { if (strncasecmp(tmp, "avro", 4) == 0 || @@ -781,6 +806,11 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } ctx->parquet_schema_type = flb_sds_create_len(tmp, strlen(tmp)); + if (ctx->parquet_schema_type == NULL) { + flb_plg_error(ctx->ins, "Failed to create parquet schema type"); + flb_errno(); + return -1; + } } tmp = flb_output_get_property("parquet.schema_file", ins); @@ -789,6 +819,11 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } ctx->parquet_schema_file = flb_sds_create_len(tmp, strlen(tmp)); + if (ctx->parquet_schema_file == NULL) { + flb_plg_error(ctx->ins, "Failed to create parquet schema file"); + flb_errno(); + return -1; + } } tmp = flb_output_get_property("content_type", ins); From 2c4ee80a19559471a4d898bf3b21f8b8387c3d80 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 3 Jun 2024 16:58:30 +0900 Subject: [PATCH 24/31] out_s3: Use _CHECK suffix to clarify meanings Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 8 ++++---- plugins/out_s3/s3.h | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index da9c3bd5671..2a9a8094889 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -704,9 +704,9 @@ static int cb_s3_init(struct flb_output_instance *ins, ctx->parquet_schema_file = NULL; if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { - cmdp = flb_popen(DEFAULT_PARQUET_COMMAND_EXISTENCE, "r"); + cmdp = flb_popen(DEFAULT_PARQUET_COMMAND_CHECK, "r"); if (cmdp == NULL) { - flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_EXISTENCE); + flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_CHECK); return -1; } flb_pclose(cmdp); @@ -1411,7 +1411,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, cmdp = _popen(parquet_cmd, "r"); if (cmdp == NULL) { - flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_EXISTENCE); + flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_CHECK); return -1; } _pclose(cmdp); @@ -1596,7 +1596,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, cmdp = flb_popen(parquet_cmd, "r"); if (cmdp == NULL) { - flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_EXISTENCE); + flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_CHECK); return -1; } flb_pclose(cmdp); diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index e62066d4428..41c7e062ca7 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -53,9 +53,9 @@ #define DEFAULT_PARQUET_SCHEMA_TYPE "avro" #define DEFAULT_PARQUET_COMMAND "columnify" #if defined(FLB_SYSTEM_WINDOWS) -#define DEFAULT_PARQUET_COMMAND_EXISTENCE "where columnify" +#define DEFAULT_PARQUET_COMMAND_CHECK "where columnify" #else -#define DEFAULT_PARQUET_COMMAND_EXISTENCE "columnify -h > /dev/null 2>&1" +#define DEFAULT_PARQUET_COMMAND_CHECK "columnify -h > /dev/null 2>&1" #endif /* From 4bf6b7da2a0f6ac319f41d990b1d4eeb5512ae7e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 3 Jun 2024 19:13:16 +0900 Subject: [PATCH 25/31] out_s3: Make temporary directory for parquet processing to be configurable Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 33 ++++++++++++++------------------- plugins/out_s3/s3.h | 7 +++++++ 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 2a9a8094889..4f7f7420f07 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1462,34 +1462,23 @@ static int s3_compress_parquet(struct flb_s3 *ctx, } #else -static const char *get_tmpdir() -{ - const char* tmp; -#ifdef __ANDROID__ - tmp = "/data/local/tmp"; -#else - tmp = "/tmp"; -#endif - return tmp; -} - -static int create_tmpfile(char *file_path, char *template, size_t template_len) +static int create_tmpfile(struct flb_s3 *ctx, char *file_path, char *template, size_t template_len) { int ret; int result; flb_sds_t path_buf; - const char *tmpdir; - size_t tmpdir_len; + const char *process_dir; + size_t process_dir_len; path_buf = flb_sds_create_size(PATH_MAX); if (path_buf == NULL) { goto error; } - tmpdir = get_tmpdir(); - tmpdir_len = strlen(tmpdir); + process_dir = ctx->parquet_process_dir; + process_dir_len = flb_sds_len(ctx->parquet_process_dir); - result = flb_sds_cat_safe(&path_buf, tmpdir, tmpdir_len); + result = flb_sds_cat_safe(&path_buf, process_dir, process_dir_len); if (result < 0) { ret = -1; goto error; @@ -1551,13 +1540,13 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - result = create_tmpfile(infile, template_in_suffix, strlen(template_in_suffix)); + result = create_tmpfile(ctx, infile, template_in_suffix, strlen(template_in_suffix)); if (result < 0) { ret = -1; goto error; } - result = create_tmpfile(outfile, template_out_suffix, strlen(template_out_suffix)); + result = create_tmpfile(ctx, outfile, template_out_suffix, strlen(template_out_suffix)); if (result < 0) { ret = -1; goto error; @@ -3126,6 +3115,12 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "Schema file for parquet objects. " }, + { + FLB_CONFIG_MAP_STR, "parquet.process_dir", DEFAULT_PARQUET_PROCESS_DIR, + 0, FLB_TRUE, offsetof(struct flb_s3, parquet_process_dir), + "Specify a temporary directory for processing parquet objects. " + "This paramater is effective for non Windows platforms. " + }, { FLB_CONFIG_MAP_STR, "content_type", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 41c7e062ca7..4dfe13f00e1 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -58,6 +58,12 @@ #define DEFAULT_PARQUET_COMMAND_CHECK "columnify -h > /dev/null 2>&1" #endif +#ifdef __ANDROID__ +#define DEFAULT_PARQUET_PROCESS_DIR "/data/local/tmp" +#else +#define DEFAULT_PARQUET_PROCESS_DIR "/tmp" +#endif + /* * If we see repeated errors on an upload/chunk, we will discard it * This saves us from scenarios where something goes wrong and an upload can @@ -165,6 +171,7 @@ struct flb_s3 { flb_sds_t parquet_record_type; flb_sds_t parquet_schema_type; flb_sds_t parquet_schema_file; + flb_sds_t parquet_process_dir; /* * used to track that unset buffers were found on startup that have not From 9c7a476a341dc16424b08f49d0729f6f31f43c14 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 4 Jun 2024 14:21:31 +0900 Subject: [PATCH 26/31] out_s3: Use separated temporary directory against ordinary S3 store_dir Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 4dfe13f00e1..8c4e3536cf2 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -59,9 +59,9 @@ #endif #ifdef __ANDROID__ -#define DEFAULT_PARQUET_PROCESS_DIR "/data/local/tmp" +#define DEFAULT_PARQUET_PROCESS_DIR "/data/local/tmp/parquet/s3" #else -#define DEFAULT_PARQUET_PROCESS_DIR "/tmp" +#define DEFAULT_PARQUET_PROCESS_DIR "/tmp/parquet/s3" #endif /* From ff4be15aee98dda80edd9bfa2dba7e72234e7bc6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 4 Jun 2024 15:13:45 +0900 Subject: [PATCH 27/31] out_s3: Handle creating nested directories for processing parquet objects Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 98 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 4f7f7420f07..ca2cbac3be8 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1311,6 +1311,94 @@ static int build_columnify_command(struct flb_s3 *ctx, return ret; } +static int s3_is_dir(const char *dir) +{ + int ret; + struct stat st; + + if (!dir) { + errno = EINVAL; + return -1; + } + + if (strlen(dir) == 0) { + errno = EINVAL; + return -1; + } + + ret = stat(dir, &st); + if (ret == -1) { + return -1; + } + + if (st.st_mode & S_IFDIR) { + return 0; + } + + errno = EINVAL; + + return -1; +} + +static int s3_mkdir(struct flb_s3 *ctx, const char *dir, mode_t mode) +{ + struct stat st; + char *dup_dir = NULL; +#ifdef FLB_SYSTEM_MACOS + char *parent_dir = NULL; +#endif + + int ret; + + if (!stat(dir, &st)) { + return 0; + } + +#if FLB_SYSTEM_MACOS + dup_dir = strdup(dir); + if (!dup_dir) { + return -1; + } + + /* macOS's dirname(3) should return current directory when slash + * charachter is not included in passed string. + * And note that macOS's dirname(3) does not modify passed string. + */ + parent_dir = dirname(dup_dir); + if (stat(parent_dir, &st) == 0 && strncmp(parent_dir, ".", 1)) { + if (S_ISDIR (st.st_mode)) { + flb_plg_debug(ctx->ins, "creating directory %s", dup_dir); + ret = mkdir(dup_dir, mode); + free(dup_dir); + return ret; + } + } + + ret = s3_mkdir(ctx, dirname(dup_dir), mode); + if (ret != 0) { + free(dup_dir); + return ret; + } + flb_plg_debug(ctx->ins, "creating directory %s", dup_dir); + ret = mkdir(dup_dir, mode); + free(dup_dir); + return ret; +#else + dup_dir = strdup(dir); + if (!dup_dir) { + return -1; + } + ret = s3_mkdir(ctx, dirname(dup_dir), mode); + free(dup_dir); + if (ret != 0) { + return ret; + } + flb_plg_debug(ctx->ins, "creating directory %s", dir); + return mkdir(dir, mode); +#endif +} + + #if defined(FLB_SYSTEM_WINDOWS) static int s3_compress_parquet(struct flb_s3 *ctx, char *body, size_t body_size, @@ -1350,7 +1438,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, TEXT(template_in_prefix), 0, /* create unique name only */ in_temp_file); - if (bytes == 0) { + if (bytes > MAX_PATH || bytes == 0) { flb_plg_error(ctx->ins, "GetFileName failed"); ret = GetLastError(); goto error; @@ -1475,6 +1563,14 @@ static int create_tmpfile(struct flb_s3 *ctx, char *file_path, char *template, s goto error; } + ret = s3_is_dir(ctx->parquet_process_dir); + if (ret == -1) { + flb_plg_debug(ctx->ins, "creating process dir %s.", ctx->parquet_process_dir); + if (s3_mkdir(ctx, ctx->parquet_process_dir, 0755) == -1) { + flb_plg_error(ctx->ins, "ensuring existence of process dir %s is failed.", ctx->parquet_process_dir); + } + } + process_dir = ctx->parquet_process_dir; process_dir_len = flb_sds_len(ctx->parquet_process_dir); From a7ae867fe5f45e4f2572e56bb91a5b215376614a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 4 Jun 2024 16:51:32 +0900 Subject: [PATCH 28/31] out_s3: windows: Handle nested directories for processing parquet objects Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 76 ++++++++++++++++++++++++++++++++++++++++----- plugins/out_s3/s3.h | 5 +++ 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index ca2cbac3be8..bb66143ab93 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1400,6 +1400,64 @@ static int s3_mkdir(struct flb_s3 *ctx, const char *dir, mode_t mode) #if defined(FLB_SYSTEM_WINDOWS) +static flb_sds_t create_parquest_processing_dir(struct flb_s3 *ctx) +{ + int ret = 0; + DWORD bytes; + BOOL result = FALSE; + flb_sds_t path_buf = NULL; + TCHAR work_dir[MAX_PATH]; + TCHAR tmp_path[MAX_PATH]; + + path_buf = flb_sds_create_size(PATH_MAX); + if (path_buf == NULL) { + goto error; + } + + bytes = GetTempPathA(MAX_PATH, + tmp_path); + if (bytes > MAX_PATH || bytes == 0) { + flb_plg_error(ctx->ins, "GetTempPath failed"); + ret = GetLastError(); + goto error; + } + + result = flb_sds_cat_safe(&path_buf, tmp_path, strlen(tmp_path)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&path_buf, ctx->parquet_process_dir, + flb_sds_len(ctx->parquet_process_dir)); + if (result < 0) { + ret = -1; + goto error; + } + + ret = s3_is_dir(path_buf); + if (ret == -1) { + if (_fullpath(work_dir, path_buf, MAX_PATH) == NULL) { + ret = -1; + goto error; + } + + if (SHCreateDirectoryExA(NULL, work_dir, NULL) != ERROR_SUCCESS) { + ret = -1; + goto error; + } + } + + return path_buf; + +error: + if (path_buf != NULL) { + flb_sds_destroy(path_buf); + } + + return NULL; +} + static int s3_compress_parquet(struct flb_s3 *ctx, char *body, size_t body_size, void **payload_buf, size_t *payload_size) @@ -1418,6 +1476,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, int fdout = -1; flb_sds_t parquet_buf; TCHAR tmp_path[MAX_PATH]; + flb_sds_t path_buf = NULL; TCHAR in_temp_file[MAX_PATH]; TCHAR out_temp_file[MAX_PATH]; @@ -1426,25 +1485,24 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - bytes = GetTempPathA(MAX_PATH, - tmp_path); - if (bytes > MAX_PATH || bytes == 0) { - flb_plg_error(ctx->ins, "GetTempPath failed"); + path_buf = create_parquest_processing_dir(ctx); + if (path_buf == NULL) { + flb_plg_error(ctx->ins, "create processing parquet directory failed"); ret = GetLastError(); goto error; } - bytes = GetTempFileNameA(tmp_path, + bytes = GetTempFileNameA(path_buf, TEXT(template_in_prefix), 0, /* create unique name only */ in_temp_file); - if (bytes > MAX_PATH || bytes == 0) { + if (bytes == 0) { flb_plg_error(ctx->ins, "GetFileName failed"); ret = GetLastError(); goto error; } - bytes = GetTempFileNameA(tmp_path, + bytes = GetTempFileNameA(path_buf, TEXT(template_out_prefix), 0, /* create unique name only */ out_temp_file); @@ -1532,6 +1590,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, *payload_size = parquet_size; flb_sds_destroy(parquet_cmd); + flb_sds_destroy(path_buf); return 0; @@ -1545,6 +1604,9 @@ static int s3_compress_parquet(struct flb_s3 *ctx, if (parquet_cmd != NULL) { flb_sds_destroy(parquet_cmd); } + if (path_buf != NULL) { + flb_sds_destroy(path_buf); + } return ret; } diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 8c4e3536cf2..465426b0c81 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -61,7 +61,12 @@ #ifdef __ANDROID__ #define DEFAULT_PARQUET_PROCESS_DIR "/data/local/tmp/parquet/s3" #else +#if defined(FLB_SYSTEM_WINDOWS) +/* The prefix of process dir will be obtained by GetTempPathA */ +#define DEFAULT_PARQUET_PROCESS_DIR "parquet\\s3" +#else #define DEFAULT_PARQUET_PROCESS_DIR "/tmp/parquet/s3" +#endif /* FLB_SYSTEM_WINDOWS */ #endif /* From 552b9295e8cde009a662c69b124dd2e6c308e329 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 4 Jun 2024 19:47:06 +0900 Subject: [PATCH 29/31] out_s3: Unify nested directory creations Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index bb66143ab93..b8cd1722b11 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1344,6 +1344,9 @@ static int s3_mkdir(struct flb_s3 *ctx, const char *dir, mode_t mode) { struct stat st; char *dup_dir = NULL; +#ifdef FLB_SYSTEM_WINDOWS + char path[PATH_MAX]; +#endif #ifdef FLB_SYSTEM_MACOS char *parent_dir = NULL; #endif @@ -1354,7 +1357,19 @@ static int s3_mkdir(struct flb_s3 *ctx, const char *dir, mode_t mode) return 0; } -#if FLB_SYSTEM_MACOS +#ifdef FLB_SYSTEM_WINDOWS + (void) mode; + + if (_fullpath(path, dir, MAX_PATH) == NULL) { + return -1; + } + + if (SHCreateDirectoryExA(NULL, path, NULL) != ERROR_SUCCESS) { + return -1; + } + + return 0; +#elif FLB_SYSTEM_MACOS dup_dir = strdup(dir); if (!dup_dir) { return -1; @@ -1437,13 +1452,10 @@ static flb_sds_t create_parquest_processing_dir(struct flb_s3 *ctx) ret = s3_is_dir(path_buf); if (ret == -1) { - if (_fullpath(work_dir, path_buf, MAX_PATH) == NULL) { - ret = -1; - goto error; - } - - if (SHCreateDirectoryExA(NULL, work_dir, NULL) != ERROR_SUCCESS) { - ret = -1; + flb_plg_debug(ctx->ins, "creating process dir %s.", ctx->parquet_process_dir); + if (s3_mkdir(ctx, ctx->parquet_process_dir, 0755) == -1) { + flb_plg_error(ctx->ins, "ensuring existence of process dir %s is failed.", + ctx->parquet_process_dir); goto error; } } @@ -1629,7 +1641,9 @@ static int create_tmpfile(struct flb_s3 *ctx, char *file_path, char *template, s if (ret == -1) { flb_plg_debug(ctx->ins, "creating process dir %s.", ctx->parquet_process_dir); if (s3_mkdir(ctx, ctx->parquet_process_dir, 0755) == -1) { - flb_plg_error(ctx->ins, "ensuring existence of process dir %s is failed.", ctx->parquet_process_dir); + flb_plg_error(ctx->ins, "ensuring existence of process dir %s is failed.", + ctx->parquet_process_dir); + goto error; } } From 7ad5161ea8b18218941caf32022023008c47fbd6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 4 Jun 2024 19:59:38 +0900 Subject: [PATCH 30/31] out_s3: windows: Link neccessary libraries for SHCreateDirectoryExA Signed-off-by: Hiroshi Hatake --- plugins/out_s3/CMakeLists.txt | 10 ++++++++++ plugins/out_s3/s3.c | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/plugins/out_s3/CMakeLists.txt b/plugins/out_s3/CMakeLists.txt index 94e04861707..6492658ab72 100644 --- a/plugins/out_s3/CMakeLists.txt +++ b/plugins/out_s3/CMakeLists.txt @@ -3,4 +3,14 @@ set(src s3_store.c s3_multipart.c) +set(libs "") + +if(${CMAKE_SYSTEM_NAME} MATCHES "Windows") + set(libs + ${libs} + Shell32.lib + Shlwapi.lib) +endif() + FLB_PLUGIN(out_s3 "${src}" "") +target_link_libraries(flb-plugin-out_s3 ${libs}) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index b8cd1722b11..1e321542660 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -39,6 +39,10 @@ #include +#ifdef FLB_SYSTEM_WINDOWS +#include +#endif + #include "s3.h" #include "s3_store.h" From ec656f12b694288f773fd7eeb31c33f1762a1bfa Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 19 Jun 2024 15:55:06 +0900 Subject: [PATCH 31/31] out_s3: Extract parquet related functions into s3_parquet.c Signed-off-by: Hiroshi Hatake --- plugins/out_s3/CMakeLists.txt | 3 +- plugins/out_s3/s3.c | 682 +------------------------------- plugins/out_s3/s3.h | 24 +- plugins/out_s3/s3_parquet.c | 705 ++++++++++++++++++++++++++++++++++ plugins/out_s3/s3_parquet.h | 53 +++ 5 files changed, 764 insertions(+), 703 deletions(-) create mode 100644 plugins/out_s3/s3_parquet.c create mode 100644 plugins/out_s3/s3_parquet.h diff --git a/plugins/out_s3/CMakeLists.txt b/plugins/out_s3/CMakeLists.txt index 6492658ab72..928d6a53d6f 100644 --- a/plugins/out_s3/CMakeLists.txt +++ b/plugins/out_s3/CMakeLists.txt @@ -1,7 +1,8 @@ set(src s3.c s3_store.c - s3_multipart.c) + s3_multipart.c + s3_parquet.c) set(libs "") diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 1e321542660..c49398ed3e1 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -33,18 +33,13 @@ #include #include #include -#include #include -#include #include -#ifdef FLB_SYSTEM_WINDOWS -#include -#endif - #include "s3.h" #include "s3_store.h" +#include "s3_parquet.h" #define DEFAULT_S3_PORT 443 #define DEFAULT_S3_INSECURE_PORT 80 @@ -1142,677 +1137,6 @@ static int cb_s3_init(struct flb_output_instance *ins, return 0; } -static int build_columnify_command(struct flb_s3 *ctx, - char *infile, - char *outfile, - flb_sds_t *cmd_buf) -{ - int ret = -1; - int result; - flb_sds_t tmp = NULL; - flb_sds_t amount_page = NULL; - flb_sds_t amount_row_group = NULL; - - amount_page = flb_sds_create_size(16); - if (amount_page == NULL) { - goto error; - } - - amount_row_group = flb_sds_create_size(16); - if (amount_row_group == NULL) { - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - DEFAULT_PARQUET_COMMAND, strlen(DEFAULT_PARQUET_COMMAND)); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - " -parquetCompressionCodec ", 26); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - ctx->parquet_compression, - flb_sds_len(ctx->parquet_compression)); - if (result < 0) { - ret = -1; - goto error; - } - - - result = flb_sds_cat_safe(cmd_buf, - " -parquetPageSize ", 18); - if (result < 0) { - ret = -1; - goto error; - } - - tmp = flb_sds_printf(&amount_page, "%zu", ctx->parquet_page_size); - if (!tmp) { - flb_errno(); - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - amount_page, strlen(amount_page)); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - " -parquetRowGroupSize ", 22); - if (result < 0) { - ret = -1; - goto error; - } - - tmp = flb_sds_printf(&amount_row_group, "%zu", ctx->parquet_row_group_size); - if (!tmp) { - flb_errno(); - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - amount_row_group, strlen(amount_row_group)); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - " -recordType ", 13); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - ctx->parquet_record_type, - flb_sds_len(ctx->parquet_record_type)); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - " -schemaType ", 13); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - ctx->parquet_schema_type, - flb_sds_len(ctx->parquet_schema_type)); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - " -schemaFile ", 13); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - ctx->parquet_schema_file, - flb_sds_len(ctx->parquet_schema_file)); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - " -output ", 9); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, - outfile, strlen(outfile)); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, " ", 1); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(cmd_buf, infile, strlen(infile)); - if (result < 0) { - ret = -1; - goto error; - } - - flb_sds_destroy(amount_page); - flb_sds_destroy(amount_row_group); - - return 0; - -error: - if (amount_page != NULL) { - flb_sds_destroy(amount_page); - } - if (amount_row_group != NULL) { - flb_sds_destroy(amount_row_group); - } - - return ret; -} - -static int s3_is_dir(const char *dir) -{ - int ret; - struct stat st; - - if (!dir) { - errno = EINVAL; - return -1; - } - - if (strlen(dir) == 0) { - errno = EINVAL; - return -1; - } - - ret = stat(dir, &st); - if (ret == -1) { - return -1; - } - - if (st.st_mode & S_IFDIR) { - return 0; - } - - errno = EINVAL; - - return -1; -} - -static int s3_mkdir(struct flb_s3 *ctx, const char *dir, mode_t mode) -{ - struct stat st; - char *dup_dir = NULL; -#ifdef FLB_SYSTEM_WINDOWS - char path[PATH_MAX]; -#endif -#ifdef FLB_SYSTEM_MACOS - char *parent_dir = NULL; -#endif - - int ret; - - if (!stat(dir, &st)) { - return 0; - } - -#ifdef FLB_SYSTEM_WINDOWS - (void) mode; - - if (_fullpath(path, dir, MAX_PATH) == NULL) { - return -1; - } - - if (SHCreateDirectoryExA(NULL, path, NULL) != ERROR_SUCCESS) { - return -1; - } - - return 0; -#elif FLB_SYSTEM_MACOS - dup_dir = strdup(dir); - if (!dup_dir) { - return -1; - } - - /* macOS's dirname(3) should return current directory when slash - * charachter is not included in passed string. - * And note that macOS's dirname(3) does not modify passed string. - */ - parent_dir = dirname(dup_dir); - if (stat(parent_dir, &st) == 0 && strncmp(parent_dir, ".", 1)) { - if (S_ISDIR (st.st_mode)) { - flb_plg_debug(ctx->ins, "creating directory %s", dup_dir); - ret = mkdir(dup_dir, mode); - free(dup_dir); - return ret; - } - } - - ret = s3_mkdir(ctx, dirname(dup_dir), mode); - if (ret != 0) { - free(dup_dir); - return ret; - } - flb_plg_debug(ctx->ins, "creating directory %s", dup_dir); - ret = mkdir(dup_dir, mode); - free(dup_dir); - return ret; -#else - dup_dir = strdup(dir); - if (!dup_dir) { - return -1; - } - ret = s3_mkdir(ctx, dirname(dup_dir), mode); - free(dup_dir); - if (ret != 0) { - return ret; - } - flb_plg_debug(ctx->ins, "creating directory %s", dir); - return mkdir(dir, mode); -#endif -} - - -#if defined(FLB_SYSTEM_WINDOWS) -static flb_sds_t create_parquest_processing_dir(struct flb_s3 *ctx) -{ - int ret = 0; - DWORD bytes; - BOOL result = FALSE; - flb_sds_t path_buf = NULL; - TCHAR work_dir[MAX_PATH]; - TCHAR tmp_path[MAX_PATH]; - - path_buf = flb_sds_create_size(PATH_MAX); - if (path_buf == NULL) { - goto error; - } - - bytes = GetTempPathA(MAX_PATH, - tmp_path); - if (bytes > MAX_PATH || bytes == 0) { - flb_plg_error(ctx->ins, "GetTempPath failed"); - ret = GetLastError(); - goto error; - } - - result = flb_sds_cat_safe(&path_buf, tmp_path, strlen(tmp_path)); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(&path_buf, ctx->parquet_process_dir, - flb_sds_len(ctx->parquet_process_dir)); - if (result < 0) { - ret = -1; - goto error; - } - - ret = s3_is_dir(path_buf); - if (ret == -1) { - flb_plg_debug(ctx->ins, "creating process dir %s.", ctx->parquet_process_dir); - if (s3_mkdir(ctx, ctx->parquet_process_dir, 0755) == -1) { - flb_plg_error(ctx->ins, "ensuring existence of process dir %s is failed.", - ctx->parquet_process_dir); - goto error; - } - } - - return path_buf; - -error: - if (path_buf != NULL) { - flb_sds_destroy(path_buf); - } - - return NULL; -} - -static int s3_compress_parquet(struct flb_s3 *ctx, - char *body, size_t body_size, - void **payload_buf, size_t *payload_size) -{ - int ret = 0; - char *template_in_prefix = "body"; - char *template_out_prefix = "parquet"; - HANDLE wh = NULL; - HANDLE rh = NULL; - BOOL result = FALSE; - flb_sds_t parquet_cmd = NULL; - DWORD bytes; - FILE *cmdp = NULL; - size_t parquet_size = 0; - struct stat stbuf; - int fdout = -1; - flb_sds_t parquet_buf; - TCHAR tmp_path[MAX_PATH]; - flb_sds_t path_buf = NULL; - TCHAR in_temp_file[MAX_PATH]; - TCHAR out_temp_file[MAX_PATH]; - - parquet_cmd = flb_sds_create_size(256); - if (parquet_cmd == NULL) { - goto error; - } - - path_buf = create_parquest_processing_dir(ctx); - if (path_buf == NULL) { - flb_plg_error(ctx->ins, "create processing parquet directory failed"); - ret = GetLastError(); - goto error; - } - - bytes = GetTempFileNameA(path_buf, - TEXT(template_in_prefix), - 0, /* create unique name only */ - in_temp_file); - if (bytes == 0) { - flb_plg_error(ctx->ins, "GetFileName failed"); - ret = GetLastError(); - goto error; - } - - bytes = GetTempFileNameA(path_buf, - TEXT(template_out_prefix), - 0, /* create unique name only */ - out_temp_file); - if (bytes == 0) { - flb_plg_error(ctx->ins, "GetFileName failed"); - ret = GetLastError(); - goto error; - } - - wh = CreateFileA((LPTSTR)in_temp_file, - GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - NULL, - CREATE_ALWAYS, - 0, - NULL); - if (wh == INVALID_HANDLE_VALUE) { - ret = -3; - goto error; - } - - rh = CreateFileA((LPTSTR)out_temp_file, - GENERIC_READ, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - NULL, - CREATE_ALWAYS, - 0, - NULL); - if (rh == INVALID_HANDLE_VALUE) { - ret = -4; - goto error; - } - - fdout = _open_osfhandle((intptr_t) rh, _O_RDONLY); - if (fdout == -1) { - ret = -3; - goto error; - } - - result = WriteFile(wh, body, body_size, &bytes, NULL); - if (!result) { - ret = -5; - goto error; - } - CloseHandle(wh); - - ret = build_columnify_command(ctx, in_temp_file, out_temp_file, &parquet_cmd); - if (ret != 0) { - ret = -1; - goto error; - } - - cmdp = _popen(parquet_cmd, "r"); - if (cmdp == NULL) { - flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_CHECK); - return -1; - } - _pclose(cmdp); - - if (fstat(fdout, &stbuf) == -1) { - ret = -6; - goto error; - } - parquet_size = stbuf.st_size; - parquet_buf = flb_sds_create_size(parquet_size); - - result = ReadFile(rh, parquet_buf, parquet_size, &bytes, NULL); - if (!result) { - ret = -5; - goto error; - } - - CloseHandle(rh); - - if (!DeleteFileA((LPTSTR)in_temp_file)) { - ret = -6; - flb_plg_error(ctx->ins, "DeleteFileA for %s failed", (LPTSTR)in_temp_file); - } - if (!DeleteFileA((LPTSTR)out_temp_file)) { - ret = -6; - flb_plg_error(ctx->ins, "DeleteFileA for %s failed", (LPTSTR)out_temp_file); - } - - *payload_buf = parquet_buf; - *payload_size = parquet_size; - - flb_sds_destroy(parquet_cmd); - flb_sds_destroy(path_buf); - - return 0; - -error: - if (wh != NULL) { - CloseHandle(wh); - } - if (rh != NULL) { - CloseHandle(rh); - } - if (parquet_cmd != NULL) { - flb_sds_destroy(parquet_cmd); - } - if (path_buf != NULL) { - flb_sds_destroy(path_buf); - } - - return ret; -} - -#else -static int create_tmpfile(struct flb_s3 *ctx, char *file_path, char *template, size_t template_len) -{ - int ret; - int result; - flb_sds_t path_buf; - const char *process_dir; - size_t process_dir_len; - - path_buf = flb_sds_create_size(PATH_MAX); - if (path_buf == NULL) { - goto error; - } - - ret = s3_is_dir(ctx->parquet_process_dir); - if (ret == -1) { - flb_plg_debug(ctx->ins, "creating process dir %s.", ctx->parquet_process_dir); - if (s3_mkdir(ctx, ctx->parquet_process_dir, 0755) == -1) { - flb_plg_error(ctx->ins, "ensuring existence of process dir %s is failed.", - ctx->parquet_process_dir); - goto error; - } - } - - process_dir = ctx->parquet_process_dir; - process_dir_len = flb_sds_len(ctx->parquet_process_dir); - - result = flb_sds_cat_safe(&path_buf, process_dir, process_dir_len); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(&path_buf, "/", 1); - if (result < 0) { - ret = -1; - goto error; - } - - result = flb_sds_cat_safe(&path_buf, template, template_len); - if (result < 0) { - ret = -1; - goto error; - } - - strncpy(file_path, path_buf, flb_sds_len(path_buf)); - if (mkstemp(file_path) == -1) { - flb_errno(); - ret = -2; - goto error; - } - - flb_sds_destroy(path_buf); - - return 0; - -error: - if (path_buf != NULL) { - flb_sds_destroy(path_buf); - } - - return ret; -} - -static int s3_compress_parquet(struct flb_s3 *ctx, - char *body, size_t body_size, - void **payload_buf, size_t *payload_size) -{ - int ret = 0; - int result; - char *template_in_suffix = "out_s3-body-XXXXXX"; - char *template_out_suffix = "out_s3-parquet-XXXXXX"; - char infile[PATH_MAX] = {0}; - char outfile[PATH_MAX] = {0}; - FILE *write_ptr = NULL; - FILE *read_ptr = NULL; - flb_sds_t parquet_cmd = NULL; - size_t bytes; - FILE *cmdp = NULL; - size_t parquet_size = 0; - struct stat stbuf; - int fdout = -1; - flb_sds_t parquet_buf; - - parquet_cmd = flb_sds_create_size(256); - if (parquet_cmd == NULL) { - goto error; - } - - result = create_tmpfile(ctx, infile, template_in_suffix, strlen(template_in_suffix)); - if (result < 0) { - ret = -1; - goto error; - } - - result = create_tmpfile(ctx, outfile, template_out_suffix, strlen(template_out_suffix)); - if (result < 0) { - ret = -1; - goto error; - } - - write_ptr = fopen(infile, "wb"); - if (write_ptr == NULL) { - ret = -3; - goto error; - } - - read_ptr = fopen(outfile, "rb"); - if (read_ptr == NULL) { - ret = -4; - goto error; - } - - fdout = fileno(read_ptr); - if (fdout == -1) { - ret = -3; - goto error; - } - - bytes = fwrite(body, body_size, 1, write_ptr); - if (bytes == -1) { - ret = -5; - goto error; - } - fclose(write_ptr); - - ret = build_columnify_command(ctx, infile, outfile, &parquet_cmd); - if (ret != 0) { - ret = -1; - goto error; - } - - cmdp = flb_popen(parquet_cmd, "r"); - if (cmdp == NULL) { - flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_CHECK); - return -1; - } - flb_pclose(cmdp); - - if (fstat(fdout, &stbuf) == -1) { - ret = -6; - goto error; - } - parquet_size = stbuf.st_size; - parquet_buf = flb_sds_create_size(parquet_size); - - bytes = fread(parquet_buf, parquet_size, 1, read_ptr); - if (bytes == -1) { - ret = -5; - goto error; - } - - /* Teardown for temporary files */ - if (unlink(infile) != 0) { - ret = -6; - flb_plg_warn(ctx->ins, "unlink %s is failed", infile); - } - if (unlink(outfile) != 0) { - ret = -6; - flb_plg_warn(ctx->ins, "unlink %s is failed", outfile); - } - - fclose(read_ptr); - - *payload_buf = parquet_buf; - *payload_size = parquet_size; - - flb_sds_destroy(parquet_cmd); - - return 0; - -error: - if (write_ptr != NULL) { - fclose(write_ptr); - } - if (read_ptr != NULL) { - fclose(read_ptr); - } - if (parquet_cmd != NULL) { - flb_sds_destroy(parquet_cmd); - } - - return ret; -} -#endif - /* * return value is one of FLB_OK, FLB_RETRY, FLB_ERROR * @@ -1845,7 +1169,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, } if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { - ret = s3_compress_parquet(ctx, body, body_size, &payload_buf, &payload_size); + ret = flb_s3_parquet_compress(ctx, body, body_size, &payload_buf, &payload_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data with %s", DEFAULT_PARQUET_COMMAND); return FLB_RETRY; @@ -2082,7 +1406,7 @@ static int put_all_chunks(struct flb_s3 *ctx) } if (ctx->compression == FLB_AWS_COMPRESS_PARQUET) { - ret = s3_compress_parquet(ctx, buffer, buffer_size, &payload_buf, &payload_size); + ret = flb_s3_parquet_compress(ctx, buffer, buffer_size, &payload_buf, &payload_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data with %s", DEFAULT_PARQUET_COMMAND); return FLB_RETRY; diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 465426b0c81..bfa785172e6 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -43,32 +43,10 @@ #define MAX_FILE_SIZE_STR "50,000,000,000" /* Allowed max file size 1 GB for publishing to S3 */ -#define MAX_FILE_SIZE_PUT_OBJECT 1000000000 +#define MAX_FILE_SIZE_PUT_OBJECT 1000000000 #define DEFAULT_UPLOAD_TIMEOUT 3600 -#define DEFAULT_PARQUET_COMPRESSION_FORMAT "snappy" -#define DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES "SNAPPY" -#define DEFAULT_PARQUET_RECORD_TYPE "jsonl" -#define DEFAULT_PARQUET_SCHEMA_TYPE "avro" -#define DEFAULT_PARQUET_COMMAND "columnify" -#if defined(FLB_SYSTEM_WINDOWS) -#define DEFAULT_PARQUET_COMMAND_CHECK "where columnify" -#else -#define DEFAULT_PARQUET_COMMAND_CHECK "columnify -h > /dev/null 2>&1" -#endif - -#ifdef __ANDROID__ -#define DEFAULT_PARQUET_PROCESS_DIR "/data/local/tmp/parquet/s3" -#else -#if defined(FLB_SYSTEM_WINDOWS) -/* The prefix of process dir will be obtained by GetTempPathA */ -#define DEFAULT_PARQUET_PROCESS_DIR "parquet\\s3" -#else -#define DEFAULT_PARQUET_PROCESS_DIR "/tmp/parquet/s3" -#endif /* FLB_SYSTEM_WINDOWS */ -#endif - /* * If we see repeated errors on an upload/chunk, we will discard it * This saves us from scenarios where something goes wrong and an upload can diff --git a/plugins/out_s3/s3_parquet.c b/plugins/out_s3/s3_parquet.c new file mode 100644 index 00000000000..6d4d983d125 --- /dev/null +++ b/plugins/out_s3/s3_parquet.c @@ -0,0 +1,705 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include + +#ifdef FLB_SYSTEM_WINDOWS +#include +#endif + +#include "s3.h" +#include "s3_store.h" +#include "s3_parquet.h" + +static int build_columnify_command(struct flb_s3 *ctx, + char *infile, + char *outfile, + flb_sds_t *cmd_buf) +{ + int ret = -1; + int result; + flb_sds_t tmp = NULL; + flb_sds_t amount_page = NULL; + flb_sds_t amount_row_group = NULL; + + amount_page = flb_sds_create_size(16); + if (amount_page == NULL) { + goto error; + } + + amount_row_group = flb_sds_create_size(16); + if (amount_row_group == NULL) { + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + DEFAULT_PARQUET_COMMAND, strlen(DEFAULT_PARQUET_COMMAND)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + " -parquetCompressionCodec ", 26); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + ctx->parquet_compression, + flb_sds_len(ctx->parquet_compression)); + if (result < 0) { + ret = -1; + goto error; + } + + + result = flb_sds_cat_safe(cmd_buf, + " -parquetPageSize ", 18); + if (result < 0) { + ret = -1; + goto error; + } + + tmp = flb_sds_printf(&amount_page, "%zu", ctx->parquet_page_size); + if (!tmp) { + flb_errno(); + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + amount_page, strlen(amount_page)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + " -parquetRowGroupSize ", 22); + if (result < 0) { + ret = -1; + goto error; + } + + tmp = flb_sds_printf(&amount_row_group, "%zu", ctx->parquet_row_group_size); + if (!tmp) { + flb_errno(); + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + amount_row_group, strlen(amount_row_group)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + " -recordType ", 13); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + ctx->parquet_record_type, + flb_sds_len(ctx->parquet_record_type)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + " -schemaType ", 13); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + ctx->parquet_schema_type, + flb_sds_len(ctx->parquet_schema_type)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + " -schemaFile ", 13); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + ctx->parquet_schema_file, + flb_sds_len(ctx->parquet_schema_file)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + " -output ", 9); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, + outfile, strlen(outfile)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, " ", 1); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(cmd_buf, infile, strlen(infile)); + if (result < 0) { + ret = -1; + goto error; + } + + flb_sds_destroy(amount_page); + flb_sds_destroy(amount_row_group); + + return 0; + +error: + if (amount_page != NULL) { + flb_sds_destroy(amount_page); + } + if (amount_row_group != NULL) { + flb_sds_destroy(amount_row_group); + } + + return ret; +} + +static int s3_is_dir(const char *dir) +{ + int ret; + struct stat st; + + if (!dir) { + errno = EINVAL; + return -1; + } + + if (strlen(dir) == 0) { + errno = EINVAL; + return -1; + } + + ret = stat(dir, &st); + if (ret == -1) { + return -1; + } + + if (st.st_mode & S_IFDIR) { + return 0; + } + + errno = EINVAL; + + return -1; +} + +static int s3_mkdir(struct flb_s3 *ctx, const char *dir, mode_t mode) +{ + struct stat st; + char *dup_dir = NULL; +#ifdef FLB_SYSTEM_WINDOWS + char path[PATH_MAX]; +#endif +#ifdef FLB_SYSTEM_MACOS + char *parent_dir = NULL; +#endif + + int ret; + + if (!stat(dir, &st)) { + return 0; + } + +#ifdef FLB_SYSTEM_WINDOWS + (void) mode; + + if (_fullpath(path, dir, MAX_PATH) == NULL) { + return -1; + } + + if (SHCreateDirectoryExA(NULL, path, NULL) != ERROR_SUCCESS) { + return -1; + } + + return 0; +#elif FLB_SYSTEM_MACOS + dup_dir = strdup(dir); + if (!dup_dir) { + return -1; + } + + /* macOS's dirname(3) should return current directory when slash + * charachter is not included in passed string. + * And note that macOS's dirname(3) does not modify passed string. + */ + parent_dir = dirname(dup_dir); + if (stat(parent_dir, &st) == 0 && strncmp(parent_dir, ".", 1)) { + if (S_ISDIR (st.st_mode)) { + flb_plg_debug(ctx->ins, "creating directory %s", dup_dir); + ret = mkdir(dup_dir, mode); + free(dup_dir); + return ret; + } + } + + ret = s3_mkdir(ctx, dirname(dup_dir), mode); + if (ret != 0) { + free(dup_dir); + return ret; + } + flb_plg_debug(ctx->ins, "creating directory %s", dup_dir); + ret = mkdir(dup_dir, mode); + free(dup_dir); + return ret; +#else + dup_dir = strdup(dir); + if (!dup_dir) { + return -1; + } + ret = s3_mkdir(ctx, dirname(dup_dir), mode); + free(dup_dir); + if (ret != 0) { + return ret; + } + flb_plg_debug(ctx->ins, "creating directory %s", dir); + return mkdir(dir, mode); +#endif +} + + +#if defined(FLB_SYSTEM_WINDOWS) +static flb_sds_t create_parquest_processing_dir(struct flb_s3 *ctx) +{ + int ret = 0; + DWORD bytes; + BOOL result = FALSE; + flb_sds_t path_buf = NULL; + TCHAR work_dir[MAX_PATH]; + TCHAR tmp_path[MAX_PATH]; + + path_buf = flb_sds_create_size(PATH_MAX); + if (path_buf == NULL) { + goto error; + } + + bytes = GetTempPathA(MAX_PATH, + tmp_path); + if (bytes > MAX_PATH || bytes == 0) { + flb_plg_error(ctx->ins, "GetTempPath failed"); + ret = GetLastError(); + goto error; + } + + result = flb_sds_cat_safe(&path_buf, tmp_path, strlen(tmp_path)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&path_buf, ctx->parquet_process_dir, + flb_sds_len(ctx->parquet_process_dir)); + if (result < 0) { + ret = -1; + goto error; + } + + ret = s3_is_dir(path_buf); + if (ret == -1) { + flb_plg_debug(ctx->ins, "creating process dir %s.", ctx->parquet_process_dir); + if (s3_mkdir(ctx, ctx->parquet_process_dir, 0755) == -1) { + flb_plg_error(ctx->ins, "ensuring existence of process dir %s is failed.", + ctx->parquet_process_dir); + goto error; + } + } + + return path_buf; + +error: + if (path_buf != NULL) { + flb_sds_destroy(path_buf); + } + + return NULL; +} + +int flb_s3_parquet_compress(struct flb_s3 *ctx, + char *body, size_t body_size, + void **payload_buf, size_t *payload_size) +{ + int ret = 0; + char *template_in_prefix = "body"; + char *template_out_prefix = "parquet"; + HANDLE wh = NULL; + HANDLE rh = NULL; + BOOL result = FALSE; + flb_sds_t parquet_cmd = NULL; + DWORD bytes; + FILE *cmdp = NULL; + size_t parquet_size = 0; + struct stat stbuf; + int fdout = -1; + flb_sds_t parquet_buf; + TCHAR tmp_path[MAX_PATH]; + flb_sds_t path_buf = NULL; + TCHAR in_temp_file[MAX_PATH]; + TCHAR out_temp_file[MAX_PATH]; + + parquet_cmd = flb_sds_create_size(256); + if (parquet_cmd == NULL) { + goto error; + } + + path_buf = create_parquest_processing_dir(ctx); + if (path_buf == NULL) { + flb_plg_error(ctx->ins, "create processing parquet directory failed"); + ret = GetLastError(); + goto error; + } + + bytes = GetTempFileNameA(path_buf, + TEXT(template_in_prefix), + 0, /* create unique name only */ + in_temp_file); + if (bytes == 0) { + flb_plg_error(ctx->ins, "GetFileName failed"); + ret = GetLastError(); + goto error; + } + + bytes = GetTempFileNameA(path_buf, + TEXT(template_out_prefix), + 0, /* create unique name only */ + out_temp_file); + if (bytes == 0) { + flb_plg_error(ctx->ins, "GetFileName failed"); + ret = GetLastError(); + goto error; + } + + wh = CreateFileA((LPTSTR)in_temp_file, + GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + NULL, + CREATE_ALWAYS, + 0, + NULL); + if (wh == INVALID_HANDLE_VALUE) { + ret = -3; + goto error; + } + + rh = CreateFileA((LPTSTR)out_temp_file, + GENERIC_READ, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + NULL, + CREATE_ALWAYS, + 0, + NULL); + if (rh == INVALID_HANDLE_VALUE) { + ret = -4; + goto error; + } + + fdout = _open_osfhandle((intptr_t) rh, _O_RDONLY); + if (fdout == -1) { + ret = -3; + goto error; + } + + result = WriteFile(wh, body, body_size, &bytes, NULL); + if (!result) { + ret = -5; + goto error; + } + CloseHandle(wh); + + ret = build_columnify_command(ctx, in_temp_file, out_temp_file, &parquet_cmd); + if (ret != 0) { + ret = -1; + goto error; + } + + cmdp = _popen(parquet_cmd, "r"); + if (cmdp == NULL) { + flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_CHECK); + return -1; + } + _pclose(cmdp); + + if (fstat(fdout, &stbuf) == -1) { + ret = -6; + goto error; + } + parquet_size = stbuf.st_size; + parquet_buf = flb_sds_create_size(parquet_size); + + result = ReadFile(rh, parquet_buf, parquet_size, &bytes, NULL); + if (!result) { + ret = -5; + goto error; + } + + CloseHandle(rh); + + if (!DeleteFileA((LPTSTR)in_temp_file)) { + ret = -6; + flb_plg_error(ctx->ins, "DeleteFileA for %s failed", (LPTSTR)in_temp_file); + } + if (!DeleteFileA((LPTSTR)out_temp_file)) { + ret = -6; + flb_plg_error(ctx->ins, "DeleteFileA for %s failed", (LPTSTR)out_temp_file); + } + + *payload_buf = parquet_buf; + *payload_size = parquet_size; + + flb_sds_destroy(parquet_cmd); + flb_sds_destroy(path_buf); + + return 0; + +error: + if (wh != NULL) { + CloseHandle(wh); + } + if (rh != NULL) { + CloseHandle(rh); + } + if (parquet_cmd != NULL) { + flb_sds_destroy(parquet_cmd); + } + if (path_buf != NULL) { + flb_sds_destroy(path_buf); + } + + return ret; +} + +#else +static int create_tmpfile(struct flb_s3 *ctx, char *file_path, char *template, size_t template_len) +{ + int ret; + int result; + flb_sds_t path_buf; + const char *process_dir; + size_t process_dir_len; + + path_buf = flb_sds_create_size(PATH_MAX); + if (path_buf == NULL) { + goto error; + } + + ret = s3_is_dir(ctx->parquet_process_dir); + if (ret == -1) { + flb_plg_debug(ctx->ins, "creating process dir %s.", ctx->parquet_process_dir); + if (s3_mkdir(ctx, ctx->parquet_process_dir, 0755) == -1) { + flb_plg_error(ctx->ins, "ensuring existence of process dir %s is failed.", + ctx->parquet_process_dir); + goto error; + } + } + + process_dir = ctx->parquet_process_dir; + process_dir_len = flb_sds_len(ctx->parquet_process_dir); + + result = flb_sds_cat_safe(&path_buf, process_dir, process_dir_len); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&path_buf, "/", 1); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&path_buf, template, template_len); + if (result < 0) { + ret = -1; + goto error; + } + + strncpy(file_path, path_buf, flb_sds_len(path_buf)); + if (mkstemp(file_path) == -1) { + flb_errno(); + ret = -2; + goto error; + } + + flb_sds_destroy(path_buf); + + return 0; + +error: + if (path_buf != NULL) { + flb_sds_destroy(path_buf); + } + + return ret; +} + +int flb_s3_parquet_compress(struct flb_s3 *ctx, + char *body, size_t body_size, + void **payload_buf, size_t *payload_size) +{ + int ret = 0; + int result; + char *template_in_suffix = "out_s3-body-XXXXXX"; + char *template_out_suffix = "out_s3-parquet-XXXXXX"; + char infile[PATH_MAX] = {0}; + char outfile[PATH_MAX] = {0}; + FILE *write_ptr = NULL; + FILE *read_ptr = NULL; + flb_sds_t parquet_cmd = NULL; + size_t bytes; + FILE *cmdp = NULL; + size_t parquet_size = 0; + struct stat stbuf; + int fdout = -1; + flb_sds_t parquet_buf; + + parquet_cmd = flb_sds_create_size(256); + if (parquet_cmd == NULL) { + goto error; + } + + result = create_tmpfile(ctx, infile, template_in_suffix, strlen(template_in_suffix)); + if (result < 0) { + ret = -1; + goto error; + } + + result = create_tmpfile(ctx, outfile, template_out_suffix, strlen(template_out_suffix)); + if (result < 0) { + ret = -1; + goto error; + } + + write_ptr = fopen(infile, "wb"); + if (write_ptr == NULL) { + ret = -3; + goto error; + } + + read_ptr = fopen(outfile, "rb"); + if (read_ptr == NULL) { + ret = -4; + goto error; + } + + fdout = fileno(read_ptr); + if (fdout == -1) { + ret = -3; + goto error; + } + + bytes = fwrite(body, body_size, 1, write_ptr); + if (bytes == -1) { + ret = -5; + goto error; + } + fclose(write_ptr); + + ret = build_columnify_command(ctx, infile, outfile, &parquet_cmd); + if (ret != 0) { + ret = -1; + goto error; + } + + cmdp = flb_popen(parquet_cmd, "r"); + if (cmdp == NULL) { + flb_plg_error(ctx->ins, "command %s failed", DEFAULT_PARQUET_COMMAND_CHECK); + return -1; + } + flb_pclose(cmdp); + + if (fstat(fdout, &stbuf) == -1) { + ret = -6; + goto error; + } + parquet_size = stbuf.st_size; + parquet_buf = flb_sds_create_size(parquet_size); + + bytes = fread(parquet_buf, parquet_size, 1, read_ptr); + if (bytes == -1) { + ret = -5; + goto error; + } + + /* Teardown for temporary files */ + if (unlink(infile) != 0) { + ret = -6; + flb_plg_warn(ctx->ins, "unlink %s is failed", infile); + } + if (unlink(outfile) != 0) { + ret = -6; + flb_plg_warn(ctx->ins, "unlink %s is failed", outfile); + } + + fclose(read_ptr); + + *payload_buf = parquet_buf; + *payload_size = parquet_size; + + flb_sds_destroy(parquet_cmd); + + return 0; + +error: + if (write_ptr != NULL) { + fclose(write_ptr); + } + if (read_ptr != NULL) { + fclose(read_ptr); + } + if (parquet_cmd != NULL) { + flb_sds_destroy(parquet_cmd); + } + + return ret; +} +#endif diff --git a/plugins/out_s3/s3_parquet.h b/plugins/out_s3/s3_parquet.h new file mode 100644 index 00000000000..af7a5f9513a --- /dev/null +++ b/plugins/out_s3/s3_parquet.h @@ -0,0 +1,53 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_S3_PARQUET +#define FLB_OUT_S3_PARQUET + +#include + +#define DEFAULT_PARQUET_COMPRESSION_FORMAT "snappy" +#define DEFAULT_PARQUET_COMPRESSION_FORMAT_UPCASES "SNAPPY" +#define DEFAULT_PARQUET_RECORD_TYPE "jsonl" +#define DEFAULT_PARQUET_SCHEMA_TYPE "avro" +#define DEFAULT_PARQUET_COMMAND "columnify" +#if defined(FLB_SYSTEM_WINDOWS) +#define DEFAULT_PARQUET_COMMAND_CHECK "where columnify" +#else +#define DEFAULT_PARQUET_COMMAND_CHECK "columnify -h > /dev/null 2>&1" +#endif + +#ifdef __ANDROID__ +#define DEFAULT_PARQUET_PROCESS_DIR "/data/local/tmp/parquet/s3" +#else +#if defined(FLB_SYSTEM_WINDOWS) +/* The prefix of process dir will be obtained by GetTempPathA */ +#define DEFAULT_PARQUET_PROCESS_DIR "parquet\\s3" +#else +#define DEFAULT_PARQUET_PROCESS_DIR "/tmp/parquet/s3" +#endif /* FLB_SYSTEM_WINDOWS */ +#endif + +struct flb_s3; + +int flb_s3_parquet_compress(struct flb_s3 *ctx, + char *body, size_t body_size, + void **payload_buf, size_t *payload_size); + +#endif