Skip to content

Commit

Permalink
fix: fix wrong implementation for redis sub
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 24, 2024
1 parent 6f0794f commit e59cbf9
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 45 deletions.
2 changes: 1 addition & 1 deletion node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e
concatProof := bytes.Join(n.roundProofs.proofs[proofMessage.RoundID], nil)
proof := Proof{ConfigID: n.ID, Round: proofMessage.RoundID, Proof: concatProof}

err := PublishGlobalAggregateAndProof(ctx, globalAggregate, proof)
err := PublishGlobalAggregateAndProof(ctx, n.Name, globalAggregate, proof)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to publish global aggregate and proof")
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ func TestPublishGlobalAggregateAndProof(t *testing.T) {
}

ch := make(chan SubmissionData)
err = db.Subscribe(ctx, keys.SubmissionDataStreamKey(node.ID), ch)
err = db.Subscribe(ctx, keys.SubmissionDataStreamKeyV2(node.Name), ch)
if err != nil {
t.Fatal("error subscribing to stream")
}

err = PublishGlobalAggregateAndProof(ctx, testItems.tmpData.globalAggregate, proof)
err = PublishGlobalAggregateAndProof(ctx, "test_pair", testItems.tmpData.globalAggregate, proof)
if err != nil {
t.Fatal("error publishing global aggregate and proof")
}
Expand Down
6 changes: 3 additions & 3 deletions node/pkg/aggregator/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func (a *App) setGlobalAggregateBulkWriter(configs []Config) {
a.stopGlobalAggregateBulkWriter()
}

configIds := make([]int32, len(configs))
configNames := make([]string, len(configs))
for i, config := range configs {
configIds[i] = config.ID
configNames[i] = config.Name
}

a.GlobalAggregateBulkWriter = NewGlobalAggregateBulkWriter(WithConfigIds(configIds))
a.GlobalAggregateBulkWriter = NewGlobalAggregateBulkWriter(WithConfigNames(configNames))
}

func (a *App) startGlobalAggregateBulkWriter(ctx context.Context) {
Expand Down
24 changes: 12 additions & 12 deletions node/pkg/aggregator/globalaggregatebulkwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ bulk insert proofs and aggregates into pgsql
*/

type GlobalAggregateBulkWriter struct {
ReceiveChannels map[int32]chan SubmissionData
ReceiveChannels map[string]chan SubmissionData
Buffer chan SubmissionData

LatestDataUpdateInterval time.Duration
Expand All @@ -30,7 +30,7 @@ const DefaultBufferSize = 2000
type GlobalAggregateBulkWriterConfig struct {
PgsqlBulkInsertInterval time.Duration
BufferSize int
ConfigIds []int32
ConfigNames []string
}

type GlobalAggregateBulkWriterOption func(*GlobalAggregateBulkWriterConfig)
Expand All @@ -47,9 +47,9 @@ func WithBufferSize(size int) GlobalAggregateBulkWriterOption {
}
}

func WithConfigIds(configIds []int32) GlobalAggregateBulkWriterOption {
func WithConfigNames(configNames []string) GlobalAggregateBulkWriterOption {
return func(config *GlobalAggregateBulkWriterConfig) {
config.ConfigIds = configIds
config.ConfigNames = configNames
}
}

Expand All @@ -63,14 +63,14 @@ func NewGlobalAggregateBulkWriter(opts ...GlobalAggregateBulkWriterOption) *Glob
}

result := &GlobalAggregateBulkWriter{
ReceiveChannels: make(map[int32]chan SubmissionData, len(config.ConfigIds)),
ReceiveChannels: make(map[string]chan SubmissionData, len(config.ConfigNames)),
Buffer: make(chan SubmissionData, config.BufferSize),

PgsqlBulkInsertInterval: config.PgsqlBulkInsertInterval,
}

for _, configId := range config.ConfigIds {
result.ReceiveChannels[configId] = make(chan SubmissionData)
for _, configName := range config.ConfigNames {
result.ReceiveChannels[configName] = make(chan SubmissionData)
}

return result
Expand Down Expand Up @@ -102,21 +102,21 @@ func (s *GlobalAggregateBulkWriter) Stop() {
}

func (s *GlobalAggregateBulkWriter) receive(ctx context.Context) {
for id := range s.ReceiveChannels {
go s.receiveEach(ctx, id)
for name := range s.ReceiveChannels {
go s.receiveEach(ctx, name)
}
}

func (s *GlobalAggregateBulkWriter) receiveEach(ctx context.Context, configId int32) {
err := db.Subscribe(ctx, keys.SubmissionDataStreamKey(configId), s.ReceiveChannels[configId])
func (s *GlobalAggregateBulkWriter) receiveEach(ctx context.Context, configName string) {
err := db.Subscribe(ctx, keys.SubmissionDataStreamKeyV2(configName), s.ReceiveChannels[configName])
if err != nil {
log.Error().Err(err).Str("Player", "Aggregator").Msg("failed to subscribe to submission stream")
}
for {
select {
case <-ctx.Done():
return
case data := <-s.ReceiveChannels[configId]:
case data := <-s.ReceiveChannels[configName]:
s.Buffer <- data
}
}
Expand Down
10 changes: 5 additions & 5 deletions node/pkg/aggregator/globalaggregatebulkwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestNewGlobalAggregateBulkWriter(t *testing.T) {
}
}()

_ = NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID}))
_ = NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))
if err != nil {
t.Fatal("error creating new node")
}
Expand All @@ -40,7 +40,7 @@ func TestGlobalAggregateBulkWriterStart(t *testing.T) {
}
}()

bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID}))
bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))

bulkWriter.Start(ctx)

Expand All @@ -59,7 +59,7 @@ func TestGlobalAggregateBulkWriterStop(t *testing.T) {
}
}()

bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.config.ID}))
bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))

bulkWriter.Start(ctx)

Expand All @@ -80,7 +80,7 @@ func TestGlobalAggregateBulkWriterDataStore(t *testing.T) {
}
}()

bulkWriter := NewGlobalAggregateBulkWriter(WithConfigIds([]int32{testItems.tmpData.globalAggregate.ConfigID}))
bulkWriter := NewGlobalAggregateBulkWriter(WithConfigNames([]string{testItems.tmpData.config.Name}))

bulkWriter.Start(ctx)
defer bulkWriter.Stop()
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestGlobalAggregateBulkWriterDataStore(t *testing.T) {
}

time.Sleep(time.Millisecond * 50)
err = PublishGlobalAggregateAndProof(ctx, testItems.tmpData.globalAggregate, proof)
err = PublishGlobalAggregateAndProof(ctx, "test_pair", testItems.tmpData.globalAggregate, proof)
if err != nil {
t.Fatal("error publishing global aggregate and proof")
}
Expand Down
5 changes: 2 additions & 3 deletions node/pkg/aggregator/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ func FilterNegative(values []int64) []int64 {
return result
}

func PublishGlobalAggregateAndProof(ctx context.Context, globalAggregate GlobalAggregate, proof Proof) error {
func PublishGlobalAggregateAndProof(ctx context.Context, name string, globalAggregate GlobalAggregate, proof Proof) error {
if globalAggregate.Value == 0 || globalAggregate.Timestamp.IsZero() {
return nil
}
data := SubmissionData{
GlobalAggregate: globalAggregate,
Proof: proof,
}

return db.Publish(ctx, keys.SubmissionDataStreamKey(globalAggregate.ConfigID), data)
return db.Publish(ctx, keys.SubmissionDataStreamKeyV2(name), data)
}

func getLatestRoundId(ctx context.Context, configId int32) (int32, error) {
Expand Down
8 changes: 2 additions & 6 deletions node/pkg/common/keys/keys.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package keys

import (
"strconv"
)

func SubmissionDataStreamKey(configId int32) string {
return "submissionDataStream:" + strconv.Itoa(int(configId))
func SubmissionDataStreamKeyV2(name string) string {
return "submissionDataSteram:" + name
}
13 changes: 10 additions & 3 deletions node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,24 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {
collector.OutgoingStream[config.ID] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.Symbols[config.ID] = config.Name
collector.FeedHashes[config.ID] = crypto.Keccak256([]byte(config.Name))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.ID))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKeyV2(config.Name))
}

baseRediscribe, err := db.NewRediscribe(ctx, db.WithRedisHost(baseRedisHost), db.WithRedisPort(baseRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter))
baseRediscribe, err := db.NewRediscribe(
ctx,
db.WithRedisHost(baseRedisHost),
db.WithRedisPort(baseRedisPort),
db.WithRedisTopics(redisTopics),
db.WithRedisRouter(collector.redisRouter))
if err != nil {
return nil, err
}
collector.baseRediscribe = baseRediscribe

if subRedisHost != "" && subRedisPort != "" {
subRediscribe, err := db.NewRediscribe(ctx, db.WithRedisHost(subRedisHost), db.WithRedisPort(subRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter))
subRediscribe, err := db.NewRediscribe(
ctx,
db.WithRedisHost(subRedisHost), db.WithRedisPort(subRedisPort), db.WithRedisTopics(redisTopics), db.WithRedisRouter(collector.redisRouter))
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions node/pkg/dal/tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestApiGetLatestAll(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestApiGetLatest(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestApiGetLatestTransposeAll(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestApiGetLatestTranspose(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestApiWebsocket(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down Expand Up @@ -473,7 +473,7 @@ func TestApiWebsocket(t *testing.T) {
}

// Publish data
err = testPublishData(ctx, *expectedData)
err = testPublishData(ctx, "test-aggregate", *expectedData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/dal/tests/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestCollectorStream(t *testing.T) {
}

log.Debug().Msg("Publishing data")
err = testPublishData(ctx, *sampleSubmissionData)
err = testPublishData(ctx, "test-aggregate", *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing data: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/dal/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type TestItems struct {
StatsApp *stats.StatsApp
}

func testPublishData(ctx context.Context, submissionData aggregator.SubmissionData) error {
return db.Publish(ctx, keys.SubmissionDataStreamKey(submissionData.GlobalAggregate.ConfigID), submissionData)
func testPublishData(ctx context.Context, name string, submissionData aggregator.SubmissionData) error {
return db.Publish(ctx, keys.SubmissionDataStreamKeyV2(name), submissionData)
}

func generateSampleSubmissionData(configId int32, value int64, timestamp time.Time, round int32, symbol string) (*aggregator.SubmissionData, error) {
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/reporter/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestWsDataHandling(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

err = db.Publish(ctx, keys.SubmissionDataStreamKey(sampleSubmissionData.GlobalAggregate.ConfigID), sampleSubmissionData)
err = db.Publish(ctx, keys.SubmissionDataStreamKeyV2("test-aggregate"), sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}
Expand Down

0 comments on commit e59cbf9

Please sign in to comment.