-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwrite_options.go
149 lines (129 loc) · 3.63 KB
/
write_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package clickhousebuffer
import (
"github.com/zikwall/clickhouse-buffer/v4/src/cx"
"github.com/zikwall/clickhouse-buffer/v4/src/retry"
)
// Options holds write configuration properties
type Options struct {
// Maximum number of rows sent to server in single request. Default 5000
batchSize uint
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size).
// Default 1000ms
flushInterval uint
// Debug mode
isDebug bool
// retry.Retry is enabled
isRetryEnabled bool
// cx.Logger with
logger cx.Logger
// retry.Queueable with
queue retry.Queueable
}
// BatchSize returns size of batch
func (o *Options) BatchSize() uint {
return o.batchSize
}
// SetBatchSize sets number of rows sent in single request, it would be a good practice to remove this function,
// but it is convenient for testing. DO NOT USE in parallel environments
func (o *Options) SetBatchSize(batchSize uint) *Options {
o.batchSize = batchSize
return o
}
// FlushInterval returns flush interval in ms
func (o *Options) FlushInterval() uint {
return o.flushInterval
}
// SetFlushInterval sets flush interval in ms in which is buffer flushed if it has not been already written
// it would be a good practice to remove this function,
// but it is convenient for testing. DO NOT USE in parallel environments
func (o *Options) SetFlushInterval(flushIntervalMs uint) *Options {
o.flushInterval = flushIntervalMs
return o
}
// for multithreading systems, you can implement something like this:
//
// func (o *Options) ConcurrentlySetFlushInterval(flushIntervalMs uint) *Options {
// o.mx.Lock()
// o.flushInterval = flushIntervalMs
// o.mx.Unlock()
// return o
// }
// SetDebugMode set debug mode, for logs and errors
//
// Deprecated: use WithDebugMode function with NewOptions
func (o *Options) SetDebugMode(isDebug bool) *Options {
o.isDebug = isDebug
return o
}
// SetRetryIsEnabled enable/disable resending undelivered messages
//
// Deprecated: use WithRetry function with NewOptions
func (o *Options) SetRetryIsEnabled(enabled bool) *Options {
o.isRetryEnabled = enabled
return o
}
// SetLogger installs a custom implementation of the cx.Logger interface
//
// Deprecated: use WithLogger function with NewOptions
func (o *Options) SetLogger(logger cx.Logger) *Options {
o.logger = logger
return o
}
// SetQueueEngine installs a custom implementation of the retry.Queueable interface
//
// Deprecated: use WithRetryQueueEngine function with NewOptions
func (o *Options) SetQueueEngine(queue retry.Queueable) *Options {
o.queue = queue
return o
}
func WithBatchSize(size uint) Option {
return func(o *Options) {
o.batchSize = size
}
}
func WithFlushInterval(interval uint) Option {
return func(o *Options) {
o.flushInterval = interval
}
}
func WithDebugMode(isDebug bool) Option {
return func(o *Options) {
o.isDebug = isDebug
}
}
func WithRetry(enabled bool) Option {
return func(o *Options) {
o.isRetryEnabled = enabled
}
}
func WithLogger(logger cx.Logger) Option {
return func(o *Options) {
o.logger = logger
}
}
func WithRetryQueueEngine(queue retry.Queueable) Option {
return func(o *Options) {
o.queue = queue
}
}
type Option func(o *Options)
// NewOptions returns Options object with the ability to set your own parameters
func NewOptions(options ...Option) *Options {
o := &Options{
batchSize: 2000,
flushInterval: 2000,
}
for _, option := range options {
option(o)
}
return o
}
// DefaultOptions returns Options object with default values
//
// Deprecated: use NewOptions function with Option callbacks
func DefaultOptions() *Options {
return &Options{
batchSize: 5000,
flushInterval: 1000,
}
}