Skip to content

Commit

Permalink
Merge branch 'apache:master' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang committed Aug 7, 2023
2 parents 1943f4a + ee932fa commit 74846b8
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/binary"
"strconv"
"strings"
"sync"
"time"
"unsafe"

Expand All @@ -35,13 +36,28 @@ var (
byteOrder = binary.BigEndian
heartbeatRsp = []byte{0x00, 0x00, 0x00, 0x01, 0x01}
heartbeatRspLen = len(heartbeatRsp)
reqPool *sync.Pool
batchPool *sync.Pool
)

const (
msgTypeBatch uint8 = 5
msgTypeHeartbeat uint8 = 1
)

func init() {
reqPool = &sync.Pool{
New: func() interface{} {
return &sendDataReq{}
},
}
batchPool = &sync.Pool{
New: func() interface{} {
return &batchReq{}
},
}
}

type heartbeatReq struct {
}

Expand All @@ -68,6 +84,7 @@ func (h heartbeatReq) encode(buffer *bytes.Buffer) []byte {

type batchCallback func()
type batchReq struct {
pool *sync.Pool
workerID string
batchID string
groupID string
Expand Down Expand Up @@ -112,6 +129,10 @@ func (b *batchReq) done(err error) {
b.metrics.observeTime(errorCode, time.Since(b.batchTime).Milliseconds())
b.metrics.observeSize(errorCode, b.dataSize)
}

if b.pool != nil {
b.pool.Put(b)
}
}

func (b *batchReq) encode() []byte {
Expand Down Expand Up @@ -299,6 +320,7 @@ func (b *batchRsp) decode(input []byte) {
}

type sendDataReq struct {
pool *sync.Pool
ctx context.Context
msg Message
callback Callback
Expand Down Expand Up @@ -328,12 +350,21 @@ func (s *sendDataReq) done(err error, errCode string) {

s.metrics.incMessage(errCode)
}

if s.pool != nil {
s.pool.Put(s)
}
}

type closeReq struct {
doneCh chan struct{}
}

type sendFailedBatchReq struct {
batch *batchReq
retry bool
}

func getWorkerIndex(workerID string) int {
if workerID == "" {
return -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var (
errNewConnFailed = &errNo{code: 10006, strCode: "10006", message: "new conn failed"}
errConnWriteFailed = &errNo{code: 10007, strCode: "10007", message: "conn write failed"}
errConnReadFailed = &errNo{code: 10008, strCode: "10008", message: "conn read failed"}
errLogToLong = &errNo{code: 10009, strCode: "10009", message: "input log is too long"} //nolint:unused
errLogTooLong = &errNo{code: 10009, strCode: "10009", message: "input log is too long"} //nolint:unused
errBadLog = &errNo{code: 10010, strCode: "10010", message: "input log is invalid"}
errServerError = &errNo{code: 10011, strCode: "10011", message: "server error"}
errServerPanic = &errNo{code: 10012, strCode: "10012", message: "server panic"}
Expand Down Expand Up @@ -101,31 +101,31 @@ func getErrorCode(err error) string {
}

type worker struct {
client *client // parent client
index int // worker id
indexStr string // worker id string
options *Options // config options
state atomic.Int32 // worker state
log logger.Logger // debug logger
conn atomic.Value // connection used to send data
cmdChan chan interface{} // command channel
dataChan chan *sendDataReq // data channel
dataSemaphore syncx.Semaphore // semaphore used to handle message queueing
pendingBatches map[string]*batchReq // pending batches
unackedBatches map[string]*batchReq // sent but not acknowledged batches
sendFailedBatches chan *batchReq // send failed batches channel
retryBatches chan *batchReq // retry batches channel
responseBatches chan batchRsp // batch response channel
batchTimeoutTicker *time.Ticker // batch timeout ticker
sendTimeoutTicker *time.Ticker // send timeout ticker
heartbeatTicker *time.Ticker // heartbeat ticker
mapCleanTicker *time.Ticker // map clean ticker, clean the unackedBatches map periodically
updateConnTicker *time.Ticker // update connection ticker, change connection periodically
unackedBatchCount int // sent but not acknowledged batches counter, used to clean the unackedBatches map periodically
metrics *metrics // metrics
bufferPool bufferpool.BufferPool // buffer pool
bytePool bufferpool.BytePool // byte pool
stop bool // stop the worker
client *client // parent client
index int // worker id
indexStr string // worker id string
options *Options // config options
state atomic.Int32 // worker state
log logger.Logger // debug logger
conn atomic.Value // connection used to send data
cmdChan chan interface{} // command channel
dataChan chan *sendDataReq // data channel
dataSemaphore syncx.Semaphore // semaphore used to handle message queueing
pendingBatches map[string]*batchReq // pending batches
unackedBatches map[string]*batchReq // sent but not acknowledged batches
sendFailedBatches chan sendFailedBatchReq // send failed batches channel
retryBatches chan *batchReq // retry batches channel
responseBatches chan batchRsp // batch response channel
batchTimeoutTicker *time.Ticker // batch timeout ticker
sendTimeoutTicker *time.Ticker // send timeout ticker
heartbeatTicker *time.Ticker // heartbeat ticker
mapCleanTicker *time.Ticker // map clean ticker, clean the unackedBatches map periodically
updateConnTicker *time.Ticker // update connection ticker, change connection periodically
unackedBatchCount int // sent but not acknowledged batches counter, used to clean the unackedBatches map periodically
metrics *metrics // metrics
bufferPool bufferpool.BufferPool // buffer pool
bytePool bufferpool.BytePool // byte pool
stop bool // stop the worker
}

func newWorker(cli *client, index int, opts *Options) (*worker, error) {
Expand All @@ -144,7 +144,7 @@ func newWorker(cli *client, index int, opts *Options) (*worker, error) {
dataSemaphore: syncx.NewSemaphore(int32(opts.MaxPendingMessages)),
pendingBatches: make(map[string]*batchReq),
unackedBatches: make(map[string]*batchReq),
sendFailedBatches: make(chan *batchReq, opts.MaxPendingMessages),
sendFailedBatches: make(chan sendFailedBatchReq, opts.MaxPendingMessages),
retryBatches: make(chan *batchReq, opts.MaxPendingMessages),
responseBatches: make(chan batchRsp, opts.MaxPendingMessages),
batchTimeoutTicker: time.NewTicker(opts.BatchingMaxPublishDelay),
Expand Down Expand Up @@ -244,7 +244,9 @@ func (w *worker) start() {
}

func (w *worker) doSendAsync(ctx context.Context, msg Message, callback Callback, flushImmediately bool) {
req := &sendDataReq{
req := reqPool.Get().(*sendDataReq)
*req = sendDataReq{
pool: reqPool,
ctx: ctx,
msg: msg,
callback: callback,
Expand Down Expand Up @@ -301,8 +303,12 @@ func (w *worker) send(ctx context.Context, msg Message) error {
}, true)

// wait for send done
<-doneCh
return err
select {
case <-ctx.Done():
return ctx.Err()
case <-doneCh:
return err
}
}

func (w *worker) sendAsync(ctx context.Context, msg Message, callback Callback) {
Expand All @@ -315,7 +321,9 @@ func (w *worker) handleSendData(req *sendDataReq) {
batch, ok := w.pendingBatches[req.msg.StreamID]
if !ok {
streamID := req.msg.StreamID
batch = &batchReq{
batch = batchPool.Get().(*batchReq)
*batch = batchReq{
pool: batchPool,
workerID: w.indexStr,
batchID: util.SnowFlakeID(),
groupID: w.options.GroupID,
Expand Down Expand Up @@ -353,7 +361,7 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
b.encode()

//error callback
onErr := func(e error) {
onErr := func(c gnet.Conn, e error, inCallback bool) {
defer func() {
if rec := recover(); rec != nil {
w.log.Error("panic:", rec)
Expand All @@ -371,15 +379,19 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
return
}

// network error, change a new connection
w.updateConn(c, errConnWriteFailed)

// important:when AsyncWrite() call succeed, the batch will be put into w.unackedBatches,now it failed, we need
// to delete from w.unackedBatches, as onErr() is call concurrently in different goroutine, we can not delete it
// from this callback directly, or will be panic, so we put into the w.sendFailedBatches channel, and it will be
// deleted in handleSendFailed() one by one
w.sendFailedBatches <- b
// deleted and retried in handleSendFailed() one by one
if inCallback {
w.sendFailedBatches <- sendFailedBatchReq{batch: b, retry: retryOnFail}
return
}

// network error, change a new connection
w.updateConn(errConnWriteFailed)
// put the batch to the retry channel
// in a same goroutine, retry it directly
if retryOnFail {
// w.retryBatches <- b
w.backoffRetry(context.Background(), b)
Expand All @@ -393,13 +405,13 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
conn := w.getConn()
err := conn.AsyncWrite(b.buffer.Bytes(), func(c gnet.Conn, e error) error {
if e != nil {
onErr(e) //error callback
onErr(c, e, true) //error callback
}
return nil
})

if err != nil {
onErr(err) //error callback
onErr(conn, err, false) //error callback
return
}

Expand All @@ -411,9 +423,14 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
w.unackedBatches[b.batchID] = b
}

func (w *worker) handleSendFailed(b *batchReq) {
// send failed, delete the batch from unackedBatches
delete(w.unackedBatches, b.batchID)
func (w *worker) handleSendFailed(b sendFailedBatchReq) {
// send failed, delete the batch from unackedBatches, when retried, it will be pushed back
delete(w.unackedBatches, b.batch.batchID)
if b.retry {
w.backoffRetry(context.Background(), b.batch)
} else {
b.batch.done(errConnWriteFailed)
}
}

func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) {
Expand Down Expand Up @@ -532,25 +549,25 @@ func (w *worker) handleSendHeartbeat() {
bb := w.bufferPool.Get()
bytes := hb.encode(bb)

onErr := func(e error) {
onErr := func(c gnet.Conn, e error) {
w.metrics.incError(errConnWriteFailed.getStrCode())
w.log.Error("send heartbeat failed, err:", e)
w.updateConn(errConnWriteFailed)
w.updateConn(c, errConnWriteFailed)
}

// very important:'cause we use gnet, we must call AsyncWrite to send data in goroutines that are different from gnet.OnTraffic() callback
conn := w.getConn()
err := conn.AsyncWrite(bytes, func(c gnet.Conn, e error) error {
if e != nil {
onErr(e)
onErr(c, e)
}
// recycle the buffer
w.bufferPool.Put(bb)
return nil
})

if err != nil {
onErr(err)
onErr(conn, err)
// recycle the buffer
w.bufferPool.Put(bb)
}
Expand Down Expand Up @@ -695,10 +712,10 @@ func (w *worker) handleClose(req *closeReq) {
}

func (w *worker) handleUpdateConn() {
w.updateConn(nil)
w.updateConn(nil, nil)
}

func (w *worker) updateConn(err error) {
func (w *worker) updateConn(old gnet.Conn, err error) {
w.log.Debug("worker[", w.index, "] updateConn")
newConn, newErr := w.client.getConn()
if newErr != nil {
Expand All @@ -707,10 +724,18 @@ func (w *worker) updateConn(err error) {
return
}

oldConn := w.getConn()
oldConn := old
if oldConn == nil {
oldConn = w.getConn()
}

w.client.putConn(oldConn, err)
w.setConn(newConn)
w.metrics.incUpdateConn(getErrorCode(err))
ok := w.casConn(oldConn, newConn)
if ok {
w.metrics.incUpdateConn(getErrorCode(err))
} else {
w.client.putConn(newConn, nil)
}
}

func (w *worker) setConn(conn gnet.Conn) {
Expand All @@ -721,6 +746,10 @@ func (w *worker) getConn() gnet.Conn {
return w.conn.Load().(gnet.Conn)
}

func (w *worker) casConn(oldConn, newConn gnet.Conn) bool {
return w.conn.CompareAndSwap(oldConn, newConn)
}

func (w *worker) setState(state workerState) {
w.state.Swap(int32(state))
}
Expand Down
30 changes: 16 additions & 14 deletions inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,37 @@

module github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang

go 1.17
go 1.19

require (
github.com/bwmarrin/snowflake v0.3.0
github.com/go-resty/resty/v2 v2.7.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/panjf2000/gnet/v2 v2.2.9
github.com/prometheus/client_golang v1.14.0
github.com/panjf2000/gnet/v2 v2.3.1
github.com/prometheus/client_golang v1.16.0
github.com/zentures/cityhash v0.0.0-20131128155616-cdd6a94144ab
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
golang.org/x/net v0.13.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
)

require (
github.com/klauspost/compress v1.15.15
github.com/klauspost/compress v1.16.7
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.24.0 // indirect
go.uber.org/zap v1.25.0 // indirect
)
Loading

0 comments on commit 74846b8

Please sign in to comment.