Skip to content

Commit

Permalink
binance: Check market depth subs.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeGruffins committed Sep 25, 2024
1 parent 0790451 commit ab9457b
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 5 deletions.
26 changes: 26 additions & 0 deletions client/cmd/testbinance/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,11 @@ func (f *fakeBinance) handleAccountSubscription(w http.ResponseWriter, r *http.R
}()
}

type listSubsResp struct {
ID uint64 `json:"id"`
Result []string `json:"result"`
}

func (f *fakeBinance) handleMarketStream(w http.ResponseWriter, r *http.Request) {
streamsStr := r.URL.Query().Get("streams")
if streamsStr == "" {
Expand Down Expand Up @@ -587,6 +592,25 @@ func (f *fakeBinance) handleMarketStream(w http.ResponseWriter, r *http.Request)
f.cleanMarkets()
}

listSubscriptions := func(id uint64) {
f.marketsMtx.Lock()
defer f.marketsMtx.Unlock()
var streams []string
for mktID := range cl.markets {
streams = append(streams, fmt.Sprintf("%s@depth", mktID))
}
resp := listSubsResp{
ID: id,
Result: streams,
}
b, err := json.Marshal(resp)
if err != nil {
log.Errorf("LIST_SUBSCRIBE marshal error: %v", err)
}
cl.WSLink.SendRaw(b)
f.cleanMarkets()
}

conn, cm := f.newWSLink(w, r, func(b []byte) {
var req bntypes.StreamSubscription
if err := json.Unmarshal(b, &req); err != nil {
Expand All @@ -598,6 +622,8 @@ func (f *fakeBinance) handleMarketStream(w http.ResponseWriter, r *http.Request)
subscribe(req.Params)
case "UNSUBSCRIBE":
unsubscribe(req.Params)
case "LIST_SUBSCRIPTIONS":
listSubscriptions(req.ID)
}
})
if conn == nil {
Expand Down
129 changes: 124 additions & 5 deletions client/mm/libxc/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,9 @@ type binance struct {
marketStreamMtx sync.RWMutex
marketStream comms.WsConn

marketStreamRespsMtx sync.Mutex
marketStreamResps map[uint64]chan<- []string

booksMtx sync.RWMutex
books map[string]*binanceOrderBook

Expand Down Expand Up @@ -525,6 +528,7 @@ func newBinance(cfg *CEXConfig, binanceUS bool) *binance {
tradeUpdaters: make(map[int]chan *Trade),
tradeIDNoncePrefix: encode.RandomBytes(10),
reconnectChan: make(chan struct{}),
marketStreamResps: make(map[uint64]chan<- []string),
}

bnc.markets.Store(make(map[string]*bntypes.Market))
Expand Down Expand Up @@ -1547,8 +1551,27 @@ func (bnc *binance) handleMarketDataNote(b []byte) {
bnc.log.Errorf("Error unmarshaling book note: %v", err)
return
}
if note == nil || note.Data == nil {
bnc.log.Debugf("No data in market data update: %s", string(b))
if note == nil {
bnc.log.Debugf("Market data update does not parse to a note: %s", string(b))
return
}

if note.Data == nil {
var waitingResp bool
bnc.marketStreamRespsMtx.Lock()
if ch, exists := bnc.marketStreamResps[note.ID]; exists {
waitingResp = true
timeout := time.After(time.Second * 5)
select {
case ch <- note.Result:
case <-timeout:
bnc.log.Errorf("Noone waiting for market stream result id %d", note.ID)
}
}
bnc.marketStreamRespsMtx.Unlock()
if !waitingResp {
bnc.log.Debugf("No data in market data update: %s", string(b))
}
return
}

Expand Down Expand Up @@ -1616,14 +1639,99 @@ func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, b
return nil
}

func (bnc *binance) streamsQuery() string {
func (bnc *binance) streams() []string {
bnc.booksMtx.RLock()
defer bnc.booksMtx.RUnlock()
streamNames := make([]string, 0, len(bnc.books))
for mktID := range bnc.books {
streamNames = append(streamNames, marketDataStreamID(mktID))
}
return strings.Join(streamNames, "/")
return streamNames
}

// checkSubs will query binance for current market subscriptions and compare
// that to what subscriptions we should have. If there is a discrepancy a
// warning is logged and the market subbed or unsubbed.
func (bnc *binance) checkSubs(ctx context.Context) error {
bnc.marketStreamMtx.Lock()
defer bnc.marketStreamMtx.Unlock()
streams := bnc.streams()
if len(streams) == 0 {
return nil
}

method := "LIST_SUBSCRIPTIONS"
id := atomic.AddUint64(&subscribeID, 1)

resp := make(chan []string, 1)
bnc.marketStreamRespsMtx.Lock()
bnc.marketStreamResps[id] = resp
bnc.marketStreamRespsMtx.Unlock()

defer func() {
bnc.marketStreamRespsMtx.Lock()
delete(bnc.marketStreamResps, id)
bnc.marketStreamRespsMtx.Unlock()
}()

req := &bntypes.StreamSubscription{
Method: method,
ID: id,
}

b, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("error marshaling subscription stream request: %w", err)
}

bnc.log.Debugf("Sending %v", method)
if err := bnc.marketStream.SendRaw(b); err != nil {
return fmt.Errorf("error sending subscription stream request: %w", err)
}

timeout := time.After(time.Second * 5)
var subs []string
select {
case subs = <-resp:
case <-timeout:
return fmt.Errorf("market stream result id %d did not come.", id)
case <-ctx.Done():
return nil
}

var sub []string
unsub := make([]string, len(subs))
for i, s := range subs {
unsub[i] = strings.ToLower(s)
}

out:
for _, us := range streams {
for i, them := range unsub {
if us == them {
unsub[i] = unsub[len(unsub)-1]
unsub = unsub[:len(unsub)-1]
continue out
}
}
sub = append(sub, us)
}

for _, s := range sub {
bnc.log.Warnf("Subbing to previously unsubbed stream %s", s)
if err := bnc.subUnsubDepth(true, s); err != nil {
bnc.log.Errorf("Error subscribing to %s: %v", s, err)
}
}

for _, s := range unsub {
bnc.log.Warnf("Unsubbing to previously subbed stream %s", s)
if err := bnc.subUnsubDepth(false, s); err != nil {
bnc.log.Errorf("Error unsubscribing to %s: %v", s, err)
}
}

return nil
}

// connectToMarketDataStream is called when the first market is subscribed to.
Expand All @@ -1635,7 +1743,7 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
reconnectC := make(chan struct{})

newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) {
addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, bnc.streamsQuery())
addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/"))
// Need to send key but not signature
conn, err := comms.NewWsConn(&comms.WsCfg{
URL: addr,
Expand Down Expand Up @@ -1712,22 +1820,33 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
return nil
}

checkSubsInterval := time.Minute
checkSubs := time.After(checkSubsInterval)
reconnectTimer := time.After(time.Hour * 12)
for {
select {
case <-reconnectC:
if err := reconnect(); err != nil {
bnc.log.Errorf("Error reconnecting: %v", err)
reconnectTimer = time.After(time.Second * 30)
checkSubs = make(<-chan time.Time)
continue
}
checkSubs = time.After(checkSubsInterval)
case <-reconnectTimer:
if err := reconnect(); err != nil {
bnc.log.Errorf("Error refreshing connection: %v", err)
reconnectTimer = time.After(time.Second * 30)
checkSubs = make(<-chan time.Time)
continue
}
reconnectTimer = time.After(time.Hour * 12)
checkSubs = time.After(checkSubsInterval)
case <-checkSubs:
if err := bnc.checkSubs(ctx); err != nil {
bnc.log.Errorf("Error checking subscriptions: %v", err)
}
checkSubs = time.After(checkSubsInterval)
case <-ctx.Done():
bnc.marketStreamMtx.Lock()
bnc.marketStream = nil
Expand Down
2 changes: 2 additions & 0 deletions client/mm/libxc/bntypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type BookUpdate struct {
type BookNote struct {
StreamName string `json:"stream"`
Data *BookUpdate `json:"data"`
ID uint64 `json:"id"`
Result []string `json:"result"`
}

type WSBalance struct {
Expand Down

0 comments on commit ab9457b

Please sign in to comment.