From a122216e0999177ed95d6a89117c366d7729e85e Mon Sep 17 00:00:00 2001 From: Andrey Kapitonov Date: Sat, 11 Jun 2022 12:20:34 +0300 Subject: [PATCH] Add try to write method (#25) --- README.md | 6 ++- example/cmd/redis_safe/main.go | 85 ++++++++++++++++++++++++++++++++ example/cmd/simple_safe/main.go | 80 ++++++++++++++++++++++++++++++ tests/client_impl_test.go | 51 ++++++++++++++++++- tests/integration_memory_test.go | 30 +++++++++++ tests/integration_test.go | 24 ++++++++- write.go | 25 +++++++++- 7 files changed, 295 insertions(+), 6 deletions(-) create mode 100644 example/cmd/redis_safe/main.go create mode 100644 example/cmd/simple_safe/main.go diff --git a/README.md b/README.md index ecb21d1..6002e1b 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ type MyCustomDataView struct { uuid string insertTS time.Time } -// and implement cxbuffer.Vectorable interface +// and implement cx.Vectorable interface func (t *MyCustomDataView) Row() cx.Vector { return cx.Vector{t.id, t.uuid, t.insertTS.Format(time.RFC822)} } @@ -133,6 +133,10 @@ func (t *MyCustomDataView) Row() cx.Vector { writeAPI.WriteRow(&MyCustomDataView{ id: 1, uuid: "1", insertTS: time.Now(), }) +// or use a safe way (same as WriteRow, but safer) +writeAPI.TryWriteRow(&MyCustomDataView{ + id: 1, uuid: "1", insertTS: time.Now(), +}) ``` When using a non-blocking record, you can track errors through a special error channel diff --git a/example/cmd/redis_safe/main.go b/example/cmd/redis_safe/main.go new file mode 100644 index 0000000..79525f8 --- /dev/null +++ b/example/cmd/redis_safe/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "sync" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/go-redis/redis/v8" + + clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3" + "github.com/zikwall/clickhouse-buffer/v3/example/pkg/tables" + "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxredis" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative" +) + +func main() { + hostname := os.Getenv("CLICKHOUSE_HOST") + username := os.Getenv("CLICKHOUSE_USER") + database := os.Getenv("CLICKHOUSE_DB") + password := os.Getenv("CLICKHOUSE_PASS") + redisHost := os.Getenv("REDIS_HOST") + redisPass := os.Getenv("REDIS_PASS") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, conn, err := cxnative.NewClickhouse(ctx, &clickhouse.Options{ + Addr: []string{hostname}, + Auth: clickhouse.Auth{ + Database: database, + Username: username, + Password: password, + }, + Settings: clickhouse.Settings{ + "max_execution_time": 60, + }, + DialTimeout: 5 * time.Second, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + Debug: true, + }) + if err != nil { + log.Panicln(err) + } + if err := tables.CreateTableNative(ctx, conn); err != nil { + log.Panicln(err) + } + client := clickhousebuffer.NewClientWithOptions(ctx, ch, + clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + ) + rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ + Addr: redisHost, + Password: redisPass, + }), "bucket", client.Options().BatchSize()) + if err != nil { + log.Panicln(err) + } + writeAPI := client.Writer(cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + errorsCh := writeAPI.Errors() + for err := range errorsCh { + log.Printf("clickhouse write error: %s\n", err.Error()) + } + wg.Done() + }() + + int32s := []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + for _, val := range int32s { + writeAPI.TryWriteRow(&tables.ExampleTable{ + ID: val, UUID: fmt.Sprintf("uuidf %d", val), InsertTS: time.Now(), + }) + } + + <-time.After(time.Second * 2) + client.Close() + wg.Wait() +} diff --git a/example/cmd/simple_safe/main.go b/example/cmd/simple_safe/main.go new file mode 100644 index 0000000..190b158 --- /dev/null +++ b/example/cmd/simple_safe/main.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "sync" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + + clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3" + "github.com/zikwall/clickhouse-buffer/v3/example/pkg/tables" + "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxmem" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative" +) + +func main() { + hostname := os.Getenv("CLICKHOUSE_HOST") + username := os.Getenv("CLICKHOUSE_USER") + database := os.Getenv("CLICKHOUSE_DB") + password := os.Getenv("CLICKHOUSE_PASS") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, conn, err := cxnative.NewClickhouse(ctx, &clickhouse.Options{ + Addr: []string{hostname}, + Auth: clickhouse.Auth{ + Database: database, + Username: username, + Password: password, + }, + Settings: clickhouse.Settings{ + "max_execution_time": 60, + }, + DialTimeout: 5 * time.Second, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + Debug: true, + }) + if err != nil { + log.Panicln(err) + } + if err := tables.CreateTableNative(ctx, conn); err != nil { + log.Panicln(err) + } + client := clickhousebuffer.NewClientWithOptions(ctx, ch, + clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + ) + + writeAPI := client.Writer( + cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), + cxmem.NewBuffer(client.Options().BatchSize()), + ) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + errorsCh := writeAPI.Errors() + for err := range errorsCh { + log.Printf("clickhouse write error: %s\n", err.Error()) + } + wg.Done() + }() + + int32s := []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + for _, val := range int32s { + writeAPI.TryWriteRow(&tables.ExampleTable{ + ID: val, UUID: fmt.Sprintf("uuidf %d", val), InsertTS: time.Now(), + }) + } + + <-time.After(time.Second * 2) + client.Close() + wg.Wait() +} diff --git a/tests/client_impl_test.go b/tests/client_impl_test.go index f6a3078..2421e38 100644 --- a/tests/client_impl_test.go +++ b/tests/client_impl_test.go @@ -193,6 +193,7 @@ func TestClient(t *testing.T) { } }) + // nolint:dupl // it's OK t.Run("it should be successfully handle retry", func(t *testing.T) { mock := &ClickhouseImplRetryMock{} client := clickhousebuffer.NewClientWithOptions(ctx, mock, @@ -240,6 +241,54 @@ func TestClient(t *testing.T) { simulateWait(time.Millisecond * 350) }) + // nolint:dupl // it's OK + t.Run("[safe] it should be successfully handle retry", func(t *testing.T) { + mock := &ClickhouseImplRetryMock{} + client := clickhousebuffer.NewClientWithOptions(ctx, mock, + clickhousebuffer.DefaultOptions(). + SetFlushInterval(10). + SetBatchSize(1). + SetDebugMode(true). + SetRetryIsEnabled(true), + ) + defer client.Close() + memoryBuffer := cxsyncmem.NewBuffer( + client.Options().BatchSize(), + ) + writeAPI := client.Writer(tableView, memoryBuffer) + var errors []error + mu := &sync.RWMutex{} + errorsCh := writeAPI.Errors() + // Create go proc for reading and storing errors + go func() { + for err := range errorsCh { + mu.Lock() + errors = append(errors, err) + mu.Unlock() + } + }() + writeAPI.TryWriteRow(RowMock{ + id: 1, uuid: "1", insertTS: time.Now(), + }) + simulateWait(time.Millisecond * 10) + atomic.StoreInt32(&mock.hasErr, 1) + simulateWait(time.Millisecond * 2000) + mu.RLock() + defer mu.RUnlock() + if len(errors) != 1 { + t.Fatalf("failed, expected to get one error, received %d", len(errors)) + } + if memoryBuffer.Len() != 0 { + t.Fatal("failed, the buffer was expected to be cleared") + } + ok, nook, progress := client.RetryClient().Metrics() + fmt.Println("#3:", ok, nook, progress) + if ok != 1 || nook != 0 || progress != 0 { + t.Fatalf("failed, expect one successful and zero fail retries, expect %d and failed %d", ok, nook) + } + simulateWait(time.Millisecond * 350) + }) + t.Run("it should be successfully handle retry without error channel", func(t *testing.T) { mock := &ClickhouseImplRetryMock{} client := clickhousebuffer.NewClientWithOptions(ctx, mock, @@ -314,7 +363,7 @@ func TestClient(t *testing.T) { if memoryBuffer.Len() != 0 { t.Fatal("failed, the buffer was expected to be cleared") } - simulateWait(time.Millisecond * 5000) + simulateWait(time.Millisecond * 2000) ok, nook, progress := client.RetryClient().Metrics() fmt.Println("#4:", ok, nook, progress) if ok != 0 || nook != 0 || progress != 0 { diff --git a/tests/integration_memory_test.go b/tests/integration_memory_test.go index b9778c5..d54c583 100644 --- a/tests/integration_memory_test.go +++ b/tests/integration_memory_test.go @@ -73,6 +73,36 @@ func TestSQLMemory(t *testing.T) { } } +// nolint:dupl // it's OK +func TestMemorySafe(t *testing.T) { + var err error + log.Println("RUN INTEGRATION TEST WITH MEMORY AND NATIVE CLICKHOUSE [SAFE]") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // STEP 2: Create Clickhouse service + ch, clickhouse, err := useClickhousePool(ctx) + if err != nil { + t.Fatal(err) + } + // STEP 3: Drop and Create table under certain conditions + if err = beforeCheckTables(ctx, ch); err != nil { + t.Fatal(err) + } + // STEP 4: Create clickhouse client and buffer writer with redis buffer + client, memBuffer := useClientAndMemoryBuffer(ctx, clickhouse) + defer client.Close() + // STEP 5: Write own data to redis + writeAPI := useWriteAPI(client, memBuffer) + writeDataToBufferSafe(writeAPI) + // STEP 6: Checks! + if err = checksBuffer(memBuffer); err != nil { + t.Fatal(err) + } + if err = checksClickhouse(ctx, ch); err != nil { + t.Fatal(err) + } +} + func useClientAndMemoryBuffer(ctx context.Context, clickhouse cx.Clickhouse) (clickhousebuffer.Client, cx.Buffer) { client := useCommonClient(ctx, clickhouse) return client, cxsyncmem.NewBuffer(client.Options().BatchSize()) diff --git a/tests/integration_test.go b/tests/integration_test.go index b350cb0..b869fc9 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -92,7 +92,7 @@ func TestNative(t *testing.T) { t.Fatal(err) } // we expect an exception from Clickhouse: code: 60, message: Table default.test_integration_xxx_xxx doesn't exist - <-time.After(600 * time.Millisecond) + <-time.After(1000 * time.Millisecond) mu.RLock() defer mu.RUnlock() if len(errorsSlice) != 1 { @@ -155,7 +155,7 @@ func TestSQL(t *testing.T) { t.Fatal(err) } // we expect an exception from Clickhouse: code: 60, message: Table default.test_integration_xxx_xxx doesn't exist - <-time.After(600 * time.Millisecond) + <-time.After(1000 * time.Millisecond) mu.RLock() defer mu.RUnlock() if len(errorsSlice) != 1 { @@ -398,6 +398,26 @@ func writeDataToBuffer(writeAPI clickhousebuffer.Writer) { <-time.After(50 * time.Millisecond) } +func writeDataToBufferSafe(writeAPI clickhousebuffer.Writer) { + writeAPI.TryWriteRow(integrationRow{ + id: 1, uuid: "1", insertTS: time.Now(), + }) + writeAPI.TryWriteRow(integrationRow{ + id: 2, uuid: "2", insertTS: time.Now(), + }) + writeAPI.TryWriteRow(integrationRow{ + id: 3, uuid: "3", insertTS: time.Now(), + }) + writeAPI.TryWriteRow(integrationRow{ + id: 4, uuid: "4", insertTS: time.Now(), + }) + writeAPI.TryWriteRow(integrationRow{ + id: 5, uuid: "5", insertTS: time.Now(), + }) + // wait a bit + <-time.After(50 * time.Millisecond) +} + func checksBuffer(buf cx.Buffer) error { // try read from redis buffer before flushing data in buffer rows := buf.Read() diff --git a/write.go b/write.go index 2a33d63..5385c2e 100644 --- a/write.go +++ b/write.go @@ -15,6 +15,8 @@ import ( type Writer interface { // WriteRow writes asynchronously line protocol record into bucket. WriteRow(vector cx.Vectorable) + // TryWriteRow same as WriteRow, but with Channel Closing Principle (Gracefully Close Channels) + TryWriteRow(vec cx.Vectorable) // Errors returns a channel for reading errors which occurs during async writes. Errors() <-chan error // Close writer @@ -61,10 +63,29 @@ func NewWriter(ctx context.Context, client Client, view cx.View, engine cx.Buffe // WriteRow writes asynchronously line protocol record into bucket. // WriteRow adds record into the buffer which is sent on the background when it reaches the batch size. -func (w *writer) WriteRow(rower cx.Vectorable) { +func (w *writer) WriteRow(vec cx.Vectorable) { // maybe use atomic for check is closed // atomic.LoadInt32(&w.isClosed) == 1 - w.bufferCh <- rower.Row() + w.bufferCh <- vec.Row() +} + +// TryWriteRow same as WriteRow, but with Channel Closing Principle (Gracefully Close Channels) +func (w *writer) TryWriteRow(vec cx.Vectorable) { + // the try-receive operation is to try to exit the goroutine as early as + // possible. For this specified example, it is not essential. + select { + case <-w.bufferStop: + return + default: + } + // even if bufferStop is closed, the first branch in the second select may be + // still not selected for some loops if to send to bufferCh is also unblocked. + // But this is acceptable for this example, so the first select block above can be omitted. + select { + case <-w.bufferStop: + return + case w.bufferCh <- vec.Row(): + } } // Errors returns a channel for reading errors which occurs during async writes.