Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/core: OrderBook clarify cache usage #5

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion client/core/bookie.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newBookie(dc *dexConnection, base, quote uint32, binSizes []string, logger
} else {
dexAsset := dc.assets[assetID]
if dexAsset == nil {
dc.log.Errorf("DEX market has no %d asset. Is this even possible?", base)
dc.log.Errorf("DEX market has no %d asset. Is this even possible?", assetID)
return defaultUnitInfo("XYZ")
} else {
unitInfo := dexAsset.UnitInfo
Expand Down Expand Up @@ -716,6 +716,9 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message)
return fmt.Errorf("no order book found with market id '%s'", sp.MarketID)
}

// Since we don't subscribe to server feed here it's okay to use book.Reset,
// otherwise we'd have to use ResetBeforeSubscribe/ResetAfterSubscribe pair
// instead.
err = book.Reset(&msgjson.OrderBook{
MarketID: sp.MarketID,
Seq: sp.Seq, // forces seq reset, but should be in seq with previous
Expand Down
6 changes: 5 additions & 1 deletion client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -7901,6 +7901,10 @@ func (c *Core) handleReconnect(host string) {
return
}

// Ensuring we don't lose any updates notifications that server might
// send while we are in the process of resetting order book via booky.ResetAfterSubscribe.
booky.ResetBeforeSubscribe()

// Resubscribe since our old subscription was probably lost by the
// server when the connection dropped.
snap, err := dc.subscribe(mkt.base, mkt.quote)
Expand All @@ -7910,7 +7914,7 @@ func (c *Core) handleReconnect(host string) {
}

// Create a fresh OrderBook for the bookie.
err = booky.Reset(snap)
err = booky.ResetAfterSubscribe(snap)
if err != nil {
c.log.Errorf("handleReconnect: Failed to Sync market %q order book snapshot: %v", mkt.name, err)
}
Expand Down
39 changes: 36 additions & 3 deletions client/orderbook/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,55 @@ func (ob *OrderBook) processCachedNotes() error {
}
}

// TODO: this doesn't help actually, because we still might add another note
// to cache after we exit processCachedNotes func and will never handle it (it drops).
// Must be done with ob.noteQueueMtx locked, otherwise some notes might be sent
// to ob.noteQueue after we are done processing it. Such notes won't be processed
// until next processCachedNotes call when they'll be irrelevant and more
// importantly it means we won't apply them on top of server snapshot they were
// meant to be applied to.
ob.setSynced(true)

return nil
}
Comment on lines +225 to 233
Copy link
Owner Author

@norwnd norwnd Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since cache isn't used (based on what I observed in the logs, and my current understanding of the relevant code), this fix is non-effectual really if #1 gets merged.

Moreover, we might not even need caching here really, so in the next PR I'll consider getting rid of it. But if OrderBook.noteQueue cache stays this is very much a necessary fix for it.

Copy link
Owner Author

@norwnd norwnd Mar 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on whether or not we need (to fix or reimplement) that caching there:

  • there is a 1024+128 buffer in comms package to hold incoming WS messages, it seems like enough of a buffer to hold those server update notifications off for a while (and probably we can easily increase these number if necessary, probably want to add some Tracef logs to monitor saturation in case it gets dangerously high during network delays and whatnot; on master I'm getting single-digits numbers for nextJob size during normal operation, don't imagine how this could get significantly worse unless we have ~x100 activity increase on DEX maybe)
  • ALL server notifications will be waiting behind one that got blocked (waiting on dc.booksMtx for example), it doesn't seems to be a big deal here cause we don't have to handle them in real time


// Sync updates a client tracked order book with an order book snapshot. It is
// an error if the the OrderBook is already synced.
// an error if the OrderBook is already synced because this method is meant to
// be used for initial sync (that includes server subscription request too) only.
func (ob *OrderBook) Sync(snapshot *msgjson.OrderBook) error {
if ob.isSynced() {
return fmt.Errorf("order book is already synced")
}
return ob.Reset(snapshot) // OK to use instead of ResetAfterSubscribe on initial sync.
}

// ResetBeforeSubscribe helps us to make sure we don't miss any order book update
// notifications after/during order book snapshot is taken. Once we send "subscribe"
// request ("orderbook" route) to the server we start receiving those update
// notifications immediately, thus in order to not miss any of those we must set
// OrderBook synched status to false, so that when update notification comes it is
// cached to be retried after order book sync/reset process is done. Note, doing it
// this way we might receive update notifications related to previous subscription
// (unless we somehow informed the server and got a confirmation back that we aren't
// interested in those notifications, before sending this new subscribe request - which
// we currently don't do), this isn't an issue though because we'll just drop them
// since they'll contain seq value <= seq value from order book snapshot (which means
// they are already present in order book snapshot).
func (ob *OrderBook) ResetBeforeSubscribe() {
ob.setSynced(false)
}

// ResetAfterSubscribe must be used instead of Reset when subscribing to server feed
// with "orderbook" route. It must be preceded by ResetBeforeSubscribe, see more
// details outlined in the comments there.
func (ob *OrderBook) ResetAfterSubscribe(snapshot *msgjson.OrderBook) error {
return ob.Reset(snapshot)
}

// Reset forcibly updates a client tracked order book with an order book
// snapshot. This resets the sequence.
// See ResetAfterSubscribe if you want to use Reset while re-subscribing to
// server order book feed with "orderbook" route.
// TODO: eliminate this and half of the mutexes!
func (ob *OrderBook) Reset(snapshot *msgjson.OrderBook) error {
// Don't use setSeq here, since this message is the seed and is not expected
Expand Down Expand Up @@ -295,8 +330,6 @@ func (ob *OrderBook) Reset(snapshot *msgjson.OrderBook) error {
return err
}

ob.setSynced(true)

return nil
}

Expand Down