From 88670ff8433e0e7304df7c2611c9dd8710c4d67d Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 28 Aug 2024 12:21:33 +0900 Subject: [PATCH] test: fix test --- .../fetcher/localaggregatebulkwriter_test.go | 47 ++----------------- node/pkg/fetcher/utils_test.go | 2 + 2 files changed, 7 insertions(+), 42 deletions(-) diff --git a/node/pkg/fetcher/localaggregatebulkwriter_test.go b/node/pkg/fetcher/localaggregatebulkwriter_test.go index 222d14b90..cf4d74fa1 100644 --- a/node/pkg/fetcher/localaggregatebulkwriter_test.go +++ b/node/pkg/fetcher/localaggregatebulkwriter_test.go @@ -20,52 +20,15 @@ func TestLocalAggregateBulkWriter(t *testing.T) { t.Logf("Cleanup failed: %v", cleanupErr) } }() - app := testItems.app - // get configs, initialize channel, and start localAggregators - configs, err := app.getConfigs(ctx) - if err != nil { - t.Fatalf("error getting configs: %v", err) - } + go testItems.app.Run(ctx) - localAggregatesChannel := make(chan *LocalAggregate, LocalAggregatesChannelSize) - app.LocalAggregators = make(map[int32]*LocalAggregator, len(configs)) - app.LocalAggregateBulkWriter = NewLocalAggregateBulkWriter(DefaultLocalAggregateInterval) - app.LocalAggregateBulkWriter.localAggregatesChannel = localAggregatesChannel - - feedData := []*FeedData{} - for _, config := range configs { - localAggregatorFeeds, getFeedsErr := app.getFeeds(ctx, config.ID) - if getFeedsErr != nil { - t.Fatalf("error getting configs: %v", getFeedsErr) - } - app.LocalAggregators[config.ID] = NewLocalAggregator(config, localAggregatorFeeds, localAggregatesChannel, testItems.messageBus, app.LatestFeedDataMap) - for _, feed := range localAggregatorFeeds { - feedData = append(feedData, &FeedData{FeedID: feed.ID, Value: DUMMY_FEED_VALUE, Timestamp: nil, Volume: DUMMY_FEED_VALUE}) - } - } - err = app.startAllLocalAggregators(ctx) - if err != nil { - t.Fatalf("error starting localAggregators: %v", err) - } + time.Sleep(DefaultLocalAggregateInterval * 20) - err = app.LatestFeedDataMap.SetLatestFeedData(feedData) - if err != nil { - t.Fatalf("error setting latest feed data: %v", err) - } - - data := <-localAggregatesChannel - assert.Equal(t, DUMMY_FEED_VALUE, float64(data.Value)) - - go app.LocalAggregateBulkWriter.Run(ctx) - - time.Sleep(DefaultLocalAggregateInterval * 4) - - pgsqlData, pgsqlErr := db.QueryRow[LocalAggregate](ctx, "SELECT * FROM local_aggregates WHERE config_id = @config_id", map[string]any{ - "config_id": data.ConfigID, - }) + pgsqlData, pgsqlErr := db.QueryRows[LocalAggregate](ctx, "SELECT * FROM local_aggregates", nil) if pgsqlErr != nil { t.Fatalf("error getting local aggregate from pgsql: %v", pgsqlErr) } - assert.Equal(t, float64(pgsqlData.Value), DUMMY_FEED_VALUE) + assert.Greater(t, len(pgsqlData), 0) + } diff --git a/node/pkg/fetcher/utils_test.go b/node/pkg/fetcher/utils_test.go index 25ec7dcb3..c883ba3e6 100644 --- a/node/pkg/fetcher/utils_test.go +++ b/node/pkg/fetcher/utils_test.go @@ -117,6 +117,8 @@ func TestCopyFeedData(t *testing.T) { } }() + testItems.app.initialize(ctx) + feeds := testItems.insertedFeeds feedData := []*FeedData{}