Skip to content

Commit

Permalink
feat(statement-distribution):Implement FanIn approach to statement di…
Browse files Browse the repository at this point in the history
…stribution
  • Loading branch information
DanielDDHM committed Jan 10, 2025
1 parent 4e19d34 commit 6467711
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions dot/parachain/statement-distribution/statement_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type responderMessage struct {

func (*responderMessage) isMuxedMessage() {}

type reputationChangeMessage struct{}

func (*reputationChangeMessage) isMuxedMessage() {}

// Run just receives the ctx and a channel from the overseer to subsystem
func (s *StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) {
// Inside the method Run, we spawn a goroutine to handle network incoming requests
Expand All @@ -43,10 +47,11 @@ func (s *StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-c
defer reputationDelay.Stop()

for {
message := s.awaitMessageFrom(overseerToSubSystem, responderCh)
message := s.awaitMessageFrom(overseerToSubSystem, responderCh, reputationDelay.C)

switch innerMessage := message.(type) {
// Handle each muxed message type
case *reputationChangeMessage:
logger.Info("Reputation change triggered.")
default:
logger.Warn("Unhandled message type: " + fmt.Sprintf("%v", innerMessage))
}
Expand All @@ -55,12 +60,18 @@ func (s *StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-c

func taskResponder(responderCh chan any) {}

// awaitMessageFrom waits for messages from either the overseerToSubSystem or responderCh
func (s *StatementDistribution) awaitMessageFrom(overseerToSubSystem <-chan any, responderCh chan any) MuxedMessage {
// awaitMessageFrom waits for messages from either the overseerToSubSystem, responderCh, or reputationDelay
func (s *StatementDistribution) awaitMessageFrom(
overseerToSubSystem <-chan any,
responderCh chan any,
reputationDelay <-chan time.Time,
) MuxedMessage {
select {
case msg := <-overseerToSubSystem:
return &overseerMessage{inner: msg}
case msg := <-responderCh:
return &responderMessage{inner: msg}
case <-reputationDelay:
return &reputationChangeMessage{}
}
}

0 comments on commit 6467711

Please sign in to comment.