From 297ea09db68de327a0a7dee571d48a38a181927a Mon Sep 17 00:00:00 2001 From: Andrey Kapitonov Date: Wed, 10 Aug 2022 23:50:37 +0300 Subject: [PATCH] V4: improve context transfer, options and fix race in redis buffer (#34) --- README.md | 12 ++++++---- bench/insert_redis_test.go | 6 +++++ bench/insert_simple_test.go | 37 ++++++++++++++++++++++++++++++ client.go | 6 ++--- example/cmd/advanced/main.go | 9 ++++++-- example/cmd/advanced_redis/main.go | 9 ++++++-- example/cmd/redis/main.go | 10 +++++--- example/cmd/redis_safe/main.go | 10 +++++--- example/cmd/redis_sql/main.go | 10 ++++---- example/cmd/simple/main.go | 10 +++++--- example/cmd/simple_2/main.go | 12 ++++++---- example/cmd/simple_safe/main.go | 10 +++++--- example/cmd/simple_sql/main.go | 10 ++++---- src/buffer/cxredis/buffer.go | 4 ++-- src/cx/support.go | 13 ++++++++++- src/db/cxnative/impl.go | 29 +++++++++++++++-------- src/db/cxsql/impl.go | 36 ++++++++++------------------- tests/client_impl_test.go | 12 +++++----- tests/integration_memory_test.go | 6 ++--- tests/integration_test.go | 14 ++++++----- 20 files changed, 177 insertions(+), 88 deletions(-) diff --git a/README.md b/README.md index 602914d..0bcb902 100644 --- a/README.md +++ b/README.md @@ -42,15 +42,16 @@ This is due to the fact that Clickhouse is designed so that it better processes import ( "database/sql" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" "github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative" "github.com/zikwall/clickhouse-buffer/v3/src/db/cxsql" ) // if you already have a connection to Clickhouse you can just use wrappers // with native interface -ch := cxnative.NewClickhouseWithConn(driver.Conn) +ch := cxnative.NewClickhouseWithConn(driver.Conn, &cx.RuntimeOptions{}) // or use database/sql interface -ch := cxsql.NewClickhouseWithConn(*sql.DB) +ch := cxsql.NewClickhouseWithConn(*sql.DB, &cx.RuntimeOptions{}) ``` ```go @@ -73,7 +74,7 @@ ch, conn, err := cxnative.NewClickhouse(ctx, &clickhouse.Options{ Method: clickhouse.CompressionLZ4, }, Debug: ctx.Bool("debug"), -}) +}, &cx.RuntimeOptions{}) // or with database/sql interface ch, conn, err := cxsql.NewClickhouse(ctx, &clickhouse.Options{ Addr: ctx.StringSlice("clickhouse-host"), @@ -90,7 +91,7 @@ ch, conn, err := cxsql.NewClickhouse(ctx, &clickhouse.Options{ Method: clickhouse.CompressionLZ4, }, Debug: ctx.Bool("debug"), -}, &cxsql.RuntimeOptions{}) +}, &cx.RuntimeOptions{}) ``` #### Create main data streamer client and write data @@ -111,10 +112,11 @@ buffer := cxmem.NewBuffer( ) // or use redis buffer := cxredis.NewBuffer( - contetx, *redis.Client, "bucket", client.Options().BatchSize(), + ctx, *redis.Client, "bucket", client.Options().BatchSize(), ) // create new writer api: table name with columns writeAPI := client.Writer( + ctx, cx.NewView("clickhouse_database.clickhouse_table", []string{"id", "uuid", "insert_ts"}), buffer, ) diff --git a/bench/insert_redis_test.go b/bench/insert_redis_test.go index a317e51..218cbf2 100644 --- a/bench/insert_redis_test.go +++ b/bench/insert_redis_test.go @@ -49,6 +49,7 @@ func BenchmarkInsertRedisObjects(b *testing.B) { log.Panicln(err) } writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer, ) @@ -69,6 +70,7 @@ func BenchmarkInsertRedisObjects(b *testing.B) { log.Panicln(err) } writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer, ) @@ -89,6 +91,7 @@ func BenchmarkInsertRedisObjects(b *testing.B) { log.Panicln(err) } writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer, ) @@ -140,6 +143,7 @@ func BenchmarkInsertRedisVectors(b *testing.B) { log.Panicln(err) } writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer, ) @@ -161,6 +165,7 @@ func BenchmarkInsertRedisVectors(b *testing.B) { log.Panicln(err) } writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer, ) @@ -182,6 +187,7 @@ func BenchmarkInsertRedisVectors(b *testing.B) { log.Panicln(err) } writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer, ) diff --git a/bench/insert_simple_test.go b/bench/insert_simple_test.go index d072bea..05b8dd9 100644 --- a/bench/insert_simple_test.go +++ b/bench/insert_simple_test.go @@ -66,6 +66,7 @@ func BenchmarkInsertSimplestPreallocateVectors(b *testing.B) { b.Run("1000000", func(b *testing.B) { client.Options().SetBatchSize(1000001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -78,6 +79,7 @@ func BenchmarkInsertSimplestPreallocateVectors(b *testing.B) { b.Run("100000", func(b *testing.B) { client.Options().SetBatchSize(100001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -90,6 +92,7 @@ func BenchmarkInsertSimplestPreallocateVectors(b *testing.B) { b.Run("10000", func(b *testing.B) { client.Options().SetBatchSize(10001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -102,6 +105,7 @@ func BenchmarkInsertSimplestPreallocateVectors(b *testing.B) { b.Run("1000", func(b *testing.B) { client.Options().SetBatchSize(1001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -114,6 +118,7 @@ func BenchmarkInsertSimplestPreallocateVectors(b *testing.B) { b.Run("100", func(b *testing.B) { client.Options().SetBatchSize(101) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -158,6 +163,7 @@ func BenchmarkInsertSimplestPreallocateObjects(b *testing.B) { b.Run("1000000", func(b *testing.B) { client.Options().SetBatchSize(1000001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -170,6 +176,7 @@ func BenchmarkInsertSimplestPreallocateObjects(b *testing.B) { b.Run("100000", func(b *testing.B) { client.Options().SetBatchSize(100001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -182,6 +189,7 @@ func BenchmarkInsertSimplestPreallocateObjects(b *testing.B) { b.Run("10000", func(b *testing.B) { client.Options().SetBatchSize(10001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -194,6 +202,7 @@ func BenchmarkInsertSimplestPreallocateObjects(b *testing.B) { b.Run("1000", func(b *testing.B) { client.Options().SetBatchSize(1001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -206,6 +215,7 @@ func BenchmarkInsertSimplestPreallocateObjects(b *testing.B) { b.Run("100", func(b *testing.B) { client.Options().SetBatchSize(101) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -250,6 +260,7 @@ func BenchmarkInsertSimplestObjects(b *testing.B) { b.Run("1000000", func(b *testing.B) { client.Options().SetBatchSize(1000001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -262,6 +273,7 @@ func BenchmarkInsertSimplestObjects(b *testing.B) { b.Run("100000", func(b *testing.B) { client.Options().SetBatchSize(100001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -274,6 +286,7 @@ func BenchmarkInsertSimplestObjects(b *testing.B) { b.Run("10000", func(b *testing.B) { client.Options().SetBatchSize(10001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -286,6 +299,7 @@ func BenchmarkInsertSimplestObjects(b *testing.B) { b.Run("1000", func(b *testing.B) { client.Options().SetBatchSize(1001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -298,6 +312,7 @@ func BenchmarkInsertSimplestObjects(b *testing.B) { b.Run("100", func(b *testing.B) { client.Options().SetBatchSize(101) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -342,6 +357,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) { b.Run("10000000", func(b *testing.B) { client.Options().SetBatchSize(10000001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -352,6 +368,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) { b.Run("1000000", func(b *testing.B) { client.Options().SetBatchSize(1000001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -362,6 +379,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) { b.Run("100000", func(b *testing.B) { client.Options().SetBatchSize(100001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -372,6 +390,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) { b.Run("10000", func(b *testing.B) { client.Options().SetBatchSize(10001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -382,6 +401,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) { b.Run("1000", func(b *testing.B) { client.Options().SetBatchSize(1001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -392,6 +412,7 @@ func BenchmarkInsertSimplestObjectsJust(b *testing.B) { b.Run("100", func(b *testing.B) { client.Options().SetBatchSize(101) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -434,6 +455,7 @@ func BenchmarkInsertSimplestVectors(b *testing.B) { b.Run("1000000", func(b *testing.B) { client.Options().SetBatchSize(1000001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -446,6 +468,7 @@ func BenchmarkInsertSimplestVectors(b *testing.B) { b.Run("100000", func(b *testing.B) { client.Options().SetBatchSize(100001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -458,6 +481,7 @@ func BenchmarkInsertSimplestVectors(b *testing.B) { b.Run("10000", func(b *testing.B) { client.Options().SetBatchSize(10001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -470,6 +494,7 @@ func BenchmarkInsertSimplestVectors(b *testing.B) { b.Run("1000", func(b *testing.B) { client.Options().SetBatchSize(1001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -482,6 +507,7 @@ func BenchmarkInsertSimplestVectors(b *testing.B) { b.Run("100", func(b *testing.B) { client.Options().SetBatchSize(101) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -526,6 +552,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) { b.Run("10000000", func(b *testing.B) { client.Options().SetBatchSize(10000001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -536,6 +563,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) { b.Run("1000000", func(b *testing.B) { client.Options().SetBatchSize(1000001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -546,6 +574,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) { b.Run("100000", func(b *testing.B) { client.Options().SetBatchSize(100001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -556,6 +585,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) { b.Run("10000", func(b *testing.B) { client.Options().SetBatchSize(10001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -566,6 +596,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) { b.Run("1000", func(b *testing.B) { client.Options().SetBatchSize(1001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -576,6 +607,7 @@ func BenchmarkInsertSimplestVectorsJust(b *testing.B) { b.Run("100", func(b *testing.B) { client.Options().SetBatchSize(101) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -618,6 +650,7 @@ func BenchmarkInsertSimplestEmptyVectors(b *testing.B) { b.Run("1000000", func(b *testing.B) { client.Options().SetBatchSize(1000001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -630,6 +663,7 @@ func BenchmarkInsertSimplestEmptyVectors(b *testing.B) { b.Run("100000", func(b *testing.B) { client.Options().SetBatchSize(100001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -642,6 +676,7 @@ func BenchmarkInsertSimplestEmptyVectors(b *testing.B) { b.Run("10000", func(b *testing.B) { client.Options().SetBatchSize(10001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -654,6 +689,7 @@ func BenchmarkInsertSimplestEmptyVectors(b *testing.B) { b.Run("1000", func(b *testing.B) { client.Options().SetBatchSize(1001) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) @@ -666,6 +702,7 @@ func BenchmarkInsertSimplestEmptyVectors(b *testing.B) { b.Run("100", func(b *testing.B) { client.Options().SetBatchSize(101) writeAPI = client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) diff --git a/client.go b/client.go index da710f5..e39c258 100644 --- a/client.go +++ b/client.go @@ -20,7 +20,7 @@ type Client interface { WriteBatch(context.Context, cx.View, *cx.Batch) error // Writer returns the asynchronous, non-blocking, Writer client. // Ensures using a single Writer instance for each table pair. - Writer(cx.View, cx.Buffer) Writer + Writer(context.Context, cx.View, cx.Buffer) Writer // WriterBlocking returns the synchronous, blocking, WriterBlocking client. // Ensures using a single WriterBlocking instance for each table pair. WriterBlocking(cx.View) WriterBlocking @@ -83,11 +83,11 @@ func (c *clientImpl) Options() *Options { // Writer returns the asynchronous, non-blocking, Writer client. // Ensures using a single Writer instance for each table pair. -func (c *clientImpl) Writer(view cx.View, buf cx.Buffer) Writer { +func (c *clientImpl) Writer(ctx context.Context, view cx.View, buf cx.Buffer) Writer { key := view.Name c.mu.Lock() if _, ok := c.writeAPIs[key]; !ok { - c.writeAPIs[key] = NewWriter(c.context, c, view, buf) + c.writeAPIs[key] = NewWriter(ctx, c, view, buf) } writer := c.writeAPIs[key] c.mu.Unlock() diff --git a/example/cmd/advanced/main.go b/example/cmd/advanced/main.go index df8eddc..7fae4dd 100644 --- a/example/cmd/advanced/main.go +++ b/example/cmd/advanced/main.go @@ -41,6 +41,8 @@ func main() { Method: clickhouse.CompressionLZ4, }, Debug: true, + }, &cx.RuntimeOptions{ + WriteTimeout: 15 * time.Second, }) if err != nil { log.Panicln(err) @@ -49,10 +51,13 @@ func main() { log.Panicln(err) } - client := clickhousebuffer.NewClientWithOptions(ctx, ch, - clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(10), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, clickhousebuffer.DefaultOptions(). + SetDebugMode(true). + SetFlushInterval(1000). + SetBatchSize(10), ) writeAPI := client.Writer( + ctx, cx.NewView(tables.AdvancedTableName(), tables.AdvancedTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) diff --git a/example/cmd/advanced_redis/main.go b/example/cmd/advanced_redis/main.go index 4b4a342..d62e606 100644 --- a/example/cmd/advanced_redis/main.go +++ b/example/cmd/advanced_redis/main.go @@ -45,6 +45,8 @@ func main() { Method: clickhouse.CompressionLZ4, }, Debug: true, + }, &cx.RuntimeOptions{ + WriteTimeout: 15 * time.Second, }) if err != nil { log.Panicln(err) @@ -54,8 +56,10 @@ func main() { log.Panicln(err) } - client := clickhousebuffer.NewClientWithOptions(ctx, ch, - clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(10), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, clickhousebuffer.DefaultOptions(). + SetDebugMode(true). + SetFlushInterval(1000). + SetBatchSize(10), ) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, @@ -66,6 +70,7 @@ func main() { log.Panicln(err) } writeAPI := client.Writer( + ctx, cx.NewView(tables.AdvancedTableName(), tables.AdvancedTableColumns()), rxbuffer, ) diff --git a/example/cmd/redis/main.go b/example/cmd/redis/main.go index 428ce9b..77fbae1 100644 --- a/example/cmd/redis/main.go +++ b/example/cmd/redis/main.go @@ -44,6 +44,8 @@ func main() { Method: clickhouse.CompressionLZ4, }, Debug: true, + }, &cx.RuntimeOptions{ + WriteTimeout: 15 * time.Second, }) if err != nil { log.Panicln(err) @@ -51,8 +53,10 @@ func main() { if err := tables.CreateTableNative(ctx, conn); err != nil { log.Panicln(err) } - client := clickhousebuffer.NewClientWithOptions(ctx, ch, - clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, clickhousebuffer.DefaultOptions(). + SetDebugMode(true). + SetFlushInterval(1000). + SetBatchSize(5), ) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, @@ -61,7 +65,7 @@ func main() { if err != nil { log.Panicln(err) } - writeAPI := client.Writer(cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer) + writeAPI := client.Writer(ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer) wg := sync.WaitGroup{} wg.Add(1) go func() { diff --git a/example/cmd/redis_safe/main.go b/example/cmd/redis_safe/main.go index 79525f8..d30437b 100644 --- a/example/cmd/redis_safe/main.go +++ b/example/cmd/redis_safe/main.go @@ -44,6 +44,8 @@ func main() { Method: clickhouse.CompressionLZ4, }, Debug: true, + }, &cx.RuntimeOptions{ + WriteTimeout: 15 * time.Second, }) if err != nil { log.Panicln(err) @@ -51,8 +53,10 @@ func main() { if err := tables.CreateTableNative(ctx, conn); err != nil { log.Panicln(err) } - client := clickhousebuffer.NewClientWithOptions(ctx, ch, - clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, clickhousebuffer.DefaultOptions(). + SetDebugMode(true). + SetFlushInterval(1000). + SetBatchSize(5), ) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, @@ -61,7 +65,7 @@ func main() { if err != nil { log.Panicln(err) } - writeAPI := client.Writer(cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer) + writeAPI := client.Writer(ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer) wg := sync.WaitGroup{} wg.Add(1) go func() { diff --git a/example/cmd/redis_sql/main.go b/example/cmd/redis_sql/main.go index 0ab509a..6038b9d 100644 --- a/example/cmd/redis_sql/main.go +++ b/example/cmd/redis_sql/main.go @@ -44,15 +44,17 @@ func main() { Method: clickhouse.CompressionLZ4, }, Debug: true, - }, &cxsql.RuntimeOptions{}) + }, &cx.RuntimeOptions{}) if err != nil { log.Panicln(err) } if err := tables.CreateTableSQL(ctx, conn); err != nil { log.Panicln(err) } - client := clickhousebuffer.NewClientWithOptions(ctx, ch, - clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, clickhousebuffer.DefaultOptions(). + SetDebugMode(true). + SetFlushInterval(1000). + SetBatchSize(5), ) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, @@ -61,7 +63,7 @@ func main() { if err != nil { log.Panicln(err) } - writeAPI := client.Writer(cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer) + writeAPI := client.Writer(ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer) wg := sync.WaitGroup{} wg.Add(1) diff --git a/example/cmd/simple/main.go b/example/cmd/simple/main.go index d9a087b..de90883 100644 --- a/example/cmd/simple/main.go +++ b/example/cmd/simple/main.go @@ -41,6 +41,8 @@ func main() { Method: clickhouse.CompressionLZ4, }, Debug: true, + }, &cx.RuntimeOptions{ + WriteTimeout: 15 * time.Second, }) if err != nil { log.Panicln(err) @@ -48,11 +50,13 @@ func main() { if err := tables.CreateTableNative(ctx, conn); err != nil { log.Panicln(err) } - client := clickhousebuffer.NewClientWithOptions(ctx, ch, - clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, clickhousebuffer.DefaultOptions(). + SetDebugMode(true). + SetFlushInterval(1000). + SetBatchSize(5), ) - writeAPI := client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) diff --git a/example/cmd/simple_2/main.go b/example/cmd/simple_2/main.go index 0a8ee66..1539797 100644 --- a/example/cmd/simple_2/main.go +++ b/example/cmd/simple_2/main.go @@ -40,6 +40,8 @@ func main() { Method: clickhouse.CompressionLZ4, }, Debug: true, + }, &cx.RuntimeOptions{ + WriteTimeout: 15 * time.Second, }) if err != nil { log.Panicln(err) @@ -47,22 +49,22 @@ func main() { if err := tables.CreateTableNative(ctx, conn); err != nil { log.Panicln(err) } - client := clickhousebuffer.NewClientWithOptions(ctx, ch, - clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, clickhousebuffer.DefaultOptions(). + SetDebugMode(true). + SetFlushInterval(1000). + SetBatchSize(5), ) - writeAPI := client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) - int32s := []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} for _, val := range int32s { writeAPI.WriteRow(&tables.ExampleTable{ ID: val, UUID: fmt.Sprintf("uuidf %d", val), InsertTS: time.Now(), }) } - <-time.After(time.Second * 2) client.Close() } diff --git a/example/cmd/simple_safe/main.go b/example/cmd/simple_safe/main.go index 190b158..998eb78 100644 --- a/example/cmd/simple_safe/main.go +++ b/example/cmd/simple_safe/main.go @@ -41,6 +41,8 @@ func main() { Method: clickhouse.CompressionLZ4, }, Debug: true, + }, &cx.RuntimeOptions{ + WriteTimeout: 15 * time.Second, }) if err != nil { log.Panicln(err) @@ -48,11 +50,13 @@ func main() { if err := tables.CreateTableNative(ctx, conn); err != nil { log.Panicln(err) } - client := clickhousebuffer.NewClientWithOptions(ctx, ch, - clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, clickhousebuffer.DefaultOptions(). + SetDebugMode(true). + SetFlushInterval(1000). + SetBatchSize(5), ) - writeAPI := client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) diff --git a/example/cmd/simple_sql/main.go b/example/cmd/simple_sql/main.go index c6b80e4..e042c4b 100644 --- a/example/cmd/simple_sql/main.go +++ b/example/cmd/simple_sql/main.go @@ -41,18 +41,20 @@ func main() { Method: clickhouse.CompressionLZ4, }, Debug: true, - }, &cxsql.RuntimeOptions{}) + }, &cx.RuntimeOptions{}) if err != nil { log.Panicln(err) } if err := tables.CreateTableSQL(ctx, conn); err != nil { log.Panicln(err) } - client := clickhousebuffer.NewClientWithOptions(ctx, ch, - clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, clickhousebuffer.DefaultOptions(). + SetDebugMode(true). + SetFlushInterval(1000). + SetBatchSize(5), ) - writeAPI := client.Writer( + ctx, cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), cxmem.NewBuffer(client.Options().BatchSize()), ) diff --git a/src/buffer/cxredis/buffer.go b/src/buffer/cxredis/buffer.go index 43d5eef..36f8f50 100644 --- a/src/buffer/cxredis/buffer.go +++ b/src/buffer/cxredis/buffer.go @@ -37,10 +37,10 @@ func (r *redisBuffer) Read() []cx.Vector { } func (r *redisBuffer) Len() int { - return int(r.size) + return int(atomic.LoadInt64(&r.size)) } func (r *redisBuffer) Flush() { r.client.LTrim(r.context, r.bucket, r.bufferSize, -1).Val() - atomic.CompareAndSwapInt64(&r.size, r.size, 0) + atomic.StoreInt64(&r.size, 0) } diff --git a/src/cx/support.go b/src/cx/support.go index 461c5c7..43411b5 100644 --- a/src/cx/support.go +++ b/src/cx/support.go @@ -9,7 +9,7 @@ import ( const defaultInsertDurationTimeout = time.Millisecond * 15000 // GetDefaultInsertDurationTimeout to get away from this decision in the near future -func GetDefaultInsertDurationTimeout() time.Duration { +func getDefaultInsertDurationTimeout() time.Duration { return defaultInsertDurationTimeout } @@ -54,3 +54,14 @@ func IsResendAvailable(err error) bool { } return true } + +type RuntimeOptions struct { + WriteTimeout time.Duration +} + +func (r *RuntimeOptions) GetWriteTimeout() time.Duration { + if r.WriteTimeout != 0 { + return r.WriteTimeout + } + return getDefaultInsertDurationTimeout() +} diff --git a/src/db/cxnative/impl.go b/src/db/cxnative/impl.go index 42fc99f..29262e3 100644 --- a/src/db/cxnative/impl.go +++ b/src/db/cxnative/impl.go @@ -50,16 +50,27 @@ func (c *clickhouseNative) Close() error { return c.conn.Close() } -func NewClickhouse(ctx context.Context, options *clickhouse.Options) (cx.Clickhouse, driver.Conn, error) { +func NewClickhouse( + ctx context.Context, + options *clickhouse.Options, + runtime *cx.RuntimeOptions, +) ( + cx.Clickhouse, + driver.Conn, + error, +) { conn, err := clickhouse.Open(options) if err != nil { return nil, nil, err } - ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{ - "max_block_size": 10, - }), clickhouse.WithProgress(func(p *clickhouse.Progress) { - fmt.Println("progress: ", p) - })) + ctx = clickhouse.Context(ctx, + clickhouse.WithSettings(clickhouse.Settings{ + "max_block_size": 10, + }), + clickhouse.WithProgress(func(p *clickhouse.Progress) { + fmt.Println("progress: ", p) + }), + ) if err := conn.Ping(ctx); err != nil { if exception, ok := err.(*clickhouse.Exception); ok { fmt.Printf("catch exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) @@ -68,13 +79,13 @@ func NewClickhouse(ctx context.Context, options *clickhouse.Options) (cx.Clickho } return &clickhouseNative{ conn: conn, - insertTimeout: cx.GetDefaultInsertDurationTimeout(), + insertTimeout: runtime.GetWriteTimeout(), }, conn, nil } -func NewClickhouseWithConn(conn driver.Conn) cx.Clickhouse { +func NewClickhouseWithConn(conn driver.Conn, runtime *cx.RuntimeOptions) cx.Clickhouse { return &clickhouseNative{ conn: conn, - insertTimeout: cx.GetDefaultInsertDurationTimeout(), + insertTimeout: runtime.GetWriteTimeout(), } } diff --git a/src/db/cxsql/impl.go b/src/db/cxsql/impl.go index 0af3dda..9a4361a 100644 --- a/src/db/cxsql/impl.go +++ b/src/db/cxsql/impl.go @@ -70,36 +70,24 @@ func (c *clickhouseSQL) Insert(ctx context.Context, view cx.View, rows []cx.Vect return affected, nil } -type RuntimeOptions struct { - MaxOpenConns int - MaxIdleConns int - ConnMaxLifetime time.Duration -} - func NewClickhouse( ctx context.Context, options *clickhouse.Options, - runtimeOpts *RuntimeOptions, + runtime *cx.RuntimeOptions, ) ( cx.Clickhouse, *sql.DB, error, ) { conn := clickhouse.OpenDB(options) - if runtimeOpts.MaxIdleConns == 0 { - conn.SetMaxIdleConns(runtimeOpts.MaxIdleConns) - } - if runtimeOpts.MaxOpenConns == 0 { - conn.SetMaxOpenConns(runtimeOpts.MaxOpenConns) - } - if runtimeOpts.ConnMaxLifetime == 0 { - conn.SetConnMaxLifetime(runtimeOpts.ConnMaxLifetime) - } - ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{ - "max_block_size": 10, - }), clickhouse.WithProgress(func(p *clickhouse.Progress) { - fmt.Println("progress: ", p) - })) + ctx = clickhouse.Context(ctx, + clickhouse.WithSettings(clickhouse.Settings{ + "max_block_size": 10, + }), + clickhouse.WithProgress(func(p *clickhouse.Progress) { + fmt.Println("progress: ", p) + }), + ) if err := conn.PingContext(ctx); err != nil { if exception, ok := err.(*clickhouse.Exception); ok { fmt.Printf("catch exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) @@ -108,13 +96,13 @@ func NewClickhouse( } return &clickhouseSQL{ conn: conn, - insertTimeout: cx.GetDefaultInsertDurationTimeout(), + insertTimeout: runtime.GetWriteTimeout(), }, conn, nil } -func NewClickhouseWithConn(conn *sql.DB) cx.Clickhouse { +func NewClickhouseWithConn(conn *sql.DB, runtime *cx.RuntimeOptions) cx.Clickhouse { return &clickhouseSQL{ conn: conn, - insertTimeout: cx.GetDefaultInsertDurationTimeout(), + insertTimeout: runtime.GetWriteTimeout(), } } diff --git a/tests/client_impl_test.go b/tests/client_impl_test.go index 2421e38..7747f2d 100644 --- a/tests/client_impl_test.go +++ b/tests/client_impl_test.go @@ -120,7 +120,7 @@ func TestClient(t *testing.T) { memoryBuffer := cxsyncmem.NewBuffer( client.Options().BatchSize(), ) - writeAPI := client.Writer(tableView, memoryBuffer) + writeAPI := client.Writer(ctx, tableView, memoryBuffer) writeAPI.WriteRow(RowMock{ id: 1, uuid: "1", insertTS: time.Now(), }) @@ -155,7 +155,7 @@ func TestClient(t *testing.T) { memoryBuffer := cxsyncmem.NewBuffer( client.Options().BatchSize(), ) - writeAPI := client.Writer(tableView, memoryBuffer) + writeAPI := client.Writer(ctx, tableView, memoryBuffer) var errors []error mu := &sync.RWMutex{} errorsCh := writeAPI.Errors() @@ -207,7 +207,7 @@ func TestClient(t *testing.T) { memoryBuffer := cxsyncmem.NewBuffer( client.Options().BatchSize(), ) - writeAPI := client.Writer(tableView, memoryBuffer) + writeAPI := client.Writer(ctx, tableView, memoryBuffer) var errors []error mu := &sync.RWMutex{} errorsCh := writeAPI.Errors() @@ -255,7 +255,7 @@ func TestClient(t *testing.T) { memoryBuffer := cxsyncmem.NewBuffer( client.Options().BatchSize(), ) - writeAPI := client.Writer(tableView, memoryBuffer) + writeAPI := client.Writer(ctx, tableView, memoryBuffer) var errors []error mu := &sync.RWMutex{} errorsCh := writeAPI.Errors() @@ -302,7 +302,7 @@ func TestClient(t *testing.T) { memoryBuffer := cxsyncmem.NewBuffer( client.Options().BatchSize(), ) - writeAPI := client.Writer(tableView, memoryBuffer) + writeAPI := client.Writer(ctx, tableView, memoryBuffer) writeAPI.WriteRow(RowMock{ id: 1, uuid: "1", insertTS: time.Now(), }) @@ -333,7 +333,7 @@ func TestClient(t *testing.T) { memoryBuffer := cxsyncmem.NewBuffer( client.Options().BatchSize(), ) - writeAPI := client.Writer(tableView, memoryBuffer) + writeAPI := client.Writer(ctx, tableView, memoryBuffer) var errors []error mu := &sync.RWMutex{} errorsCh := writeAPI.Errors() diff --git a/tests/integration_memory_test.go b/tests/integration_memory_test.go index d54c583..d1d051b 100644 --- a/tests/integration_memory_test.go +++ b/tests/integration_memory_test.go @@ -32,7 +32,7 @@ func TestMemory(t *testing.T) { client, memBuffer := useClientAndMemoryBuffer(ctx, clickhouse) defer client.Close() // STEP 5: Write own data to redis - writeAPI := useWriteAPI(client, memBuffer) + writeAPI := useWriteAPI(ctx, client, memBuffer) writeDataToBuffer(writeAPI) // STEP 6: Checks! if err = checksBuffer(memBuffer); err != nil { @@ -62,7 +62,7 @@ func TestSQLMemory(t *testing.T) { client, memBuffer := useClientAndMemoryBuffer(ctx, clickhouse) defer client.Close() // STEP 5: Write own data to redis - writeAPI := useWriteAPI(client, memBuffer) + writeAPI := useWriteAPI(ctx, client, memBuffer) writeDataToBuffer(writeAPI) // STEP 6: Checks! if err = checksBuffer(memBuffer); err != nil { @@ -92,7 +92,7 @@ func TestMemorySafe(t *testing.T) { client, memBuffer := useClientAndMemoryBuffer(ctx, clickhouse) defer client.Close() // STEP 5: Write own data to redis - writeAPI := useWriteAPI(client, memBuffer) + writeAPI := useWriteAPI(ctx, client, memBuffer) writeDataToBufferSafe(writeAPI) // STEP 6: Checks! if err = checksBuffer(memBuffer); err != nil { diff --git a/tests/integration_test.go b/tests/integration_test.go index eafaa3a..8b6e407 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -65,7 +65,7 @@ func TestNative(t *testing.T) { } defer client.Close() // STEP 5: Write own data to redis - writeAPI := useWriteAPI(client, redisBuffer) + writeAPI := useWriteAPI(ctx, client, redisBuffer) var errorsSlice []error mu := &sync.RWMutex{} errorsCh := writeAPI.Errors() @@ -128,7 +128,7 @@ func TestSQL(t *testing.T) { } defer client.Close() // STEP 5: Write own data to redis - writeAPI := useWriteAPI(client, redisBuffer) + writeAPI := useWriteAPI(ctx, client, redisBuffer) var errorsSlice []error mu := &sync.RWMutex{} errorsCh := writeAPI.Errors() @@ -336,7 +336,9 @@ func useOptions() *clickhouse.Options { } func useClickhousePool(ctx context.Context) (driver.Conn, cx.Clickhouse, error) { - nativeClickhouse, conn, err := cxnative.NewClickhouse(ctx, useOptions()) + nativeClickhouse, conn, err := cxnative.NewClickhouse(ctx, useOptions(), &cx.RuntimeOptions{ + WriteTimeout: 15 * time.Second, + }) if err != nil { return nil, nil, err } @@ -344,7 +346,7 @@ func useClickhousePool(ctx context.Context) (driver.Conn, cx.Clickhouse, error) } func useClickhouseSQLPool(ctx context.Context) (*sql.DB, cx.Clickhouse, error) { - sqlClickhouse, conn, err := cxsql.NewClickhouse(ctx, useOptions(), &cxsql.RuntimeOptions{}) + sqlClickhouse, conn, err := cxsql.NewClickhouse(ctx, useOptions(), &cx.RuntimeOptions{}) if err != nil { return nil, nil, err } @@ -374,8 +376,8 @@ func useClientAndRedisBuffer( return client, buf, nil } -func useWriteAPI(client clickhousebuffer.Client, buf cx.Buffer) clickhousebuffer.Writer { - writeAPI := client.Writer(cx.NewView(integrationTableName, []string{"id", "uuid", "insert_ts"}), buf) +func useWriteAPI(ctx context.Context, client clickhousebuffer.Client, buf cx.Buffer) clickhousebuffer.Writer { + writeAPI := client.Writer(ctx, cx.NewView(integrationTableName, []string{"id", "uuid", "insert_ts"}), buf) return writeAPI }