From 9c14f0ff120cf805a948f99366a96eb3e9d95aeb Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Tue, 12 Sep 2023 14:59:00 -0600 Subject: [PATCH 1/7] Add streaming --- .gitignore | 1 + c_src/zstd_nif.c | 380 +++++++++++++++++++++++++++++++++++++++++++++-- src/zstd.erl | 51 +++++++ 3 files changed, 417 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index 227c3a2..e6374fb 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ _build rebar3.crashdump priv/libzstd/libzstd.a priv/zstd_nif.so +.vscode/ diff --git a/c_src/zstd_nif.c b/c_src/zstd_nif.c index 0264929..facd2cb 100644 --- a/c_src/zstd_nif.c +++ b/c_src/zstd_nif.c @@ -3,9 +3,26 @@ #include #include +static const char* MODULE_NAME = "zstd"; +static const char* COMPRESSION_STREAM_NAME = "CStream"; +static const char* DECOMPRESSION_STREAM_NAME = "DStream"; +static char* COMPRESS_CONTEXT_KEY = "zstd_compress_context_key"; +static char* DECOMPRESS_CONTEXT_KEY = "zstd_decompress_context_key"; + ErlNifTSDKey zstdDecompressContextKey; ErlNifTSDKey zstdCompressContextKey; +static ErlNifResourceType *zstd_compression_stream_type = NULL; +static ErlNifResourceType *zstd_decompression_stream_type = NULL; + +static ERL_NIF_TERM zstd_atom_ok; +static ERL_NIF_TERM zstd_atom_error; +static ERL_NIF_TERM zstd_atom_invalid; +static ERL_NIF_TERM zstd_atom_enomem; +static ERL_NIF_TERM zstd_atom_eof; +static ERL_NIF_TERM zstd_atom_compression; +static ERL_NIF_TERM zstd_atom_decompression; + static ERL_NIF_TERM zstd_nif_compress(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { ErlNifBinary bin, ret_bin; size_t buff_size, compressed_size; @@ -25,14 +42,14 @@ static ERL_NIF_TERM zstd_nif_compress(ErlNifEnv* env, int argc, const ERL_NIF_TE buff_size = ZSTD_compressBound(bin.size); if(!enif_alloc_binary(buff_size, &ret_bin)) - return enif_make_atom(env, "error"); + return zstd_atom_error; compressed_size = ZSTD_compressCCtx(ctx, ret_bin.data, buff_size, bin.data, bin.size, compression_level); if(ZSTD_isError(compressed_size)) - return enif_make_atom(env, "error"); + return zstd_atom_error; if(!enif_realloc_binary(&ret_bin, compressed_size)) - return enif_make_atom(env, "error"); + return zstd_atom_error; return enif_make_binary(env, &ret_bin); } @@ -52,26 +69,359 @@ static ERL_NIF_TERM zstd_nif_decompress(ErlNifEnv* env, int argc, const ERL_NIF_ if(!enif_inspect_binary(env, argv[0], &bin)) return enif_make_badarg(env); - uncompressed_size = ZSTD_getDecompressedSize(bin.data, bin.size); + uncompressed_size = ZSTD_getFrameContentSize(bin.data, bin.size); outp = enif_make_new_binary(env, uncompressed_size, &out); if(ZSTD_decompressDCtx(ctx, outp, uncompressed_size, bin.data, bin.size) != uncompressed_size) - return enif_make_atom(env, "error"); + return zstd_atom_error; return out; } +static ERL_NIF_TERM zstd_nif_new_compression_stream(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + /* create handle */ + ZSTD_CStream **handle = enif_alloc_resource( + zstd_compression_stream_type, + sizeof(ZSTD_CStream *) + ); + + /* create cstream stream */ + *handle = ZSTD_createCStream(); + + ERL_NIF_TERM res = enif_make_resource(env, handle); + enif_release_resource(handle); + return res; +} + +static ERL_NIF_TERM zstd_nif_new_decompression_stream(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + /* create handle */ + ZSTD_DStream **handle = enif_alloc_resource( + zstd_decompression_stream_type, + sizeof(ZSTD_DStream *) + ); + + /* create dstream stream */ + *handle = ZSTD_createDStream(); + + ERL_NIF_TERM res = enif_make_resource(env, handle); + enif_release_resource(handle); + return res; +} + +static ERL_NIF_TERM zstd_nif_init_compression_stream(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + int level = ZSTD_CLEVEL_DEFAULT; + size_t ret; + ZSTD_CStream **pzcs; + + /* extract the stream */ + if (!(enif_get_resource(env, argv[0], zstd_compression_stream_type, (void **)&pzcs))) + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_invalid); + + /* extract the compression level if any */ + if ((argc == 2) && !(enif_get_int(env, argv[1], &level))) + return enif_make_badarg(env); + + /* initialize the stream */ + if (ZSTD_isError(ret = ZSTD_initCStream(*pzcs, level))) + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(ret), ERL_NIF_LATIN1)); + + /* stream initialization successful */ + return zstd_atom_ok; +} + +static ERL_NIF_TERM zstd_nif_init_decompression_stream(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + size_t ret; + ZSTD_DStream **pzcs; + + /* extract the stream */ + if (!(enif_get_resource(env, argv[0], zstd_decompression_stream_type, (void **)&pzcs))) + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_invalid); + + /* initialize the stream */ + if (ZSTD_isError(ret = ZSTD_initDStream(*pzcs))) + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(ret), ERL_NIF_LATIN1)); + + /* stream initialization successful */ + return zstd_atom_ok; +} + +static ERL_NIF_TERM zstd_nif_reset_compression_stream(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + size_t ret; + size_t size = ZSTD_CONTENTSIZE_UNKNOWN; + ZSTD_CStream **pzcs; + + /* extract the stream */ + if (!(enif_get_resource(env, argv[0], zstd_compression_stream_type, (void **)&pzcs))) + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_invalid); + + /* extract the pledged source size if any */ + if ((argc == 2) && !(enif_get_ulong(env, argv[1], &size))) + return enif_make_badarg(env); + + /* reset the stream */ + if (ZSTD_isError(ret = ZSTD_CCtx_reset(*pzcs, size))) + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(ret), ERL_NIF_LATIN1)); + + /* stream resetting successful */ + return zstd_atom_ok; +} + +static ERL_NIF_TERM zstd_nif_reset_decompression_stream(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + size_t ret; + ZSTD_DStream **pzcs; + + /* extract the stream */ + if (!(enif_get_resource(env, argv[0], zstd_decompression_stream_type, (void **)&pzcs))) + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_invalid); + + /* reset the stream */ + if (ZSTD_isError(ret = ZSTD_DCtx_reset(*pzcs, ZSTD_reset_session_only))) + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(ret), ERL_NIF_LATIN1)); + + /* stream resetting successful */ + return zstd_atom_ok; +} + +static ERL_NIF_TERM zstd_nif_flush_compression_stream(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + size_t ret; + ErlNifBinary bin; + ZSTD_CStream **pzcs; + + /* extract the stream */ + if (!(enif_get_resource(env, argv[0], zstd_compression_stream_type, (void **)&pzcs))) + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_invalid); + + /* allocate binary buffer */ + if (!(enif_alloc_binary(ZSTD_CStreamOutSize(), &bin))) + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_enomem); + + /* output buffer */ + ZSTD_outBuffer outbuf = { + .pos = 0, + .dst = bin.data, + .size = bin.size, + }; + + /* reset the stream */ + if (ZSTD_isError(ret = ZSTD_endStream(*pzcs, &outbuf))) + { + enif_release_binary(&bin); + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(ret), ERL_NIF_LATIN1)); + } + + /* transfer to binary object */ + ERL_NIF_TERM binary = enif_make_binary(env, &bin); + ERL_NIF_TERM result = binary; + + /* remove unused spaces */ + if (outbuf.pos < outbuf.size) + result = enif_make_sub_binary(env, binary, 0, outbuf.pos); + + /* construct the result tuple */ + return enif_make_tuple2(env, zstd_atom_ok, result); +} + +static ERL_NIF_TERM zstd_nif_compress_stream(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + size_t ret; + ErlNifBinary in; + ErlNifBinary out; + ZSTD_CStream **pzcs; + + /* extract the stream */ + if (!(enif_get_resource(env, argv[0], zstd_compression_stream_type, (void **)&pzcs)) || + !(enif_inspect_iolist_as_binary(env, argv[1], &in))) + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_invalid); + + /* all output binary buffer */ + if (!(enif_alloc_binary(ZSTD_compressBound(in.size), &out))) { + enif_release_binary(&in); + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_enomem); + } + + /* input buffer */ + ZSTD_inBuffer inbuf = { + .pos = 0, + .src = in.data, + .size = in.size, + }; + + /* output buffer */ + ZSTD_outBuffer outbuf = { + .pos = 0, + .dst = out.data, + .size = out.size, + }; + + /* compress every chunk */ + while (inbuf.pos < inbuf.size) { + if (ZSTD_isError(ret = ZSTD_compressStream(*pzcs, &outbuf, &inbuf))) { + enif_release_binary(&in); + enif_release_binary(&out); + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(ret), ERL_NIF_LATIN1)); + } + } + + /* transfer to binary object */ + ERL_NIF_TERM binary = enif_make_binary(env, &out); + ERL_NIF_TERM result = binary; + + /* remove unused spaces */ + if (outbuf.pos < outbuf.size) + result = enif_make_sub_binary(env, binary, 0, outbuf.pos); + + /* construct the result tuple */ + enif_release_binary(&in); + return enif_make_tuple2(env, zstd_atom_ok, result); +} + +static ERL_NIF_TERM zstd_nif_decompress_stream(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { + size_t ret; + ErlNifBinary in; + ErlNifBinary out; + ZSTD_DStream **pzds; + + /* extract the stream */ + if (!(enif_get_resource(env, argv[0], zstd_decompression_stream_type, (void **)&pzds)) || + !(enif_inspect_iolist_as_binary(env, argv[1], &in))) + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_invalid); + + /* allocate output binary buffer */ + if (!(enif_alloc_binary(ZSTD_DStreamOutSize(), &out))) { + enif_release_binary(&in); + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_enomem); + } + + /* input buffer */ + ZSTD_inBuffer inbuf = { + .pos = 0, + .src = in.data, + .size = in.size, + }; + + /* output buffer */ + ZSTD_outBuffer outbuf = { + .pos = 0, + .dst = out.data, + .size = out.size, + }; + + /* decompress every chunk */ + while (inbuf.pos < inbuf.size) { + /* enlarge output buffer as needed */ + if ((outbuf.size - outbuf.pos) < ZSTD_DStreamOutSize()) { + /* resize the output binary */ + if (!(enif_realloc_binary(&out, out.size + ZSTD_DStreamOutSize()))) { + enif_release_binary(&in); + enif_release_binary(&out); + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_enomem); + } + + /* update buffer pointers */ + outbuf.dst = out.data; + outbuf.size = out.size; + } + + /* decompress one frame */ + if (ZSTD_isError(ret = ZSTD_decompressStream(*pzds, &outbuf, &inbuf))) { + enif_release_binary(&in); + enif_release_binary(&out); + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(ret), ERL_NIF_LATIN1)); + } + } + + /* transfer to binary object */ + ERL_NIF_TERM binary = enif_make_binary(env, &out); + ERL_NIF_TERM result = binary; + + /* remove unused spaces */ + if (outbuf.pos < outbuf.size) + result = enif_make_sub_binary(env, binary, 0, outbuf.pos); + + /* construct the result tuple */ + enif_release_binary(&in); + return enif_make_tuple2(env, zstd_atom_ok, result); +} + + +static void zstd_compression_stream_destructor(ErlNifEnv *env, void *stream) { + ZSTD_CStream **handle = stream; + ZSTD_freeCStream(*handle); +} + +static void zstd_decompression_stream_destructor(ErlNifEnv *env, void *stream) { + ZSTD_DStream **handle = stream; + ZSTD_freeDStream(*handle); +} + +static int zstd_init(ErlNifEnv *env) { + // For compress and decompress + enif_tsd_key_create(COMPRESS_CONTEXT_KEY, &zstdCompressContextKey); + enif_tsd_key_create(DECOMPRESS_CONTEXT_KEY, &zstdDecompressContextKey); + + // Compression stream type + zstd_compression_stream_type = enif_open_resource_type( + env, + MODULE_NAME, + COMPRESSION_STREAM_NAME, + zstd_compression_stream_destructor, + ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, + NULL + ); + + // Decompression stream type + zstd_decompression_stream_type = enif_open_resource_type( + env, + MODULE_NAME, + DECOMPRESSION_STREAM_NAME, + zstd_decompression_stream_destructor, + ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, + NULL + ); + + // Create atoms + enif_make_existing_atom(env, "ok" , &zstd_atom_ok , ERL_NIF_LATIN1); + enif_make_existing_atom(env, "error" , &zstd_atom_error , ERL_NIF_LATIN1); + enif_make_existing_atom(env, "invalid" , &zstd_atom_invalid , ERL_NIF_LATIN1); + enif_make_existing_atom(env, "enomem", &zstd_atom_enomem, ERL_NIF_LATIN1); + enif_make_existing_atom(env, "eof", &zstd_atom_eof, ERL_NIF_LATIN1); + enif_make_existing_atom(env, "compression" , &zstd_atom_compression , ERL_NIF_LATIN1); + enif_make_existing_atom(env, "decompression" , &zstd_atom_decompression , ERL_NIF_LATIN1); + + /* should all be loaded */ + return !(zstd_compression_stream_type && zstd_decompression_stream_type); +} + +static int zstd_on_load(ErlNifEnv *env, void **priv, ERL_NIF_TERM info) { + return zstd_init(env); +} + +static int zstd_on_reload(ErlNifEnv *env, void **priv, ERL_NIF_TERM info) { + return zstd_init(env); +} + +static int zstd_on_upgrade(ErlNifEnv *env, void **priv, void **old, ERL_NIF_TERM info) { + return zstd_init(env); +} + static ErlNifFunc nif_funcs[] = { - {"compress", 2, zstd_nif_compress}, - {"decompress", 1, zstd_nif_decompress} -}; + { "compress" , 2, zstd_nif_compress , ERL_DIRTY_JOB_CPU_BOUND }, + { "decompress" , 1, zstd_nif_decompress , ERL_DIRTY_JOB_CPU_BOUND }, + + { "new_compression_stream" , 0, zstd_nif_new_compression_stream }, + { "new_decompression_stream" , 0, zstd_nif_new_decompression_stream }, + + { "compression_stream_init" , 1, zstd_nif_init_compression_stream , ERL_DIRTY_JOB_CPU_BOUND }, + { "compression_stream_init" , 2, zstd_nif_init_compression_stream , ERL_DIRTY_JOB_CPU_BOUND }, + { "decompression_stream_init" , 1, zstd_nif_init_decompression_stream , ERL_DIRTY_JOB_CPU_BOUND }, -static int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) - { - enif_tsd_key_create("zstd_decompress_context_key", &zstdDecompressContextKey); - enif_tsd_key_create("zstd_compress_context_key", &zstdCompressContextKey); - return 0; - } + { "compression_stream_reset" , 2, zstd_nif_reset_compression_stream }, + { "compression_stream_reset" , 1, zstd_nif_reset_compression_stream }, + { "decompression_stream_reset" , 1, zstd_nif_reset_decompression_stream }, + + { "stream_flush" , 1, zstd_nif_flush_compression_stream , ERL_DIRTY_JOB_CPU_BOUND }, + { "stream_compress" , 2, zstd_nif_compress_stream , ERL_DIRTY_JOB_CPU_BOUND }, + { "stream_decompress" , 2, zstd_nif_decompress_stream , ERL_DIRTY_JOB_CPU_BOUND } +}; -ERL_NIF_INIT(zstd, nif_funcs, load, NULL, NULL, NULL); +ERL_NIF_INIT(zstd, nif_funcs, zstd_on_load, zstd_on_reload, zstd_on_upgrade, NULL); diff --git a/src/zstd.erl b/src/zstd.erl index 5da34b4..28db72a 100644 --- a/src/zstd.erl +++ b/src/zstd.erl @@ -2,6 +2,13 @@ -export([compress/1, compress/2]). -export([decompress/1]). +-export([new_compression_stream/0, new_decompression_stream/0, + compression_stream_init/1, compression_stream_init/2, + decompression_stream_init/1, + compression_stream_reset/2, compression_stream_reset/1, decompression_stream_reset/1, + stream_flush/1, + stream_compress/2, + stream_decompress/2]). -on_load init/0. @@ -21,6 +28,50 @@ compress(_, _) -> decompress(_) -> erlang:nif_error(?LINE). +-spec new_compression_stream() -> reference(). +new_compression_stream() -> + erlang:nif_error(?LINE). + +-spec new_decompression_stream() -> reference(). +new_decompression_stream() -> + erlang:nif_error(?LINE). + +-spec compression_stream_init(reference()) -> ok | {error, invalid | string()}. +compression_stream_init(_Ref) -> + erlang:nif_error(?LINE). + +-spec compression_stream_init(reference(), 0..22) -> ok | {error, invalid | string()}. +compression_stream_init(_Ref, _Level) -> + erlang:nif_error(?LINE). + +-spec decompression_stream_init(reference()) -> ok | {error, invalid | string()}. +decompression_stream_init(_Ref) -> + erlang:nif_error(?LINE). + +-spec compression_stream_reset(reference()) -> ok | {error, invalid | string()}. +compression_stream_reset(_Ref) -> + erlang:nif_error(?LINE). + +-spec compression_stream_reset(reference(), non_neg_integer()) -> ok | {error, invalid | string()}. +compression_stream_reset(_Ref, _Size) -> + erlang:nif_error(?LINE). + +-spec decompression_stream_reset(reference()) -> ok | {error, invalid | string()}. +decompression_stream_reset(_Ref) -> + erlang:nif_error(?LINE). + +-spec stream_flush(reference()) -> {ok, binary()} | {error, invalid | enomem | string()}. +stream_flush(_Ref) -> + erlang:nif_error(?LINE). + +-spec stream_compress(reference(), iodata()) -> {ok, binary()} | {error, invalid | enomem | string()}. +stream_compress(_Ref, _IOData) -> + erlang:nif_error(?LINE). + +-spec stream_decompress(reference(), iodata()) -> {ok, binary()} | {error, invalid | enomem | string()}. +stream_decompress(_Ref, _Binary) -> + erlang:nif_error(?LINE). + init() -> SoName = case code:priv_dir(?APPNAME) of From 21b16584d9afc76eac54d4990fde04e4c9aa38ab Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Tue, 12 Sep 2023 16:08:50 -0600 Subject: [PATCH 2/7] Add examples --- README.md | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/README.md b/README.md index 9287a9f..707c069 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,47 @@ zstd:decompress(Binary). <<"Hello, World!">> ``` +``` +compress_file() -> + {ok, File} = file:open("test.txt.zst", [write, raw, delayed_write, sync]), + Bin = << <<"A">> || _ <- lists:seq(1, 1024 * 1024) >>, + CStream = zstd:new_compression_stream(), + ok = zstd:compression_stream_init(CStream), + {ok, ResC} = zstd:stream_compress(CStream, Bin), + file:write(File, ResC), + {ok, ResF} = zstd:stream_flush(CStream), + file:write(File, ResF), + file:close(File). + +decompress_file() -> + {ok, Data} = file:read_file("test.txt.zst"), + DStream = zstd:new_decompression_stream(), + ok = zstd:decompression_stream_init(DStream), + {ok, Bin} = zstd:stream_decompress(DStream, Data), + io:format("~s", [Bin]), + io:format("Size: ~p~n", [size(Bin)]). + +decompress_large_file() -> + DStream = zstd:new_decompression_stream(), + ok = zstd:decompression_stream_init(DStream), + + {ok, RFile} = file:open("somefile.log.ztd", [read, raw, binary, read_ahead]), + {ok, WFile} = file:open("somefile.log", [write, raw, delayed_write, sync]), + read_file(RFile, WFile, DStream), + file:close(RFile), + file:close(WFile). + +read_file(RFile, WFile, DStream) -> + case file:read(RFile, 1024 * 8) of + {ok, Data} -> + {ok, Bin} = zstd:stream_decompress(DStream, Data), + file:write(WFile, Bin), + read_file(RFile, WFile, DStream); + eof -> + ok + end. +``` + #### For Elixir ``` From d74f8403f320cfd560b4a8afd92444c3270fdd51 Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Tue, 12 Sep 2023 16:11:32 -0600 Subject: [PATCH 3/7] Fix spaces --- c_src/zstd_nif.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/c_src/zstd_nif.c b/c_src/zstd_nif.c index facd2cb..7576073 100644 --- a/c_src/zstd_nif.c +++ b/c_src/zstd_nif.c @@ -405,11 +405,11 @@ static int zstd_on_upgrade(ErlNifEnv *env, void **priv, void **old, ERL_NIF_TERM } static ErlNifFunc nif_funcs[] = { - { "compress" , 2, zstd_nif_compress , ERL_DIRTY_JOB_CPU_BOUND }, - { "decompress" , 1, zstd_nif_decompress , ERL_DIRTY_JOB_CPU_BOUND }, + { "compress" , 2, zstd_nif_compress , ERL_DIRTY_JOB_CPU_BOUND }, + { "decompress" , 1, zstd_nif_decompress , ERL_DIRTY_JOB_CPU_BOUND }, - { "new_compression_stream" , 0, zstd_nif_new_compression_stream }, - { "new_decompression_stream" , 0, zstd_nif_new_decompression_stream }, + { "new_compression_stream" , 0, zstd_nif_new_compression_stream }, + { "new_decompression_stream" , 0, zstd_nif_new_decompression_stream }, { "compression_stream_init" , 1, zstd_nif_init_compression_stream , ERL_DIRTY_JOB_CPU_BOUND }, { "compression_stream_init" , 2, zstd_nif_init_compression_stream , ERL_DIRTY_JOB_CPU_BOUND }, From d268727971d53a5903d81b2ad2765a9f8effd682 Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Tue, 12 Sep 2023 16:13:53 -0600 Subject: [PATCH 4/7] Format --- src/zstd.erl | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/zstd.erl b/src/zstd.erl index 28db72a..1080bec 100644 --- a/src/zstd.erl +++ b/src/zstd.erl @@ -2,13 +2,10 @@ -export([compress/1, compress/2]). -export([decompress/1]). --export([new_compression_stream/0, new_decompression_stream/0, - compression_stream_init/1, compression_stream_init/2, - decompression_stream_init/1, - compression_stream_reset/2, compression_stream_reset/1, decompression_stream_reset/1, - stream_flush/1, - stream_compress/2, - stream_decompress/2]). +-export([new_compression_stream/0, new_decompression_stream/0, compression_stream_init/1, + compression_stream_init/2, decompression_stream_init/1, compression_stream_reset/2, + compression_stream_reset/1, decompression_stream_reset/1, stream_flush/1, + stream_compress/2, stream_decompress/2]). -on_load init/0. @@ -52,7 +49,8 @@ decompression_stream_init(_Ref) -> compression_stream_reset(_Ref) -> erlang:nif_error(?LINE). --spec compression_stream_reset(reference(), non_neg_integer()) -> ok | {error, invalid | string()}. +-spec compression_stream_reset(reference(), non_neg_integer()) -> + ok | {error, invalid | string()}. compression_stream_reset(_Ref, _Size) -> erlang:nif_error(?LINE). @@ -64,11 +62,13 @@ decompression_stream_reset(_Ref) -> stream_flush(_Ref) -> erlang:nif_error(?LINE). --spec stream_compress(reference(), iodata()) -> {ok, binary()} | {error, invalid | enomem | string()}. +-spec stream_compress(reference(), iodata()) -> + {ok, binary()} | {error, invalid | enomem | string()}. stream_compress(_Ref, _IOData) -> erlang:nif_error(?LINE). --spec stream_decompress(reference(), iodata()) -> {ok, binary()} | {error, invalid | enomem | string()}. +-spec stream_decompress(reference(), iodata()) -> + {ok, binary()} | {error, invalid | enomem | string()}. stream_decompress(_Ref, _Binary) -> erlang:nif_error(?LINE). From 9e58cfaae2e8517418ff9242ec6f55a1e13fd1e7 Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Tue, 12 Sep 2023 16:34:49 -0600 Subject: [PATCH 5/7] More space fixing --- c_src/zstd_nif.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/c_src/zstd_nif.c b/c_src/zstd_nif.c index 7576073..6997c61 100644 --- a/c_src/zstd_nif.c +++ b/c_src/zstd_nif.c @@ -405,8 +405,8 @@ static int zstd_on_upgrade(ErlNifEnv *env, void **priv, void **old, ERL_NIF_TERM } static ErlNifFunc nif_funcs[] = { - { "compress" , 2, zstd_nif_compress , ERL_DIRTY_JOB_CPU_BOUND }, - { "decompress" , 1, zstd_nif_decompress , ERL_DIRTY_JOB_CPU_BOUND }, + { "compress" , 2, zstd_nif_compress , ERL_DIRTY_JOB_CPU_BOUND }, + { "decompress" , 1, zstd_nif_decompress , ERL_DIRTY_JOB_CPU_BOUND }, { "new_compression_stream" , 0, zstd_nif_new_compression_stream }, { "new_decompression_stream" , 0, zstd_nif_new_decompression_stream }, From 5456b669836f83109efa84ddd248b5748a6f7d71 Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Wed, 20 Sep 2023 00:43:58 -0600 Subject: [PATCH 6/7] Add testing --- test/zstd_tests.erl | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/test/zstd_tests.erl b/test/zstd_tests.erl index 4927a04..b3368a5 100644 --- a/test/zstd_tests.erl +++ b/test/zstd_tests.erl @@ -7,3 +7,16 @@ zstd_test() -> ?assertEqual(Data, zstd:decompress( zstd:compress(Data))). + +zstd_stream_test() -> + Bin = << <<"A">> || _ <- lists:seq(1, 1024 * 1024) >>, + CStream = zstd:new_compression_stream(), + ok = zstd:compression_stream_init(CStream), + {ok, CompressionBin} = zstd:stream_compress(CStream, Bin), + {ok, FlushBin} = zstd:stream_flush(CStream), + + DStream = zstd:new_decompression_stream(), + ok = zstd:decompression_stream_init(DStream), + {ok, DBin1} = zstd:stream_decompress(DStream, CompressionBin), + {ok, DBin2} = zstd:stream_decompress(DStream, FlushBin), + ?assertEqual(Bin, <>). \ No newline at end of file From d16c0a56ae192bf823c501ff7f90df719be0d278 Mon Sep 17 00:00:00 2001 From: Pablo Lopez Date: Wed, 20 Sep 2023 09:03:06 +0200 Subject: [PATCH 7/7] [streaming-] Applied format --- test/zstd_tests.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/zstd_tests.erl b/test/zstd_tests.erl index b3368a5..a67ea1b 100644 --- a/test/zstd_tests.erl +++ b/test/zstd_tests.erl @@ -19,4 +19,4 @@ zstd_stream_test() -> ok = zstd:decompression_stream_init(DStream), {ok, DBin1} = zstd:stream_decompress(DStream, CompressionBin), {ok, DBin2} = zstd:stream_decompress(DStream, FlushBin), - ?assertEqual(Bin, <>). \ No newline at end of file + ?assertEqual(Bin, <>).