Skip to content

Commit

Permalink
Add try to write method (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
zikwall authored Jun 11, 2022
1 parent 6b5fbc6 commit a122216
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 6 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,18 @@ type MyCustomDataView struct {
uuid string
insertTS time.Time
}
// and implement cxbuffer.Vectorable interface
// and implement cx.Vectorable interface
func (t *MyCustomDataView) Row() cx.Vector {
return cx.Vector{t.id, t.uuid, t.insertTS.Format(time.RFC822)}
}
// async write your data
writeAPI.WriteRow(&MyCustomDataView{
id: 1, uuid: "1", insertTS: time.Now(),
})
// or use a safe way (same as WriteRow, but safer)
writeAPI.TryWriteRow(&MyCustomDataView{
id: 1, uuid: "1", insertTS: time.Now(),
})
```

When using a non-blocking record, you can track errors through a special error channel
Expand Down
85 changes: 85 additions & 0 deletions example/cmd/redis_safe/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package main

import (
"context"
"fmt"
"log"
"os"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/go-redis/redis/v8"

clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3"
"github.com/zikwall/clickhouse-buffer/v3/example/pkg/tables"
"github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxredis"
"github.com/zikwall/clickhouse-buffer/v3/src/cx"
"github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative"
)

func main() {
hostname := os.Getenv("CLICKHOUSE_HOST")
username := os.Getenv("CLICKHOUSE_USER")
database := os.Getenv("CLICKHOUSE_DB")
password := os.Getenv("CLICKHOUSE_PASS")
redisHost := os.Getenv("REDIS_HOST")
redisPass := os.Getenv("REDIS_PASS")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch, conn, err := cxnative.NewClickhouse(ctx, &clickhouse.Options{
Addr: []string{hostname},
Auth: clickhouse.Auth{
Database: database,
Username: username,
Password: password,
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
DialTimeout: 5 * time.Second,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
Debug: true,
})
if err != nil {
log.Panicln(err)
}
if err := tables.CreateTableNative(ctx, conn); err != nil {
log.Panicln(err)
}
client := clickhousebuffer.NewClientWithOptions(ctx, ch,
clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5),
)
rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{
Addr: redisHost,
Password: redisPass,
}), "bucket", client.Options().BatchSize())
if err != nil {
log.Panicln(err)
}
writeAPI := client.Writer(cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
errorsCh := writeAPI.Errors()
for err := range errorsCh {
log.Printf("clickhouse write error: %s\n", err.Error())
}
wg.Done()
}()

int32s := []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
for _, val := range int32s {
writeAPI.TryWriteRow(&tables.ExampleTable{
ID: val, UUID: fmt.Sprintf("uuidf %d", val), InsertTS: time.Now(),
})
}

<-time.After(time.Second * 2)
client.Close()
wg.Wait()
}
80 changes: 80 additions & 0 deletions example/cmd/simple_safe/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"context"
"fmt"
"log"
"os"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"

clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3"
"github.com/zikwall/clickhouse-buffer/v3/example/pkg/tables"
"github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxmem"
"github.com/zikwall/clickhouse-buffer/v3/src/cx"
"github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative"
)

func main() {
hostname := os.Getenv("CLICKHOUSE_HOST")
username := os.Getenv("CLICKHOUSE_USER")
database := os.Getenv("CLICKHOUSE_DB")
password := os.Getenv("CLICKHOUSE_PASS")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch, conn, err := cxnative.NewClickhouse(ctx, &clickhouse.Options{
Addr: []string{hostname},
Auth: clickhouse.Auth{
Database: database,
Username: username,
Password: password,
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
DialTimeout: 5 * time.Second,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
Debug: true,
})
if err != nil {
log.Panicln(err)
}
if err := tables.CreateTableNative(ctx, conn); err != nil {
log.Panicln(err)
}
client := clickhousebuffer.NewClientWithOptions(ctx, ch,
clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5),
)

writeAPI := client.Writer(
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
cxmem.NewBuffer(client.Options().BatchSize()),
)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
errorsCh := writeAPI.Errors()
for err := range errorsCh {
log.Printf("clickhouse write error: %s\n", err.Error())
}
wg.Done()
}()

int32s := []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
for _, val := range int32s {
writeAPI.TryWriteRow(&tables.ExampleTable{
ID: val, UUID: fmt.Sprintf("uuidf %d", val), InsertTS: time.Now(),
})
}

<-time.After(time.Second * 2)
client.Close()
wg.Wait()
}
51 changes: 50 additions & 1 deletion tests/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func TestClient(t *testing.T) {
}
})

// nolint:dupl // it's OK
t.Run("it should be successfully handle retry", func(t *testing.T) {
mock := &ClickhouseImplRetryMock{}
client := clickhousebuffer.NewClientWithOptions(ctx, mock,
Expand Down Expand Up @@ -240,6 +241,54 @@ func TestClient(t *testing.T) {
simulateWait(time.Millisecond * 350)
})

// nolint:dupl // it's OK
t.Run("[safe] it should be successfully handle retry", func(t *testing.T) {
mock := &ClickhouseImplRetryMock{}
client := clickhousebuffer.NewClientWithOptions(ctx, mock,
clickhousebuffer.DefaultOptions().
SetFlushInterval(10).
SetBatchSize(1).
SetDebugMode(true).
SetRetryIsEnabled(true),
)
defer client.Close()
memoryBuffer := cxsyncmem.NewBuffer(
client.Options().BatchSize(),
)
writeAPI := client.Writer(tableView, memoryBuffer)
var errors []error
mu := &sync.RWMutex{}
errorsCh := writeAPI.Errors()
// Create go proc for reading and storing errors
go func() {
for err := range errorsCh {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
}()
writeAPI.TryWriteRow(RowMock{
id: 1, uuid: "1", insertTS: time.Now(),
})
simulateWait(time.Millisecond * 10)
atomic.StoreInt32(&mock.hasErr, 1)
simulateWait(time.Millisecond * 2000)
mu.RLock()
defer mu.RUnlock()
if len(errors) != 1 {
t.Fatalf("failed, expected to get one error, received %d", len(errors))
}
if memoryBuffer.Len() != 0 {
t.Fatal("failed, the buffer was expected to be cleared")
}
ok, nook, progress := client.RetryClient().Metrics()
fmt.Println("#3:", ok, nook, progress)
if ok != 1 || nook != 0 || progress != 0 {
t.Fatalf("failed, expect one successful and zero fail retries, expect %d and failed %d", ok, nook)
}
simulateWait(time.Millisecond * 350)
})

t.Run("it should be successfully handle retry without error channel", func(t *testing.T) {
mock := &ClickhouseImplRetryMock{}
client := clickhousebuffer.NewClientWithOptions(ctx, mock,
Expand Down Expand Up @@ -314,7 +363,7 @@ func TestClient(t *testing.T) {
if memoryBuffer.Len() != 0 {
t.Fatal("failed, the buffer was expected to be cleared")
}
simulateWait(time.Millisecond * 5000)
simulateWait(time.Millisecond * 2000)
ok, nook, progress := client.RetryClient().Metrics()
fmt.Println("#4:", ok, nook, progress)
if ok != 0 || nook != 0 || progress != 0 {
Expand Down
30 changes: 30 additions & 0 deletions tests/integration_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,36 @@ func TestSQLMemory(t *testing.T) {
}
}

// nolint:dupl // it's OK
func TestMemorySafe(t *testing.T) {
var err error
log.Println("RUN INTEGRATION TEST WITH MEMORY AND NATIVE CLICKHOUSE [SAFE]")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// STEP 2: Create Clickhouse service
ch, clickhouse, err := useClickhousePool(ctx)
if err != nil {
t.Fatal(err)
}
// STEP 3: Drop and Create table under certain conditions
if err = beforeCheckTables(ctx, ch); err != nil {
t.Fatal(err)
}
// STEP 4: Create clickhouse client and buffer writer with redis buffer
client, memBuffer := useClientAndMemoryBuffer(ctx, clickhouse)
defer client.Close()
// STEP 5: Write own data to redis
writeAPI := useWriteAPI(client, memBuffer)
writeDataToBufferSafe(writeAPI)
// STEP 6: Checks!
if err = checksBuffer(memBuffer); err != nil {
t.Fatal(err)
}
if err = checksClickhouse(ctx, ch); err != nil {
t.Fatal(err)
}
}

func useClientAndMemoryBuffer(ctx context.Context, clickhouse cx.Clickhouse) (clickhousebuffer.Client, cx.Buffer) {
client := useCommonClient(ctx, clickhouse)
return client, cxsyncmem.NewBuffer(client.Options().BatchSize())
Expand Down
24 changes: 22 additions & 2 deletions tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestNative(t *testing.T) {
t.Fatal(err)
}
// we expect an exception from Clickhouse: code: 60, message: Table default.test_integration_xxx_xxx doesn't exist
<-time.After(600 * time.Millisecond)
<-time.After(1000 * time.Millisecond)
mu.RLock()
defer mu.RUnlock()
if len(errorsSlice) != 1 {
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestSQL(t *testing.T) {
t.Fatal(err)
}
// we expect an exception from Clickhouse: code: 60, message: Table default.test_integration_xxx_xxx doesn't exist
<-time.After(600 * time.Millisecond)
<-time.After(1000 * time.Millisecond)
mu.RLock()
defer mu.RUnlock()
if len(errorsSlice) != 1 {
Expand Down Expand Up @@ -398,6 +398,26 @@ func writeDataToBuffer(writeAPI clickhousebuffer.Writer) {
<-time.After(50 * time.Millisecond)
}

func writeDataToBufferSafe(writeAPI clickhousebuffer.Writer) {
writeAPI.TryWriteRow(integrationRow{
id: 1, uuid: "1", insertTS: time.Now(),
})
writeAPI.TryWriteRow(integrationRow{
id: 2, uuid: "2", insertTS: time.Now(),
})
writeAPI.TryWriteRow(integrationRow{
id: 3, uuid: "3", insertTS: time.Now(),
})
writeAPI.TryWriteRow(integrationRow{
id: 4, uuid: "4", insertTS: time.Now(),
})
writeAPI.TryWriteRow(integrationRow{
id: 5, uuid: "5", insertTS: time.Now(),
})
// wait a bit
<-time.After(50 * time.Millisecond)
}

func checksBuffer(buf cx.Buffer) error {
// try read from redis buffer before flushing data in buffer
rows := buf.Read()
Expand Down
25 changes: 23 additions & 2 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
type Writer interface {
// WriteRow writes asynchronously line protocol record into bucket.
WriteRow(vector cx.Vectorable)
// TryWriteRow same as WriteRow, but with Channel Closing Principle (Gracefully Close Channels)
TryWriteRow(vec cx.Vectorable)
// Errors returns a channel for reading errors which occurs during async writes.
Errors() <-chan error
// Close writer
Expand Down Expand Up @@ -61,10 +63,29 @@ func NewWriter(ctx context.Context, client Client, view cx.View, engine cx.Buffe

// WriteRow writes asynchronously line protocol record into bucket.
// WriteRow adds record into the buffer which is sent on the background when it reaches the batch size.
func (w *writer) WriteRow(rower cx.Vectorable) {
func (w *writer) WriteRow(vec cx.Vectorable) {
// maybe use atomic for check is closed
// atomic.LoadInt32(&w.isClosed) == 1
w.bufferCh <- rower.Row()
w.bufferCh <- vec.Row()
}

// TryWriteRow same as WriteRow, but with Channel Closing Principle (Gracefully Close Channels)
func (w *writer) TryWriteRow(vec cx.Vectorable) {
// the try-receive operation is to try to exit the goroutine as early as
// possible. For this specified example, it is not essential.
select {
case <-w.bufferStop:
return
default:
}
// even if bufferStop is closed, the first branch in the second select may be
// still not selected for some loops if to send to bufferCh is also unblocked.
// But this is acceptable for this example, so the first select block above can be omitted.
select {
case <-w.bufferStop:
return
case w.bufferCh <- vec.Row():
}
}

// Errors returns a channel for reading errors which occurs during async writes.
Expand Down

0 comments on commit a122216

Please sign in to comment.