Skip to content

Commit

Permalink
Merge pull request #11 from agoncear-mwb/main
Browse files Browse the repository at this point in the history
Added explicit key for using unsafe string reader
  • Loading branch information
auxten authored Jul 26, 2024
2 parents 82a5138 + 02f64ea commit f3d6a72
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 19 deletions.
38 changes: 28 additions & 10 deletions chdb/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ const (
INVALID
)

const sessionOptionKey = "session"
const udfPathOptionKey = "udfPath"
const driverTypeKey = "driverType"
const driverBufferSizeKey = "bufferSize"

const defaultBufferSize = 512
const (
sessionOptionKey = "session"
udfPathOptionKey = "udfPath"
driverTypeKey = "driverType"
useUnsafeStringReaderKey = "useUnsafeStringReader"
driverBufferSizeKey = "bufferSize"
defaultBufferSize = 512
)

func (d DriverType) String() string {
switch d {
Expand All @@ -44,7 +46,7 @@ func (d DriverType) String() string {
return ""
}

func (d DriverType) PrepareRows(result *chdbstable.LocalResult, buf []byte, bufSize int) (driver.Rows, error) {
func (d DriverType) PrepareRows(result *chdbstable.LocalResult, buf []byte, bufSize int, useUnsafe bool) (driver.Rows, error) {
switch d {
case ARROW:
reader, err := ipc.NewFileReader(bytes.NewReader(buf))
Expand All @@ -54,7 +56,11 @@ func (d DriverType) PrepareRows(result *chdbstable.LocalResult, buf []byte, bufS
return &arrowRows{localResult: result, reader: reader}, nil
case PARQUET:
reader := parquet.NewGenericReader[any](bytes.NewReader(buf))
return &parquetRows{localResult: result, reader: reader, bufferSize: bufSize, needNewBuffer: true}, nil
return &parquetRows{
localResult: result, reader: reader,
bufferSize: bufSize, needNewBuffer: true,
useUnsafeStringReader: useUnsafe,
}, nil
}
return nil, fmt.Errorf("Unsupported driver type")
}
Expand All @@ -79,6 +85,7 @@ type connector struct {
udfPath string
driverType DriverType
bufferSize int
useUnsafe bool
session *chdb.Session
}

Expand All @@ -87,7 +94,11 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
if c.driverType == INVALID {
return nil, fmt.Errorf("DriverType not supported")
}
cc := &conn{udfPath: c.udfPath, session: c.session, driverType: c.driverType, bufferSize: c.bufferSize}
cc := &conn{
udfPath: c.udfPath, session: c.session,
driverType: c.driverType, bufferSize: c.bufferSize,
useUnsafe: c.useUnsafe,
}
cc.SetupQueryFun()
return cc, nil
}
Expand Down Expand Up @@ -138,6 +149,12 @@ func NewConnect(opts map[string]string) (ret *connector, err error) {
} else {
ret.bufferSize = defaultBufferSize
}
useUnsafe, ok := opts[useUnsafeStringReaderKey]
if ok {
if strings.ToLower(useUnsafe) == "true" {
ret.useUnsafe = true
}
}

udfPath, ok := opts[udfPathOptionKey]
if ok {
Expand Down Expand Up @@ -170,6 +187,7 @@ type conn struct {
udfPath string
driverType DriverType
bufferSize int
useUnsafe bool
session *chdb.Session
QueryFun queryHandle
}
Expand Down Expand Up @@ -229,7 +247,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
if buf == nil {
return nil, fmt.Errorf("result is nil")
}
return c.driverType.PrepareRows(result, buf, c.bufferSize)
return c.driverType.PrepareRows(result, buf, c.bufferSize, c.useUnsafe)

}

Expand Down
28 changes: 19 additions & 9 deletions chdb/driver/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/parquet-go/parquet-go"
)

// NOTE: this function is strictly unsafe and can lead to undefined behavior if the underlying slice is going out of scope or if it is being modified while in use.
// Use this function ONLY if you know that both of the conditions are respected and you need to allocate less memory possible.
func bytesToString(data []byte) string {
return *(*string)(unsafe.Pointer(&data))
}
Expand All @@ -22,14 +24,15 @@ func getStringFromBytes(v parquet.Value) string {
}

type parquetRows struct {
localResult *chdbstable.LocalResult // result from clickhouse
reader *parquet.GenericReader[any] // parquet reader
curRecord parquet.Row // TODO: delete this?
buffer []parquet.Row // record buffer
bufferSize int // amount of records to preload into buffer
bufferIndex int64 // index in the current buffer
curRow int64 // row counter
needNewBuffer bool
localResult *chdbstable.LocalResult // result from clickhouse
reader *parquet.GenericReader[any] // parquet reader
curRecord parquet.Row // TODO: delete this?
buffer []parquet.Row // record buffer
bufferSize int // amount of records to preload into buffer
bufferIndex int64 // index in the current buffer
curRow int64 // row counter
needNewBuffer bool
useUnsafeStringReader bool
}

func (r *parquetRows) Columns() (out []string) {
Expand Down Expand Up @@ -100,7 +103,14 @@ func (r *parquetRows) Next(dest []driver.Value) error {
}
switch r.ColumnTypeDatabaseTypeName(columnIndex) {
case "STRING":
dest[columnIndex] = getStringFromBytes(curVal)
// we check if the user has initialized the connection with the unsafeStringReader parameter, and in that case we use `getStringFromBytes` method.
// otherwise, we fallback to the traditional way and we allocate a new string
if r.useUnsafeStringReader {
dest[columnIndex] = getStringFromBytes(curVal)
} else {
dest[columnIndex] = string(curVal.ByteArray())
}

case "INT8", "INT(8,true)":
dest[columnIndex] = int8(curVal.Int32()) //check if this is correct
case "INT16", "INT(16,true)":
Expand Down

0 comments on commit f3d6a72

Please sign in to comment.