Skip to content

Commit

Permalink
GH-43015: [Go] Reuse column compressors in IPC module (#43016)
Browse files Browse the repository at this point in the history
This changes the IPC writer to reuse lz4 or zstd compressors instead of
instantiating new ones on each call to Write. Prior to this commit we
would instantiate as many encoders per Write call as the configured
number of concurrent compression workers, even though the compressors
are reusable across calls.

### Are these changes tested?
I have added no tests and have not yet closely verified the patch.
There's no functional change to test and I want to get agreement on the
approach before solidifying it. I have not tested the lz4 change at all.
* GitHub Issue: #43015
  • Loading branch information
wkalt authored Jun 25, 2024
1 parent d27ddda commit b0d8d82
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 11 deletions.
6 changes: 5 additions & 1 deletion go/arrow/ipc/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ type FileWriter struct {
mapper dictutils.Mapper
codec flatbuf.CompressionType
compressNP int
compressors []compressor
minSpaceSavings *float64

// map of the last written dictionaries by id
Expand All @@ -302,6 +303,7 @@ func NewFileWriter(w io.WriteSeeker, opts ...Option) (*FileWriter, error) {
codec: cfg.codec,
compressNP: cfg.compressNP,
minSpaceSavings: cfg.minSpaceSavings,
compressors: make([]compressor, cfg.compressNP),
}

pos, err := f.w.Seek(0, io.SeekCurrent)
Expand Down Expand Up @@ -345,7 +347,9 @@ func (f *FileWriter) Write(rec arrow.Record) error {
const allow64b = true
var (
data = Payload{msg: MessageRecordBatch}
enc = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b, f.codec, f.compressNP, f.minSpaceSavings)
enc = newRecordEncoder(
f.mem, 0, kMaxNestingDepth, allow64b, f.codec, f.compressNP, f.minSpaceSavings, f.compressors,
)
)
defer data.Release()

Expand Down
6 changes: 5 additions & 1 deletion go/arrow/ipc/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func newConfig(opts ...Option) *config {
alloc: memory.NewGoAllocator(),
codec: -1, // uncompressed
ensureNativeEndian: true,
compressNP: 1,
}

for _, opt := range opts {
Expand Down Expand Up @@ -132,9 +133,12 @@ func WithZstd() Option {
// WithCompressConcurrency specifies a number of goroutines to spin up for
// concurrent compression of the body buffers when writing compress IPC records.
// If n <= 1 then compression will be done serially without goroutine
// parallelization. Default is 0.
// parallelization. Default is 1.
func WithCompressConcurrency(n int) Option {
return func(cfg *config) {
if n <= 0 {
n = 1
}
cfg.compressNP = n
}
}
Expand Down
45 changes: 38 additions & 7 deletions go/arrow/ipc/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type Writer struct {
mapper dictutils.Mapper
codec flatbuf.CompressionType
compressNP int
compressors []compressor
minSpaceSavings *float64

// map of the last written dictionaries by id
Expand All @@ -107,6 +108,7 @@ func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer {
compressNP: cfg.compressNP,
minSpaceSavings: cfg.minSpaceSavings,
emitDictDeltas: cfg.emitDictDeltas,
compressors: make([]compressor, cfg.compressNP),
}
}

Expand All @@ -120,6 +122,8 @@ func NewWriter(w io.Writer, opts ...Option) *Writer {
schema: cfg.schema,
codec: cfg.codec,
emitDictDeltas: cfg.emitDictDeltas,
compressNP: cfg.compressNP,
compressors: make([]compressor, cfg.compressNP),
}
}

Expand Down Expand Up @@ -170,7 +174,16 @@ func (w *Writer) Write(rec arrow.Record) (err error) {
const allow64b = true
var (
data = Payload{msg: MessageRecordBatch}
enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b, w.codec, w.compressNP, w.minSpaceSavings)
enc = newRecordEncoder(
w.mem,
0,
kMaxNestingDepth,
allow64b,
w.codec,
w.compressNP,
w.minSpaceSavings,
w.compressors,
)
)
defer data.Release()

Expand Down Expand Up @@ -310,17 +323,28 @@ type recordEncoder struct {
allow64b bool
codec flatbuf.CompressionType
compressNP int
compressors []compressor
minSpaceSavings *float64
}

func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64, allow64b bool, codec flatbuf.CompressionType, compressNP int, minSpaceSavings *float64) *recordEncoder {
func newRecordEncoder(
mem memory.Allocator,
startOffset,
maxDepth int64,
allow64b bool,
codec flatbuf.CompressionType,
compressNP int,
minSpaceSavings *float64,
compressors []compressor,
) *recordEncoder {
return &recordEncoder{
mem: mem,
start: startOffset,
depth: maxDepth,
allow64b: allow64b,
codec: codec,
compressNP: compressNP,
compressors: compressors,
minSpaceSavings: minSpaceSavings,
}
}
Expand All @@ -340,6 +364,13 @@ func (w *recordEncoder) reset() {
w.fields = make([]fieldMetadata, 0)
}

func (w *recordEncoder) getCompressor(id int) compressor {
if w.compressors[id] == nil {
w.compressors[id] = getCompressor(w.codec)
}
return w.compressors[id]
}

func (w *recordEncoder) compressBodyBuffers(p *Payload) error {
compress := func(idx int, codec compressor) error {
if p.body[idx] == nil || p.body[idx].Len() == 0 {
Expand Down Expand Up @@ -378,7 +409,7 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload) error {
}

if w.compressNP <= 1 {
codec := getCompressor(w.codec)
codec := w.getCompressor(0)
for idx := range p.body {
if err := compress(idx, codec); err != nil {
return err
Expand All @@ -395,11 +426,11 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload) error {
)
defer cancel()

for i := 0; i < w.compressNP; i++ {
for workerID := 0; workerID < w.compressNP; workerID++ {
wg.Add(1)
go func() {
go func(id int) {
defer wg.Done()
codec := getCompressor(w.codec)
codec := w.getCompressor(id)
for {
select {
case idx, ok := <-ch:
Expand All @@ -418,7 +449,7 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload) error {
return
}
}
}()
}(workerID)
}

for idx := range p.body {
Expand Down
5 changes: 3 additions & 2 deletions go/arrow/ipc/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
}

for _, codec := range []flatbuf.CompressionType{flatbuf.CompressionTypeLZ4_FRAME, flatbuf.CompressionTypeZSTD} {
enc := newRecordEncoder(mem, 0, 5, true, codec, 1, nil)
compressors := []compressor{getCompressor(codec)}
enc := newRecordEncoder(mem, 0, 5, true, codec, 1, nil, compressors)
var payload Payload
require.NoError(t, enc.encode(&payload, batch))
assert.Len(t, payload.body, 2)
Expand All @@ -205,7 +206,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
assert.Greater(t, compressedSize, int64(0))
expectedSavings := 1.0 - float64(compressedSize)/float64(uncompressedSize)

compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1, &expectedSavings)
compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1, &expectedSavings, compressors)
payload.Release()
payload.body = payload.body[:0]
require.NoError(t, compressEncoder.encode(&payload, batch))
Expand Down

0 comments on commit b0d8d82

Please sign in to comment.