From 8214744f3a2ec5a7da4d1c191458eb75e9c70cfe Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Thu, 5 Sep 2024 14:59:06 -0500 Subject: [PATCH] handle ws reconnect signal --- client/mm/libxc/binance.go | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/client/mm/libxc/binance.go b/client/mm/libxc/binance.go index 0ac3394852..e0a25cb94f 100644 --- a/client/mm/libxc/binance.go +++ b/client/mm/libxc/binance.go @@ -1580,21 +1580,26 @@ func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, b return nil } +func (bnc *binance) streamsQuery() string { + bnc.booksMtx.RLock() + defer bnc.booksMtx.RUnlock() + streamNames := make([]string, 0, len(bnc.books)) + for mktID := range bnc.books { + streamNames = append(streamNames, marketDataStreamID(mktID)) + } + return strings.Join(streamNames, "/") +} + // connectToMarketDataStream is called when the first market is subscribed to. // It creates a connection to the market data stream and starts a goroutine // to reconnect every 12 hours, as Binance will close the stream every 24 // hours. Additional markets are subscribed to by calling // subscribeToAdditionalMarketDataStream. func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quoteID uint32) error { - newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) { - bnc.booksMtx.Lock() - streamNames := make([]string, 0, len(bnc.books)) - for mktID := range bnc.books { - streamNames = append(streamNames, marketDataStreamID(mktID)) - } - bnc.booksMtx.Unlock() + reconnectC := make(chan struct{}) - addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(streamNames, "/")) + newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) { + addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, bnc.streamsQuery()) // Need to send key but not signature conn, err := comms.NewWsConn(&comms.WsCfg{ URL: addr, @@ -1606,6 +1611,10 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote EchoPingData: true, ReconnectSync: func() { bnc.log.Debugf("Binance reconnected") + select { + case reconnectC <- struct{}{}: + default: + } }, ConnectEventFunc: func(cs comms.ConnectionStatus) {}, Logger: bnc.log.SubLogger("BNCBOOK"), @@ -1671,13 +1680,18 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote reconnectTimer := time.After(time.Hour * 12) for { select { - case <-reconnectTimer: - err = reconnect() - if err != nil { + case <-reconnectC: + if err := reconnect(); err != nil { bnc.log.Errorf("Error reconnecting: %v", err) reconnectTimer = time.After(time.Second * 30) continue } + case <-reconnectTimer: + if err := reconnect(); err != nil { + bnc.log.Errorf("Error refreshing connection: %v", err) + reconnectTimer = time.After(time.Second * 30) + continue + } reconnectTimer = time.After(time.Hour * 12) case <-ctx.Done(): bnc.marketStreamMtx.Lock()