Skip to content

Commit

Permalink
V4: improve context transfer, options and fix race in redis buffer (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
zikwall authored Aug 10, 2022
1 parent 34f12d2 commit 297ea09
Show file tree
Hide file tree
Showing 20 changed files with 177 additions and 88 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ This is due to the fact that Clickhouse is designed so that it better processes
import (
"database/sql"

"github.com/zikwall/clickhouse-buffer/v3/src/cx"
"github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative"
"github.com/zikwall/clickhouse-buffer/v3/src/db/cxsql"
)

// if you already have a connection to Clickhouse you can just use wrappers
// with native interface
ch := cxnative.NewClickhouseWithConn(driver.Conn)
ch := cxnative.NewClickhouseWithConn(driver.Conn, &cx.RuntimeOptions{})
// or use database/sql interface
ch := cxsql.NewClickhouseWithConn(*sql.DB)
ch := cxsql.NewClickhouseWithConn(*sql.DB, &cx.RuntimeOptions{})
```

```go
Expand All @@ -73,7 +74,7 @@ ch, conn, err := cxnative.NewClickhouse(ctx, &clickhouse.Options{
Method: clickhouse.CompressionLZ4,
},
Debug: ctx.Bool("debug"),
})
}, &cx.RuntimeOptions{})
// or with database/sql interface
ch, conn, err := cxsql.NewClickhouse(ctx, &clickhouse.Options{
Addr: ctx.StringSlice("clickhouse-host"),
Expand All @@ -90,7 +91,7 @@ ch, conn, err := cxsql.NewClickhouse(ctx, &clickhouse.Options{
Method: clickhouse.CompressionLZ4,
},
Debug: ctx.Bool("debug"),
}, &cxsql.RuntimeOptions{})
}, &cx.RuntimeOptions{})
```

#### Create main data streamer client and write data
Expand All @@ -111,10 +112,11 @@ buffer := cxmem.NewBuffer(
)
// or use redis
buffer := cxredis.NewBuffer(
contetx, *redis.Client, "bucket", client.Options().BatchSize(),
ctx, *redis.Client, "bucket", client.Options().BatchSize(),
)
// create new writer api: table name with columns
writeAPI := client.Writer(
ctx,
cx.NewView("clickhouse_database.clickhouse_table", []string{"id", "uuid", "insert_ts"}),
buffer,
)
Expand Down
6 changes: 6 additions & 0 deletions bench/insert_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func BenchmarkInsertRedisObjects(b *testing.B) {
log.Panicln(err)
}
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
rxbuffer,
)
Expand All @@ -69,6 +70,7 @@ func BenchmarkInsertRedisObjects(b *testing.B) {
log.Panicln(err)
}
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
rxbuffer,
)
Expand All @@ -89,6 +91,7 @@ func BenchmarkInsertRedisObjects(b *testing.B) {
log.Panicln(err)
}
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
rxbuffer,
)
Expand Down Expand Up @@ -140,6 +143,7 @@ func BenchmarkInsertRedisVectors(b *testing.B) {
log.Panicln(err)
}
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
rxbuffer,
)
Expand All @@ -161,6 +165,7 @@ func BenchmarkInsertRedisVectors(b *testing.B) {
log.Panicln(err)
}
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
rxbuffer,
)
Expand All @@ -182,6 +187,7 @@ func BenchmarkInsertRedisVectors(b *testing.B) {
log.Panicln(err)
}
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
rxbuffer,
)
Expand Down
37 changes: 37 additions & 0 deletions bench/insert_simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func BenchmarkInsertSimplestPreallocateVectors(b *testing.B) {
b.Run("1000000", func(b *testing.B) {
client.Options().SetBatchSize(1000001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -78,6 +79,7 @@ func BenchmarkInsertSimplestPreallocateVectors(b *testing.B) {
b.Run("100000", func(b *testing.B) {
client.Options().SetBatchSize(100001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -90,6 +92,7 @@ func BenchmarkInsertSimplestPreallocateVectors(b *testing.B) {
b.Run("10000", func(b *testing.B) {
client.Options().SetBatchSize(10001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -102,6 +105,7 @@ func BenchmarkInsertSimplestPreallocateVectors(b *testing.B) {
b.Run("1000", func(b *testing.B) {
client.Options().SetBatchSize(1001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -114,6 +118,7 @@ func BenchmarkInsertSimplestPreallocateVectors(b *testing.B) {
b.Run("100", func(b *testing.B) {
client.Options().SetBatchSize(101)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand Down Expand Up @@ -158,6 +163,7 @@ func BenchmarkInsertSimplestPreallocateObjects(b *testing.B) {
b.Run("1000000", func(b *testing.B) {
client.Options().SetBatchSize(1000001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -170,6 +176,7 @@ func BenchmarkInsertSimplestPreallocateObjects(b *testing.B) {
b.Run("100000", func(b *testing.B) {
client.Options().SetBatchSize(100001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -182,6 +189,7 @@ func BenchmarkInsertSimplestPreallocateObjects(b *testing.B) {
b.Run("10000", func(b *testing.B) {
client.Options().SetBatchSize(10001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -194,6 +202,7 @@ func BenchmarkInsertSimplestPreallocateObjects(b *testing.B) {
b.Run("1000", func(b *testing.B) {
client.Options().SetBatchSize(1001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -206,6 +215,7 @@ func BenchmarkInsertSimplestPreallocateObjects(b *testing.B) {
b.Run("100", func(b *testing.B) {
client.Options().SetBatchSize(101)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand Down Expand Up @@ -250,6 +260,7 @@ func BenchmarkInsertSimplestObjects(b *testing.B) {
b.Run("1000000", func(b *testing.B) {
client.Options().SetBatchSize(1000001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -262,6 +273,7 @@ func BenchmarkInsertSimplestObjects(b *testing.B) {
b.Run("100000", func(b *testing.B) {
client.Options().SetBatchSize(100001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -274,6 +286,7 @@ func BenchmarkInsertSimplestObjects(b *testing.B) {
b.Run("10000", func(b *testing.B) {
client.Options().SetBatchSize(10001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -286,6 +299,7 @@ func BenchmarkInsertSimplestObjects(b *testing.B) {
b.Run("1000", func(b *testing.B) {
client.Options().SetBatchSize(1001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -298,6 +312,7 @@ func BenchmarkInsertSimplestObjects(b *testing.B) {
b.Run("100", func(b *testing.B) {
client.Options().SetBatchSize(101)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand Down Expand Up @@ -342,6 +357,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) {
b.Run("10000000", func(b *testing.B) {
client.Options().SetBatchSize(10000001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -352,6 +368,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) {
b.Run("1000000", func(b *testing.B) {
client.Options().SetBatchSize(1000001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -362,6 +379,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) {
b.Run("100000", func(b *testing.B) {
client.Options().SetBatchSize(100001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -372,6 +390,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) {
b.Run("10000", func(b *testing.B) {
client.Options().SetBatchSize(10001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -382,6 +401,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) {
b.Run("1000", func(b *testing.B) {
client.Options().SetBatchSize(1001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -392,6 +412,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) {
b.Run("100", func(b *testing.B) {
client.Options().SetBatchSize(101)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand Down Expand Up @@ -434,6 +455,7 @@ func BenchmarkInsertSimplestVectors(b *testing.B) {
b.Run("1000000", func(b *testing.B) {
client.Options().SetBatchSize(1000001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -446,6 +468,7 @@ func BenchmarkInsertSimplestVectors(b *testing.B) {
b.Run("100000", func(b *testing.B) {
client.Options().SetBatchSize(100001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -458,6 +481,7 @@ func BenchmarkInsertSimplestVectors(b *testing.B) {
b.Run("10000", func(b *testing.B) {
client.Options().SetBatchSize(10001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -470,6 +494,7 @@ func BenchmarkInsertSimplestVectors(b *testing.B) {
b.Run("1000", func(b *testing.B) {
client.Options().SetBatchSize(1001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -482,6 +507,7 @@ func BenchmarkInsertSimplestVectors(b *testing.B) {
b.Run("100", func(b *testing.B) {
client.Options().SetBatchSize(101)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand Down Expand Up @@ -526,6 +552,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) {
b.Run("10000000", func(b *testing.B) {
client.Options().SetBatchSize(10000001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -536,6 +563,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) {
b.Run("1000000", func(b *testing.B) {
client.Options().SetBatchSize(1000001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -546,6 +574,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) {
b.Run("100000", func(b *testing.B) {
client.Options().SetBatchSize(100001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -556,6 +585,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) {
b.Run("10000", func(b *testing.B) {
client.Options().SetBatchSize(10001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -566,6 +596,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) {
b.Run("1000", func(b *testing.B) {
client.Options().SetBatchSize(1001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -576,6 +607,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) {
b.Run("100", func(b *testing.B) {
client.Options().SetBatchSize(101)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand Down Expand Up @@ -618,6 +650,7 @@ func BenchmarkInsertSimplestEmptyVectors(b *testing.B) {
b.Run("1000000", func(b *testing.B) {
client.Options().SetBatchSize(1000001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -630,6 +663,7 @@ func BenchmarkInsertSimplestEmptyVectors(b *testing.B) {
b.Run("100000", func(b *testing.B) {
client.Options().SetBatchSize(100001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -642,6 +676,7 @@ func BenchmarkInsertSimplestEmptyVectors(b *testing.B) {
b.Run("10000", func(b *testing.B) {
client.Options().SetBatchSize(10001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -654,6 +689,7 @@ func BenchmarkInsertSimplestEmptyVectors(b *testing.B) {
b.Run("1000", func(b *testing.B) {
client.Options().SetBatchSize(1001)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand All @@ -666,6 +702,7 @@ func BenchmarkInsertSimplestEmptyVectors(b *testing.B) {
b.Run("100", func(b *testing.B) {
client.Options().SetBatchSize(101)
writeAPI = client.Writer(
ctx,
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)
Expand Down
Loading

0 comments on commit 297ea09

Please sign in to comment.