Skip to content

Commit

Permalink
Joe/Buck review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
martonp committed Sep 12, 2023
1 parent 08aae3b commit 08a73db
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 224 deletions.
4 changes: 4 additions & 0 deletions client/core/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (c *Core) logNote(n Notification) {
logFun("notify: %v", n)
}

func (c *Core) Broadcast(n Notification) {
c.notify(n)
}

// notify sends a notification to all subscribers. If the notification is of
// sufficient severity, it is stored in the database.
func (c *Core) notify(n Notification) {
Expand Down
76 changes: 26 additions & 50 deletions client/mm/mm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -39,6 +37,7 @@ type clientCore interface {
User() *core.User
Login(pw []byte) error
OpenWallet(assetID uint32, appPW []byte) error
Broadcast(core.Notification)
}

var _ clientCore = (*core.Core)(nil)
Expand Down Expand Up @@ -166,7 +165,7 @@ type MarketMaker struct {
unsyncedOracle *priceOracle

runningBotsMtx sync.RWMutex
runningBots map[string]interface{}
runningBots map[MarketWithHost]interface{}

noteMtx sync.RWMutex
noteChans map[uint64]chan core.Notification
Expand All @@ -182,7 +181,7 @@ func NewMarketMaker(c clientCore, log dex.Logger) (*MarketMaker, error) {
log: log,
running: atomic.Bool{},
orders: make(map[order.OrderID]*orderInfo),
runningBots: make(map[string]interface{}),
runningBots: make(map[MarketWithHost]interface{}),
noteChans: make(map[uint64]chan core.Notification),
unsyncedOracle: newUnsyncedPriceOracle(log),
}, nil
Expand All @@ -195,49 +194,23 @@ func (m *MarketMaker) Running() bool {

// MarketWithHost represents a market on a specific dex server.
type MarketWithHost struct {
Host string `json:"host"`
Base uint32 `json:"base"`
Quote uint32 `json:"quote"`
Host string `json:"host"`
BaseID uint32 `json:"base"`
QuoteID uint32 `json:"quote"`
}

func (m *MarketWithHost) String() string {
return fmt.Sprintf("%s-%d-%d", m.Host, m.Base, m.Quote)
}

func parseMarketWithHost(mkt string) (*MarketWithHost, error) {
parts := strings.Split(mkt, "-")
if len(parts) != 3 {
return nil, fmt.Errorf("invalid market %s", mkt)
}
host := parts[0]
base64, err := strconv.ParseUint(parts[1], 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid market %s", mkt)
}
quote64, err := strconv.ParseUint(parts[2], 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid market %s", mkt)
}
return &MarketWithHost{
Host: host,
Base: uint32(base64),
Quote: uint32(quote64),
}, nil
return fmt.Sprintf("%s-%d-%d", m.Host, m.BaseID, m.QuoteID)
}

// RunningBots returns the markets on which a bot is running.
func (m *MarketMaker) RunningBots() []*MarketWithHost {
func (m *MarketMaker) RunningBots() []MarketWithHost {
m.runningBotsMtx.RLock()
defer m.runningBotsMtx.RUnlock()

mkts := make([]*MarketWithHost, 0, len(m.runningBots))
mkts := make([]MarketWithHost, 0, len(m.runningBots))
for mkt := range m.runningBots {
mktWithHost, err := parseMarketWithHost(mkt)
if err != nil {
m.log.Errorf("failed to parse market %s: %v", mkt, err)
continue
}
mkts = append(mkts, mktWithHost)
mkts = append(mkts, mkt)
}

return mkts
Expand Down Expand Up @@ -285,13 +258,13 @@ func priceOracleFromConfigs(ctx context.Context, cfgs []*BotConfig, log dex.Logg
return oracle, nil
}

func (m *MarketMaker) markBotAsRunning(id string, running bool) {
func (m *MarketMaker) markBotAsRunning(mkt MarketWithHost, running bool) {
m.runningBotsMtx.Lock()
defer m.runningBotsMtx.Unlock()
if running {
m.runningBots[id] = struct{}{}
m.runningBots[mkt] = struct{}{}
} else {
delete(m.runningBots, id)
delete(m.runningBots, mkt)
}

if len(m.runningBots) == 0 {
Expand All @@ -308,8 +281,11 @@ func (m *MarketMaker) MarketReport(base, quote uint32) (*MarketReport, error) {

m.syncedOracleMtx.RLock()
if m.syncedOracle != nil {
price, oracles, err := m.syncedOracle.GetOracleInfo(base, quote)
price, oracles, err := m.syncedOracle.getOracleInfo(base, quote)
m.syncedOracleMtx.RUnlock()
if err != nil && !errors.Is(err, errUnsyncedMarket) {
m.log.Errorf("failed to get oracle info for market %d-%d: %v", base, quote, err)
}
if err == nil {
return &MarketReport{
Price: price,
Expand All @@ -321,7 +297,7 @@ func (m *MarketMaker) MarketReport(base, quote uint32) (*MarketReport, error) {
}
m.syncedOracleMtx.RUnlock()

price, oracles, err := m.unsyncedOracle.GetOracleInfo(base, quote)
price, oracles, err := m.unsyncedOracle.getOracleInfo(base, quote)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -888,7 +864,7 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err
user := m.core.User()

startedMarketMaking = true
m.notify(newMMStartStopNote(true))
m.core.Broadcast(newMMStartStopNote(true))

wg := new(sync.WaitGroup)

Expand All @@ -915,15 +891,15 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err
case cfg.MMCfg != nil:
wg.Add(1)
go func(cfg *BotConfig) {
mkt := &MarketWithHost{cfg.Host, cfg.BaseAsset, cfg.QuoteAsset}
m.markBotAsRunning(mkt.String(), true)
mkt := MarketWithHost{cfg.Host, cfg.BaseAsset, cfg.QuoteAsset}
m.markBotAsRunning(mkt, true)
defer func() {
m.markBotAsRunning(mkt.String(), false)
m.markBotAsRunning(mkt, false)
}()

m.notify(newBotStartStopNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, true))
m.core.Broadcast(newBotStartStopNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, true))
defer func() {
m.notify(newBotStartStopNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, false))
m.core.Broadcast(newBotStartStopNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, false))
}()
logger := m.log.SubLogger(fmt.Sprintf("MarketMaker-%s-%d-%d", cfg.Host, cfg.BaseAsset, cfg.QuoteAsset))
mktID := dexMarketID(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset)
Expand All @@ -932,7 +908,7 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err
baseFiatRate = user.FiatRates[cfg.BaseAsset]
quoteFiatRate = user.FiatRates[cfg.QuoteAsset]
}
RunBasicMarketMaker(m.ctx, cfg, m.wrappedCoreForBot(mktID), oracle, baseFiatRate, quoteFiatRate, logger, m.notify)
RunBasicMarketMaker(m.ctx, cfg, m.wrappedCoreForBot(mktID), oracle, baseFiatRate, quoteFiatRate, logger)
wg.Done()
}(cfg)
default:
Expand All @@ -944,7 +920,7 @@ func (m *MarketMaker) Run(ctx context.Context, cfgs []*BotConfig, pw []byte) err
wg.Wait()
m.log.Infof("All bots have stopped running.")
m.running.Store(false)
m.notify(newMMStartStopNote(false))
m.core.Broadcast(newMMStartStopNote(false))
}()

return nil
Expand Down
7 changes: 3 additions & 4 deletions client/mm/mm_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func basisPrice(book dexOrderBook, oracle oracle, cfg *MarketMakingConfig, mkt *
var oracleWeighting, oraclePrice float64
if cfg.OracleWeighting != nil && *cfg.OracleWeighting > 0 {
oracleWeighting = *cfg.OracleWeighting
oraclePrice = oracle.GetMarketPrice(mkt.BaseID, mkt.QuoteID)
oraclePrice = oracle.getMarketPrice(mkt.BaseID, mkt.QuoteID)
if oraclePrice == 0 {
log.Warnf("no oracle price available for %s bot", mkt.Name)
}
Expand Down Expand Up @@ -854,8 +854,7 @@ func (m *basicMarketMaker) run() {
}

// RunBasicMarketMaker starts a basic market maker bot.
func RunBasicMarketMaker(ctx context.Context, cfg *BotConfig, c clientCore, oracle oracle, baseFiatRate, quoteFiatRate float64, log dex.Logger,
notify func(core.Notification)) {
func RunBasicMarketMaker(ctx context.Context, cfg *BotConfig, c clientCore, oracle oracle, baseFiatRate, quoteFiatRate float64, log dex.Logger) {
if cfg.MMCfg == nil {
// implies bug in caller
log.Errorf("No market making config provided. Exiting.")
Expand All @@ -864,7 +863,7 @@ func RunBasicMarketMaker(ctx context.Context, cfg *BotConfig, c clientCore, orac

err := cfg.MMCfg.Validate()
if err != nil {
notify(newValidationErrorNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, fmt.Sprintf("invalid market making config: %v", err)))
c.Broadcast(newValidationErrorNote(cfg.Host, cfg.BaseAsset, cfg.QuoteAsset, fmt.Sprintf("invalid market making config: %v", err)))
return
}

Expand Down
4 changes: 3 additions & 1 deletion client/mm/mm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (c *tCore) User() *core.User {
return nil
}

func (c *tCore) Broadcast(core.Notification) {}

var _ clientCore = (*tCore)(nil)

func tMaxOrderEstimate(lots uint64, swapFees, redeemFees uint64) *core.MaxOrderEstimate {
Expand Down Expand Up @@ -297,7 +299,7 @@ type tOracle struct {
marketPrice float64
}

func (o *tOracle) GetMarketPrice(base, quote uint32) float64 {
func (o *tOracle) getMarketPrice(base, quote uint32) float64 {
return o.marketPrice
}

Expand Down
94 changes: 4 additions & 90 deletions client/mm/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ package mm

import (
"fmt"
"sync/atomic"

"decred.org/dcrdex/client/core"
"decred.org/dcrdex/client/db"
"decred.org/dcrdex/dex"
)
Expand All @@ -18,96 +16,12 @@ const (
mmStartStop = "mmstartstop"
)

// NoteFeed contains a receiving channel for notifications.
type NoteFeed struct {
C <-chan core.Notification
closer func()
}

// ReturnFeed should be called when the channel is no longer needed.
func (f *NoteFeed) ReturnFeed() {
if f.closer != nil {
f.closer()
}
}

// NotificationFeed returns a new receiving channel for notifications. The
// channel has capacity 1024, and should be monitored for the lifetime of the
// Core. Blocking channels are silently ignored.
func (m *MarketMaker) NotificationFeed() *NoteFeed {
id, ch := m.notificationFeed()
return &NoteFeed{
C: ch,
closer: func() { m.returnFeed(id) },
}
}

func (m *MarketMaker) returnFeed(channelID uint64) {
m.noteMtx.Lock()
delete(m.noteChans, channelID)
m.noteMtx.Unlock()
}

func (m *MarketMaker) logNote(n core.Notification) {
if n.Subject() == "" && n.Details() == "" {
return
}

logFun := m.log.Warnf // default in case the Severity level is unknown to notify
switch n.Severity() {
case db.Data:
logFun = m.log.Tracef
case db.Poke:
logFun = m.log.Debugf
case db.Success:
logFun = m.log.Infof
case db.WarningLevel:
logFun = m.log.Warnf
case db.ErrorLevel:
logFun = m.log.Errorf
}

logFun("notify: %v", n)
}

// notify sends a notification to all subscribers. If the notification is of
// sufficient severity, it is stored in the database.
func (m *MarketMaker) notify(n core.Notification) {
m.logNote(n)

m.noteMtx.RLock()
for _, ch := range m.noteChans {
select {
case ch <- n:
default:
m.log.Errorf("blocking notification channel")
}
}
m.noteMtx.RUnlock()
}

var noteChanCounter uint64

func (m *MarketMaker) notificationFeed() (uint64, <-chan core.Notification) {
ch := make(chan core.Notification, 1024)
cid := atomic.AddUint64(&noteChanCounter, 1)
m.noteMtx.Lock()
m.noteChans[cid] = ch
m.noteMtx.Unlock()
return cid, ch
}

type botValidationErrorNote struct {
db.Notification
}

func newValidationErrorNote(host string, baseID, quoteID uint32, errorMsg string) *botValidationErrorNote {
func newValidationErrorNote(host string, baseID, quoteID uint32, errorMsg string) *db.Notification {
baseSymbol := dex.BipIDSymbol(baseID)
quoteSymbol := dex.BipIDSymbol(quoteID)
msg := fmt.Sprintf("%s-%s @ %s: %s", host, baseSymbol, quoteSymbol, errorMsg)
return &botValidationErrorNote{
Notification: db.NewNotification(validationNote, "", "Bot Config Validation Error", msg, db.ErrorLevel),
}
msg := fmt.Sprintf("%s-%s @ %s: %s", baseSymbol, quoteSymbol, host, errorMsg)
note := db.NewNotification(validationNote, "", "Bot Config Validation Error", msg, db.ErrorLevel)
return &note
}

type botStartStopNote struct {
Expand Down
Loading

0 comments on commit 08a73db

Please sign in to comment.