Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(OraklNode) Refresh fetcher when global aggregate and local aggregate price differ by more than 30% #2242

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"bytes"
"context"
"encoding/json"

"math"
"time"

"bisonai.com/miko/node/pkg/bus"
"bisonai.com/miko/node/pkg/chain/helper"
errorSentinel "bisonai.com/miko/node/pkg/error"
"bisonai.com/miko/node/pkg/raft"
Expand All @@ -16,7 +17,15 @@ import (
"github.com/rs/zerolog/log"
)

func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Config, signHelper *helper.Signer, latestLocalAggregates *LatestLocalAggregates) (*Aggregator, error) {
func NewAggregator(
h host.Host,
ps *pubsub.PubSub,
topicString string,
config Config,
signHelper *helper.Signer,
latestLocalAggregates *LatestLocalAggregates,
mb *bus.MessageBus,
) (*Aggregator, error) {
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
if h == nil || ps == nil || topicString == "" {
return nil, errorSentinel.ErrAggregatorInvalidInitValue
}
Expand Down Expand Up @@ -53,6 +62,9 @@ func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Co
RoundID: 1,
Signer: signHelper,
LatestLocalAggregates: latestLocalAggregates,
bus: mb,

roundLocalAggregate: map[int32]int64{},
}
aggregator.Raft.LeaderJob = aggregator.LeaderJob
aggregator.Raft.HandleCustomMessage = aggregator.HandleCustomMessage
Expand Down Expand Up @@ -102,16 +114,14 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message)
return err
}
defer n.leaveOnlyLast10Entries(triggerMessage.RoundID)

n.mu.Lock()
defer n.mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are two locks in this function both of which are released when function returns. I wonder if it'd be same result having a single lock 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scope which n.mu.Lock() and n.roundTriggers.mu.Lock() are having are different so it probably shouldn't have the same result 😅

to be more precise, it might be better to have separate mutex for roundLocalAggregate value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought n.mu.Lock() can do the job of n.roundTriggers.mu.Lock() but seems like it's worth keeping both since it introduces more clarity to the logic?

if msg.SentFrom != n.Raft.GetHostId() {
// follower can be changed into leader unexpectedly before recieving the message
// increase round id before checking the message sent from leader
// so that the next round will be triggered with larger round id
// prevents already handled message error

n.mu.Lock()
n.RoundID = max(triggerMessage.RoundID, n.RoundID)
n.mu.Unlock()
}

if triggerMessage.RoundID == 0 {
Expand Down Expand Up @@ -142,6 +152,7 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message)
// if not enough messages collected from HandleSyncReplyMessage, it will hang in certain round
value = -1
} else {
n.roundLocalAggregate[triggerMessage.RoundID] = localAggregate.Value
value = localAggregate.Value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n.roundLocalAggregate[triggerMessage.RoundID] = value

}

Expand Down Expand Up @@ -282,11 +293,33 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e
concatProof := bytes.Join(n.roundProofs.proofs[proofMessage.RoundID], nil)
proof := Proof{ConfigID: n.ID, Round: proofMessage.RoundID, Proof: concatProof}

n.mu.Lock()
defer n.mu.Unlock()
defer delete(n.roundLocalAggregate, proofMessage.RoundID)
if localAggregate, ok := n.roundLocalAggregate[proofMessage.RoundID]; ok {
if math.Abs(float64(localAggregate-globalAggregate.Value))/float64(globalAggregate.Value) > 0.3 {
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
log.Warn().Str("Player", "Aggregator").Str("Name", n.Name).Int32("roundId", proofMessage.RoundID).Int64("localAggregate", localAggregate).Int64("globalAggregate", globalAggregate.Value).Msg("local aggregate and global aggregate mismatch")
msg := bus.Message{
From: bus.AGGREGATOR,
To: bus.FETCHER,
Content: bus.MessageContent{
Command: bus.REFRESH_FETCHER_APP,
Args: nil,
},
}
err = n.bus.Publish(msg)
if err != nil {
log.Warn().Str("Player", "Aggregator").Err(err).Msg("failed to publish fetcher refresh bus message")
}

return errorSentinel.ErrAggregatorGlobalLocalPriceMismatch
}
}

nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
err := PublishGlobalAggregateAndProof(ctx, n.Name, globalAggregate, proof)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to publish global aggregate and proof")
}

}
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions node/pkg/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ func TestNewAggregator(t *testing.T) {
}
}()

_, err = NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap)
_, err = NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus)
if err != nil {
t.Fatal("error creating new node")
}
}

func TestNewAggregator_Error(t *testing.T) {
_, err := NewAggregator(nil, nil, "", Config{}, nil, nil)
_, err := NewAggregator(nil, nil, "", Config{}, nil, nil, nil)
assert.NotNil(t, err, "Expected error when creating new aggregator with nil parameters")
}

Expand All @@ -46,7 +46,7 @@ func TestLeaderJob(t *testing.T) {
}
}()

node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap)
node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus)
if err != nil {
t.Fatal("error creating new node")
}
Expand All @@ -70,7 +70,7 @@ func TestGetLatestRoundId(t *testing.T) {
}
}()

node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap)
node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus)
if err != nil {
t.Fatal("error creating new node")
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestPublishGlobalAggregateAndProof(t *testing.T) {
}
}()

node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap)
node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus)
if err != nil {
t.Fatal("error creating new node")
}
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/aggregator/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (a *App) initializeLoadedAggregators(ctx context.Context, loadedConfigs []C
}

topicString := config.Name + "-global-aggregator-topic-" + strconv.Itoa(int(config.AggregateInterval))
tmpNode, err := NewAggregator(h, ps, topicString, config, signer, a.LatestLocalAggregates)
tmpNode, err := NewAggregator(h, ps, topicString, config, signer, a.LatestLocalAggregates, a.Bus)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/aggregator/globalaggregatebulkwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestGlobalAggregateBulkWriterDataStore(t *testing.T) {
defer bulkWriter.Stop()
assert.NotEqual(t, nil, bulkWriter.ctx)

node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap)
node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString, testItems.tmpData.config, testItems.signer, testItems.latestLocalAggMap, testItems.app.Bus)
if err != nil {
t.Fatal("error creating new node")
}
Expand Down
13 changes: 8 additions & 5 deletions node/pkg/aggregator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,12 @@ type Aggregator struct {
Raft *raft.Raft

LatestLocalAggregates *LatestLocalAggregates
roundTriggers *RoundTriggers
roundPrices *RoundPrices
roundPriceFixes *RoundPriceFixes
roundProofs *RoundProofs
roundLocalAggregate map[int32]int64

roundTriggers *RoundTriggers
roundPrices *RoundPrices
roundPriceFixes *RoundPriceFixes
roundProofs *RoundProofs

RoundID int32
Signer *helper.Signer
Expand All @@ -198,7 +200,8 @@ type Aggregator struct {
nodeCancel context.CancelFunc
isRunning bool

mu sync.RWMutex
mu sync.RWMutex
bus *bus.MessageBus
}

type PriceDataMessage struct {
Expand Down
2 changes: 2 additions & 0 deletions node/pkg/error/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ var (
ErrAggregatorNotFound = &CustomError{Service: Aggregator, Code: InternalError, Message: "Aggregator not found"}
ErrAggregatorCancelNotFound = &CustomError{Service: Aggregator, Code: InternalError, Message: "Aggregator cancel function not found"}
ErrAggregatorEmptyProof = &CustomError{Service: Aggregator, Code: InternalError, Message: "Empty proof"}
ErrAggregatorGlobalLocalPriceMismatch = &CustomError{Service: Aggregator, Code: InternalError, Message: "Global and local price mismatch"}

ErrBootAPIDbPoolNotFound = &CustomError{Service: BootAPI, Code: InternalError, Message: "db pool not found"}

Expand Down Expand Up @@ -156,6 +157,7 @@ var (
ErrFetcherFailedToGetDexResultSlice = &CustomError{Service: Fetcher, Code: InternalError, Message: "Failed to get dex result slice"}
ErrFetcherFailedBigIntConvert = &CustomError{Service: Fetcher, Code: InternalError, Message: "Failed to convert to fetched data to big.Int"}
ErrFetcherFeedNotFound = &CustomError{Service: Fetcher, Code: InvalidInputError, Message: "Feed not found"}
ErrFetcherRefreshCooldown = &CustomError{Service: Fetcher, Code: InternalError, Message: "Failed to refresh data: cooldown period not over"}

ErrLibP2pEmptyNonLocalAddress = &CustomError{Service: Others, Code: InternalError, Message: "Host has no non-local addresses"}
ErrLibP2pAddressSplitFail = &CustomError{Service: Others, Code: InternalError, Message: "Failed to split address"}
Expand Down
8 changes: 7 additions & 1 deletion node/pkg/fetcher/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func New(bus *bus.MessageBus) *App {
},
FeedDataDumpChannel: make(chan *FeedData, DefaultFeedDataDumpChannelSize),
Bus: bus,
LastRefreshTime: time.Time{},
}
}

Expand Down Expand Up @@ -63,7 +64,7 @@ func (a *App) subscribe(ctx context.Context) {
}

func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
if msg.From != bus.ADMIN {
if msg.From != bus.ADMIN && msg.From != bus.AGGREGATOR {
log.Debug().Str("Player", "Fetcher").Msg("fetcher received message from non-admin")
return
}
Expand Down Expand Up @@ -93,6 +94,11 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
}
msg.Response <- bus.MessageResponse{Success: true}
case bus.REFRESH_FETCHER_APP:
if !a.LastRefreshTime.IsZero() && time.Since(a.LastRefreshTime) < RefreshCooldownInterval {
bus.HandleMessageError(errorSentinel.ErrFetcherRefreshCooldown, msg, "refresh on cooldown")
return
}
a.LastRefreshTime = time.Now()
err := a.stopAll(ctx)
if err != nil {
log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to stop all fetchers")
Expand Down
126 changes: 126 additions & 0 deletions node/pkg/fetcher/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"bisonai.com/miko/node/pkg/bus"
"bisonai.com/miko/node/pkg/db"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -113,3 +114,128 @@ func TestAppRun(t *testing.T) {
}
assert.Greater(t, len(localAggregateResult), 0)
}

func TestAppRefresh(t *testing.T) {
ctx := context.Background()
clean, testItems, err := setup(ctx)
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer func() {
if cleanupErr := clean(); cleanupErr != nil {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()

app := testItems.app

err = app.initialize(ctx)
if err != nil {
t.Fatalf("error initializing fetcher: %v", err)
}

err = app.Run(ctx)
if err != nil {
t.Fatalf("error running fetcher: %v", err)
}
for _, fetcher := range app.Fetchers {
assert.True(t, fetcher.isRunning)
}
for _, localAggregator := range app.LocalAggregators {
assert.True(t, localAggregator.isRunning)
}
assert.True(t, app.FeedDataBulkWriter.isRunning)

time.Sleep(WAIT_SECONDS)

resp := make(chan bus.MessageResponse)
err = testItems.messageBus.Publish(
bus.Message{
From: bus.AGGREGATOR,
To: bus.FETCHER,
Content: bus.MessageContent{
Command: bus.REFRESH_FETCHER_APP,
Args: nil,
},
Response: resp,
},
)
if err != nil {
t.Fatalf("error publishing message: %v", err)
}

result := <-resp
assert.Equal(t, result.Success, true)
}

func TestAppRefreshCooldown(t *testing.T) {
ctx := context.Background()
clean, testItems, err := setup(ctx)
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer func() {
if cleanupErr := clean(); cleanupErr != nil {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()

app := testItems.app

err = app.initialize(ctx)
if err != nil {
t.Fatalf("error initializing fetcher: %v", err)
}

err = app.Run(ctx)
if err != nil {
t.Fatalf("error running fetcher: %v", err)
}
for _, fetcher := range app.Fetchers {
assert.True(t, fetcher.isRunning)
}
for _, localAggregator := range app.LocalAggregators {
assert.True(t, localAggregator.isRunning)
}
assert.True(t, app.FeedDataBulkWriter.isRunning)

time.Sleep(WAIT_SECONDS)

resp := make(chan bus.MessageResponse)
err = testItems.messageBus.Publish(
bus.Message{
From: bus.AGGREGATOR,
To: bus.FETCHER,
Content: bus.MessageContent{
Command: bus.REFRESH_FETCHER_APP,
Args: nil,
},
Response: resp,
},
)
if err != nil {
t.Fatalf("error publishing message: %v", err)
}

result := <-resp
assert.Equal(t, result.Success, true)

resp2 := make(chan bus.MessageResponse)
err = testItems.messageBus.Publish(
bus.Message{
From: bus.AGGREGATOR,
To: bus.FETCHER,
Content: bus.MessageContent{
Command: bus.REFRESH_FETCHER_APP,
Args: nil,
},
Response: resp2,
},
)
if err != nil {
t.Fatalf("error publishing message: %v", err)
}

result2 := <-resp2
assert.False(t, result2.Success)
}
Loading