Skip to content

Commit

Permalink
refactor(broadcast): improved organization
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksander-vedvik committed May 11, 2024
1 parent e7ab4e1 commit 4ff4d75
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 389 deletions.
186 changes: 140 additions & 46 deletions broadcast.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package gorums

import (
"fmt"
"context"
"hash/fnv"
"log/slog"
"net"
"strings"
"sync"
"time"

"github.com/relab/gorums/broadcast"
"github.com/relab/gorums/ordering"
"google.golang.org/protobuf/reflect/protoreflect"
)

type broadcastServer struct {
Expand All @@ -20,66 +24,24 @@ type broadcastServer struct {
orchestrator *BroadcastOrchestrator
manager broadcast.Manager
logger *slog.Logger
metrics *broadcast.Metric
}

func (srv *Server) PrintStats() {
fmt.Println(srv.broadcastSrv.metrics.GetStats())
srv.broadcastSrv.metrics.Reset()
}

func (srv *Server) GetStats() broadcast.Metrics {
//m := srv.broadcastSrv.metrics.GetStats()
//srv.broadcastSrv.metrics.Reset()
//return m
return srv.broadcastSrv.manager.GetStats()
}

func newBroadcastServer(logger *slog.Logger, withMetrics bool) *broadcastServer {
var m *broadcast.Metric = nil
if withMetrics {
//m = broadcast.NewMetric()
}
func newBroadcastServer(logger *slog.Logger) *broadcastServer {
srv := &broadcastServer{
logger: logger,
metrics: m,
logger: logger,
}
srv.manager = broadcast.NewBroadcastManager(logger, m, createClient, srv.canceler)
srv.manager = broadcast.NewBroadcastManager(logger, createClient, srv.canceler)
return srv
}

//func newBroadcastServer(logger *slog.Logger, withMetrics bool) *broadcastServer {
//var m *broadcast.Metric = nil
//if withMetrics {
//m = broadcast.NewMetric()
//}
//router := broadcast.NewRouter(logger, m, createClient)
//return &broadcastServer{
//router: router,
//state: broadcast.NewState(logger, router, m),
//logger: logger,
//metrics: m,
//}
//}

func (srv *broadcastServer) stop() {
srv.manager.Close()
}

//type BroadcastState interface {
//Prune() error
//Process(broadcast.Content) error
//ProcessBroadcast(uint64, protoreflect.ProtoMessage, string)
//ProcessSendToClient(uint64, protoreflect.ProtoMessage, error)
//NewBroadcastID() uint64
//}

//type BroadcastRouter interface {
//AddAddr(id uint32, addr string)
//AddServerHandler(method string, handler broadcast.ServerHandler)
//AddClientHandler(method string, handler broadcast.ClientHandler)
//}

type Snowflake interface {
NewBroadcastID() uint64
}
Expand All @@ -93,3 +55,135 @@ func (srv *broadcastServer) addAddr(lis net.Listener) {
srv.id = h.Sum32()
srv.manager.AddAddr(srv.id, srv.addr)
}

const (
BroadcastID string = "broadcastID"
)

type BroadcastHandlerFunc func(method string, req protoreflect.ProtoMessage, broadcastID uint64, options ...broadcast.BroadcastOptions)
type BroadcastForwardHandlerFunc func(req protoreflect.ProtoMessage, method string, broadcastID uint64, forwardAddr, originAddr string)
type BroadcastServerHandlerFunc func(method string, req protoreflect.ProtoMessage, options ...broadcast.BroadcastOptions)
type BroadcastSendToClientHandlerFunc func(broadcastID uint64, resp protoreflect.ProtoMessage, err error)
type CancelHandlerFunc func(broadcastID uint64, srvAddrs []string)
type DoneHandlerFunc func(broadcastID uint64)

type defaultImplementationFunc[T protoreflect.ProtoMessage, V protoreflect.ProtoMessage] func(ServerCtx, T) (V, error)
type clientImplementationFunc[T protoreflect.ProtoMessage, V protoreflect.ProtoMessage] func(context.Context, T, uint64) (V, error)

type implementationFunc[T protoreflect.ProtoMessage, V Broadcaster] func(ServerCtx, T, V)

func CancelFunc(ServerCtx, protoreflect.ProtoMessage, Broadcaster) {}

const Cancellation string = "cancel"

// The BroadcastOrchestrator is used as a container for all
// broadcast handlers. The BroadcastHandler takes in a method
// and schedules it for broadcasting. SendToClientHandler works
// similarly but it sends the message to the calling client.
//
// It is necessary to use an orchestrator to hide certain
// implementation details, such as internal methods on the
// broadcast struct. The BroadcastOrchestrator will thus
// be an unimported field in the broadcast struct in the
// generated code.
type BroadcastOrchestrator struct {
BroadcastHandler BroadcastHandlerFunc
ForwardHandler BroadcastForwardHandlerFunc
SendToClientHandler BroadcastSendToClientHandlerFunc
ServerBroadcastHandler BroadcastServerHandlerFunc
CancelHandler CancelHandlerFunc
DoneHandler DoneHandlerFunc
}

func NewBroadcastOrchestrator(srv *Server) *BroadcastOrchestrator {
return &BroadcastOrchestrator{
BroadcastHandler: srv.broadcastSrv.broadcastHandler,
ForwardHandler: srv.broadcastSrv.forwardHandler,
ServerBroadcastHandler: srv.broadcastSrv.serverBroadcastHandler,
SendToClientHandler: srv.broadcastSrv.sendToClientHandler,
CancelHandler: srv.broadcastSrv.cancelHandler,
DoneHandler: srv.broadcastSrv.doneHandler,
}
}

type BroadcastOption func(*broadcast.BroadcastOptions)

func WithSubset(srvAddrs ...string) BroadcastOption {
return func(b *broadcast.BroadcastOptions) {
b.ServerAddresses = srvAddrs
}
}

func WithGossip(percentage float32, ttl int) BroadcastOption {
return func(b *broadcast.BroadcastOptions) {
b.GossipPercentage = percentage
b.TTL = ttl
}
}

func WithTTL(ttl int) BroadcastOption {
return func(b *broadcast.BroadcastOptions) {
b.TTL = ttl
}
}

func WithDeadline(deadline time.Time) BroadcastOption {
return func(b *broadcast.BroadcastOptions) {
b.Deadline = deadline
}
}

func WithoutSelf() BroadcastOption {
return func(b *broadcast.BroadcastOptions) {
b.SkipSelf = true
}
}

func WithoutUniquenessChecks() BroadcastOption {
return func(b *broadcast.BroadcastOptions) {
b.OmitUniquenessChecks = true
}
}

func WithRelationToRequest(broadcastID uint64) BroadcastOption {
return func(b *broadcast.BroadcastOptions) {
b.RelatedToReq = broadcastID
}
}

func NewBroadcastOptions() broadcast.BroadcastOptions {
return broadcast.BroadcastOptions{
ServerAddresses: make([]string, 0), // to prevent nil errors
}
}

type Broadcaster interface{}

type BroadcastMetadata struct {
BroadcastID uint64
IsBroadcastClient bool // type of sender, could be: Client or Server
SenderAddr string // address of last hop
OriginAddr string // address of the origin
OriginMethod string // the first method called by the origin
Method string // the current method
Digest []byte // digest of original message sent by client
}

func newBroadcastMetadata(md *ordering.Metadata) BroadcastMetadata {
if md == nil {
return BroadcastMetadata{}
}
tmp := strings.Split(md.Method, ".")
m := ""
if len(tmp) >= 1 {
m = tmp[len(tmp)-1]
}
return BroadcastMetadata{
BroadcastID: md.BroadcastMsg.BroadcastID,
IsBroadcastClient: md.BroadcastMsg.IsBroadcastClient,
SenderAddr: md.BroadcastMsg.SenderAddr,
OriginAddr: md.BroadcastMsg.OriginAddr,
OriginMethod: md.BroadcastMsg.OriginMethod,
Method: m,
}
}
20 changes: 9 additions & 11 deletions broadcast/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,20 @@ type Manager interface {
}

type manager struct {
state *BroadcastState
router *BroadcastRouter
metrics *Metric
logger *slog.Logger
state *BroadcastState
router *BroadcastRouter
logger *slog.Logger
}

func NewBroadcastManager(logger *slog.Logger, m *Metric, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error), canceler func(broadcastID uint64, srvAddrs []string)) Manager {
router := NewRouter(logger, m, createClient, canceler)
state := NewState(logger, m, router)
func NewBroadcastManager(logger *slog.Logger, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error), canceler func(broadcastID uint64, srvAddrs []string)) Manager {
router := NewRouter(logger, createClient, canceler)
state := NewState(logger, router)
router.registerState(state)
state.RunShards()
return &manager{
state: state,
router: router,
logger: logger,
metrics: m,
state: state,
router: router,
logger: logger,
}
}

Expand Down
Loading

0 comments on commit 4ff4d75

Please sign in to comment.