From be94dcef5597522b352506d86c36ad4a9e502793 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Sun, 6 Aug 2023 15:16:36 +0800 Subject: [PATCH 1/5] [INLONG-8629][Agent] Fix sending invalid data to dataproxy failed blocks normal data sending (#8630) --- .../agent/plugin/sinks/SenderManager.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java index 3447eb00588..554d6f59166 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java @@ -168,6 +168,21 @@ public void Stop() { shutdown = true; resendExecutorService.shutdown(); sender.close(); + cleanResendQueue(); + } + + private void cleanResendQueue() { + while (!resendQueue.isEmpty()) { + try { + AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS); + if (callback != null) { + MemoryManager.getInstance() + .release(AGENT_GLOBAL_WRITER_PERMIT, (int) callback.batchMessage.getTotalSize()); + } + } catch (InterruptedException e) { + LOGGER.error("clean resend queue error{}", e.getMessage()); + } + } } private AgentMetricItem getMetricItem(Map otherDimensions) { @@ -222,6 +237,10 @@ private void sendBatchWithRetryCount(BatchProxyMessage batchMessage, int retry) boolean suc = false; while (!suc) { try { + if (!resendQueue.isEmpty()) { + AgentUtils.silenceSleepInMs(retrySleepTime); + continue; + } sender.asyncSendMessage(new AgentSenderCallback(batchMessage, retry), batchMessage.getDataList(), batchMessage.getGroupId(), batchMessage.getStreamId(), batchMessage.getDataTime(), SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS, From ed9009a640319fbb2929f80ca022fbf4e4463a9a Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Sun, 6 Aug 2023 15:54:43 +0800 Subject: [PATCH 2/5] [INLONG-8639][SDK] Improve send failed logic (#8640) Co-authored-by: gunli --- .../dataproxy-sdk-golang/dataproxy/request.go | 5 + .../dataproxy-sdk-golang/dataproxy/worker.go | 115 +++++++++++------- 2 files changed, 73 insertions(+), 47 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go index f45204c8097..ac89aba1421 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go @@ -334,6 +334,11 @@ type closeReq struct { doneCh chan struct{} } +type sendFailedBatchReq struct { + batch *batchReq + retry bool +} + func getWorkerIndex(workerID string) int { if workerID == "" { return -1 diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index 0734c06c567..186a8e3be70 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -58,7 +58,7 @@ var ( errNewConnFailed = &errNo{code: 10006, strCode: "10006", message: "new conn failed"} errConnWriteFailed = &errNo{code: 10007, strCode: "10007", message: "conn write failed"} errConnReadFailed = &errNo{code: 10008, strCode: "10008", message: "conn read failed"} - errLogToLong = &errNo{code: 10009, strCode: "10009", message: "input log is too long"} //nolint:unused + errLogTooLong = &errNo{code: 10009, strCode: "10009", message: "input log is too long"} //nolint:unused errBadLog = &errNo{code: 10010, strCode: "10010", message: "input log is invalid"} errServerError = &errNo{code: 10011, strCode: "10011", message: "server error"} errServerPanic = &errNo{code: 10012, strCode: "10012", message: "server panic"} @@ -101,31 +101,31 @@ func getErrorCode(err error) string { } type worker struct { - client *client // parent client - index int // worker id - indexStr string // worker id string - options *Options // config options - state atomic.Int32 // worker state - log logger.Logger // debug logger - conn atomic.Value // connection used to send data - cmdChan chan interface{} // command channel - dataChan chan *sendDataReq // data channel - dataSemaphore syncx.Semaphore // semaphore used to handle message queueing - pendingBatches map[string]*batchReq // pending batches - unackedBatches map[string]*batchReq // sent but not acknowledged batches - sendFailedBatches chan *batchReq // send failed batches channel - retryBatches chan *batchReq // retry batches channel - responseBatches chan batchRsp // batch response channel - batchTimeoutTicker *time.Ticker // batch timeout ticker - sendTimeoutTicker *time.Ticker // send timeout ticker - heartbeatTicker *time.Ticker // heartbeat ticker - mapCleanTicker *time.Ticker // map clean ticker, clean the unackedBatches map periodically - updateConnTicker *time.Ticker // update connection ticker, change connection periodically - unackedBatchCount int // sent but not acknowledged batches counter, used to clean the unackedBatches map periodically - metrics *metrics // metrics - bufferPool bufferpool.BufferPool // buffer pool - bytePool bufferpool.BytePool // byte pool - stop bool // stop the worker + client *client // parent client + index int // worker id + indexStr string // worker id string + options *Options // config options + state atomic.Int32 // worker state + log logger.Logger // debug logger + conn atomic.Value // connection used to send data + cmdChan chan interface{} // command channel + dataChan chan *sendDataReq // data channel + dataSemaphore syncx.Semaphore // semaphore used to handle message queueing + pendingBatches map[string]*batchReq // pending batches + unackedBatches map[string]*batchReq // sent but not acknowledged batches + sendFailedBatches chan sendFailedBatchReq // send failed batches channel + retryBatches chan *batchReq // retry batches channel + responseBatches chan batchRsp // batch response channel + batchTimeoutTicker *time.Ticker // batch timeout ticker + sendTimeoutTicker *time.Ticker // send timeout ticker + heartbeatTicker *time.Ticker // heartbeat ticker + mapCleanTicker *time.Ticker // map clean ticker, clean the unackedBatches map periodically + updateConnTicker *time.Ticker // update connection ticker, change connection periodically + unackedBatchCount int // sent but not acknowledged batches counter, used to clean the unackedBatches map periodically + metrics *metrics // metrics + bufferPool bufferpool.BufferPool // buffer pool + bytePool bufferpool.BytePool // byte pool + stop bool // stop the worker } func newWorker(cli *client, index int, opts *Options) (*worker, error) { @@ -144,7 +144,7 @@ func newWorker(cli *client, index int, opts *Options) (*worker, error) { dataSemaphore: syncx.NewSemaphore(int32(opts.MaxPendingMessages)), pendingBatches: make(map[string]*batchReq), unackedBatches: make(map[string]*batchReq), - sendFailedBatches: make(chan *batchReq, opts.MaxPendingMessages), + sendFailedBatches: make(chan sendFailedBatchReq, opts.MaxPendingMessages), retryBatches: make(chan *batchReq, opts.MaxPendingMessages), responseBatches: make(chan batchRsp, opts.MaxPendingMessages), batchTimeoutTicker: time.NewTicker(opts.BatchingMaxPublishDelay), @@ -353,7 +353,7 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) { b.encode() //error callback - onErr := func(e error) { + onErr := func(c gnet.Conn, e error, inCallback bool) { defer func() { if rec := recover(); rec != nil { w.log.Error("panic:", rec) @@ -371,15 +371,19 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) { return } + // network error, change a new connection + w.updateConn(c, errConnWriteFailed) + // important:when AsyncWrite() call succeed, the batch will be put into w.unackedBatches,now it failed, we need // to delete from w.unackedBatches, as onErr() is call concurrently in different goroutine, we can not delete it // from this callback directly, or will be panic, so we put into the w.sendFailedBatches channel, and it will be - // deleted in handleSendFailed() one by one - w.sendFailedBatches <- b + // deleted and retried in handleSendFailed() one by one + if inCallback { + w.sendFailedBatches <- sendFailedBatchReq{batch: b, retry: retryOnFail} + return + } - // network error, change a new connection - w.updateConn(errConnWriteFailed) - // put the batch to the retry channel + // in a same goroutine, retry it directly if retryOnFail { // w.retryBatches <- b w.backoffRetry(context.Background(), b) @@ -393,13 +397,13 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) { conn := w.getConn() err := conn.AsyncWrite(b.buffer.Bytes(), func(c gnet.Conn, e error) error { if e != nil { - onErr(e) //error callback + onErr(c, e, true) //error callback } return nil }) if err != nil { - onErr(err) //error callback + onErr(conn, err, false) //error callback return } @@ -411,9 +415,14 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) { w.unackedBatches[b.batchID] = b } -func (w *worker) handleSendFailed(b *batchReq) { - // send failed, delete the batch from unackedBatches - delete(w.unackedBatches, b.batchID) +func (w *worker) handleSendFailed(b sendFailedBatchReq) { + // send failed, delete the batch from unackedBatches, when retried, it will be pushed back + delete(w.unackedBatches, b.batch.batchID) + if b.retry { + w.backoffRetry(context.Background(), b.batch) + } else { + b.batch.done(errConnWriteFailed) + } } func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) { @@ -532,17 +541,17 @@ func (w *worker) handleSendHeartbeat() { bb := w.bufferPool.Get() bytes := hb.encode(bb) - onErr := func(e error) { + onErr := func(c gnet.Conn, e error) { w.metrics.incError(errConnWriteFailed.getStrCode()) w.log.Error("send heartbeat failed, err:", e) - w.updateConn(errConnWriteFailed) + w.updateConn(c, errConnWriteFailed) } // very important:'cause we use gnet, we must call AsyncWrite to send data in goroutines that are different from gnet.OnTraffic() callback conn := w.getConn() err := conn.AsyncWrite(bytes, func(c gnet.Conn, e error) error { if e != nil { - onErr(e) + onErr(c, e) } // recycle the buffer w.bufferPool.Put(bb) @@ -550,7 +559,7 @@ func (w *worker) handleSendHeartbeat() { }) if err != nil { - onErr(err) + onErr(conn, err) // recycle the buffer w.bufferPool.Put(bb) } @@ -695,10 +704,10 @@ func (w *worker) handleClose(req *closeReq) { } func (w *worker) handleUpdateConn() { - w.updateConn(nil) + w.updateConn(nil, nil) } -func (w *worker) updateConn(err error) { +func (w *worker) updateConn(old gnet.Conn, err error) { w.log.Debug("worker[", w.index, "] updateConn") newConn, newErr := w.client.getConn() if newErr != nil { @@ -707,10 +716,18 @@ func (w *worker) updateConn(err error) { return } - oldConn := w.getConn() + oldConn := old + if oldConn == nil { + oldConn = w.getConn() + } + w.client.putConn(oldConn, err) - w.setConn(newConn) - w.metrics.incUpdateConn(getErrorCode(err)) + ok := w.casConn(oldConn, newConn) + if ok { + w.metrics.incUpdateConn(getErrorCode(err)) + } else { + w.client.putConn(newConn, nil) + } } func (w *worker) setConn(conn gnet.Conn) { @@ -721,6 +738,10 @@ func (w *worker) getConn() gnet.Conn { return w.conn.Load().(gnet.Conn) } +func (w *worker) casConn(oldConn, newConn gnet.Conn) bool { + return w.conn.CompareAndSwap(oldConn, newConn) +} + func (w *worker) setState(state workerState) { w.state.Swap(int32(state)) } From 80121518cbe985836dc3a9f442f565c700a20279 Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Sun, 6 Aug 2023 15:55:18 +0800 Subject: [PATCH 3/5] [INLONG-8635][SDK] Update dependency packages and required Go version (#8636) Co-authored-by: gunli --- .../dataproxy-sdk-golang/go.mod | 30 ++++++++++--------- .../dataproxy-sdk-golang/go.sum | 30 +++++++++++++++++++ 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod index ed05f2eebd0..685b7c14bbf 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.mod @@ -16,35 +16,37 @@ module github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang -go 1.17 +go 1.19 require ( github.com/bwmarrin/snowflake v0.3.0 github.com/go-resty/resty/v2 v2.7.0 github.com/gofrs/uuid v4.4.0+incompatible github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c - github.com/panjf2000/gnet/v2 v2.2.9 - github.com/prometheus/client_golang v1.14.0 + github.com/panjf2000/gnet/v2 v2.3.1 + github.com/prometheus/client_golang v1.16.0 github.com/zentures/cityhash v0.0.0-20131128155616-cdd6a94144ab ) require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/golang/protobuf v1.5.2 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect - github.com/prometheus/client_model v0.3.0 // indirect - github.com/prometheus/common v0.37.0 // indirect - github.com/prometheus/procfs v0.8.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/prometheus/client_model v0.4.0 // indirect + github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/procfs v0.11.1 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.8.0 // indirect - google.golang.org/protobuf v1.28.1 // indirect + golang.org/x/net v0.13.0 // indirect + golang.org/x/sync v0.3.0 // indirect + golang.org/x/sys v0.10.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) require ( - github.com/klauspost/compress v1.15.15 + github.com/klauspost/compress v1.16.7 go.uber.org/atomic v1.11.0 - go.uber.org/zap v1.24.0 // indirect + go.uber.org/zap v1.25.0 // indirect ) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.sum b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.sum index bb479cc0cd8..2e3815e77d8 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.sum +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/go.sum @@ -40,6 +40,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -50,6 +51,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -107,6 +110,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -148,6 +153,8 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -156,6 +163,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -169,6 +178,8 @@ github.com/panjf2000/ants/v2 v2.7.1 h1:qBy5lfSdbxvrR0yUnZfaEDjf0FlCw4ufsbcsxmE7r github.com/panjf2000/ants/v2 v2.7.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= github.com/panjf2000/gnet/v2 v2.2.9 h1:rmIkaXYtMb2dkgaedojb1uEM2NgVM0jdrnmSNq7F/Vk= github.com/panjf2000/gnet/v2 v2.2.9/go.mod h1:Q34YBnJNDFLsVBC4TiGD3uN+imoXrunFnecs/4FYcx4= +github.com/panjf2000/gnet/v2 v2.3.1 h1:J7vHkNxwsevVIw3u/6LCXgcnpGBk5iKqhQ2RMblGodc= +github.com/panjf2000/gnet/v2 v2.3.1/go.mod h1:Ik5lTy2nmBg9Uvjfcf2KRYs+EXVNOLyxPHpFOFlqu+M= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -182,18 +193,24 @@ github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqr github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_golang v1.14.0 h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw= github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y= +github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= +github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.37.0 h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8pXE= github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= @@ -201,6 +218,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= +github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= +github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -237,6 +256,7 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -244,6 +264,8 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8 go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= +go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -322,6 +344,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.13.0 h1:Nvo8UFsZ8X3BhAC9699Z1j7XQ3rsZnUUm7jfBEk1ueY= +golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -342,6 +366,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -388,6 +414,8 @@ golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -528,6 +556,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From eaa1a04e0ab0c7d009c024dce3fe3af7ce8b0bb6 Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Sun, 6 Aug 2023 15:55:49 +0800 Subject: [PATCH 4/5] [INLONG-8631][SDK] Handle context.Done() in Send() (#8632) Co-authored-by: gunli --- .../dataproxy-sdk-golang/dataproxy/worker.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index 186a8e3be70..7edfc0c6dcf 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -301,8 +301,12 @@ func (w *worker) send(ctx context.Context, msg Message) error { }, true) // wait for send done - <-doneCh - return err + select { + case <-ctx.Done(): + return ctx.Err() + case <-doneCh: + return err + } } func (w *worker) sendAsync(ctx context.Context, msg Message, callback Callback) { From ee932fa35b675bbc8168777416cc31a1721eefa4 Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Sun, 6 Aug 2023 15:56:24 +0800 Subject: [PATCH 5/5] [INLONG-8637][SDK] Pool data request and batch request (#8638) Co-authored-by: gunli --- .../dataproxy-sdk-golang/dataproxy/request.go | 26 +++++++++++++++++++ .../dataproxy-sdk-golang/dataproxy/worker.go | 8 ++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go index ac89aba1421..d93b0af006b 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go @@ -22,6 +22,7 @@ import ( "encoding/binary" "strconv" "strings" + "sync" "time" "unsafe" @@ -35,6 +36,8 @@ var ( byteOrder = binary.BigEndian heartbeatRsp = []byte{0x00, 0x00, 0x00, 0x01, 0x01} heartbeatRspLen = len(heartbeatRsp) + reqPool *sync.Pool + batchPool *sync.Pool ) const ( @@ -42,6 +45,19 @@ const ( msgTypeHeartbeat uint8 = 1 ) +func init() { + reqPool = &sync.Pool{ + New: func() interface{} { + return &sendDataReq{} + }, + } + batchPool = &sync.Pool{ + New: func() interface{} { + return &batchReq{} + }, + } +} + type heartbeatReq struct { } @@ -68,6 +84,7 @@ func (h heartbeatReq) encode(buffer *bytes.Buffer) []byte { type batchCallback func() type batchReq struct { + pool *sync.Pool workerID string batchID string groupID string @@ -112,6 +129,10 @@ func (b *batchReq) done(err error) { b.metrics.observeTime(errorCode, time.Since(b.batchTime).Milliseconds()) b.metrics.observeSize(errorCode, b.dataSize) } + + if b.pool != nil { + b.pool.Put(b) + } } func (b *batchReq) encode() []byte { @@ -299,6 +320,7 @@ func (b *batchRsp) decode(input []byte) { } type sendDataReq struct { + pool *sync.Pool ctx context.Context msg Message callback Callback @@ -328,6 +350,10 @@ func (s *sendDataReq) done(err error, errCode string) { s.metrics.incMessage(errCode) } + + if s.pool != nil { + s.pool.Put(s) + } } type closeReq struct { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index 7edfc0c6dcf..eb4a8348ac5 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -244,7 +244,9 @@ func (w *worker) start() { } func (w *worker) doSendAsync(ctx context.Context, msg Message, callback Callback, flushImmediately bool) { - req := &sendDataReq{ + req := reqPool.Get().(*sendDataReq) + *req = sendDataReq{ + pool: reqPool, ctx: ctx, msg: msg, callback: callback, @@ -319,7 +321,9 @@ func (w *worker) handleSendData(req *sendDataReq) { batch, ok := w.pendingBatches[req.msg.StreamID] if !ok { streamID := req.msg.StreamID - batch = &batchReq{ + batch = batchPool.Get().(*batchReq) + *batch = batchReq{ + pool: batchPool, workerID: w.indexStr, batchID: util.SnowFlakeID(), groupID: w.options.GroupID,