Skip to content

Commit

Permalink
#65 remove hardcode from Block
Browse files Browse the repository at this point in the history
  • Loading branch information
kshvakov committed Aug 31, 2017
1 parent c5ddef1 commit 7204389
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 71 deletions.
26 changes: 25 additions & 1 deletion lib/column/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,31 @@ func (array *Array) Write(encoder *binary.Encoder, v interface{}) error {
return fmt.Errorf("do not use Write method for Array(T) column")
}

func (array *Array) ReadArray(decoder *binary.Decoder, ln int) (interface{}, error) {
func (array *Array) ReadArray(decoder *binary.Decoder, rows int) (_ []interface{}, err error) {
var (
values = make([]interface{}, rows)
offsets = make([]uint64, rows)
)
for i := 0; i < rows; i++ {
offset, err := decoder.UInt64()
if err != nil {
return nil, err
}
offsets[i] = offset
}
for n, offset := range offsets {
ln := offset
if n != 0 {
ln = ln - offsets[n-1]
}
if values[n], err = array.read(decoder, int(ln)); err != nil {
return nil, err
}
}
return values, nil
}

func (array *Array) read(decoder *binary.Decoder, ln int) (interface{}, error) {
slice := reflect.MakeSlice(array.valueOf.Type(), 0, ln)
for i := 0; i < ln; i++ {
value, err := array.column.Read(decoder)
Expand Down
25 changes: 25 additions & 0 deletions lib/column/nullable.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,31 @@ func (null *Nullable) Write(encoder *binary.Encoder, v interface{}) error {
return nil
}

func (null *Nullable) ReadNull(decoder *binary.Decoder, rows int) (_ []interface{}, err error) {
var (
isNull byte
value interface{}
nulls = make([]byte, rows)
values = make([]interface{}, rows)
)
for i := 0; i < rows; i++ {
if isNull, err = decoder.ReadByte(); err != nil {
return nil, err
}
nulls[i] = isNull
}
for i, isNull := range nulls {
switch value, err = null.column.Read(decoder); true {
case err != nil:
return nil, err
case isNull == 0:
values[i] = value
default:
values[i] = nil
}
}
return values, nil
}
func (null *Nullable) WriteNull(nulls, encoder *binary.Encoder, v interface{}) error {
if v == nil {
if _, err := nulls.Write([]byte{1}); err != nil {
Expand Down
72 changes: 21 additions & 51 deletions lib/data/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
type Block struct {
Values [][]interface{}
Columns []column.Column
Buffers []*buffer
NumRows uint64
NumColumns uint64
offsets []uint64
buffers []*buffer
info blockInfo
}

Expand Down Expand Up @@ -73,44 +73,12 @@ func (block *Block) Read(serverInfo *ServerInfo, decoder *binary.Decoder) (err e
block.Columns = append(block.Columns, c)
switch column := c.(type) {
case *column.Array:
offsets := make([]uint64, block.NumRows)
for row := 0; row < int(block.NumRows); row++ {
offset, err := decoder.UInt64()
if err != nil {
return err
}
offsets[row] = offset
}
for n, offset := range offsets {
ln := offset
if n != 0 {
ln = ln - offsets[n-1]
}
if value, err = column.ReadArray(decoder, int(ln)); err != nil {
return err
}
block.Values[i] = append(block.Values[i], value)
if block.Values[i], err = column.ReadArray(decoder, int(block.NumRows)); err != nil {
return err
}
case *column.Nullable:
var (
isNull byte
nulls = make([]byte, block.NumRows)
)
for i := 0; i < int(block.NumRows); i++ {
if isNull, err = decoder.ReadByte(); err != nil {
return err
}
nulls[i] = isNull
}
for _, isNull := range nulls {
switch value, err = column.Read(decoder); true {
case err != nil:
return err
case isNull == 0:
block.Values[i] = append(block.Values[i], value)
default:
block.Values[i] = append(block.Values[i], nil)
}
if block.Values[i], err = column.ReadNull(decoder, int(block.NumRows)); err != nil {
return err
}
default:
for row := 0; row < int(block.NumRows); row++ {
Expand All @@ -135,20 +103,20 @@ func (block *Block) AppendRow(args []driver.Value) error {
for num, c := range block.Columns {
switch column := c.(type) {
case *column.Array:
ln, err := column.WriteArray(block.Buffers[num].Column, args[num])
ln, err := column.WriteArray(block.buffers[num].Column, args[num])
if err != nil {
return err
}
block.offsets[num] += ln
if err := block.Buffers[num].Offset.UInt64(block.offsets[num]); err != nil {
if err := block.buffers[num].Offset.UInt64(block.offsets[num]); err != nil {
return err
}
case *column.Nullable:
if err := column.WriteNull(block.Buffers[num].Offset, block.Buffers[num].Column, args[num]); err != nil {
if err := column.WriteNull(block.buffers[num].Offset, block.buffers[num].Column, args[num]); err != nil {
return err
}
default:
if err := column.Write(block.Buffers[num].Column, args[num]); err != nil {
if err := column.Write(block.buffers[num].Column, args[num]); err != nil {
return err
}
}
Expand All @@ -157,15 +125,15 @@ func (block *Block) AppendRow(args []driver.Value) error {
}

func (block *Block) Reserve() {
if len(block.Buffers) == 0 {
block.Buffers = make([]*buffer, len(block.Columns))
if len(block.buffers) == 0 {
block.buffers = make([]*buffer, len(block.Columns))
block.offsets = make([]uint64, len(block.Columns))
for i := 0; i < len(block.Columns); i++ {
var (
offsetBuffer = wb.New(wb.InitialSize)
columnBuffer = wb.New(wb.InitialSize)
)
block.Buffers[i] = &buffer{
block.buffers[i] = &buffer{
Offset: binary.NewEncoder(offsetBuffer),
Column: binary.NewEncoder(columnBuffer),
offsetBuffer: offsetBuffer,
Expand All @@ -178,11 +146,13 @@ func (block *Block) Reserve() {
func (block *Block) Reset() {
block.NumRows = 0
block.NumColumns = 0
for _, buffer := range block.Buffers {
for _, buffer := range block.buffers {
buffer.reset()
}
block.offsets = nil
block.Buffers = nil
{
block.offsets = nil
block.buffers = nil
}
}

func (block *Block) Write(serverInfo *ServerInfo, encoder *binary.Encoder) error {
Expand All @@ -199,8 +169,8 @@ func (block *Block) Write(serverInfo *ServerInfo, encoder *binary.Encoder) error
for i, column := range block.Columns {
encoder.String(column.Name())
encoder.String(column.CHType())
if len(block.Buffers) == len(block.Columns) {
if _, err := block.Buffers[i].WriteTo(encoder); err != nil {
if len(block.buffers) == len(block.Columns) {
if _, err := block.buffers[i].WriteTo(encoder); err != nil {
return err
}
}
Expand Down Expand Up @@ -282,6 +252,6 @@ func (buf *buffer) WriteTo(w io.Writer) (int64, error) {
}

func (buf *buffer) reset() {
buf.offsetBuffer.Free()
buf.columnBuffer.Free()
buf.offsetBuffer.Reset()
buf.columnBuffer.Reset()
}
30 changes: 15 additions & 15 deletions lib/data/block_write_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,66 +9,66 @@ import (
)

func (block *Block) WriteDate(c int, v time.Time) error {
return block.Buffers[c].Column.UInt16(uint16(v.Unix() / 24 / 3600))
return block.buffers[c].Column.UInt16(uint16(v.Unix() / 24 / 3600))
}

func (block *Block) WriteDateTime(c int, v time.Time) error {
return block.Buffers[c].Column.UInt32(uint32(v.Unix()))
return block.buffers[c].Column.UInt32(uint32(v.Unix()))
}

func (block *Block) WriteUInt8(c int, v uint8) error {
return block.Buffers[c].Column.UInt8(v)
return block.buffers[c].Column.UInt8(v)
}

func (block *Block) WriteUInt16(c int, v uint16) error {
return block.Buffers[c].Column.UInt16(v)
return block.buffers[c].Column.UInt16(v)
}

func (block *Block) WriteUInt32(c int, v uint32) error {
return block.Buffers[c].Column.UInt32(v)
return block.buffers[c].Column.UInt32(v)
}

func (block *Block) WriteUInt64(c int, v uint64) error {
return block.Buffers[c].Column.UInt64(v)
return block.buffers[c].Column.UInt64(v)
}

func (block *Block) WriteFloat32(c int, v float32) error {
return block.Buffers[c].Column.Float32(v)
return block.buffers[c].Column.Float32(v)
}

func (block *Block) WriteFloat64(c int, v float64) error {
return block.Buffers[c].Column.Float64(v)
return block.buffers[c].Column.Float64(v)
}

func (block *Block) WriteBytes(c int, v []byte) error {
if err := block.Buffers[c].Column.Uvarint(uint64(len(v))); err != nil {
if err := block.buffers[c].Column.Uvarint(uint64(len(v))); err != nil {
return err
}
if _, err := block.Buffers[c].Column.Write(v); err != nil {
if _, err := block.buffers[c].Column.Write(v); err != nil {
return err
}
return nil
}

func (block *Block) WriteString(c int, v string) error {
if err := block.Buffers[c].Column.Uvarint(uint64(len(v))); err != nil {
if err := block.buffers[c].Column.Uvarint(uint64(len(v))); err != nil {
return err
}
if _, err := block.Buffers[c].Column.Write(binary.Str2Bytes(v)); err != nil {
if _, err := block.buffers[c].Column.Write(binary.Str2Bytes(v)); err != nil {
return err
}
return nil
}

func (block *Block) WriteFixedString(c int, v []byte) error {
return block.Columns[c].Write(block.Buffers[c].Column, v)
return block.Columns[c].Write(block.buffers[c].Column, v)
}

func (block *Block) WriteArray(c int, v *types.Array) error {
ln, err := block.Columns[c].(*column.Array).WriteArray(block.Buffers[c].Column, v)
ln, err := block.Columns[c].(*column.Array).WriteArray(block.buffers[c].Column, v)
if err != nil {
return err
}
block.offsets[c] += ln
return block.Buffers[c].Offset.UInt64(block.offsets[c])
return block.buffers[c].Offset.UInt64(block.offsets[c])
}
2 changes: 1 addition & 1 deletion lib/types/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (array *Array) Value() (driver.Value, error) {
encoder.Uvarint(uint64(ln))
for i := 0; i < ln; i++ {
if err := array.column.Write(encoder, v.Index(i).Interface()); err != nil {
buff.Free()
buff.Reset()
return nil, err
}
}
Expand Down
6 changes: 3 additions & 3 deletions lib/writebuffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ func (wb *WriteBuffer) WriteTo(w io.Writer) (int64, error) {
for _, chunk := range wb.chunks {
ln, err := w.Write(chunk)
if err != nil {
wb.Free()
wb.Reset()
return 0, err
}
size += int64(ln)
}
wb.Free()
wb.Reset()
return size, nil
}

Expand Down Expand Up @@ -90,7 +90,7 @@ func (wb *WriteBuffer) calcCap(dataSize int) int {
return max(dataSize, cap(wb.chunks[len(wb.chunks)-1])*2)
}

func (wb *WriteBuffer) Free() {
func (wb *WriteBuffer) Reset() {
if len(wb.chunks) == 0 {
return
}
Expand Down

0 comments on commit 7204389

Please sign in to comment.