From b0e1f748f5f96d3a7f79edf2b959a01af347dfda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alfonso=20Subiotto=20Marqu=C3=A9s?= Date: Mon, 27 Nov 2023 17:21:21 +0100 Subject: [PATCH] GH-38728: [Go] ipc: put lz4 decompression buffers back into sync.Pool (#38729) The lz4 decompressor was not calling Reset on the underlying writer in its Close method. This could cause buffers not to be released back to the pool and defeating the purpose of the sync.Pool in the lz4 package. Additionally, a call to Close was missing in readDictionary. Closes #38728 * Closes: #38728 Authored-by: Alfonso Subiotto Marques Signed-off-by: Matt Topol --- go/arrow/ipc/compression.go | 4 +- go/arrow/ipc/file_reader.go | 1 + go/arrow/ipc/reader_test.go | 90 +++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 1 deletion(-) diff --git a/go/arrow/ipc/compression.go b/go/arrow/ipc/compression.go index 8856b732f9c5d..06a9cf67cfb6b 100644 --- a/go/arrow/ipc/compression.go +++ b/go/arrow/ipc/compression.go @@ -104,7 +104,9 @@ type lz4Decompressor struct { *lz4.Reader } -func (z *lz4Decompressor) Close() {} +func (z *lz4Decompressor) Close() { + z.Reader.Reset(nil) +} func getDecompressor(codec flatbuf.CompressionType) decompressor { switch codec { diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index 1c7eb31799cfa..729225e794406 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -730,6 +730,7 @@ func readDictionary(memo *dictutils.Memo, meta *memory.Buffer, body ReadAtSeeker bodyCompress := data.Compression(nil) if bodyCompress != nil { codec = getDecompressor(bodyCompress.Codec()) + defer codec.Close() } id := md.Id() diff --git a/go/arrow/ipc/reader_test.go b/go/arrow/ipc/reader_test.go index f00f3bb3da476..42bb3fea3e963 100644 --- a/go/arrow/ipc/reader_test.go +++ b/go/arrow/ipc/reader_test.go @@ -18,6 +18,8 @@ package ipc import ( "bytes" + "fmt" + "io" "testing" "github.com/apache/arrow/go/v15/arrow" @@ -93,3 +95,91 @@ func TestReaderCheckedAllocator(t *testing.T) { _, err = reader.Read() require.NoError(t, err) } + +func BenchmarkIPC(b *testing.B) { + alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer alloc.AssertSize(b, 0) + + schema := arrow.NewSchema([]arrow.Field{ + { + Name: "s", + Type: &arrow.DictionaryType{ + ValueType: arrow.BinaryTypes.String, + IndexType: arrow.PrimitiveTypes.Int32, + }, + }, + }, nil) + + rb := array.NewRecordBuilder(alloc, schema) + defer rb.Release() + + bldr := rb.Field(0).(*array.BinaryDictionaryBuilder) + bldr.Append([]byte("foo")) + bldr.Append([]byte("bar")) + bldr.Append([]byte("baz")) + + rec := rb.NewRecord() + defer rec.Release() + + for _, codec := range []struct { + name string + codecOption Option + }{ + { + name: "plain", + }, + { + name: "zstd", + codecOption: WithZstd(), + }, + { + name: "lz4", + codecOption: WithLZ4(), + }, + } { + options := []Option{WithSchema(schema), WithAllocator(alloc)} + if codec.codecOption != nil { + options = append(options, codec.codecOption) + } + b.Run(fmt.Sprintf("Writer/codec=%s", codec.name), func(b *testing.B) { + buf := new(bytes.Buffer) + for i := 0; i < b.N; i++ { + func() { + buf.Reset() + writer := NewWriter(buf, options...) + defer writer.Close() + if err := writer.Write(rec); err != nil { + b.Fatal(err) + } + }() + } + }) + + b.Run(fmt.Sprintf("Reader/codec=%s", codec.name), func(b *testing.B) { + buf := new(bytes.Buffer) + writer := NewWriter(buf, options...) + defer writer.Close() + require.NoError(b, writer.Write(rec)) + bufBytes := buf.Bytes() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + func() { + reader, err := NewReader(bytes.NewReader(bufBytes), WithAllocator(alloc)) + if err != nil { + b.Fatal(err) + } + defer reader.Release() + for { + if _, err := reader.Read(); err != nil { + if err == io.EOF { + break + } + b.Fatal(err) + } + } + }() + } + }) + } +}