From 12515b17768cccfb39c9761c97ed0950186d9ceb Mon Sep 17 00:00:00 2001 From: Dennis Kovalenko Date: Wed, 2 Aug 2023 09:56:08 +0400 Subject: [PATCH] Use custom allocators for zstd (#566 -> #1138) gpdb uses zstd to compress workfiles, but it can't control memory allocation within zstd. Meanwhile, zstd allocates as much memory as it needs which results in ambiguous error message in a case of memory exhaustion and can make OOM killer stop gpdb processes with force. This patch forces zstd to use our custom allocator which can keep track of allocated memory. Since this zstd feature is experimental and its api can change, we also had to link zstd statically. (cherry picked from commit 7185bf85ea8f4abdfdeb08d5a53180ea2f389728) --- Changes were adapted to GPDB 7. Autoconf check was adjusted to behave the same way as the existing code. Both PRs from the original task were integrated into this commit. --- configure | 44 ++++++++++++++++++++++++++++++ configure.in | 3 +- gpcontrib/zstd/Makefile | 2 -- src/backend/storage/file/buffile.c | 26 ++++++++++++++++-- 4 files changed, 70 insertions(+), 5 deletions(-) diff --git a/configure b/configure index ccfdcbd78cf0..b6a215adb4e9 100755 --- a/configure +++ b/configure @@ -9718,6 +9718,50 @@ else ZSTD_LIBS=$pkg_cv_ZSTD_LIBS { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5 $as_echo "yes" >&6; } + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compressCCtx in -l:libzstd.a" >&5 +$as_echo_n "checking for ZSTD_compressCCtx in -l:libzstd.a... " >&6; } +if ${ac_cv_lib__libzstd_a_ZSTD_compressCCtx+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-l:libzstd.a $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char ZSTD_compressCCtx (); +int +main () +{ +return ZSTD_compressCCtx (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib__libzstd_a_ZSTD_compressCCtx=yes +else + ac_cv_lib__libzstd_a_ZSTD_compressCCtx=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib__libzstd_a_ZSTD_compressCCtx" >&5 +$as_echo "$ac_cv_lib__libzstd_a_ZSTD_compressCCtx" >&6; } +if test "x$ac_cv_lib__libzstd_a_ZSTD_compressCCtx" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIB_LIBZSTD_A 1 +_ACEOF + + LIBS="-l:libzstd.a $LIBS" + +fi fi fi diff --git a/configure.in b/configure.in index 8b65a463e85f..49903d16e931 100644 --- a/configure.in +++ b/configure.in @@ -1148,7 +1148,8 @@ AC_SUBST(with_zstd) if test "$with_zstd" = yes; then dnl zstd_errors.h was renamed from error_public.h in v1.1.1 - PKG_CHECK_MODULES([ZSTD], [libzstd >= 1.4.0]) + PKG_CHECK_MODULES([ZSTD], [libzstd >= 1.4.0], + [AC_CHECK_LIB([:libzstd.a], [ZSTD_compressCCtx], [])]) fi # diff --git a/gpcontrib/zstd/Makefile b/gpcontrib/zstd/Makefile index 0b7a9016f1b0..b3141c865e30 100644 --- a/gpcontrib/zstd/Makefile +++ b/gpcontrib/zstd/Makefile @@ -2,8 +2,6 @@ PG_CONFIG = pg_config MODULE_big = gp_zstd_compression OBJS = zstd_compression.o -CFLAGS_SL += -lzstd -LDFLAGS_SL += -lzstd REGRESS = zstd_column_compression compression_zstd zstd_abort_leak AOCO_zstd AORO_zstd diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 00f61fec3b84..c2471e334207 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -42,6 +42,7 @@ #include "postgres.h" #ifdef USE_ZSTD +#define ZSTD_STATIC_LINKING_ONLY #include #endif @@ -55,6 +56,7 @@ #include "utils/resowner.h" #include "storage/gp_compress.h" +#include "utils/gp_alloc.h" #include "utils/faultinjector.h" #include "utils/workfile_mgr.h" @@ -1277,6 +1279,26 @@ BufFilePledgeSequential(BufFile *buffile) #define BUFFILE_ZSTD_COMPRESSION_LEVEL 1 +static void * +zstdCustomAlloc(void *opaque, size_t size) +{ + (void) opaque; + return MemoryContextAlloc(TopMemoryContext, size); +} + +static void +zstdCustomFree(void *opaque, void *address) +{ + (void) opaque; + pfree(address); +} + +static ZSTD_customMem zstdCustomMem = +{ + .customAlloc = zstdCustomAlloc, + .customFree = zstdCustomFree +}; + /* * Temporary buffer used during compression. It's used only within the * functions, so we can allocate this once and reuse it for all files. @@ -1317,7 +1339,7 @@ BufFileStartCompression(BufFile *file) CurrentResourceOwner = file->resowner; file->zstd_context = zstd_alloc_context(); - file->zstd_context->cctx = ZSTD_createCStream(); + file->zstd_context->cctx = ZSTD_createCStream_advanced(zstdCustomMem); if (!file->zstd_context->cctx) elog(ERROR, "out of memory"); ret = ZSTD_initCStream(file->zstd_context->cctx, BUFFILE_ZSTD_COMPRESSION_LEVEL); @@ -1424,7 +1446,7 @@ BufFileEndCompression(BufFile *file) file->uncompressed_bytes, BufFileSize(file)); /* Done writing. Initialize for reading */ - file->zstd_context->dctx = ZSTD_createDStream(); + file->zstd_context->dctx = ZSTD_createDStream_advanced(zstdCustomMem); if (!file->zstd_context->dctx) elog(ERROR, "out of memory"); ret = ZSTD_initDStream(file->zstd_context->dctx);