Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(statement-distribution):Implement FanIn approach to statement distribution #4406

Draft
wants to merge 3 commits into
base: feat/parachain
Choose a base branch
from

Conversation

DanielDDHM
Copy link

Changes

Implement FanIn approach to statement distribution

Tests

Unit Tests: Core logic is tested in isolation using testify for assertions

go test -tags integration github.com/ChainSafe/gossamer

Issues

@DanielDDHM DanielDDHM changed the base branch from development to feat/parachain December 12, 2024 19:00
@DanielDDHM
Copy link
Author

DanielDDHM commented Dec 14, 2024

@EclesioMeloJunior Currently, if i follow the implementation in pseudo code, we need to require passing all channels explicitly as function arguments, which can make the code less flexible and harder to maintain as new channels are added or removed.
Wouldn't it be more efficient and scalable to use a dynamic approach, such as passing a map of channels (map[string]<-chan any>), as shown below?
This way, we avoid modifying the function signature whenever channels change, and the system becomes more modular:

in run

func (s StatementDistribution) Run(
	ctx context.Context,
	channels map[string]<-chan any,
) {
	muxedChannel := FanIn(ctx, channels)

	for {
		select {
		case muxedMsg := <-muxedChannel:
			err := s.processMuxedMessage(muxedMsg)
			if err != nil {
				logger.Errorf("error processing muxed message: %v", err)
			}
		case <-ctx.Done():
			logger.Infof("shutting down: %v", ctx.Err())
			return
		}
	}
}

fanIn Function

func FanIn(
    ctx context.Context,
    channels map[string]<-chan any,
) <-chan MuxedMessage {
    output := make(chan MuxedMessage)

    go func() {
        defer close(output)
        for {
            select {
            case <-ctx.Done():
                return
            default:
                // Iterate through channels
                for source, ch := range channels {
                    select {
                    case msg, ok := <-ch:
                        if ok {
                            output <- MuxedMessage{Source: source, Message: msg}
                        }
                    default:
                        // Non-blocking check
                    }
                }
            }
        }
    }()

    return output
}

what do you think?

@EclesioMeloJunior
Copy link
Member

@DanielDDHM using the for loop is too much given that the amount of channels don't change, also the Run method only receives the overseer channel the other channels are created inside the Run method.

Also you should define a timer to trigger the reputation aggregator to send the reputation changes.

So what you can do is, instead of the FanIn function, have all the channels in the Run method select statement, that should keep things simple.

Btw, I've changed the task to don't include the V1 related channels given that they are deprecated and will be removed soon.

@DanielDDHM
Copy link
Author

@DanielDDHM using the for loop is too much given that the amount of channels don't change, also the Run method only receives the overseer channel the other channels are created inside the Run method.

Also you should define a timer to trigger the reputation aggregator to send the reputation changes.

So what you can do is, instead of the FanIn function, have all the channels in the Run method select statement, that should keep things simple.

Btw, I've changed the task to don't include the V1 related channels given that they are deprecated and will be removed soon.

@EclesioMeloJunior could you take a look on the new approach?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat(statement-distribution) Implement FanIn approach to statement distribution
2 participants