Skip to content

Commit

Permalink
refactor feed info service.
Browse files Browse the repository at this point in the history
  • Loading branch information
jppade committed May 13, 2022
1 parent 0b7e4d4 commit ad98424
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 81 deletions.
2 changes: 1 addition & 1 deletion cmd/services/feedInfoService/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ module github.com/diadata-org/diadata/cmd/services/feedInfoService
go 1.14

require (
github.com/diadata-org/diadata v1.4.1-rc-163
github.com/diadata-org/diadata v1.4.1-rc-164
github.com/sirupsen/logrus v1.8.1
)
200 changes: 123 additions & 77 deletions cmd/services/feedInfoService/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import (
)

var (
datastore *models.DB
relDB *models.RelDB
err error
basetokenMap = make(map[string]dia.Asset)
assets []dia.Asset
)
Expand All @@ -31,14 +28,19 @@ const (
NUM_RANGES = "24"
)

type timeRange struct {
tLeft time.Time
tRight time.Time
}

func main() {

datastore, err = models.NewDataStore()
datastore, err := models.NewDataStore()
if err != nil {
log.Errorln("NewDataStore", err)
}

relDB, err = models.NewRelDataStore()
relDB, err := models.NewRelDataStore()
if err != nil {
log.Errorln("NewRelDataStore:", err)
}
Expand All @@ -58,62 +60,150 @@ func main() {
ticker := time.NewTicker(COLLECTION_FREQUENCY_SECONDS * time.Second)

// Initial run.
updateStats(assets, time.Now(), numRanges, datastore, relDB)
for _, asset := range assets {
updateStatsPerAsset(asset, time.Now(), numRanges, datastore, relDB)
}
log.Info("...update done.")

for {
select {
case tFinal := <-ticker.C:
updateStats(assets, tFinal, numRanges, datastore, relDB)
for _, asset := range assets {
updateStatsPerAsset(asset, tFinal, numRanges, datastore, relDB)
}
log.Info("...update done.")
}
}

}

func updateStats(assets []dia.Asset, tFinal time.Time, numRanges int, datastore *models.DB, relDB *models.RelDB) {
tInit := tFinal.Add(-time.Duration(LOOKBACK_SECONDS * time.Second))
func updateStatsPerAsset(asset dia.Asset, tFinal time.Time, numRanges int, datastore *models.DB, relDB *models.RelDB) {

tInit := tFinal.Add(-time.Duration(LOOKBACK_SECONDS * time.Second))
// Make time ranges for batching the trades getter.
starttimes, endtimes := utils.MakeTimeRanges(tInit, tFinal, numRanges)

for _, asset := range assets {
var (
pairMap = make(map[string]struct{})
pairExchangeMap = make(map[string]struct{})
pairExchangeVolMap = make(map[dia.Pair]map[string]float64)
aggVolumes []dia.AggregatedVolume
tradesDistribution dia.TradesDistribution
tradesCount []int
binMap = make(map[timeRange]int)
binDuration = time.Duration(BIN_DURATION_SECONDS * time.Second)
)

// Constant trades distribution params.
tradesDistribution.Asset = asset
tradesDistribution.Timestamp = tFinal
tradesDistribution.Threshold = BIN_THRESHOLD
tradesDistribution.SizeBinSeconds = int64(BIN_DURATION_SECONDS)
tradesDistribution.TimeRangeSeconds = int64(LOOKBACK_SECONDS)

// Make bin map, mapping a time bin of size BIN_DURATION_SECONDS to the number of trades in this bin.
leftTime := tInit
for leftTime.Before(tFinal) {
tr := timeRange{
tLeft: leftTime,
tRight: leftTime.Add(binDuration),
}
binMap[tr] = 0
leftTime = leftTime.Add(binDuration)
}

for i := range starttimes {

// 1. Fetch trades for @asset in batches.
log.Infof("collect trades for: %s - %s ...", asset.Address, asset.Blockchain)
trades, err := datastore.GetTradesByExchangesBatchedFull(asset, []string{}, true, starttimes, endtimes)
trades, err := datastore.GetTradesByExchangesFull(asset, []string{}, true, starttimes[i], endtimes[i])
if err != nil {
log.Fatal("GetTradesByExchangesBatched: ", err)
log.Warnf("GetTradesByExchangesBatched in time range %v -- %v: %v", starttimes[i], endtimes[i], err)
}
log.Info("...done collecting trades.")

// 2. Get volumes per exchange and per pair.
aggVolumes := computePairStats(asset, trades, tFinal)
for _, pv := range aggVolumes {
err = relDB.SetAggregatedVolume(pv)
if err != nil {
log.Errorf("SetAggregatedVolume for %s - %s: %v", asset.Address, asset.Blockchain, err)
pairMap, pairExchangeMap, pairExchangeVolMap = computePairStats(asset, trades, pairMap, pairExchangeMap, pairExchangeVolMap, relDB)

// 3. Get statistics on trades' frequency and distribution
binMap = computeBinMap(trades, binMap)

}

// Get trades statistics from binMap.
for _, value := range binMap {
tradesCount = append(tradesCount, value)
tradesDistribution.NumTradesTotal += value
if value < BIN_THRESHOLD {
tradesDistribution.NumLowBins += 1
}
}
tradesDistribution.AvgNumPerBin = average(tradesCount)
tradesDistribution.StdDeviation = math.Sqrt(variance(tradesCount))

err := relDB.SetTradesDistribution(tradesDistribution)
if err != nil {
log.Errorf("set trades distributionfor %s - %s: %v", asset.Address, asset.Blockchain, err)
}

// Fill pairVolumes slice, i.e. sort by exchange and pair.
for pair, exchangeMap := range pairExchangeVolMap {
for exchange, value := range exchangeMap {
pairVolume := dia.AggregatedVolume{
Pair: pair,
Exchange: exchange,
Volume: value,
Timestamp: tFinal,
TimeRangeSeconds: LOOKBACK_SECONDS,
}
aggVolumes = append(aggVolumes, pairVolume)
}
}

// 3. Get statistics on trades' frequency and distribution
binDuration := time.Duration(BIN_DURATION_SECONDS * time.Second)
starttime := tInit
tradesFreq := computeTradesFrequency(asset, trades, binDuration, starttime, tFinal)
err = relDB.SetTradesDistribution(tradesFreq)
for _, pv := range aggVolumes {
err = relDB.SetAggregatedVolume(pv)
if err != nil {
log.Error("set trades distributionfor %s - %s: %v", asset.Address, asset.Blockchain, err)
log.Errorf("SetAggregatedVolume for %s - %s: %v", asset.Address, asset.Blockchain, err)
}
}
}

// computeBinMap returns the mapping of a time bin of size BIN_DURATION_SECONDS to the number of trades in this bin.
func computeBinMap(trades []dia.Trade, binMap map[timeRange]int) map[timeRange]int {
for _, trade := range trades {
binMap = mapTradeToBin(trade, binMap)
}
return binMap
}

// computePairStats takes a slice of trades with @asset as quotetoken asset. It returns a slice of aggregated volumes.
func computePairStats(asset dia.Asset, trades []dia.Trade, timestamp time.Time) (pairVolumes []dia.AggregatedVolume) {
var (
pairMap = make(map[string]struct{})
pairExchangeMap = make(map[string]struct{})
pairExchangeVolMap = make(map[dia.Pair]map[string]float64)
)
// mapTradeToBin maps a trade to the corresponding time bin and increments the counter.
func mapTradeToBin(trade dia.Trade, binMap map[timeRange]int) map[timeRange]int {
for key := range binMap {
if inBin(trade, key) {
binMap[key] += 1
return binMap
}
}
return binMap
}

func inBin(trade dia.Trade, tr timeRange) bool {
if !trade.Time.Before(tr.tLeft) && !trade.Time.After(tr.tRight) {
return true
}
return false
}

func computePairStats(
asset dia.Asset,
trades []dia.Trade,
pairMap map[string]struct{},
pairExchangeMap map[string]struct{},
pairExchangeVolMap map[dia.Pair]map[string]float64,
relDB *models.RelDB,
) (
map[string]struct{},
map[string]struct{},
map[dia.Pair]map[string]float64,
) {

quotetoken, err := relDB.GetAsset(asset.Address, asset.Blockchain)
if err != nil {
Expand Down Expand Up @@ -149,51 +239,7 @@ func computePairStats(asset dia.Asset, trades []dia.Trade, timestamp time.Time)
}
}

// Fill pairVolumes slice, i.e. sort by exchange and pair.
for pair, exchangeMap := range pairExchangeVolMap {
for exchange, value := range exchangeMap {
pairVolume := dia.AggregatedVolume{
Pair: pair,
Exchange: exchange,
Volume: value,
Timestamp: timestamp,
TimeRangeSeconds: LOOKBACK_SECONDS,
}
pairVolumes = append(pairVolumes, pairVolume)
}
}
return
}

// computeTradesFrequency returns statistics on the trades of @asset in the given time-range, assuming a @binDuration.
func computeTradesFrequency(asset dia.Asset, trades []dia.Trade, binDuration time.Duration, starttime time.Time, endtime time.Time) (tradesDistribution dia.TradesDistribution) {

var tradesCount []int
tradesCount = append(tradesCount, 0)
var binCount int
var lowBinCount int
for _, trade := range trades {
if trade.Time.Before(starttime.Add(binDuration)) {
tradesCount[binCount] += 1
} else {
if tradesCount[binCount] < int(BIN_THRESHOLD) {
lowBinCount++
}
tradesCount = append(tradesCount, 0)
binCount++
starttime = starttime.Add(binDuration)
}
}
tradesDistribution.Asset = asset
tradesDistribution.NumTradesTotal = len(trades)
tradesDistribution.NumLowBins = lowBinCount
tradesDistribution.Threshold = BIN_THRESHOLD
tradesDistribution.SizeBinSeconds = int64(BIN_DURATION_SECONDS)
tradesDistribution.AvgNumPerBin = average(tradesCount)
tradesDistribution.StdDeviation = math.Sqrt(variance(tradesCount))
tradesDistribution.TimeRangeSeconds = int64(LOOKBACK_SECONDS)
tradesDistribution.Timestamp = endtime
return
return pairMap, pairExchangeMap, pairExchangeVolMap
}

// getBasetoken fetches basetoken info either from local cache if in there or from postgres if not.
Expand Down
1 change: 1 addition & 0 deletions pkg/model/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Datastore interface {
GetLastTrades(asset dia.Asset, exchange string, maxTrades int, fullAsset bool) ([]dia.Trade, error)
GetAllTrades(t time.Time, maxTrades int) ([]dia.Trade, error)
GetTradesByExchanges(symbol dia.Asset, exchange []string, startTime, endTime time.Time) ([]dia.Trade, error)
GetTradesByExchangesFull(asset dia.Asset, exchanges []string, returnBasetoken bool, startTime, endTime time.Time) ([]dia.Trade, error)
GetTradesByExchangesBatched(asset dia.Asset, exchanges []string, startTimes, endTimes []time.Time) ([]dia.Trade, error)
GetTradesByExchangesBatchedFull(asset dia.Asset, exchanges []string, returnBasetoken bool, startTimes, endTimes []time.Time) ([]dia.Trade, error)
GetOldTradesFromInflux(table string, exchange string, verified bool, timeInit, timeFinal time.Time) ([]dia.Trade, error)
Expand Down
8 changes: 5 additions & 3 deletions pkg/model/trades.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func parseTrade(row []interface{}, fullBasetoken bool) *dia.Trade {
}

func (datastore *DB) GetTradesByExchanges(asset dia.Asset, exchanges []string, startTime, endTime time.Time) ([]dia.Trade, error) {
return datastore.GetTradesByExchangesFull(asset, exchanges, false, startTime, endTime)
}

func (datastore *DB) GetTradesByExchangesFull(asset dia.Asset, exchanges []string, returnBasetoken bool, startTime, endTime time.Time) ([]dia.Trade, error) {
var r []dia.Trade
subQuery := ""
if len(exchanges) > 0 {
Expand All @@ -110,21 +114,19 @@ func (datastore *DB) GetTradesByExchanges(asset dia.Asset, exchanges []string, s
}
query := fmt.Sprintf("SELECT time,estimatedUSDPrice,verified,foreignTradeID,pair,price,symbol,volume,verified,basetokenblockchain,basetokenaddress FROM %s WHERE quotetokenaddress='%s' and quotetokenblockchain='%s' %s AND estimatedUSDPrice > 0 AND time >= %d AND time <= %d ", influxDbTradesTable, asset.Address, asset.Blockchain, subQuery, startTime.UnixNano(), endTime.UnixNano())

log.Infoln("GetTradesByExchanges Query", query)
res, err := queryInfluxDB(datastore.influxClient, query)
if err != nil {
return r, err
}

if len(res) > 0 && len(res[0].Series) > 0 {
for _, row := range res[0].Series[0].Values {
t := parseTrade(row, false)
t := parseTrade(row, returnBasetoken)
if t != nil {
r = append(r, *t)
}
}
} else {
log.Errorf("Empty response GetTradesByExchanges for %s \n", asset.Symbol)
return nil, fmt.Errorf("no trades found")
}
return r, nil
Expand Down

0 comments on commit ad98424

Please sign in to comment.