diff --git a/lib/column/array.go b/lib/column/array.go index 07c5b4a0be..8cfa7d0215 100644 --- a/lib/column/array.go +++ b/lib/column/array.go @@ -26,7 +26,31 @@ func (array *Array) Write(encoder *binary.Encoder, v interface{}) error { return fmt.Errorf("do not use Write method for Array(T) column") } -func (array *Array) ReadArray(decoder *binary.Decoder, ln int) (interface{}, error) { +func (array *Array) ReadArray(decoder *binary.Decoder, rows int) (_ []interface{}, err error) { + var ( + values = make([]interface{}, rows) + offsets = make([]uint64, rows) + ) + for i := 0; i < rows; i++ { + offset, err := decoder.UInt64() + if err != nil { + return nil, err + } + offsets[i] = offset + } + for n, offset := range offsets { + ln := offset + if n != 0 { + ln = ln - offsets[n-1] + } + if values[n], err = array.read(decoder, int(ln)); err != nil { + return nil, err + } + } + return values, nil +} + +func (array *Array) read(decoder *binary.Decoder, ln int) (interface{}, error) { slice := reflect.MakeSlice(array.valueOf.Type(), 0, ln) for i := 0; i < ln; i++ { value, err := array.column.Read(decoder) diff --git a/lib/column/nullable.go b/lib/column/nullable.go index 79a8ef9406..9b049005d1 100644 --- a/lib/column/nullable.go +++ b/lib/column/nullable.go @@ -25,6 +25,31 @@ func (null *Nullable) Write(encoder *binary.Encoder, v interface{}) error { return nil } +func (null *Nullable) ReadNull(decoder *binary.Decoder, rows int) (_ []interface{}, err error) { + var ( + isNull byte + value interface{} + nulls = make([]byte, rows) + values = make([]interface{}, rows) + ) + for i := 0; i < rows; i++ { + if isNull, err = decoder.ReadByte(); err != nil { + return nil, err + } + nulls[i] = isNull + } + for i, isNull := range nulls { + switch value, err = null.column.Read(decoder); true { + case err != nil: + return nil, err + case isNull == 0: + values[i] = value + default: + values[i] = nil + } + } + return values, nil +} func (null *Nullable) WriteNull(nulls, encoder *binary.Encoder, v interface{}) error { if v == nil { if _, err := nulls.Write([]byte{1}); err != nil { diff --git a/lib/data/block.go b/lib/data/block.go index 3708c59c51..62b679bdb9 100644 --- a/lib/data/block.go +++ b/lib/data/block.go @@ -14,10 +14,10 @@ import ( type Block struct { Values [][]interface{} Columns []column.Column - Buffers []*buffer NumRows uint64 NumColumns uint64 offsets []uint64 + buffers []*buffer info blockInfo } @@ -73,44 +73,12 @@ func (block *Block) Read(serverInfo *ServerInfo, decoder *binary.Decoder) (err e block.Columns = append(block.Columns, c) switch column := c.(type) { case *column.Array: - offsets := make([]uint64, block.NumRows) - for row := 0; row < int(block.NumRows); row++ { - offset, err := decoder.UInt64() - if err != nil { - return err - } - offsets[row] = offset - } - for n, offset := range offsets { - ln := offset - if n != 0 { - ln = ln - offsets[n-1] - } - if value, err = column.ReadArray(decoder, int(ln)); err != nil { - return err - } - block.Values[i] = append(block.Values[i], value) + if block.Values[i], err = column.ReadArray(decoder, int(block.NumRows)); err != nil { + return err } case *column.Nullable: - var ( - isNull byte - nulls = make([]byte, block.NumRows) - ) - for i := 0; i < int(block.NumRows); i++ { - if isNull, err = decoder.ReadByte(); err != nil { - return err - } - nulls[i] = isNull - } - for _, isNull := range nulls { - switch value, err = column.Read(decoder); true { - case err != nil: - return err - case isNull == 0: - block.Values[i] = append(block.Values[i], value) - default: - block.Values[i] = append(block.Values[i], nil) - } + if block.Values[i], err = column.ReadNull(decoder, int(block.NumRows)); err != nil { + return err } default: for row := 0; row < int(block.NumRows); row++ { @@ -135,20 +103,20 @@ func (block *Block) AppendRow(args []driver.Value) error { for num, c := range block.Columns { switch column := c.(type) { case *column.Array: - ln, err := column.WriteArray(block.Buffers[num].Column, args[num]) + ln, err := column.WriteArray(block.buffers[num].Column, args[num]) if err != nil { return err } block.offsets[num] += ln - if err := block.Buffers[num].Offset.UInt64(block.offsets[num]); err != nil { + if err := block.buffers[num].Offset.UInt64(block.offsets[num]); err != nil { return err } case *column.Nullable: - if err := column.WriteNull(block.Buffers[num].Offset, block.Buffers[num].Column, args[num]); err != nil { + if err := column.WriteNull(block.buffers[num].Offset, block.buffers[num].Column, args[num]); err != nil { return err } default: - if err := column.Write(block.Buffers[num].Column, args[num]); err != nil { + if err := column.Write(block.buffers[num].Column, args[num]); err != nil { return err } } @@ -157,15 +125,15 @@ func (block *Block) AppendRow(args []driver.Value) error { } func (block *Block) Reserve() { - if len(block.Buffers) == 0 { - block.Buffers = make([]*buffer, len(block.Columns)) + if len(block.buffers) == 0 { + block.buffers = make([]*buffer, len(block.Columns)) block.offsets = make([]uint64, len(block.Columns)) for i := 0; i < len(block.Columns); i++ { var ( offsetBuffer = wb.New(wb.InitialSize) columnBuffer = wb.New(wb.InitialSize) ) - block.Buffers[i] = &buffer{ + block.buffers[i] = &buffer{ Offset: binary.NewEncoder(offsetBuffer), Column: binary.NewEncoder(columnBuffer), offsetBuffer: offsetBuffer, @@ -178,11 +146,13 @@ func (block *Block) Reserve() { func (block *Block) Reset() { block.NumRows = 0 block.NumColumns = 0 - for _, buffer := range block.Buffers { + for _, buffer := range block.buffers { buffer.reset() } - block.offsets = nil - block.Buffers = nil + { + block.offsets = nil + block.buffers = nil + } } func (block *Block) Write(serverInfo *ServerInfo, encoder *binary.Encoder) error { @@ -199,8 +169,8 @@ func (block *Block) Write(serverInfo *ServerInfo, encoder *binary.Encoder) error for i, column := range block.Columns { encoder.String(column.Name()) encoder.String(column.CHType()) - if len(block.Buffers) == len(block.Columns) { - if _, err := block.Buffers[i].WriteTo(encoder); err != nil { + if len(block.buffers) == len(block.Columns) { + if _, err := block.buffers[i].WriteTo(encoder); err != nil { return err } } @@ -282,6 +252,6 @@ func (buf *buffer) WriteTo(w io.Writer) (int64, error) { } func (buf *buffer) reset() { - buf.offsetBuffer.Free() - buf.columnBuffer.Free() + buf.offsetBuffer.Reset() + buf.columnBuffer.Reset() } diff --git a/lib/data/block_write_column.go b/lib/data/block_write_column.go index 9c7fb5b079..0ddb2a4d4f 100644 --- a/lib/data/block_write_column.go +++ b/lib/data/block_write_column.go @@ -9,66 +9,66 @@ import ( ) func (block *Block) WriteDate(c int, v time.Time) error { - return block.Buffers[c].Column.UInt16(uint16(v.Unix() / 24 / 3600)) + return block.buffers[c].Column.UInt16(uint16(v.Unix() / 24 / 3600)) } func (block *Block) WriteDateTime(c int, v time.Time) error { - return block.Buffers[c].Column.UInt32(uint32(v.Unix())) + return block.buffers[c].Column.UInt32(uint32(v.Unix())) } func (block *Block) WriteUInt8(c int, v uint8) error { - return block.Buffers[c].Column.UInt8(v) + return block.buffers[c].Column.UInt8(v) } func (block *Block) WriteUInt16(c int, v uint16) error { - return block.Buffers[c].Column.UInt16(v) + return block.buffers[c].Column.UInt16(v) } func (block *Block) WriteUInt32(c int, v uint32) error { - return block.Buffers[c].Column.UInt32(v) + return block.buffers[c].Column.UInt32(v) } func (block *Block) WriteUInt64(c int, v uint64) error { - return block.Buffers[c].Column.UInt64(v) + return block.buffers[c].Column.UInt64(v) } func (block *Block) WriteFloat32(c int, v float32) error { - return block.Buffers[c].Column.Float32(v) + return block.buffers[c].Column.Float32(v) } func (block *Block) WriteFloat64(c int, v float64) error { - return block.Buffers[c].Column.Float64(v) + return block.buffers[c].Column.Float64(v) } func (block *Block) WriteBytes(c int, v []byte) error { - if err := block.Buffers[c].Column.Uvarint(uint64(len(v))); err != nil { + if err := block.buffers[c].Column.Uvarint(uint64(len(v))); err != nil { return err } - if _, err := block.Buffers[c].Column.Write(v); err != nil { + if _, err := block.buffers[c].Column.Write(v); err != nil { return err } return nil } func (block *Block) WriteString(c int, v string) error { - if err := block.Buffers[c].Column.Uvarint(uint64(len(v))); err != nil { + if err := block.buffers[c].Column.Uvarint(uint64(len(v))); err != nil { return err } - if _, err := block.Buffers[c].Column.Write(binary.Str2Bytes(v)); err != nil { + if _, err := block.buffers[c].Column.Write(binary.Str2Bytes(v)); err != nil { return err } return nil } func (block *Block) WriteFixedString(c int, v []byte) error { - return block.Columns[c].Write(block.Buffers[c].Column, v) + return block.Columns[c].Write(block.buffers[c].Column, v) } func (block *Block) WriteArray(c int, v *types.Array) error { - ln, err := block.Columns[c].(*column.Array).WriteArray(block.Buffers[c].Column, v) + ln, err := block.Columns[c].(*column.Array).WriteArray(block.buffers[c].Column, v) if err != nil { return err } block.offsets[c] += ln - return block.Buffers[c].Offset.UInt64(block.offsets[c]) + return block.buffers[c].Offset.UInt64(block.offsets[c]) } diff --git a/lib/types/array.go b/lib/types/array.go index 36d8f56160..ec7f8de86a 100644 --- a/lib/types/array.go +++ b/lib/types/array.go @@ -55,7 +55,7 @@ func (array *Array) Value() (driver.Value, error) { encoder.Uvarint(uint64(ln)) for i := 0; i < ln; i++ { if err := array.column.Write(encoder, v.Index(i).Interface()); err != nil { - buff.Free() + buff.Reset() return nil, err } } diff --git a/lib/writebuffer/buffer.go b/lib/writebuffer/buffer.go index 3d6f0940e0..5c43a67247 100644 --- a/lib/writebuffer/buffer.go +++ b/lib/writebuffer/buffer.go @@ -43,12 +43,12 @@ func (wb *WriteBuffer) WriteTo(w io.Writer) (int64, error) { for _, chunk := range wb.chunks { ln, err := w.Write(chunk) if err != nil { - wb.Free() + wb.Reset() return 0, err } size += int64(ln) } - wb.Free() + wb.Reset() return size, nil } @@ -90,7 +90,7 @@ func (wb *WriteBuffer) calcCap(dataSize int) int { return max(dataSize, cap(wb.chunks[len(wb.chunks)-1])*2) } -func (wb *WriteBuffer) Free() { +func (wb *WriteBuffer) Reset() { if len(wb.chunks) == 0 { return }