diff --git a/client/mm/libxc/binance.go b/client/mm/libxc/binance.go index 3974ff7cac..de90cd72ea 100644 --- a/client/mm/libxc/binance.go +++ b/client/mm/libxc/binance.go @@ -69,6 +69,8 @@ type binanceOrderBook struct { baseConversionFactor uint64 quoteConversionFactor uint64 log dex.Logger + + connectedChan chan bool } func newBinanceOrderBook( @@ -86,6 +88,7 @@ func newBinanceOrderBook( quoteConversionFactor: quoteConversionFactor, log: log, getSnapshot: getSnapshot, + connectedChan: make(chan bool), } } @@ -161,7 +164,7 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error resyncChan := make(chan struct{}, 1) - desync := func() { + desync := func(resync bool) { // clear the sync cache, set the special ID, trigger a book refresh. syncMtx.Lock() defer syncMtx.Unlock() @@ -170,7 +173,9 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error if updateID != updateIDUnsynced { b.synced.Store(false) updateID = updateIDUnsynced - resyncChan <- struct{}{} + if resync { + resyncChan <- struct{}{} + } } } @@ -265,7 +270,7 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error case update := <-b.updateQueue: if !processUpdate(update) { b.log.Tracef("Bad %s update with ID %d", b.mktID, update.LastUpdateID) - desync() + desync(true) } case <-ctx.Done(): return @@ -288,6 +293,13 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error if retry != nil { // don't hammer continue } + case connected := <-b.connectedChan: + if !connected { + b.log.Debugf("Unsyncing %s orderbook due to disconnect.", b.mktID, retryFrequency) + desync(false) + retry = nil + continue + } case <-ctx.Done(): return } @@ -296,8 +308,8 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error b.log.Infof("Synced %s orderbook", b.mktID) retry = nil } else { - b.log.Infof("Failed to sync %s orderbook. Trying again in %s", b.mktID, retryFrequency) - desync() // Clears the syncCache + b.log.Infof("Failed to sync %s orderbook. Trying again in %s", b.mktID, retryFrequency) + desync(false) // Clears the syncCache retry = time.After(retryFrequency) } } @@ -1742,10 +1754,27 @@ out: // subscribeToAdditionalMarketDataStream. func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quoteID uint32) error { reconnectC := make(chan struct{}) + checkSubsC := make(chan struct{}) newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) { addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/")) // Need to send key but not signature + connectEventFunc := func(cs comms.ConnectionStatus) { + if cs != comms.Disconnected && cs != comms.Connected { + return + } + // If disconnected, set all books to unsynced so bots + // will not place new orders. + connected := cs == comms.Connected + bnc.booksMtx.RLock() + defer bnc.booksMtx.RUnlock() + for _, b := range bnc.books { + select { + case b.connectedChan <- connected: + default: + } + } + } conn, err := comms.NewWsConn(&comms.WsCfg{ URL: addr, // Binance Docs: The websocket server will send a ping frame every 3 @@ -1757,11 +1786,11 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote ReconnectSync: func() { bnc.log.Debugf("Binance reconnected") select { - case reconnectC <- struct{}{}: + case checkSubsC <- struct{}{}: default: } }, - ConnectEventFunc: func(cs comms.ConnectionStatus) {}, + ConnectEventFunc: connectEventFunc, Logger: bnc.log.SubLogger("BNCBOOK"), RawHandler: bnc.handleMarketDataNote, }) @@ -1849,6 +1878,11 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote bnc.log.Errorf("Error checking subscriptions: %v", err) } checkSubs = time.After(checkSubsInterval) + case <-checkSubsC: + if err := bnc.checkSubs(ctx); err != nil { + bnc.log.Errorf("Error checking subscriptions: %v", err) + } + checkSubs = time.After(checkSubsInterval) case <-ctx.Done(): bnc.marketStreamMtx.Lock() bnc.marketStream = nil