Skip to content

Commit

Permalink
refactor(broadcast): simplified manager interface
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksander-vedvik committed May 3, 2024
1 parent bafd2ce commit cc75006
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 200 deletions.
4 changes: 1 addition & 3 deletions broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type broadcastServer struct {
view RawConfiguration
createBroadcaster func(m BroadcastMetadata, o *BroadcastOrchestrator) Broadcaster
orchestrator *BroadcastOrchestrator
manager broadcast.BroadcastManager
manager broadcast.Manager
logger *slog.Logger
metrics *broadcast.Metric
}
Expand Down Expand Up @@ -62,7 +62,6 @@ func newBroadcastServer(logger *slog.Logger, withMetrics bool) *broadcastServer
//}

func (srv *broadcastServer) stop() {
//srv.state.Prune()
srv.manager.Close()
}

Expand Down Expand Up @@ -91,6 +90,5 @@ func (srv *broadcastServer) addAddr(lis net.Listener) {
h := fnv.New32a()
_, _ = h.Write([]byte(srv.addr))
srv.id = h.Sum32()
//srv.router.AddAddr(srv.id, srv.addr)
srv.manager.AddAddr(srv.id, srv.addr)
}
55 changes: 29 additions & 26 deletions broadcast/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,38 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"
)

type BroadcastManager interface {
type Manager interface {
Process(Content) error
ProcessBroadcast(uint64, protoreflect.ProtoMessage, string, ...BroadcastOptions)
ProcessSendToClient(uint64, protoreflect.ProtoMessage, error)
Broadcast(uint64, protoreflect.ProtoMessage, string, ...BroadcastOptions)
SendToClient(uint64, protoreflect.ProtoMessage, error)
NewBroadcastID() uint64
AddAddr(id uint32, addr string)
AddServerHandler(method string, handler ServerHandler)
AddClientHandler(method string)
AddHandler(method string, handler any)
Close() error
ResetState()
GetStats() Metrics
}

type broadcastManager struct {
type manager struct {
state *BroadcastState
router *BroadcastRouter
metrics *Metric
logger *slog.Logger
}

func NewBroadcastManager(logger *slog.Logger, m *Metric, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error)) BroadcastManager {
router := NewRouter(logger, m, createClient)
func NewBroadcastManager(logger *slog.Logger, m *Metric, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error)) Manager {
state := NewState(logger, m)
for _, shard := range state.shards {
go shard.run(router, state.reqTTL, state.sendBuffer)
}
return &broadcastManager{
router := NewRouter(logger, m, createClient, state)
state.RunShards(router)
return &manager{
state: state,
router: router,
logger: logger,
metrics: m,
}
}

func (mgr *broadcastManager) Process(msg Content) error {
func (mgr *manager) Process(msg Content) error {
_, shardID, _, _ := DecodeBroadcastID(msg.BroadcastID)
shardID = shardID % NumShards
shard := mgr.state.shards[shardID]
Expand All @@ -63,7 +61,7 @@ func (mgr *broadcastManager) Process(msg Content) error {
}
}

func (mgr *broadcastManager) ProcessBroadcast(broadcastID uint64, req protoreflect.ProtoMessage, method string, opts ...BroadcastOptions) {
func (mgr *manager) Broadcast(broadcastID uint64, req protoreflect.ProtoMessage, method string, opts ...BroadcastOptions) {
var options BroadcastOptions
if len(opts) > 0 {
options = opts[0]
Expand All @@ -82,7 +80,7 @@ func (mgr *broadcastManager) ProcessBroadcast(broadcastID uint64, req protorefle
}
}

func (mgr *broadcastManager) ProcessSendToClient(broadcastID uint64, resp protoreflect.ProtoMessage, err error) {
func (mgr *manager) SendToClient(broadcastID uint64, resp protoreflect.ProtoMessage, err error) {
_, shardID, _, _ := DecodeBroadcastID(broadcastID)
shardID = shardID % NumShards
shard := mgr.state.shards[shardID]
Expand All @@ -98,31 +96,36 @@ func (mgr *broadcastManager) ProcessSendToClient(broadcastID uint64, resp protor
}
}

func (mgr *broadcastManager) NewBroadcastID() uint64 {
func (mgr *manager) NewBroadcastID() uint64 {
return mgr.state.snowflake.NewBroadcastID()
}

func (mgr *broadcastManager) AddAddr(id uint32, addr string) {
func (mgr *manager) AddAddr(id uint32, addr string) {
mgr.router.id = id
mgr.router.addr = addr
mgr.state.snowflake = NewSnowflake(addr)
}

func (mgr *broadcastManager) AddServerHandler(method string, handler ServerHandler) {
mgr.router.serverHandlers[method] = handler
func (mgr *manager) AddHandler(method string, handler any) {
switch h := handler.(type) {
case ServerHandler:
mgr.router.serverHandlers[method] = h
default:
// only needs to know whether the handler exists. routing is done
// client-side using the provided metadata in the request.
mgr.router.clientHandlers[method] = struct{}{}
}
}

func (mgr *broadcastManager) AddClientHandler(method string) {
// only needs to know whether the handler exists. routing is done
// client-side using the provided metadata in the request.
mgr.router.clientHandlers[method] = struct{}{}
func (mgr *manager) Close() error {
return mgr.state.Close()
}

func (mgr *broadcastManager) Close() error {
return mgr.state.Close()
func (mgr *manager) ResetState() {
mgr.state.reset()
}

func (mgr *broadcastManager) GetStats() Metrics {
func (mgr *manager) GetStats() Metrics {
time.Sleep(5 * time.Second)
m := mgr.state.getStats()
return Metrics{
Expand Down
9 changes: 4 additions & 5 deletions broadcast/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type BroadcastRouter struct {
methodsConversion map[string]uint16

Check failure on line 30 in broadcast/router.go

View workflow job for this annotation

GitHub Actions / lint

field `methodsConversion` is unused (unused)
serverHandlers map[string]ServerHandler // handlers on other servers
clientHandlers map[string]struct{} // specifies what handlers a client has implemented. Used only for BroadcastCalls.
clients map[string]*Client
createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error)
dialOpts []grpc.DialOption
dialTimeout time.Duration
Expand All @@ -39,7 +38,7 @@ type BroadcastRouter struct {
state *BroadcastState
}

func NewRouter(logger *slog.Logger, metrics *Metric, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error), dialOpts ...grpc.DialOption) *BroadcastRouter {
func NewRouter(logger *slog.Logger, metrics *Metric, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error), state *BroadcastState, dialOpts ...grpc.DialOption) *BroadcastRouter {
if len(dialOpts) <= 0 {
dialOpts = []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
Expand All @@ -48,12 +47,12 @@ func NewRouter(logger *slog.Logger, metrics *Metric, createClient func(addr stri
return &BroadcastRouter{
serverHandlers: make(map[string]ServerHandler),
clientHandlers: make(map[string]struct{}),
clients: make(map[string]*Client),
createClient: createClient,
dialOpts: dialOpts,
dialTimeout: 3 * time.Second,
logger: logger,
metrics: metrics,
state: state,
}
}

Expand Down Expand Up @@ -94,14 +93,14 @@ func (r *BroadcastRouter) routeClientReply(broadcastID uint64, addr, method stri
func (r *BroadcastRouter) getClient(addr string) (*Client, error) {
r.mut.Lock()
defer r.mut.Unlock()
if client, ok := r.clients[addr]; ok {
if client, ok := r.state.clients[addr]; ok {
return client, nil
}
client, err := r.createClient(addr, r.dialOpts)
if err != nil {
return nil, err
}
r.clients[addr] = client
r.state.clients[addr] = client
return client, nil
}

Expand Down
Loading

0 comments on commit cc75006

Please sign in to comment.