From 90feee0a78a3c7d1a8f7f7f6d0c656d27f44d6c8 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Sun, 9 Jan 2022 15:23:04 +0800 Subject: [PATCH] fix status concurrent issue and add mssing fields --- go/channel.go | 4 ++-- go/client.go | 36 +++++++++++++++++++++++++----------- go/response_spot.go | 2 ++ 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/go/channel.go b/go/channel.go index e1d45bd..aa28951 100644 --- a/go/channel.go +++ b/go/channel.go @@ -135,7 +135,7 @@ func (ws *WsService) baseSubscribe(event string, channel string, payload []strin func (ws *WsService) readMsg() { ws.once.Do(func() { go func() { - defer ws.Client.Close() + defer ws.Close() for { select { @@ -157,7 +157,7 @@ func (ws *WsService) readMsg() { var rawTrade UpdateMsg if err := json.Unmarshal(message, &rawTrade); err != nil { - ws.Logger.Printf("Unmarshal err:%s, body:%s", err.Error(), string(message)) + ws.Logger.Printf("Unmarshal err: %s, body: %s", err.Error(), string(message)) continue } diff --git a/go/client.go b/go/client.go index 633cd3d..29694f7 100644 --- a/go/client.go +++ b/go/client.go @@ -7,16 +7,15 @@ import ( "os" "strings" "sync" + "sync/atomic" "time" mapset "github.com/deckarep/golang-set" "github.com/gorilla/websocket" ) -type status int - const ( - disconnected status = iota + disconnected uint64 = iota connected reconnecting ) @@ -30,7 +29,7 @@ type WsService struct { msgChs *sync.Map // business chan calls *sync.Map conf *ConnConf - status status + status atomic.Uint64 clientMu *sync.Mutex } @@ -108,10 +107,11 @@ func NewWsService(ctx context.Context, logger *log.Logger, conf *ConnConf) (*WsS calls: new(sync.Map), msgChs: new(sync.Map), once: new(sync.Once), - status: connected, clientMu: new(sync.Mutex), } + ws.status.Store(connected) + go ws.activePing() return ws, nil @@ -172,7 +172,7 @@ func (ws *WsService) GetConnConf() *ConnConf { func (ws *WsService) reconnect() error { // avoid repeated reconnection - if ws.status == reconnecting { + if ws.status.Load() == reconnecting { return nil } @@ -183,7 +183,7 @@ func (ws *WsService) reconnect() error { ws.Client.Close() } - ws.status = reconnecting + ws.status.Store(reconnecting) stop := false retry := 0 @@ -204,7 +204,7 @@ func (ws *WsService) reconnect() error { } } - ws.status = connected + ws.status.Store(connected) // resubscribe after reconnect ws.conf.subscribeMsg.Range(func(key, value interface{}) bool { @@ -298,6 +298,20 @@ func (ws *WsService) GetConnection() *websocket.Conn { return ws.Client } +func (ws *WsService) IsConnected() bool { + return ws.status.Load() == connected +} + +func (ws *WsService) Close() { + ws.mu.Lock() + defer ws.mu.Unlock() + if ws.Client != nil { + if err := ws.Client.Close(); err != nil { + ws.Logger.Printf("close err: %s", err.Error()) + } + } +} + func (ws *WsService) activePing() { du, err := time.ParseDuration(ws.conf.PingInterval) if err != nil { @@ -325,7 +339,7 @@ func (ws *WsService) activePing() { return true }) - if ws.status != connected { + if ws.status.Load() != connected { continue } @@ -339,12 +353,12 @@ func (ws *WsService) activePing() { } } -var statusString = map[status]string{ +var statusString = map[uint64]string{ disconnected: "disconnected", connected: "connected", reconnecting: "reconnecting", } func (ws *WsService) Status() string { - return statusString[ws.status] + return statusString[ws.status.Load()] } diff --git a/go/response_spot.go b/go/response_spot.go index 60fad39..1945a36 100644 --- a/go/response_spot.go +++ b/go/response_spot.go @@ -162,6 +162,8 @@ type OrderMsg struct { FillPrice string `json:"fill_price,omitempty"` // Total filled in quote currency FilledTotal string `json:"filled_total,omitempty"` + // Average deal price + AvgDealPrice string `json:"avg_deal_price,omitempty"` // Fee deducted Fee string `json:"fee,omitempty"` // Fee currency unit