Skip to content

Commit

Permalink
handle ws reconnect signal
Browse files Browse the repository at this point in the history
  • Loading branch information
buck54321 committed Sep 5, 2024
1 parent ae2fbd0 commit 61a6f8c
Showing 1 changed file with 25 additions and 11 deletions.
36 changes: 25 additions & 11 deletions client/mm/libxc/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -1579,21 +1579,26 @@ func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, b
return nil
}

func (bnc *binance) streamsQuery() string {
bnc.booksMtx.RLock()
defer bnc.booksMtx.RUnlock()
streamNames := make([]string, 0, len(bnc.books))
for mktID := range bnc.books {
streamNames = append(streamNames, marketDataStreamID(mktID))
}
return strings.Join(streamNames, "/")
}

// connectToMarketDataStream is called when the first market is subscribed to.
// It creates a connection to the market data stream and starts a goroutine
// to reconnect every 12 hours, as Binance will close the stream every 24
// hours. Additional markets are subscribed to by calling
// subscribeToAdditionalMarketDataStream.
func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quoteID uint32) error {
newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) {
bnc.booksMtx.Lock()
streamNames := make([]string, 0, len(bnc.books))
for mktID := range bnc.books {
streamNames = append(streamNames, marketDataStreamID(mktID))
}
bnc.booksMtx.Unlock()
reconnectC := make(chan struct{})

addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(streamNames, "/"))
newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) {
addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, bnc.streamsQuery())
// Need to send key but not signature
conn, err := comms.NewWsConn(&comms.WsCfg{
URL: addr,
Expand All @@ -1604,6 +1609,10 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
PingWait: time.Minute * 4,
ReconnectSync: func() {
bnc.log.Debugf("Binance reconnected")
select {
case reconnectC <- struct{}{}:
default:
}
},
ConnectEventFunc: func(cs comms.ConnectionStatus) {},
Logger: bnc.log.SubLogger("BNCBOOK"),
Expand Down Expand Up @@ -1669,13 +1678,18 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
reconnectTimer := time.After(time.Hour * 12)
for {
select {
case <-reconnectTimer:
err = reconnect()
if err != nil {
case <-reconnectC:
if err := reconnect(); err != nil {
bnc.log.Errorf("Error reconnecting: %v", err)
reconnectTimer = time.After(time.Second * 30)
continue
}
case <-reconnectTimer:
if err := reconnect(); err != nil {
bnc.log.Errorf("Error refreshing connection: %v", err)
reconnectTimer = time.After(time.Second * 30)
continue
}
reconnectTimer = time.After(time.Hour * 12)
case <-ctx.Done():
bnc.marketStreamMtx.Lock()
Expand Down

0 comments on commit 61a6f8c

Please sign in to comment.