From 9e091090e79d111862ca506212cb6b226d75233e Mon Sep 17 00:00:00 2001 From: aleksander-vedvik Date: Wed, 8 May 2024 18:38:35 +0200 Subject: [PATCH] feat(broadcast): enabled cancellation --- broadcast.go | 5 +- broadcast/manager.go | 35 ++++++--- broadcast/request.go | 49 +++++++----- broadcast/request_test.go | 68 +++++++++-------- broadcast/router.go | 22 ++++-- broadcast/shard.go | 42 ++++++++-- broadcast/shard_test.go | 32 ++++---- broadcast/state.go | 10 ++- broadcastTypes.go | 7 ++ channel.go | 4 +- cmd/protoc-gen-gorums/dev/server.go | 4 + .../dev/zorums_qspec_gorums.pb.go | 2 +- .../gengorums/template_static.go | 4 + handler.go | 29 ++++++- ordering/ordering.pb.go | 76 +++++++------------ ordering/ordering.proto | 8 +- tests/broadcast/broadcast_gorums.pb.go | 6 +- tests/broadcast/broadcast_test.go | 36 ++++----- tests/broadcast/server.go | 1 + 19 files changed, 278 insertions(+), 162 deletions(-) diff --git a/broadcast.go b/broadcast.go index 3d50ad87..15a9bd6e 100644 --- a/broadcast.go +++ b/broadcast.go @@ -40,11 +40,12 @@ func newBroadcastServer(logger *slog.Logger, withMetrics bool) *broadcastServer if withMetrics { //m = broadcast.NewMetric() } - return &broadcastServer{ - manager: broadcast.NewBroadcastManager(logger, m, createClient), + srv := &broadcastServer{ logger: logger, metrics: m, } + srv.manager = broadcast.NewBroadcastManager(logger, m, createClient, srv.canceler) + return srv } //func newBroadcastServer(logger *slog.Logger, withMetrics bool) *broadcastServer { diff --git a/broadcast/manager.go b/broadcast/manager.go index ee7a26d5..7f9d7655 100644 --- a/broadcast/manager.go +++ b/broadcast/manager.go @@ -1,6 +1,7 @@ package broadcast import ( + "context" "errors" "log/slog" "time" @@ -10,9 +11,10 @@ import ( ) type Manager interface { - Process(Content) error + Process(Content) (error, context.Context) Broadcast(uint64, protoreflect.ProtoMessage, string, ...BroadcastOptions) SendToClient(uint64, protoreflect.ProtoMessage, error) + Cancel(uint64, []string) NewBroadcastID() uint64 AddAddr(id uint32, addr string) AddHandler(method string, handler any) @@ -28,9 +30,9 @@ type manager struct { logger *slog.Logger } -func NewBroadcastManager(logger *slog.Logger, m *Metric, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error)) Manager { +func NewBroadcastManager(logger *slog.Logger, m *Metric, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error), canceler func(broadcastID uint64, srvAddrs []string)) Manager { state := NewState(logger, m) - router := NewRouter(logger, m, createClient, state) + router := NewRouter(logger, m, createClient, state, canceler) state.RunShards(router) return &manager{ state: state, @@ -40,24 +42,24 @@ func NewBroadcastManager(logger *slog.Logger, m *Metric, createClient func(addr } } -func (mgr *manager) Process(msg Content) error { +func (mgr *manager) Process(msg Content) (error, context.Context) { _, shardID, _, _ := DecodeBroadcastID(msg.BroadcastID) shardID = shardID % NumShards shard := mgr.state.shards[shardID] // we only need a single response - receiveChan := make(chan error, 1) + receiveChan := make(chan shardResponse, 1) msg.ReceiveChan = receiveChan select { case <-shard.ctx.Done(): - return errors.New("shard is down") + return errors.New("shard is down"), nil case shard.sendChan <- msg: } select { case <-shard.ctx.Done(): - return errors.New("shard is down") - case err := <-receiveChan: - return err + return errors.New("shard is down"), nil + case resp := <-receiveChan: + return resp.err, resp.reqCtx } } @@ -96,6 +98,21 @@ func (mgr *manager) SendToClient(broadcastID uint64, resp protoreflect.ProtoMess } } +func (mgr *manager) Cancel(broadcastID uint64, srvAddrs []string) { + _, shardID, _, _ := DecodeBroadcastID(broadcastID) + shardID = shardID % NumShards + shard := mgr.state.shards[shardID] + select { + case shard.broadcastChan <- Msg{ + Cancellation: &cancellation{ + srvAddrs: srvAddrs, + }, + BroadcastID: broadcastID, + }: + case <-shard.ctx.Done(): + } +} + func (mgr *manager) NewBroadcastID() uint64 { return mgr.state.snowflake.NewBroadcastID() } diff --git a/broadcast/request.go b/broadcast/request.go index 1652f3b6..d9b23694 100644 --- a/broadcast/request.go +++ b/broadcast/request.go @@ -8,13 +8,14 @@ import ( ) type BroadcastRequest struct { - broadcastChan chan Msg - sendChan chan Content - ctx context.Context - cancelFunc context.CancelFunc - started time.Time - ended time.Time - //sync.Once + broadcastChan chan Msg + sendChan chan Content + ctx context.Context + cancelFunc context.CancelFunc + started time.Time + ended time.Time + cancellationCtx context.Context + cancellationCtxCancel context.CancelFunc // should only be called by the shard } // func (req *BroadcastRequest) handle(router *BroadcastRouter, broadcastID uint64, msg Content, metrics *Metric) { @@ -35,6 +36,7 @@ func (req *BroadcastRequest) handle(router Router, broadcastID uint64, msg Conte //metrics.RemoveGoroutine(broadcastID, "req") //} req.cancelFunc() + req.cancellationCtxCancel() }() for { select { @@ -47,6 +49,10 @@ func (req *BroadcastRequest) handle(router Router, broadcastID uint64, msg Conte if broadcastID != bMsg.BroadcastID { continue } + if bMsg.Cancellation != nil { + _ = router.Send(broadcastID, "", "", bMsg.Cancellation) + return + } if bMsg.Broadcast { // check if msg has already been broadcasted for this method if alreadyBroadcasted(methods, bMsg.Method) { @@ -67,11 +73,6 @@ func (req *BroadcastRequest) handle(router Router, broadcastID uint64, msg Conte return } // QuorumCall if origin addr is empty. - //if !msg.hasReceivedClientRequest() { - //// Has not received client request - //continue - //} - // Has received client request err := msg.send(bMsg.Reply.Response, bMsg.Reply.Err) if err != nil { // add response if not already done @@ -89,7 +90,15 @@ func (req *BroadcastRequest) handle(router Router, broadcastID uint64, msg Conte } case new := <-req.sendChan: if new.BroadcastID != broadcastID { - new.ReceiveChan <- BroadcastIDErr{} + new.ReceiveChan <- shardResponse{ + err: BroadcastIDErr{}, + } + continue + } + if msg.IsCancellation { + new.ReceiveChan <- shardResponse{ + err: nil, + } continue } if msg.OriginAddr == "" && new.OriginAddr != "" { @@ -107,15 +116,21 @@ func (req *BroadcastRequest) handle(router Router, broadcastID uint64, msg Conte if sent && !msg.isBroadcastCall() { err := msg.send(respMsg, respErr) if err != nil { - new.ReceiveChan <- err - // should return here? + new.ReceiveChan <- shardResponse{ + err: err, + } return } //new.ReceiveChan <- errors.New("req is done and should be returned immediately to client") - new.ReceiveChan <- AlreadyProcessedErr{} + new.ReceiveChan <- shardResponse{ + err: AlreadyProcessedErr{}, + } return } - new.ReceiveChan <- nil + new.ReceiveChan <- shardResponse{ + err: nil, + reqCtx: req.cancellationCtx, + } } } } diff --git a/broadcast/request_test.go b/broadcast/request_test.go index eb95b4d6..5af28480 100644 --- a/broadcast/request_test.go +++ b/broadcast/request_test.go @@ -47,7 +47,7 @@ func TestHandleBroadcastOption(t *testing.T) { in: Content{ BroadcastID: broadcastID, IsBroadcastClient: false, - ReceiveChan: make(chan error), + ReceiveChan: make(chan shardResponse), }, out: nil, }, @@ -55,7 +55,7 @@ func TestHandleBroadcastOption(t *testing.T) { in: Content{ BroadcastID: snowflake.NewBroadcastID(), IsBroadcastClient: false, - ReceiveChan: make(chan error), + ReceiveChan: make(chan shardResponse), }, out: BroadcastIDErr{}, }, @@ -63,7 +63,7 @@ func TestHandleBroadcastOption(t *testing.T) { in: Content{ BroadcastID: broadcastID, IsBroadcastClient: false, - ReceiveChan: make(chan error), + ReceiveChan: make(chan shardResponse), }, out: nil, }, @@ -72,29 +72,33 @@ func TestHandleBroadcastOption(t *testing.T) { msg := Content{ BroadcastID: broadcastID, OriginMethod: "testMethod", - ReceiveChan: make(chan error), + ReceiveChan: make(chan shardResponse), } router := &mockRouter{ returnError: false, } + cancelCtx, cancelCancel := context.WithTimeout(context.Background(), 1*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() + defer cancelCancel() req := &BroadcastRequest{ - ctx: ctx, - cancelFunc: cancel, - sendChan: make(chan Content), - broadcastChan: make(chan Msg, 5), - started: time.Now(), + ctx: ctx, + cancelFunc: cancel, + sendChan: make(chan Content), + broadcastChan: make(chan Msg, 5), + started: time.Now(), + cancellationCtx: cancelCtx, + cancellationCtxCancel: cancelCancel, } go req.handle(router, msg.BroadcastID, msg) for _, tt := range tests { req.sendChan <- tt.in - err := <-tt.in.ReceiveChan - if err != tt.out { - t.Fatalf("wrong error returned.\n\tgot: %v, want: %v", tt.out, err) + resp := <-tt.in.ReceiveChan + if resp.err != tt.out { + t.Fatalf("wrong error returned.\n\tgot: %v, want: %v", tt.out, resp.err) } } @@ -122,13 +126,13 @@ func TestHandleBroadcastOption(t *testing.T) { BroadcastID: broadcastID, IsBroadcastClient: true, SendFn: func(resp protoreflect.ProtoMessage, err error) {}, - ReceiveChan: make(chan error), + ReceiveChan: make(chan shardResponse), } req.sendChan <- clientMsg - err := <-clientMsg.ReceiveChan + resp := <-clientMsg.ReceiveChan expectedErr := AlreadyProcessedErr{} - if err != expectedErr { - t.Fatalf("wrong error returned.\n\tgot: %v, want: %v", err, expectedErr) + if resp.err != expectedErr { + t.Fatalf("wrong error returned.\n\tgot: %v, want: %v", resp.err, expectedErr) } select { @@ -150,7 +154,7 @@ func TestHandleBroadcastCall(t *testing.T) { in: Content{ BroadcastID: broadcastID, IsBroadcastClient: false, - ReceiveChan: make(chan error, 1), + ReceiveChan: make(chan shardResponse, 1), }, out: nil, }, @@ -158,7 +162,7 @@ func TestHandleBroadcastCall(t *testing.T) { in: Content{ BroadcastID: snowflake.NewBroadcastID(), IsBroadcastClient: false, - ReceiveChan: make(chan error, 1), + ReceiveChan: make(chan shardResponse, 1), }, out: BroadcastIDErr{}, }, @@ -166,7 +170,7 @@ func TestHandleBroadcastCall(t *testing.T) { in: Content{ BroadcastID: broadcastID, IsBroadcastClient: false, - ReceiveChan: make(chan error, 1), + ReceiveChan: make(chan shardResponse, 1), }, out: nil, }, @@ -177,7 +181,7 @@ func TestHandleBroadcastCall(t *testing.T) { IsBroadcastClient: false, OriginAddr: "127.0.0.1:8080", OriginMethod: "testMethod", - ReceiveChan: make(chan error), + ReceiveChan: make(chan shardResponse), } router := &mockRouter{ @@ -185,21 +189,25 @@ func TestHandleBroadcastCall(t *testing.T) { } ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + cancelCtx, cancelCancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() + defer cancelCancel() req := &BroadcastRequest{ - ctx: ctx, - cancelFunc: cancel, - sendChan: make(chan Content), - broadcastChan: make(chan Msg, 5), - started: time.Now(), + ctx: ctx, + cancelFunc: cancel, + sendChan: make(chan Content), + broadcastChan: make(chan Msg, 5), + started: time.Now(), + cancellationCtx: cancelCtx, + cancellationCtxCancel: cancelCancel, } go req.handle(router, msg.BroadcastID, msg) for _, tt := range tests { req.sendChan <- tt.in - err := <-tt.in.ReceiveChan - if err != tt.out { - t.Fatalf("wrong error returned.\n\tgot: %v, want: %v", tt.out, err) + resp := <-tt.in.ReceiveChan + if resp.err != tt.out { + t.Fatalf("wrong error returned.\n\tgot: %v, want: %v", tt.out, resp.err) } } @@ -228,7 +236,7 @@ func TestHandleBroadcastCall(t *testing.T) { IsBroadcastClient: true, OriginAddr: "127.0.0.1:8080", OriginMethod: "testMethod", - ReceiveChan: make(chan error), + ReceiveChan: make(chan shardResponse), } select { case <-req.ctx.Done(): @@ -263,7 +271,7 @@ func BenchmarkHandle(b *testing.B) { IsBroadcastClient: true, SendFn: sendFn, OriginMethod: originMethod, - ReceiveChan: make(chan error, 1), + ReceiveChan: make(chan shardResponse, 1), } ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) diff --git a/broadcast/router.go b/broadcast/router.go index 91ee98f4..1d67d9ef 100644 --- a/broadcast/router.go +++ b/broadcast/router.go @@ -31,6 +31,7 @@ type BroadcastRouter struct { serverHandlers map[string]ServerHandler // handlers on other servers clientHandlers map[string]struct{} // specifies what handlers a client has implemented. Used only for BroadcastCalls. createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error) + canceler func(broadcastID uint64, srvAddrs []string) dialOpts []grpc.DialOption dialTimeout time.Duration logger *slog.Logger @@ -38,7 +39,7 @@ type BroadcastRouter struct { state *BroadcastState } -func NewRouter(logger *slog.Logger, metrics *Metric, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error), state *BroadcastState, dialOpts ...grpc.DialOption) *BroadcastRouter { +func NewRouter(logger *slog.Logger, metrics *Metric, createClient func(addr string, dialOpts []grpc.DialOption) (*Client, error), state *BroadcastState, canceler func(broadcastID uint64, srvAddrs []string), dialOpts ...grpc.DialOption) *BroadcastRouter { if len(dialOpts) <= 0 { dialOpts = []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -48,6 +49,7 @@ func NewRouter(logger *slog.Logger, metrics *Metric, createClient func(addr stri serverHandlers: make(map[string]ServerHandler), clientHandlers: make(map[string]struct{}), createClient: createClient, + canceler: canceler, dialOpts: dialOpts, dialTimeout: 3 * time.Second, logger: logger, @@ -62,6 +64,9 @@ func (r *BroadcastRouter) Send(broadcastID uint64, addr, method string, req any) return r.routeBroadcast(broadcastID, addr, method, val) case *reply: return r.routeClientReply(broadcastID, addr, method, val) + case *cancellation: + r.canceler(broadcastID, val.srvAddrs) + return nil } return errors.New("wrong req type") } @@ -105,11 +110,12 @@ func (r *BroadcastRouter) getClient(addr string) (*Client, error) { } type Msg struct { - Broadcast bool - BroadcastID uint64 - Msg *broadcastMsg - Method string - Reply *reply + Broadcast bool + BroadcastID uint64 + Msg *broadcastMsg + Method string + Reply *reply + Cancellation *cancellation //receiveChan chan error } @@ -150,3 +156,7 @@ func (r *reply) getResponse() protoreflect.ProtoMessage { func (r *reply) getError() error { return r.Err } + +type cancellation struct { + srvAddrs []string +} diff --git a/broadcast/shard.go b/broadcast/shard.go index d9a704bd..daf02398 100644 --- a/broadcast/shard.go +++ b/broadcast/shard.go @@ -2,7 +2,6 @@ package broadcast import ( "context" - "errors" "time" ) @@ -66,24 +65,49 @@ func (s *shard) run(router Router, reqTTL time.Duration, sendBuffer int) { select { case <-req.ctx.Done(): s.metrics.droppedMsgs++ - msg.ReceiveChan <- AlreadyProcessedErr{} + msg.ReceiveChan <- shardResponse{ + err: AlreadyProcessedErr{}, + } default: } + if msg.IsCancellation { + msg.ReceiveChan <- shardResponse{ + err: nil, + } + continue + } if !msg.IsBroadcastClient { // no need to send it to the broadcast request goroutine. // the first request should contain all info needed // except for the routing info given in the client req. - msg.ReceiveChan <- nil + msg.ReceiveChan <- shardResponse{ + err: nil, + reqCtx: req.cancellationCtx, + } continue } + // msg.Ctx will correspond to the streamCtx between the client and this server. + // We can thus listen to it and signal a cancellation if the client goes offline + // or cancels the request. We also have to listen to the req.ctx to prevent leaking + // the goroutine. + go func() { + select { + case <-req.ctx.Done(): + case <-msg.Ctx.Done(): + } + req.cancellationCtxCancel() + }() // must check if the req is done to prevent deadlock select { case <-req.ctx.Done(): s.metrics.droppedMsgs++ - msg.ReceiveChan <- AlreadyProcessedErr{} + msg.ReceiveChan <- shardResponse{ + err: AlreadyProcessedErr{}, + } case req.sendChan <- msg: } } else { + msg.Ctx, msg.CancelCtx = context.WithCancel(context.Background()) // check size of s.reqs. If too big, then perform necessary cleanup. // should only affect the current shard and not the others. ctx, cancel := context.WithTimeout(s.ctx, reqTTL) @@ -100,15 +124,19 @@ func (s *shard) run(router Router, reqTTL time.Duration, sendBuffer int) { sendChan: make(chan Content), // this channel can be buffered because there is no receive // channel. The result is simply ignored. - broadcastChan: make(chan Msg, sendBuffer), - started: time.Now(), + broadcastChan: make(chan Msg, sendBuffer), + started: time.Now(), + cancellationCtx: msg.Ctx, + cancellationCtxCancel: msg.CancelCtx, } s.reqs[msg.BroadcastID] = req go req.handle(router, msg.BroadcastID, msg) select { case <-req.ctx.Done(): s.metrics.droppedMsgs++ - msg.ReceiveChan <- errors.New("req is done") + msg.ReceiveChan <- shardResponse{ + err: AlreadyProcessedErr{}, + } case req.sendChan <- msg: } } diff --git a/broadcast/shard_test.go b/broadcast/shard_test.go index dc8553bb..c161f681 100644 --- a/broadcast/shard_test.go +++ b/broadcast/shard_test.go @@ -59,7 +59,7 @@ func TestShard(t *testing.T) { OriginAddr: "127.0.0.1:8080", OriginMethod: "testMethod", IsBroadcastClient: true, - ReceiveChan: make(chan error), + ReceiveChan: make(chan shardResponse), }, out: nil, }, @@ -69,7 +69,7 @@ func TestShard(t *testing.T) { OriginAddr: "127.0.0.1:8080", OriginMethod: "testMethod", IsBroadcastClient: false, - ReceiveChan: make(chan error), + ReceiveChan: make(chan shardResponse), }, out: nil, }, @@ -79,7 +79,7 @@ func TestShard(t *testing.T) { OriginAddr: "127.0.0.1:8080", OriginMethod: "testMethod", IsBroadcastClient: false, - ReceiveChan: make(chan error), + ReceiveChan: make(chan shardResponse), }, out: nil, }, @@ -87,9 +87,9 @@ func TestShard(t *testing.T) { for _, tt := range tests { shard.sendChan <- tt.in - err := <-tt.in.ReceiveChan - if err != tt.out { - t.Fatalf("wrong error returned.\n\tgot: %v, want: %v", tt.out, err) + resp := <-tt.in.ReceiveChan + if resp.err != tt.out { + t.Fatalf("wrong error returned.\n\tgot: %v, want: %v", tt.out, resp.err) } } @@ -106,7 +106,8 @@ func TestShard(t *testing.T) { OriginAddr: "127.0.0.1:8080", OriginMethod: "testMethod", IsBroadcastClient: true, - ReceiveChan: make(chan error, 1), + ReceiveChan: make(chan shardResponse, 1), + Ctx: context.Background(), } shard.sendChan <- clientMsg @@ -117,13 +118,14 @@ func TestShard(t *testing.T) { OriginAddr: "127.0.0.1:8080", OriginMethod: "testMethod", IsBroadcastClient: true, - ReceiveChan: make(chan error, 1), + ReceiveChan: make(chan shardResponse, 1), + Ctx: context.Background(), } // this will panic if the request sendChan is closed shard.sendChan <- msgShouldBeDropped select { - case err := <-msgShouldBeDropped.ReceiveChan: - if err == nil { + case resp := <-msgShouldBeDropped.ReceiveChan: + if resp.err == nil { t.Fatalf("the request should have been stopped. SendToClient has been called.") } case <-time.After(3 * time.Second): @@ -131,8 +133,8 @@ func TestShard(t *testing.T) { } select { - case err := <-clientMsg.ReceiveChan: - if err == nil { + case resp := <-clientMsg.ReceiveChan: + if resp.err == nil { t.Fatalf("the request should have been stopped. SendToClient has been called.") } case <-time.After(3 * time.Second): @@ -167,7 +169,8 @@ func BenchmarkShard(b *testing.B) { IsBroadcastClient: false, OriginAddr: originAddr, OriginMethod: originMethod, - ReceiveChan: make(chan error, 1), + ReceiveChan: make(chan shardResponse, 1), + Ctx: context.Background(), } msgs[i] = msg shard.sendChan <- msg @@ -192,7 +195,8 @@ func BenchmarkShard(b *testing.B) { IsBroadcastClient: true, OriginAddr: originAddr, OriginMethod: originMethod, - ReceiveChan: make(chan error, 1), + ReceiveChan: make(chan shardResponse, 1), + Ctx: context.Background(), } } else { msg = msgs[i%10] diff --git a/broadcast/state.go b/broadcast/state.go index a1ad7bdb..06482005 100644 --- a/broadcast/state.go +++ b/broadcast/state.go @@ -127,13 +127,21 @@ func (state *BroadcastState) getStats() shardMetrics { return m } +type shardResponse struct { + err error + reqCtx context.Context +} + type Content struct { BroadcastID uint64 IsBroadcastClient bool + IsCancellation bool OriginAddr string OriginMethod string - ReceiveChan chan error + ReceiveChan chan shardResponse SendFn func(resp protoreflect.ProtoMessage, err error) + Ctx context.Context + CancelCtx context.CancelFunc } func (c Content) send(resp protoreflect.ProtoMessage, err error) error { diff --git a/broadcastTypes.go b/broadcastTypes.go index 2d4304a9..a2c85903 100644 --- a/broadcastTypes.go +++ b/broadcastTypes.go @@ -26,12 +26,17 @@ type BroadcastHandlerFunc func(method string, req protoreflect.ProtoMessage, bro type BroadcastForwardHandlerFunc func(req RequestTypes, method string, broadcastID uint64, forwardAddr, originAddr string) type BroadcastServerHandlerFunc func(method string, req RequestTypes, options ...broadcast.BroadcastOptions) type BroadcastSendToClientHandlerFunc func(broadcastID uint64, resp protoreflect.ProtoMessage, err error) +type CancelHandlerFunc func(broadcastID uint64, srvAddrs []string) type defaultImplementationFunc[T RequestTypes, V ResponseTypes] func(ServerCtx, T) (V, error) type clientImplementationFunc[T protoreflect.ProtoMessage, V protoreflect.ProtoMessage] func(context.Context, T, uint64) (V, error) type implementationFunc[T RequestTypes, V Broadcaster] func(ServerCtx, T, V) +func CancelFunc(ServerCtx, RequestTypes, Broadcaster) {} + +const Cancellation string = "cancel" + // The BroadcastOrchestrator is used as a container for all // broadcast handlers. The BroadcastHandler takes in a method // and schedules it for broadcasting. SendToClientHandler works @@ -47,6 +52,7 @@ type BroadcastOrchestrator struct { ForwardHandler BroadcastForwardHandlerFunc SendToClientHandler BroadcastSendToClientHandlerFunc ServerBroadcastHandler BroadcastServerHandlerFunc + CancelHandler CancelHandlerFunc } func NewBroadcastOrchestrator(srv *Server) *BroadcastOrchestrator { @@ -55,6 +61,7 @@ func NewBroadcastOrchestrator(srv *Server) *BroadcastOrchestrator { ForwardHandler: srv.broadcastSrv.forwardHandler, ServerBroadcastHandler: srv.broadcastSrv.serverBroadcastHandler, SendToClientHandler: srv.broadcastSrv.sendToClientHandler, + CancelHandler: srv.broadcastSrv.cancelHandler, } } diff --git a/channel.go b/channel.go index e590e362..ecdd2b8f 100644 --- a/channel.go +++ b/channel.go @@ -199,7 +199,9 @@ func (c *channel) sendMsg(req request) (err error) { // false alarm default: // trigger reconnect + c.streamMut.Lock() c.cancelStream() + c.streamMut.Unlock() } } }() @@ -235,14 +237,12 @@ func (c *channel) sender() { } // return error if stream is broken if c.streamBroken.get() { - //slog.Info("channel: stream is broken") c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: streamDownErr}) continue } // else try to send message err := c.sendMsg(req) if err != nil { - //slog.Info("channel: well this is weird", "err", err) // return the error c.routeResponse(req.msg.Metadata.MessageID, response{nid: c.node.ID(), err: err}) } diff --git a/cmd/protoc-gen-gorums/dev/server.go b/cmd/protoc-gen-gorums/dev/server.go index 95cc1513..a1c42f5b 100644 --- a/cmd/protoc-gen-gorums/dev/server.go +++ b/cmd/protoc-gen-gorums/dev/server.go @@ -88,6 +88,10 @@ func (b *Broadcast) SendToClient(resp protoreflect.ProtoMessage, err error) { b.orchestrator.SendToClientHandler(b.metadata.BroadcastID, resp, err) } +func (b *Broadcast) Cancel() { + b.orchestrator.CancelHandler(b.metadata.BroadcastID, b.srvAddrs) +} + func (srv *Server) SendToClient(resp protoreflect.ProtoMessage, err error, broadcastID uint64) { srv.SendToClientHandler(resp, err, broadcastID) } diff --git a/cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go index 816e895f..151cf810 100644 --- a/cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums_qspec_gorums.pb.go @@ -86,7 +86,7 @@ type QuorumSpec interface { BroadcastWithClientHandler2QF(in *Request, replies []*Response) (*Response, bool) // BroadcastWithClientHandlerAndBroadcastOptionQF is the quorum function for the BroadcastWithClientHandlerAndBroadcastOption - // broadcast call method. The in parameter is the request object + // broadcastcall call method. The in parameter is the request object // supplied to the BroadcastWithClientHandlerAndBroadcastOption method at call time, and may or may not // be used by the quorum function. If the in parameter is not needed // you should implement your quorum function with '_ *Request'. diff --git a/cmd/protoc-gen-gorums/gengorums/template_static.go b/cmd/protoc-gen-gorums/gengorums/template_static.go index 021667b9..2daf4c7c 100644 --- a/cmd/protoc-gen-gorums/gengorums/template_static.go +++ b/cmd/protoc-gen-gorums/gengorums/template_static.go @@ -247,6 +247,10 @@ func (b *Broadcast) SendToClient(resp protoreflect.ProtoMessage, err error) { b.orchestrator.SendToClientHandler(b.metadata.BroadcastID, resp, err) } +func (b *Broadcast) Cancel() { + b.orchestrator.CancelHandler(b.metadata.BroadcastID, b.srvAddrs) +} + func (srv *Server) SendToClient(resp protoreflect.ProtoMessage, err error, broadcastID uint64) { srv.SendToClientHandler(resp, err, broadcastID) } diff --git a/handler.go b/handler.go index 08dd1ba7..c10e215d 100644 --- a/handler.go +++ b/handler.go @@ -55,8 +55,12 @@ func BroadcastHandler[T RequestTypes, V Broadcaster](impl implementationFunc[T, msg := broadcast.Content{} createRequest(&msg, ctx, in, finished) - //err := srv.broadcastSrv.state.Process(msg) - err := srv.broadcastSrv.manager.Process(msg) + var err error + // we are not interested in the server context as this is tied to the previous hop. + // instead we want to check whether the client has cancelled the broadcast request + // and if so, we return a cancelled context. This enables the implementer to listen + // for cancels and do proper actions. + err, ctx.Context = srv.broadcastSrv.manager.Process(msg) if err != nil { //if srv.broadcastSrv.metrics != nil { //srv.broadcastSrv.metrics.AddDropped(false) @@ -84,9 +88,13 @@ func createRequest(msg *broadcast.Content, ctx ServerCtx, in *Message, finished msg.IsBroadcastClient = in.Metadata.BroadcastMsg.IsBroadcastClient msg.OriginAddr = in.Metadata.BroadcastMsg.OriginAddr msg.OriginMethod = in.Metadata.BroadcastMsg.OriginMethod + msg.Ctx = ctx.Context if msg.OriginAddr == "" && msg.IsBroadcastClient { msg.SendFn = createSendFn(in.Metadata.MessageID, in.Metadata.Method, finished, ctx) } + if in.Metadata.Method == Cancellation { + msg.IsCancellation = true + } } func createSendFn(msgID uint64, method string, finished chan<- *Message, ctx ServerCtx) func(resp protoreflect.ProtoMessage, err error) { @@ -148,6 +156,23 @@ func (srv *broadcastServer) forwardHandler(req RequestTypes, method string, broa srv.viewMutex.RUnlock() } +func (srv *broadcastServer) cancelHandler(broadcastID uint64, srvAddrs []string) { + srv.manager.Cancel(broadcastID, srvAddrs) +} + +func (srv *broadcastServer) canceler(broadcastID uint64, srvAddrs []string) { + cd := broadcastCallData{ + Message: nil, + Method: Cancellation, + BroadcastID: broadcastID, + ServerAddresses: srvAddrs, + } + srv.viewMutex.RLock() + // drop request if a view change has occured + srv.view.broadcastCall(context.Background(), cd) + srv.viewMutex.RUnlock() +} + func (srv *broadcastServer) serverBroadcastHandler(method string, req RequestTypes, opts ...broadcast.BroadcastOptions) { cd := broadcastCallData{ Message: req, diff --git a/ordering/ordering.pb.go b/ordering/ordering.pb.go index 8edddfe8..55a25bae 100644 --- a/ordering/ordering.pb.go +++ b/ordering/ordering.pb.go @@ -29,11 +29,9 @@ type Metadata struct { unknownFields protoimpl.UnknownFields MessageID uint64 `protobuf:"varint,1,opt,name=MessageID,proto3" json:"MessageID,omitempty"` - Method string `protobuf:"bytes,2,opt,name=Method,proto3" json:"Method,omitempty"` + Method string `protobuf:"bytes,2,opt,name=Method,proto3" json:"Method,omitempty"` // uint32: 4 bytes Status *status.Status `protobuf:"bytes,3,opt,name=Status,proto3" json:"Status,omitempty"` BroadcastMsg *BroadcastMsg `protobuf:"bytes,4,opt,name=BroadcastMsg,proto3" json:"BroadcastMsg,omitempty"` - Signature []byte `protobuf:"bytes,5,opt,name=Signature,proto3" json:"Signature,omitempty"` - HashMsg []byte `protobuf:"bytes,6,opt,name=HashMsg,proto3" json:"HashMsg,omitempty"` } func (x *Metadata) Reset() { @@ -96,31 +94,16 @@ func (x *Metadata) GetBroadcastMsg() *BroadcastMsg { return nil } -func (x *Metadata) GetSignature() []byte { - if x != nil { - return x.Signature - } - return nil -} - -func (x *Metadata) GetHashMsg() []byte { - if x != nil { - return x.HashMsg - } - return nil -} - type BroadcastMsg struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - IsBroadcastClient bool `protobuf:"varint,1,opt,name=SenderType,proto3" json:"SenderType,omitempty"` - BroadcastID uint64 `protobuf:"varint,2,opt,name=BroadcastID,proto3" json:"BroadcastID,omitempty"` - // uint32 SenderID = 3; - SenderAddr string `protobuf:"bytes,3,opt,name=SenderAddr,proto3" json:"SenderAddr,omitempty"` - OriginAddr string `protobuf:"bytes,4,opt,name=OriginAddr,proto3" json:"OriginAddr,omitempty"` - OriginMethod string `protobuf:"bytes,5,opt,name=OriginMethod,proto3" json:"OriginMethod,omitempty"` + IsBroadcastClient bool `protobuf:"varint,1,opt,name=IsBroadcastClient,proto3" json:"IsBroadcastClient,omitempty"` + BroadcastID uint64 `protobuf:"varint,2,opt,name=BroadcastID,proto3" json:"BroadcastID,omitempty"` + SenderAddr string `protobuf:"bytes,3,opt,name=SenderAddr,proto3" json:"SenderAddr,omitempty"` // bytes -> ipv4: 32 bit/ 4 bytes, ipv6: 128 bit / 16 bytes -> + Port: 16 bit / 2 bytes + OriginAddr string `protobuf:"bytes,4,opt,name=OriginAddr,proto3" json:"OriginAddr,omitempty"` // bytes -> ipv4: 32 bit/ 4 bytes, ipv6: 128 bit / 16 bytes -> + Port: 16 bit / 2 bytes + OriginMethod string `protobuf:"bytes,5,opt,name=OriginMethod,proto3" json:"OriginMethod,omitempty"` // uint32: 4 bytes } func (x *BroadcastMsg) Reset() { @@ -155,7 +138,7 @@ func (*BroadcastMsg) Descriptor() ([]byte, []int) { return file_ordering_ordering_proto_rawDescGZIP(), []int{1} } -func (x *BroadcastMsg) GetSenderType() bool { +func (x *BroadcastMsg) GetIsBroadcastClient() bool { if x != nil { return x.IsBroadcastClient } @@ -196,7 +179,7 @@ var file_ordering_ordering_proto_rawDesc = []byte{ 0x0a, 0x17, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2f, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x1a, 0x17, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x2f, - 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe0, 0x01, 0x0a, + 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa8, 0x01, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1c, 0x0a, 0x09, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x49, 0x44, 0x12, 0x16, 0x0a, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, @@ -207,29 +190,26 @@ var file_ordering_ordering_proto_rawDesc = []byte{ 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x4d, 0x73, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x4d, 0x73, 0x67, 0x52, 0x0c, 0x42, 0x72, 0x6f, 0x61, 0x64, - 0x63, 0x61, 0x73, 0x74, 0x4d, 0x73, 0x67, 0x12, 0x1c, 0x0a, 0x09, 0x53, 0x69, 0x67, 0x6e, 0x61, - 0x74, 0x75, 0x72, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x53, 0x69, 0x67, 0x6e, - 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x48, 0x61, 0x73, 0x68, 0x4d, 0x73, 0x67, - 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x48, 0x61, 0x73, 0x68, 0x4d, 0x73, 0x67, 0x22, - 0xb4, 0x01, 0x0a, 0x0c, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x4d, 0x73, 0x67, - 0x12, 0x1e, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x49, 0x44, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, - 0x49, 0x44, 0x12, 0x1e, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x41, 0x64, - 0x64, 0x72, 0x12, 0x1e, 0x0a, 0x0a, 0x4f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x41, 0x64, 0x64, 0x72, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x4f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x41, 0x64, - 0x64, 0x72, 0x12, 0x22, 0x0a, 0x0c, 0x4f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x4d, 0x65, 0x74, 0x68, - 0x6f, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x4f, 0x72, 0x69, 0x67, 0x69, 0x6e, - 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x32, 0x42, 0x0a, 0x06, 0x47, 0x6f, 0x72, 0x75, 0x6d, 0x73, - 0x12, 0x38, 0x0a, 0x0a, 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x12, - 0x2e, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, - 0x74, 0x61, 0x1a, 0x12, 0x2e, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x28, 0x01, 0x30, 0x01, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x62, 0x2f, 0x67, - 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2f, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x63, 0x61, 0x73, 0x74, 0x4d, 0x73, 0x67, 0x22, 0xc2, 0x01, 0x0a, 0x0c, 0x42, 0x72, 0x6f, 0x61, + 0x64, 0x63, 0x61, 0x73, 0x74, 0x4d, 0x73, 0x67, 0x12, 0x2c, 0x0a, 0x11, 0x49, 0x73, 0x42, 0x72, + 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x11, 0x49, 0x73, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, + 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x12, 0x20, 0x0a, 0x0b, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, + 0x61, 0x73, 0x74, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x42, 0x72, 0x6f, + 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x49, 0x44, 0x12, 0x1e, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, + 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x53, 0x65, + 0x6e, 0x64, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x12, 0x1e, 0x0a, 0x0a, 0x4f, 0x72, 0x69, 0x67, + 0x69, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x4f, 0x72, + 0x69, 0x67, 0x69, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x12, 0x22, 0x0a, 0x0c, 0x4f, 0x72, 0x69, 0x67, + 0x69, 0x6e, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, + 0x4f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x32, 0x42, 0x0a, 0x06, + 0x47, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x12, 0x38, 0x0a, 0x0a, 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x12, 0x12, 0x2e, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, + 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x12, 0x2e, 0x6f, 0x72, 0x64, 0x65, 0x72, + 0x69, 0x6e, 0x67, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x28, 0x01, 0x30, 0x01, + 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, + 0x65, 0x6c, 0x61, 0x62, 0x2f, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2f, 0x6f, 0x72, 0x64, 0x65, + 0x72, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/ordering/ordering.proto b/ordering/ordering.proto index 8762730b..ced6da38 100644 --- a/ordering/ordering.proto +++ b/ordering/ordering.proto @@ -16,7 +16,7 @@ service Gorums { // and contains information necessary for Gorums to handle the messages. message Metadata { uint64 MessageID = 1; - string Method = 2; + string Method = 2; // uint32: 4 bytes google.rpc.Status Status = 3; BroadcastMsg BroadcastMsg = 4; } @@ -24,7 +24,7 @@ message Metadata { message BroadcastMsg { bool IsBroadcastClient = 1; uint64 BroadcastID = 2; - string SenderAddr = 3; - string OriginAddr = 4; - string OriginMethod = 5; + string SenderAddr = 3; // bytes -> ipv4: 32 bit/ 4 bytes, ipv6: 128 bit / 16 bytes -> + Port: 16 bit / 2 bytes + string OriginAddr = 4; // bytes -> ipv4: 32 bit/ 4 bytes, ipv6: 128 bit / 16 bytes -> + Port: 16 bit / 2 bytes + string OriginMethod = 5; // uint32: 4 bytes } diff --git a/tests/broadcast/broadcast_gorums.pb.go b/tests/broadcast/broadcast_gorums.pb.go index 2beca6af..7752badd 100644 --- a/tests/broadcast/broadcast_gorums.pb.go +++ b/tests/broadcast/broadcast_gorums.pb.go @@ -261,6 +261,10 @@ func (b *Broadcast) SendToClient(resp protoreflect.ProtoMessage, err error) { b.orchestrator.SendToClientHandler(b.metadata.BroadcastID, resp, err) } +func (b *Broadcast) Cancel() { + b.orchestrator.CancelHandler(b.metadata.BroadcastID, b.srvAddrs) +} + func (srv *Server) SendToClient(resp protoreflect.ProtoMessage, err error, broadcastID uint64) { srv.SendToClientHandler(resp, err, broadcastID) } @@ -447,7 +451,7 @@ type QuorumSpec interface { QuorumCallQF(in *Request, replies map[uint32]*Response) (*Response, bool) // QuorumCallWithBroadcastQF is the quorum function for the QuorumCallWithBroadcast - // quorum call method. The in parameter is the request object + // broadcast call method. The in parameter is the request object // supplied to the QuorumCallWithBroadcast method at call time, and may or may not // be used by the quorum function. If the in parameter is not needed // you should implement your quorum function with '_ *Request'. diff --git a/tests/broadcast/broadcast_test.go b/tests/broadcast/broadcast_test.go index 1fb7b545..484460e6 100644 --- a/tests/broadcast/broadcast_test.go +++ b/tests/broadcast/broadcast_test.go @@ -842,15 +842,15 @@ func BenchmarkBroadcastCallManyClients(b *testing.B) { } } - stop, err := StartTrace("traceprofileBC") - if err != nil { - b.Error(err) - } - defer stop() - cpuProfile, _ := os.Create("cpuprofileBC") - memProfile, _ := os.Create("memprofileBC") - runtime.GC() - pprof.StartCPUProfile(cpuProfile) + //stop, err := StartTrace("traceprofileBC") + //if err != nil { + //b.Error(err) + //} + //defer stop() + //cpuProfile, _ := os.Create("cpuprofileBC") + //memProfile, _ := os.Create("memprofileBC") + //runtime.GC() + //pprof.StartCPUProfile(cpuProfile) b.Run(fmt.Sprintf("BC_OneClientOneReq_%d", 0), func(b *testing.B) { for i := 0; i < b.N; i++ { @@ -929,10 +929,10 @@ func BenchmarkBroadcastCallManyClients(b *testing.B) { } } }) - pprof.StopCPUProfile() - pprof.WriteHeapProfile(memProfile) - cpuProfile.Close() - memProfile.Close() + //pprof.StopCPUProfile() + //pprof.WriteHeapProfile(memProfile) + //cpuProfile.Close() + //memProfile.Close() } func BenchmarkBroadcastCallTenClientsCPU(b *testing.B) { @@ -1141,11 +1141,11 @@ func TestBroadcastCallTenClientsOnlyAsync(t *testing.T) { wg1.Wait() time.Sleep(1 * time.Second) - stop, err := StartTrace("traceprofileBC") - if err != nil { - t.Error(err) - } - defer stop() + //stop, err := StartTrace("traceprofileBC") + //if err != nil { + //t.Error(err) + //} + //defer stop() var wg sync.WaitGroup for r := 0; r < numReqs; r++ { for i, client := range clients { diff --git a/tests/broadcast/server.go b/tests/broadcast/server.go index fe171a82..60531231 100644 --- a/tests/broadcast/server.go +++ b/tests/broadcast/server.go @@ -45,6 +45,7 @@ func newtestServer(addr string, srvAddresses []string, _ int) *testServer { srv.peers = srvAddresses srv.addr = addr srv.mgr = NewManager( + gorums.WithSendBufferSize(uint(2*len(srvAddresses))), gorums.WithPublicKey("server"), gorums.WithGrpcDialOptions( grpc.WithTransportCredentials(insecure.NewCredentials()),