From adef014da6fd9c56e84197ce13a1a5e3a24d0147 Mon Sep 17 00:00:00 2001 From: JoeGruff Date: Mon, 23 Sep 2024 15:22:58 +0900 Subject: [PATCH] binance: Desync books on disconnect. --- client/mm/libxc/binance.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/client/mm/libxc/binance.go b/client/mm/libxc/binance.go index 6488944f90..c91664148c 100644 --- a/client/mm/libxc/binance.go +++ b/client/mm/libxc/binance.go @@ -69,6 +69,8 @@ type binanceOrderBook struct { baseConversionFactor uint64 quoteConversionFactor uint64 log dex.Logger + + connectedChan chan bool } func newBinanceOrderBook( @@ -86,6 +88,7 @@ func newBinanceOrderBook( quoteConversionFactor: quoteConversionFactor, log: log, getSnapshot: getSnapshot, + connectedChan: make(chan bool), } } @@ -288,6 +291,13 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error if retry != nil { // don't hammer continue } + case connected := <-b.connectedChan: + if !connected { + b.log.Debugf("Unsyncing %s orderbook due to disconnect.", b.mktID, retryFrequency) + desync() + retry = nil + continue + } case <-ctx.Done(): return } @@ -296,7 +306,7 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error b.log.Infof("Synced %s orderbook", b.mktID) retry = nil } else { - b.log.Infof("Failed to sync %s orderbook. Trying again in %s", b.mktID, retryFrequency) + b.log.Infof("Failed to sync %s orderbook. Trying again in %s", b.mktID, retryFrequency) desync() // Clears the syncCache retry = time.After(retryFrequency) } @@ -1745,6 +1755,19 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) { addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/")) // Need to send key but not signature + connectEventFunc := func(cs comms.ConnectionStatus) { + if cs != comms.Disconnected && cs != comms.Connected { + return + } + // If disconnected, set all books to unsynced so bots + // will not place new orders. + connected := cs == comms.Connected + bnc.booksMtx.RLock() + defer bnc.booksMtx.RLock() + for _, b := range bnc.books { + b.connectedChan <- connected + } + } conn, err := comms.NewWsConn(&comms.WsCfg{ URL: addr, // Binance Docs: The websocket server will send a ping frame every 3 @@ -1759,7 +1782,7 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote default: } }, - ConnectEventFunc: func(cs comms.ConnectionStatus) {}, + ConnectEventFunc: connectEventFunc, Logger: bnc.log.SubLogger("BNCBOOK"), RawHandler: bnc.handleMarketDataNote, })