From 6dc9e7bfdc49c4b1d1775be22bca6dabe2173a2e Mon Sep 17 00:00:00 2001 From: norwnd Date: Thu, 9 Mar 2023 05:58:36 +0300 Subject: [PATCH 1/2] client/core: OrderBook clarify cache usage --- client/core/bookie.go | 3 +++ client/core/core.go | 6 +++++- client/orderbook/orderbook.go | 37 ++++++++++++++++++++++++++++++++--- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/client/core/bookie.go b/client/core/bookie.go index 7f9dea9750..b2c34df8e5 100644 --- a/client/core/bookie.go +++ b/client/core/bookie.go @@ -716,6 +716,9 @@ func handleTradeSuspensionMsg(c *Core, dc *dexConnection, msg *msgjson.Message) return fmt.Errorf("no order book found with market id '%s'", sp.MarketID) } + // Since we don't subscribe to server feed here it's okay to use book.Reset, + // otherwise we'd have to use ResetBeforeSubscribe/ResetAfterSubscribe pair + // instead. err = book.Reset(&msgjson.OrderBook{ MarketID: sp.MarketID, Seq: sp.Seq, // forces seq reset, but should be in seq with previous diff --git a/client/core/core.go b/client/core/core.go index 84b3222e46..fa735753e0 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -7901,6 +7901,10 @@ func (c *Core) handleReconnect(host string) { return } + // Ensuring we don't lose any updates notifications that server might + // send while we are in the process of resetting order book via booky.ResetAfterSubscribe. + booky.ResetBeforeSubscribe() + // Resubscribe since our old subscription was probably lost by the // server when the connection dropped. snap, err := dc.subscribe(mkt.base, mkt.quote) @@ -7910,7 +7914,7 @@ func (c *Core) handleReconnect(host string) { } // Create a fresh OrderBook for the bookie. - err = booky.Reset(snap) + err = booky.ResetAfterSubscribe(snap) if err != nil { c.log.Errorf("handleReconnect: Failed to Sync market %q order book snapshot: %v", mkt.name, err) } diff --git a/client/orderbook/orderbook.go b/client/orderbook/orderbook.go index c111244844..e8493e2f4c 100644 --- a/client/orderbook/orderbook.go +++ b/client/orderbook/orderbook.go @@ -220,20 +220,53 @@ func (ob *OrderBook) processCachedNotes() error { } } + // Must be done with ob.noteQueueMtx locked, otherwise some notes might be sent + // to ob.noteQueue after we are done processing it. Such notes won't be processed + // until next processCachedNotes call when they'll be irrelevant and more + // importantly it means we won't apply them on top of server snapshot they were + // meant to be applied to. + ob.setSynced(true) + return nil } // Sync updates a client tracked order book with an order book snapshot. It is -// an error if the the OrderBook is already synced. +// an error if the OrderBook is already synced because this method is meant to +// be used for initial sync (that includes server subscription request too) only. func (ob *OrderBook) Sync(snapshot *msgjson.OrderBook) error { if ob.isSynced() { return fmt.Errorf("order book is already synced") } + return ob.Reset(snapshot) // OK to use instead of ResetAfterSubscribe on initial sync. +} + +// ResetBeforeSubscribe helps us to make sure we don't miss any order book update +// notifications after/during order book snapshot is taken. Once we send "subscribe" +// request ("orderbook" route) to the server we start receiving those update +// notifications immediately, thus in order to not miss any of those we must set +// OrderBook synched status to false, so that when update notification comes it is +// cached to be retried after order book sync/reset process is done. Note, doing it +// this way we might receive update notifications related to previous subscription +// (unless we somehow informed the server and got a confirmation back that we aren't +// interested in those notifications, before sending this new subscribe request - which +// we currently don't do), this isn't an issue though because we'll just drop them +// since they'll contain seq value <= seq value from order book snapshot (which means +// they are already present in order book snapshot). +func (ob *OrderBook) ResetBeforeSubscribe() { + ob.setSynced(false) +} + +// ResetAfterSubscribe must be used instead of Reset when subscribing to server feed +// with "orderbook" route. It must be preceded by ResetBeforeSubscribe, see more +// details outlined in the comments there. +func (ob *OrderBook) ResetAfterSubscribe(snapshot *msgjson.OrderBook) error { return ob.Reset(snapshot) } // Reset forcibly updates a client tracked order book with an order book // snapshot. This resets the sequence. +// See ResetAfterSubscribe if you want to use Reset while re-subscribing to +// server order book feed with "orderbook" route. // TODO: eliminate this and half of the mutexes! func (ob *OrderBook) Reset(snapshot *msgjson.OrderBook) error { // Don't use setSeq here, since this message is the seed and is not expected @@ -295,8 +328,6 @@ func (ob *OrderBook) Reset(snapshot *msgjson.OrderBook) error { return err } - ob.setSynced(true) - return nil } From 3a44923080132ef8249c81cc4bd876109ada60df Mon Sep 17 00:00:00 2001 From: norwnd Date: Sat, 11 Mar 2023 10:54:29 +0300 Subject: [PATCH 2/2] Revising further --- client/core/bookie.go | 2 +- client/orderbook/orderbook.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/client/core/bookie.go b/client/core/bookie.go index b2c34df8e5..2fa3dfb278 100644 --- a/client/core/bookie.go +++ b/client/core/bookie.go @@ -147,7 +147,7 @@ func newBookie(dc *dexConnection, base, quote uint32, binSizes []string, logger } else { dexAsset := dc.assets[assetID] if dexAsset == nil { - dc.log.Errorf("DEX market has no %d asset. Is this even possible?", base) + dc.log.Errorf("DEX market has no %d asset. Is this even possible?", assetID) return defaultUnitInfo("XYZ") } else { unitInfo := dexAsset.UnitInfo diff --git a/client/orderbook/orderbook.go b/client/orderbook/orderbook.go index e8493e2f4c..8e7eb2918a 100644 --- a/client/orderbook/orderbook.go +++ b/client/orderbook/orderbook.go @@ -220,6 +220,8 @@ func (ob *OrderBook) processCachedNotes() error { } } + // TODO: this doesn't help actually, because we still might add another note + // to cache after we exit processCachedNotes func and will never handle it (it drops). // Must be done with ob.noteQueueMtx locked, otherwise some notes might be sent // to ob.noteQueue after we are done processing it. Such notes won't be processed // until next processCachedNotes call when they'll be irrelevant and more