From e186da29fd5d056b59497441f3e78450b930bbdd Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 00:00:45 +0900 Subject: [PATCH 1/8] wip --- node/pkg/aggregator/aggregator.go | 41 +++++++++++++++---- node/pkg/aggregator/aggregator_test.go | 10 ++--- node/pkg/aggregator/app.go | 2 +- .../globalaggregatebulkwriter_test.go | 2 +- node/pkg/aggregator/types.go | 13 +++--- node/pkg/error/sentinel.go | 1 + node/pkg/fetcher/app.go | 8 +++- node/pkg/fetcher/types.go | 2 + 8 files changed, 59 insertions(+), 20 deletions(-) diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index 82eba9cba..f8bc7cf43 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -4,9 +4,10 @@ import ( "bytes" "context" "encoding/json" - + "math" "time" + "bisonai.com/miko/node/pkg/bus" "bisonai.com/miko/node/pkg/chain/helper" errorSentinel "bisonai.com/miko/node/pkg/error" "bisonai.com/miko/node/pkg/raft" @@ -16,7 +17,15 @@ import ( "github.com/rs/zerolog/log" ) -func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Config, signHelper *helper.Signer, latestLocalAggregates *LatestLocalAggregates) (*Aggregator, error) { +func NewAggregator( + h host.Host, + ps *pubsub.PubSub, + topicString string, + config Config, + signHelper *helper.Signer, + latestLocalAggregates *LatestLocalAggregates, + mb *bus.MessageBus, +) (*Aggregator, error) { if h == nil || ps == nil || topicString == "" { return nil, errorSentinel.ErrAggregatorInvalidInitValue } @@ -53,6 +62,7 @@ func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Co RoundID: 1, Signer: signHelper, LatestLocalAggregates: latestLocalAggregates, + bus: mb, } aggregator.Raft.LeaderJob = aggregator.LeaderJob aggregator.Raft.HandleCustomMessage = aggregator.HandleCustomMessage @@ -102,16 +112,14 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message) return err } defer n.leaveOnlyLast10Entries(triggerMessage.RoundID) - + n.mu.Lock() + defer n.mu.Unlock() if msg.SentFrom != n.Raft.GetHostId() { // follower can be changed into leader unexpectedly before recieving the message // increase round id before checking the message sent from leader // so that the next round will be triggered with larger round id // prevents already handled message error - - n.mu.Lock() n.RoundID = max(triggerMessage.RoundID, n.RoundID) - n.mu.Unlock() } if triggerMessage.RoundID == 0 { @@ -142,6 +150,7 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message) // if not enough messages collected from HandleSyncReplyMessage, it will hang in certain round value = -1 } else { + n.roundLocalAggregate[triggerMessage.RoundID] = localAggregate.Value value = localAggregate.Value } @@ -282,11 +291,29 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e concatProof := bytes.Join(n.roundProofs.proofs[proofMessage.RoundID], nil) proof := Proof{ConfigID: n.ID, Round: proofMessage.RoundID, Proof: concatProof} + n.mu.Lock() + defer n.mu.Unlock() + if math.Abs(float64(n.roundLocalAggregate[proofMessage.RoundID]-globalAggregate.Value))/float64(globalAggregate.Value) > 0.3 { + log.Warn().Str("Player", "Aggregator").Str("Name", n.Name).Int32("roundId", proofMessage.RoundID).Int64("localAggregate", n.roundLocalAggregate[proofMessage.RoundID]).Int64("globalAggregate", globalAggregate.Value).Msg("local aggregate and global aggregate mismatch") + msg := bus.Message{ + From: bus.AGGREGATOR, + To: bus.FETCHER, + Content: bus.MessageContent{ + Command: bus.REFRESH_FETCHER_APP, + Args: nil, + }, + } + err = n.bus.Publish(msg) + if err != nil { + log.Warn().Str("Player", "Aggregator").Err(err).Msg("failed to publish fetcher refresh bus message") + } + } + err := PublishGlobalAggregateAndProof(ctx, n.Name, globalAggregate, proof) if err != nil { log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to publish global aggregate and proof") } - + delete(n.roundLocalAggregate, proofMessage.RoundID) } return nil } diff --git a/node/pkg/aggregator/aggregator_test.go b/node/pkg/aggregator/aggregator_test.go index e84121c44..0acfca560 100644 --- a/node/pkg/aggregator/aggregator_test.go +++ b/node/pkg/aggregator/aggregator_test.go @@ -23,14 +23,14 @@ func TestNewAggregator(t *testing.T) { } }() - _, err = NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap) + _, err = NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus) if err != nil { t.Fatal("error creating new node") } } func TestNewAggregator_Error(t *testing.T) { - _, err := NewAggregator(nil, nil, "", Config{}, nil, nil) + _, err := NewAggregator(nil, nil, "", Config{}, nil, nil, nil) assert.NotNil(t, err, "Expected error when creating new aggregator with nil parameters") } @@ -46,7 +46,7 @@ func TestLeaderJob(t *testing.T) { } }() - node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap) + node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus) if err != nil { t.Fatal("error creating new node") } @@ -70,7 +70,7 @@ func TestGetLatestRoundId(t *testing.T) { } }() - node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap) + node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus) if err != nil { t.Fatal("error creating new node") } @@ -98,7 +98,7 @@ func TestPublishGlobalAggregateAndProof(t *testing.T) { } }() - node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap) + node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus) if err != nil { t.Fatal("error creating new node") } diff --git a/node/pkg/aggregator/app.go b/node/pkg/aggregator/app.go index bacd063e0..6f2912a26 100644 --- a/node/pkg/aggregator/app.go +++ b/node/pkg/aggregator/app.go @@ -127,7 +127,7 @@ func (a *App) initializeLoadedAggregators(ctx context.Context, loadedConfigs []C } topicString := config.Name + "-global-aggregator-topic-" + strconv.Itoa(int(config.AggregateInterval)) - tmpNode, err := NewAggregator(h, ps, topicString, config, signer, a.LatestLocalAggregates) + tmpNode, err := NewAggregator(h, ps, topicString, config, signer, a.LatestLocalAggregates, a.Bus) if err != nil { return err } diff --git a/node/pkg/aggregator/globalaggregatebulkwriter_test.go b/node/pkg/aggregator/globalaggregatebulkwriter_test.go index 2dbd409c6..52e40aab3 100644 --- a/node/pkg/aggregator/globalaggregatebulkwriter_test.go +++ b/node/pkg/aggregator/globalaggregatebulkwriter_test.go @@ -86,7 +86,7 @@ func TestGlobalAggregateBulkWriterDataStore(t *testing.T) { defer bulkWriter.Stop() assert.NotEqual(t, nil, bulkWriter.ctx) - node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap) + node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus) if err != nil { t.Fatal("error creating new node") } diff --git a/node/pkg/aggregator/types.go b/node/pkg/aggregator/types.go index fcd689ecb..b350f0794 100644 --- a/node/pkg/aggregator/types.go +++ b/node/pkg/aggregator/types.go @@ -186,10 +186,12 @@ type Aggregator struct { Raft *raft.Raft LatestLocalAggregates *LatestLocalAggregates - roundTriggers *RoundTriggers - roundPrices *RoundPrices - roundPriceFixes *RoundPriceFixes - roundProofs *RoundProofs + roundLocalAggregate map[int32]int64 + + roundTriggers *RoundTriggers + roundPrices *RoundPrices + roundPriceFixes *RoundPriceFixes + roundProofs *RoundProofs RoundID int32 Signer *helper.Signer @@ -198,7 +200,8 @@ type Aggregator struct { nodeCancel context.CancelFunc isRunning bool - mu sync.RWMutex + mu sync.RWMutex + bus *bus.MessageBus } type PriceDataMessage struct { diff --git a/node/pkg/error/sentinel.go b/node/pkg/error/sentinel.go index 0b04dd2e8..1b267a38e 100644 --- a/node/pkg/error/sentinel.go +++ b/node/pkg/error/sentinel.go @@ -156,6 +156,7 @@ var ( ErrFetcherFailedToGetDexResultSlice = &CustomError{Service: Fetcher, Code: InternalError, Message: "Failed to get dex result slice"} ErrFetcherFailedBigIntConvert = &CustomError{Service: Fetcher, Code: InternalError, Message: "Failed to convert to fetched data to big.Int"} ErrFetcherFeedNotFound = &CustomError{Service: Fetcher, Code: InvalidInputError, Message: "Feed not found"} + ErrFetcherRefreshCooldown = &CustomError{Service: Fetcher, Code: InternalError, Message: "Failed to refresh data: cooldown period not over"} ErrLibP2pEmptyNonLocalAddress = &CustomError{Service: Others, Code: InternalError, Message: "Host has no non-local addresses"} ErrLibP2pAddressSplitFail = &CustomError{Service: Others, Code: InternalError, Message: "Failed to split address"} diff --git a/node/pkg/fetcher/app.go b/node/pkg/fetcher/app.go index 40649e7de..5aa80bd9d 100644 --- a/node/pkg/fetcher/app.go +++ b/node/pkg/fetcher/app.go @@ -25,6 +25,7 @@ func New(bus *bus.MessageBus) *App { }, FeedDataDumpChannel: make(chan *FeedData, DefaultFeedDataDumpChannelSize), Bus: bus, + LastRefreshTime: time.Time{}, } } @@ -63,7 +64,7 @@ func (a *App) subscribe(ctx context.Context) { } func (a *App) handleMessage(ctx context.Context, msg bus.Message) { - if msg.From != bus.ADMIN { + if msg.From != bus.ADMIN || msg.From != bus.ACTIVATE_AGGREGATOR { log.Debug().Str("Player", "Fetcher").Msg("fetcher received message from non-admin") return } @@ -93,6 +94,11 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) { } msg.Response <- bus.MessageResponse{Success: true} case bus.REFRESH_FETCHER_APP: + if !a.LastRefreshTime.IsZero() && time.Since(a.LastRefreshTime) < RefreshCooldownInterval { + bus.HandleMessageError(errorSentinel.ErrFetcherRefreshCooldown, msg, "refresh on cooldown") + return + } + a.LastRefreshTime = time.Now() err := a.stopAll(ctx) if err != nil { log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to stop all fetchers") diff --git a/node/pkg/fetcher/types.go b/node/pkg/fetcher/types.go index 09c3f1f18..398afa7af 100644 --- a/node/pkg/fetcher/types.go +++ b/node/pkg/fetcher/types.go @@ -25,6 +25,7 @@ const ( DefaultLocalAggregateInterval = 200 * time.Millisecond DefaultFeedDataDumpChannelSize = 20000 MaxOutlierRemovalRatio = 0.25 + RefreshCooldownInterval = 5 * time.Minute ) type Feed = types.Feed @@ -91,6 +92,7 @@ type App struct { LatestFeedDataMap *LatestFeedDataMap Proxies []Proxy FeedDataDumpChannel chan *FeedData + LastRefreshTime time.Time } type Definition struct { From 9dceb87d6c1ee7f9a78fcbe58658c70959f9fd8f Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 00:02:30 +0900 Subject: [PATCH 2/8] fix: bus msg skip condition --- node/pkg/fetcher/app.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/pkg/fetcher/app.go b/node/pkg/fetcher/app.go index 5aa80bd9d..1ab220f3d 100644 --- a/node/pkg/fetcher/app.go +++ b/node/pkg/fetcher/app.go @@ -64,7 +64,7 @@ func (a *App) subscribe(ctx context.Context) { } func (a *App) handleMessage(ctx context.Context, msg bus.Message) { - if msg.From != bus.ADMIN || msg.From != bus.ACTIVATE_AGGREGATOR { + if msg.From != bus.ADMIN && msg.From != bus.ACTIVATE_AGGREGATOR { log.Debug().Str("Player", "Fetcher").Msg("fetcher received message from non-admin") return } From 51e3baedba96e601a8c8611a87c09cd4b33dbe40 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 00:12:01 +0900 Subject: [PATCH 3/8] fix: check if exists before comparison --- node/pkg/aggregator/aggregator.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index f8bc7cf43..e8b69f53b 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -293,19 +293,21 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e n.mu.Lock() defer n.mu.Unlock() - if math.Abs(float64(n.roundLocalAggregate[proofMessage.RoundID]-globalAggregate.Value))/float64(globalAggregate.Value) > 0.3 { - log.Warn().Str("Player", "Aggregator").Str("Name", n.Name).Int32("roundId", proofMessage.RoundID).Int64("localAggregate", n.roundLocalAggregate[proofMessage.RoundID]).Int64("globalAggregate", globalAggregate.Value).Msg("local aggregate and global aggregate mismatch") - msg := bus.Message{ - From: bus.AGGREGATOR, - To: bus.FETCHER, - Content: bus.MessageContent{ - Command: bus.REFRESH_FETCHER_APP, - Args: nil, - }, - } - err = n.bus.Publish(msg) - if err != nil { - log.Warn().Str("Player", "Aggregator").Err(err).Msg("failed to publish fetcher refresh bus message") + if localAggregate, ok := n.roundLocalAggregate[proofMessage.RoundID]; ok { + if math.Abs(float64(localAggregate-globalAggregate.Value))/float64(globalAggregate.Value) > 0.3 { + log.Warn().Str("Player", "Aggregator").Str("Name", n.Name).Int32("roundId", proofMessage.RoundID).Int64("localAggregate", localAggregate).Int64("globalAggregate", globalAggregate.Value).Msg("local aggregate and global aggregate mismatch") + msg := bus.Message{ + From: bus.AGGREGATOR, + To: bus.FETCHER, + Content: bus.MessageContent{ + Command: bus.REFRESH_FETCHER_APP, + Args: nil, + }, + } + err = n.bus.Publish(msg) + if err != nil { + log.Warn().Str("Player", "Aggregator").Err(err).Msg("failed to publish fetcher refresh bus message") + } } } From 2f121d989697ab3d54b8258a49b0d691e96864e5 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 11:10:40 +0900 Subject: [PATCH 4/8] fix: minor fixes --- node/pkg/aggregator/aggregator.go | 4 ++++ node/pkg/error/sentinel.go | 1 + node/pkg/fetcher/app.go | 2 +- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index e8b69f53b..fb85a6179 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -63,6 +63,8 @@ func NewAggregator( Signer: signHelper, LatestLocalAggregates: latestLocalAggregates, bus: mb, + + roundLocalAggregate: map[int32]int64{}, } aggregator.Raft.LeaderJob = aggregator.LeaderJob aggregator.Raft.HandleCustomMessage = aggregator.HandleCustomMessage @@ -308,6 +310,8 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e if err != nil { log.Warn().Str("Player", "Aggregator").Err(err).Msg("failed to publish fetcher refresh bus message") } + + return errorSentinel.ErrAggregatorGlobalLocalPriceMismatch } } diff --git a/node/pkg/error/sentinel.go b/node/pkg/error/sentinel.go index 1b267a38e..46aa659b2 100644 --- a/node/pkg/error/sentinel.go +++ b/node/pkg/error/sentinel.go @@ -92,6 +92,7 @@ var ( ErrAggregatorNotFound = &CustomError{Service: Aggregator, Code: InternalError, Message: "Aggregator not found"} ErrAggregatorCancelNotFound = &CustomError{Service: Aggregator, Code: InternalError, Message: "Aggregator cancel function not found"} ErrAggregatorEmptyProof = &CustomError{Service: Aggregator, Code: InternalError, Message: "Empty proof"} + ErrAggregatorGlobalLocalPriceMismatch = &CustomError{Service: Aggregator, Code: InternalError, Message: "Global and local price mismatch"} ErrBootAPIDbPoolNotFound = &CustomError{Service: BootAPI, Code: InternalError, Message: "db pool not found"} diff --git a/node/pkg/fetcher/app.go b/node/pkg/fetcher/app.go index 1ab220f3d..aef075110 100644 --- a/node/pkg/fetcher/app.go +++ b/node/pkg/fetcher/app.go @@ -64,7 +64,7 @@ func (a *App) subscribe(ctx context.Context) { } func (a *App) handleMessage(ctx context.Context, msg bus.Message) { - if msg.From != bus.ADMIN && msg.From != bus.ACTIVATE_AGGREGATOR { + if msg.From != bus.ADMIN && msg.From != bus.AGGREGATOR { log.Debug().Str("Player", "Fetcher").Msg("fetcher received message from non-admin") return } From 2b09a1c0813cea9d6039eebbab2284ba5f8ea9e4 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 11:53:23 +0900 Subject: [PATCH 5/8] test: add test --- node/pkg/fetcher/app_test.go | 126 ++++++++++++++++++++++++++++++++++ node/pkg/fetcher/main_test.go | 2 +- 2 files changed, 127 insertions(+), 1 deletion(-) diff --git a/node/pkg/fetcher/app_test.go b/node/pkg/fetcher/app_test.go index 904fb7ce5..9a38fe254 100644 --- a/node/pkg/fetcher/app_test.go +++ b/node/pkg/fetcher/app_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "bisonai.com/miko/node/pkg/bus" "bisonai.com/miko/node/pkg/db" "github.com/stretchr/testify/assert" ) @@ -113,3 +114,128 @@ func TestAppRun(t *testing.T) { } assert.Greater(t, len(localAggregateResult), 0) } + +func TestAppRefresh(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + app := testItems.app + + err = app.initialize(ctx) + if err != nil { + t.Fatalf("error initializing fetcher: %v", err) + } + + err = app.Run(ctx) + if err != nil { + t.Fatalf("error running fetcher: %v", err) + } + for _, fetcher := range app.Fetchers { + assert.True(t, fetcher.isRunning) + } + for _, localAggregator := range app.LocalAggregators { + assert.True(t, localAggregator.isRunning) + } + assert.True(t, app.FeedDataBulkWriter.isRunning) + + time.Sleep(WAIT_SECONDS) + + resp := make(chan bus.MessageResponse) + err = testItems.messageBus.Publish( + bus.Message{ + From: bus.AGGREGATOR, + To: bus.FETCHER, + Content: bus.MessageContent{ + Command: bus.REFRESH_FETCHER_APP, + Args: nil, + }, + Response: resp, + }, + ) + if err != nil { + t.Fatalf("error publishing message: %v", err) + } + + result := <-resp + assert.Equal(t, result.Success, true) +} + +func TestAppRefreshCooldown(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + app := testItems.app + + err = app.initialize(ctx) + if err != nil { + t.Fatalf("error initializing fetcher: %v", err) + } + + err = app.Run(ctx) + if err != nil { + t.Fatalf("error running fetcher: %v", err) + } + for _, fetcher := range app.Fetchers { + assert.True(t, fetcher.isRunning) + } + for _, localAggregator := range app.LocalAggregators { + assert.True(t, localAggregator.isRunning) + } + assert.True(t, app.FeedDataBulkWriter.isRunning) + + time.Sleep(WAIT_SECONDS) + + resp := make(chan bus.MessageResponse) + err = testItems.messageBus.Publish( + bus.Message{ + From: bus.AGGREGATOR, + To: bus.FETCHER, + Content: bus.MessageContent{ + Command: bus.REFRESH_FETCHER_APP, + Args: nil, + }, + Response: resp, + }, + ) + if err != nil { + t.Fatalf("error publishing message: %v", err) + } + + result := <-resp + assert.Equal(t, result.Success, true) + + resp2 := make(chan bus.MessageResponse) + err = testItems.messageBus.Publish( + bus.Message{ + From: bus.AGGREGATOR, + To: bus.FETCHER, + Content: bus.MessageContent{ + Command: bus.REFRESH_FETCHER_APP, + Args: nil, + }, + Response: resp2, + }, + ) + if err != nil { + t.Fatalf("error publishing message: %v", err) + } + + result2 := <-resp2 + assert.False(t, result2.Success) +} diff --git a/node/pkg/fetcher/main_test.go b/node/pkg/fetcher/main_test.go index 2eb89fb06..b17825871 100644 --- a/node/pkg/fetcher/main_test.go +++ b/node/pkg/fetcher/main_test.go @@ -262,7 +262,7 @@ func cleanup(ctx context.Context, testItems *TestItems) func() error { return err } - err = testItems.app.stopAllFetchers(ctx) + err = testItems.app.stopAll(ctx) if err != nil { return err } From 88670ff8433e0e7304df7c2611c9dd8710c4d67d Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 12:21:33 +0900 Subject: [PATCH 6/8] test: fix test --- .../fetcher/localaggregatebulkwriter_test.go | 47 ++----------------- node/pkg/fetcher/utils_test.go | 2 + 2 files changed, 7 insertions(+), 42 deletions(-) diff --git a/node/pkg/fetcher/localaggregatebulkwriter_test.go b/node/pkg/fetcher/localaggregatebulkwriter_test.go index 222d14b90..cf4d74fa1 100644 --- a/node/pkg/fetcher/localaggregatebulkwriter_test.go +++ b/node/pkg/fetcher/localaggregatebulkwriter_test.go @@ -20,52 +20,15 @@ func TestLocalAggregateBulkWriter(t *testing.T) { t.Logf("Cleanup failed: %v", cleanupErr) } }() - app := testItems.app - // get configs, initialize channel, and start localAggregators - configs, err := app.getConfigs(ctx) - if err != nil { - t.Fatalf("error getting configs: %v", err) - } + go testItems.app.Run(ctx) - localAggregatesChannel := make(chan *LocalAggregate, LocalAggregatesChannelSize) - app.LocalAggregators = make(map[int32]*LocalAggregator, len(configs)) - app.LocalAggregateBulkWriter = NewLocalAggregateBulkWriter(DefaultLocalAggregateInterval) - app.LocalAggregateBulkWriter.localAggregatesChannel = localAggregatesChannel - - feedData := []*FeedData{} - for _, config := range configs { - localAggregatorFeeds, getFeedsErr := app.getFeeds(ctx, config.ID) - if getFeedsErr != nil { - t.Fatalf("error getting configs: %v", getFeedsErr) - } - app.LocalAggregators[config.ID] = NewLocalAggregator(config, localAggregatorFeeds, localAggregatesChannel, testItems.messageBus, app.LatestFeedDataMap) - for _, feed := range localAggregatorFeeds { - feedData = append(feedData, &FeedData{FeedID: feed.ID, Value: DUMMY_FEED_VALUE, Timestamp: nil, Volume: DUMMY_FEED_VALUE}) - } - } - err = app.startAllLocalAggregators(ctx) - if err != nil { - t.Fatalf("error starting localAggregators: %v", err) - } + time.Sleep(DefaultLocalAggregateInterval * 20) - err = app.LatestFeedDataMap.SetLatestFeedData(feedData) - if err != nil { - t.Fatalf("error setting latest feed data: %v", err) - } - - data := <-localAggregatesChannel - assert.Equal(t, DUMMY_FEED_VALUE, float64(data.Value)) - - go app.LocalAggregateBulkWriter.Run(ctx) - - time.Sleep(DefaultLocalAggregateInterval * 4) - - pgsqlData, pgsqlErr := db.QueryRow[LocalAggregate](ctx, "SELECT * FROM local_aggregates WHERE config_id = @config_id", map[string]any{ - "config_id": data.ConfigID, - }) + pgsqlData, pgsqlErr := db.QueryRows[LocalAggregate](ctx, "SELECT * FROM local_aggregates", nil) if pgsqlErr != nil { t.Fatalf("error getting local aggregate from pgsql: %v", pgsqlErr) } - assert.Equal(t, float64(pgsqlData.Value), DUMMY_FEED_VALUE) + assert.Greater(t, len(pgsqlData), 0) + } diff --git a/node/pkg/fetcher/utils_test.go b/node/pkg/fetcher/utils_test.go index 25ec7dcb3..c883ba3e6 100644 --- a/node/pkg/fetcher/utils_test.go +++ b/node/pkg/fetcher/utils_test.go @@ -117,6 +117,8 @@ func TestCopyFeedData(t *testing.T) { } }() + testItems.app.initialize(ctx) + feeds := testItems.insertedFeeds feedData := []*FeedData{} From c66f079742e0f4f7a8d94652178957090e66a0a8 Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 12:54:26 +0900 Subject: [PATCH 7/8] fix: deferred round local aggregate removal --- node/pkg/aggregator/aggregator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index fb85a6179..68fbcf8cc 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -295,6 +295,7 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e n.mu.Lock() defer n.mu.Unlock() + defer delete(n.roundLocalAggregate, proofMessage.RoundID) if localAggregate, ok := n.roundLocalAggregate[proofMessage.RoundID]; ok { if math.Abs(float64(localAggregate-globalAggregate.Value))/float64(globalAggregate.Value) > 0.3 { log.Warn().Str("Player", "Aggregator").Str("Name", n.Name).Int32("roundId", proofMessage.RoundID).Int64("localAggregate", localAggregate).Int64("globalAggregate", globalAggregate.Value).Msg("local aggregate and global aggregate mismatch") @@ -319,7 +320,6 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e if err != nil { log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to publish global aggregate and proof") } - delete(n.roundLocalAggregate, proofMessage.RoundID) } return nil } From 1a5d6b3f2f6f6c459392a6d65c92b7fd13c18b3b Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 15:44:57 +0900 Subject: [PATCH 8/8] fix: update based on feedbacks --- node/pkg/aggregator/aggregator.go | 6 +++--- node/pkg/aggregator/types.go | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/node/pkg/aggregator/aggregator.go b/node/pkg/aggregator/aggregator.go index 68fbcf8cc..957627be3 100644 --- a/node/pkg/aggregator/aggregator.go +++ b/node/pkg/aggregator/aggregator.go @@ -26,7 +26,7 @@ func NewAggregator( latestLocalAggregates *LatestLocalAggregates, mb *bus.MessageBus, ) (*Aggregator, error) { - if h == nil || ps == nil || topicString == "" { + if h == nil || ps == nil || topicString == "" || mb == nil { return nil, errorSentinel.ErrAggregatorInvalidInitValue } @@ -152,8 +152,8 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message) // if not enough messages collected from HandleSyncReplyMessage, it will hang in certain round value = -1 } else { - n.roundLocalAggregate[triggerMessage.RoundID] = localAggregate.Value value = localAggregate.Value + n.roundLocalAggregate[triggerMessage.RoundID] = value } return n.PublishPriceDataMessage(ctx, triggerMessage.RoundID, value, triggerMessage.Timestamp) @@ -297,7 +297,7 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e defer n.mu.Unlock() defer delete(n.roundLocalAggregate, proofMessage.RoundID) if localAggregate, ok := n.roundLocalAggregate[proofMessage.RoundID]; ok { - if math.Abs(float64(localAggregate-globalAggregate.Value))/float64(globalAggregate.Value) > 0.3 { + if math.Abs(float64(localAggregate-globalAggregate.Value))/float64(globalAggregate.Value) > GLOBAL_AGGREGATE_ERR_THRESHOLD { log.Warn().Str("Player", "Aggregator").Str("Name", n.Name).Int32("roundId", proofMessage.RoundID).Int64("localAggregate", localAggregate).Int64("globalAggregate", globalAggregate.Value).Msg("local aggregate and global aggregate mismatch") msg := bus.Message{ From: bus.AGGREGATOR, diff --git a/node/pkg/aggregator/types.go b/node/pkg/aggregator/types.go index b350f0794..e07331be1 100644 --- a/node/pkg/aggregator/types.go +++ b/node/pkg/aggregator/types.go @@ -14,7 +14,8 @@ import ( ) const ( - AGREEMENT_QUORUM = 0.5 + GLOBAL_AGGREGATE_ERR_THRESHOLD = 0.3 + AGREEMENT_QUORUM = 0.5 Trigger raft.MessageType = "trigger" PriceData raft.MessageType = "priceData"