Skip to content

Commit

Permalink
test: fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 28, 2024
1 parent 2b09a1c commit 88670ff
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 42 deletions.
47 changes: 5 additions & 42 deletions node/pkg/fetcher/localaggregatebulkwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,52 +20,15 @@ func TestLocalAggregateBulkWriter(t *testing.T) {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()
app := testItems.app

// get configs, initialize channel, and start localAggregators
configs, err := app.getConfigs(ctx)
if err != nil {
t.Fatalf("error getting configs: %v", err)
}
go testItems.app.Run(ctx)

localAggregatesChannel := make(chan *LocalAggregate, LocalAggregatesChannelSize)
app.LocalAggregators = make(map[int32]*LocalAggregator, len(configs))
app.LocalAggregateBulkWriter = NewLocalAggregateBulkWriter(DefaultLocalAggregateInterval)
app.LocalAggregateBulkWriter.localAggregatesChannel = localAggregatesChannel

feedData := []*FeedData{}
for _, config := range configs {
localAggregatorFeeds, getFeedsErr := app.getFeeds(ctx, config.ID)
if getFeedsErr != nil {
t.Fatalf("error getting configs: %v", getFeedsErr)
}
app.LocalAggregators[config.ID] = NewLocalAggregator(config, localAggregatorFeeds, localAggregatesChannel, testItems.messageBus, app.LatestFeedDataMap)
for _, feed := range localAggregatorFeeds {
feedData = append(feedData, &FeedData{FeedID: feed.ID, Value: DUMMY_FEED_VALUE, Timestamp: nil, Volume: DUMMY_FEED_VALUE})
}
}
err = app.startAllLocalAggregators(ctx)
if err != nil {
t.Fatalf("error starting localAggregators: %v", err)
}
time.Sleep(DefaultLocalAggregateInterval * 20)

err = app.LatestFeedDataMap.SetLatestFeedData(feedData)
if err != nil {
t.Fatalf("error setting latest feed data: %v", err)
}

data := <-localAggregatesChannel
assert.Equal(t, DUMMY_FEED_VALUE, float64(data.Value))

go app.LocalAggregateBulkWriter.Run(ctx)

time.Sleep(DefaultLocalAggregateInterval * 4)

pgsqlData, pgsqlErr := db.QueryRow[LocalAggregate](ctx, "SELECT * FROM local_aggregates WHERE config_id = @config_id", map[string]any{
"config_id": data.ConfigID,
})
pgsqlData, pgsqlErr := db.QueryRows[LocalAggregate](ctx, "SELECT * FROM local_aggregates", nil)
if pgsqlErr != nil {
t.Fatalf("error getting local aggregate from pgsql: %v", pgsqlErr)
}
assert.Equal(t, float64(pgsqlData.Value), DUMMY_FEED_VALUE)
assert.Greater(t, len(pgsqlData), 0)

}
2 changes: 2 additions & 0 deletions node/pkg/fetcher/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func TestCopyFeedData(t *testing.T) {
}
}()

testItems.app.initialize(ctx)

feeds := testItems.insertedFeeds
feedData := []*FeedData{}

Expand Down

0 comments on commit 88670ff

Please sign in to comment.