diff --git a/go/arrow/ipc/compression.go b/go/arrow/ipc/compression.go index 8856b732f9c5d..06a9cf67cfb6b 100644 --- a/go/arrow/ipc/compression.go +++ b/go/arrow/ipc/compression.go @@ -104,7 +104,9 @@ type lz4Decompressor struct { *lz4.Reader } -func (z *lz4Decompressor) Close() {} +func (z *lz4Decompressor) Close() { + z.Reader.Reset(nil) +} func getDecompressor(codec flatbuf.CompressionType) decompressor { switch codec { diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index 1c7eb31799cfa..729225e794406 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -730,6 +730,7 @@ func readDictionary(memo *dictutils.Memo, meta *memory.Buffer, body ReadAtSeeker bodyCompress := data.Compression(nil) if bodyCompress != nil { codec = getDecompressor(bodyCompress.Codec()) + defer codec.Close() } id := md.Id() diff --git a/go/arrow/ipc/reader_test.go b/go/arrow/ipc/reader_test.go index f00f3bb3da476..42bb3fea3e963 100644 --- a/go/arrow/ipc/reader_test.go +++ b/go/arrow/ipc/reader_test.go @@ -18,6 +18,8 @@ package ipc import ( "bytes" + "fmt" + "io" "testing" "github.com/apache/arrow/go/v15/arrow" @@ -93,3 +95,91 @@ func TestReaderCheckedAllocator(t *testing.T) { _, err = reader.Read() require.NoError(t, err) } + +func BenchmarkIPC(b *testing.B) { + alloc := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer alloc.AssertSize(b, 0) + + schema := arrow.NewSchema([]arrow.Field{ + { + Name: "s", + Type: &arrow.DictionaryType{ + ValueType: arrow.BinaryTypes.String, + IndexType: arrow.PrimitiveTypes.Int32, + }, + }, + }, nil) + + rb := array.NewRecordBuilder(alloc, schema) + defer rb.Release() + + bldr := rb.Field(0).(*array.BinaryDictionaryBuilder) + bldr.Append([]byte("foo")) + bldr.Append([]byte("bar")) + bldr.Append([]byte("baz")) + + rec := rb.NewRecord() + defer rec.Release() + + for _, codec := range []struct { + name string + codecOption Option + }{ + { + name: "plain", + }, + { + name: "zstd", + codecOption: WithZstd(), + }, + { + name: "lz4", + codecOption: WithLZ4(), + }, + } { + options := []Option{WithSchema(schema), WithAllocator(alloc)} + if codec.codecOption != nil { + options = append(options, codec.codecOption) + } + b.Run(fmt.Sprintf("Writer/codec=%s", codec.name), func(b *testing.B) { + buf := new(bytes.Buffer) + for i := 0; i < b.N; i++ { + func() { + buf.Reset() + writer := NewWriter(buf, options...) + defer writer.Close() + if err := writer.Write(rec); err != nil { + b.Fatal(err) + } + }() + } + }) + + b.Run(fmt.Sprintf("Reader/codec=%s", codec.name), func(b *testing.B) { + buf := new(bytes.Buffer) + writer := NewWriter(buf, options...) + defer writer.Close() + require.NoError(b, writer.Write(rec)) + bufBytes := buf.Bytes() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + func() { + reader, err := NewReader(bytes.NewReader(bufBytes), WithAllocator(alloc)) + if err != nil { + b.Fatal(err) + } + defer reader.Release() + for { + if _, err := reader.Read(); err != nil { + if err == io.EOF { + break + } + b.Fatal(err) + } + } + }() + } + }) + } +}