Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 27, 2024
1 parent c2d0fcb commit 8d0d0ad
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 20 deletions.
41 changes: 34 additions & 7 deletions node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"bytes"
"context"
"encoding/json"

"math"
"time"

"bisonai.com/miko/node/pkg/bus"
"bisonai.com/miko/node/pkg/chain/helper"
errorSentinel "bisonai.com/miko/node/pkg/error"
"bisonai.com/miko/node/pkg/raft"
Expand All @@ -16,7 +17,15 @@ import (
"github.com/rs/zerolog/log"
)

func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Config, signHelper *helper.Signer, latestLocalAggregates *LatestLocalAggregates) (*Aggregator, error) {
func NewAggregator(
h host.Host,
ps *pubsub.PubSub,
topicString string,
config Config,
signHelper *helper.Signer,
latestLocalAggregates *LatestLocalAggregates,
mb *bus.MessageBus,
) (*Aggregator, error) {
if h == nil || ps == nil || topicString == "" {
return nil, errorSentinel.ErrAggregatorInvalidInitValue
}
Expand Down Expand Up @@ -53,6 +62,7 @@ func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Co
RoundID: 1,
Signer: signHelper,
LatestLocalAggregates: latestLocalAggregates,
bus: mb,
}
aggregator.Raft.LeaderJob = aggregator.LeaderJob
aggregator.Raft.HandleCustomMessage = aggregator.HandleCustomMessage
Expand Down Expand Up @@ -102,16 +112,14 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message)
return err
}
defer n.leaveOnlyLast10Entries(triggerMessage.RoundID)

n.mu.Lock()
defer n.mu.Unlock()
if msg.SentFrom != n.Raft.GetHostId() {
// follower can be changed into leader unexpectedly before recieving the message
// increase round id before checking the message sent from leader
// so that the next round will be triggered with larger round id
// prevents already handled message error

n.mu.Lock()
n.RoundID = max(triggerMessage.RoundID, n.RoundID)
n.mu.Unlock()
}

if triggerMessage.RoundID == 0 {
Expand Down Expand Up @@ -142,6 +150,7 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message)
// if not enough messages collected from HandleSyncReplyMessage, it will hang in certain round
value = -1
} else {
n.roundLocalAggregate[triggerMessage.RoundID] = localAggregate.Value
value = localAggregate.Value
}

Expand Down Expand Up @@ -282,11 +291,29 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e
concatProof := bytes.Join(n.roundProofs.proofs[proofMessage.RoundID], nil)
proof := Proof{ConfigID: n.ID, Round: proofMessage.RoundID, Proof: concatProof}

n.mu.Lock()
defer n.mu.Unlock()
if math.Abs(float64(n.roundLocalAggregate[proofMessage.RoundID]-globalAggregate.Value))/float64(globalAggregate.Value) > 0.3 {
log.Warn().Str("Player", "Aggregator").Str("Name", n.Name).Int32("roundId", proofMessage.RoundID).Int64("localAggregate", n.roundLocalAggregate[proofMessage.RoundID]).Int64("globalAggregate", globalAggregate.Value).Msg("local aggregate and global aggregate mismatch")
msg := bus.Message{
From: bus.AGGREGATOR,
To: bus.FETCHER,
Content: bus.MessageContent{
Command: bus.REFRESH_FETCHER_APP,
Args: nil,
},
}
err = n.bus.Publish(msg)
if err != nil {
log.Warn().Str("Player", "Aggregator").Err(err).Msg("failed to publish fetcher refresh bus message")
}
}

err := PublishGlobalAggregateAndProof(ctx, n.Name, globalAggregate, proof)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to publish global aggregate and proof")
}

delete(n.roundLocalAggregate, proofMessage.RoundID)
}
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions node/pkg/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ func TestNewAggregator(t *testing.T) {
}
}()

_, err = NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap)
_, err = NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus)
if err != nil {
t.Fatal("error creating new node")
}
}

func TestNewAggregator_Error(t *testing.T) {
_, err := NewAggregator(nil, nil, "", Config{}, nil, nil)
_, err := NewAggregator(nil, nil, "", Config{}, nil, nil, nil)
assert.NotNil(t, err, "Expected error when creating new aggregator with nil parameters")
}

Expand All @@ -46,7 +46,7 @@ func TestLeaderJob(t *testing.T) {
}
}()

node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap)
node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus)
if err != nil {
t.Fatal("error creating new node")
}
Expand All @@ -70,7 +70,7 @@ func TestGetLatestRoundId(t *testing.T) {
}
}()

node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap)
node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus)
if err != nil {
t.Fatal("error creating new node")
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestPublishGlobalAggregateAndProof(t *testing.T) {
}
}()

node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap)
node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus)
if err != nil {
t.Fatal("error creating new node")
}
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/aggregator/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (a *App) initializeLoadedAggregators(ctx context.Context, loadedConfigs []C
}

topicString := config.Name + "-global-aggregator-topic-" + strconv.Itoa(int(config.AggregateInterval))
tmpNode, err := NewAggregator(h, ps, topicString, config, signer, a.LatestLocalAggregates)
tmpNode, err := NewAggregator(h, ps, topicString, config, signer, a.LatestLocalAggregates, a.Bus)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/aggregator/globalaggregatebulkwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestGlobalAggregateBulkWriterDataStore(t *testing.T) {
defer bulkWriter.Stop()
assert.NotEqual(t, nil, bulkWriter.ctx)

node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap)
node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus)
if err != nil {
t.Fatal("error creating new node")
}
Expand Down
13 changes: 8 additions & 5 deletions node/pkg/aggregator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,12 @@ type Aggregator struct {
Raft *raft.Raft

LatestLocalAggregates *LatestLocalAggregates
roundTriggers *RoundTriggers
roundPrices *RoundPrices
roundPriceFixes *RoundPriceFixes
roundProofs *RoundProofs
roundLocalAggregate map[int32]int64

roundTriggers *RoundTriggers
roundPrices *RoundPrices
roundPriceFixes *RoundPriceFixes
roundProofs *RoundProofs

RoundID int32
Signer *helper.Signer
Expand All @@ -198,7 +200,8 @@ type Aggregator struct {
nodeCancel context.CancelFunc
isRunning bool

mu sync.RWMutex
mu sync.RWMutex
bus *bus.MessageBus
}

type PriceDataMessage struct {
Expand Down
1 change: 1 addition & 0 deletions node/pkg/error/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ var (
ErrFetcherFailedToGetDexResultSlice = &CustomError{Service: Fetcher, Code: InternalError, Message: "Failed to get dex result slice"}
ErrFetcherFailedBigIntConvert = &CustomError{Service: Fetcher, Code: InternalError, Message: "Failed to convert to fetched data to big.Int"}
ErrFetcherFeedNotFound = &CustomError{Service: Fetcher, Code: InvalidInputError, Message: "Feed not found"}
ErrFetcherRefreshCooldown = &CustomError{Service: Fetcher, Code: InternalError, Message: "Failed to refresh data: cooldown period not over"}

ErrLibP2pEmptyNonLocalAddress = &CustomError{Service: Others, Code: InternalError, Message: "Host has no non-local addresses"}
ErrLibP2pAddressSplitFail = &CustomError{Service: Others, Code: InternalError, Message: "Failed to split address"}
Expand Down
8 changes: 7 additions & 1 deletion node/pkg/fetcher/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func New(bus *bus.MessageBus) *App {
},
FeedDataDumpChannel: make(chan *FeedData, DefaultFeedDataDumpChannelSize),
Bus: bus,
LastRefreshTime: time.Time{},
}
}

Expand Down Expand Up @@ -63,7 +64,7 @@ func (a *App) subscribe(ctx context.Context) {
}

func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
if msg.From != bus.ADMIN {
if msg.From != bus.ADMIN || msg.From != bus.ACTIVATE_AGGREGATOR {
log.Debug().Str("Player", "Fetcher").Msg("fetcher received message from non-admin")
return
}
Expand Down Expand Up @@ -93,6 +94,11 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
}
msg.Response <- bus.MessageResponse{Success: true}
case bus.REFRESH_FETCHER_APP:
if !a.LastRefreshTime.IsZero() && time.Since(a.LastRefreshTime) < RefreshCooldownInterval {
bus.HandleMessageError(errorSentinel.ErrFetcherRefreshCooldown, msg, "refresh on cooldown")
return
}
a.LastRefreshTime = time.Now()
err := a.stopAll(ctx)
if err != nil {
log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to stop all fetchers")
Expand Down
2 changes: 2 additions & 0 deletions node/pkg/fetcher/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
DefaultLocalAggregateInterval = 200 * time.Millisecond
DefaultFeedDataDumpChannelSize = 20000
MaxOutlierRemovalRatio = 0.25
RefreshCooldownInterval = 5 * time.Minute
)

type Feed = types.Feed
Expand Down Expand Up @@ -91,6 +92,7 @@ type App struct {
LatestFeedDataMap *LatestFeedDataMap
Proxies []Proxy
FeedDataDumpChannel chan *FeedData
LastRefreshTime time.Time
}

type Definition struct {
Expand Down

0 comments on commit 8d0d0ad

Please sign in to comment.