Skip to content

Commit

Permalink
fix(broadcast): more efficient code and terminating req at client
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksander-vedvik committed May 9, 2024
1 parent c88e88a commit 8b7000f
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 23 deletions.
22 changes: 7 additions & 15 deletions broadcast/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,20 @@ type BroadcastRequest struct {

// func (req *BroadcastRequest) handle(router *BroadcastRouter, broadcastID uint64, msg Content, metrics *Metric) {
func (req *BroadcastRequest) handle(router Router, broadcastID uint64, msg Content) {
//start := time.Now()
sent := false
methods := make([]string, 0, 3)
var respErr error
var respMsg protoreflect.ProtoMessage
//if metrics != nil {
//metrics.AddGoroutine(broadcastID, "req")
//}
//go router.CreateConnection(msg.OriginAddr)
// connect to client immediately to potentially save some time
go router.Connect(msg.OriginAddr)
defer func() {
req.ended = time.Now()
//if metrics != nil {
//metrics.AddRoundTripLatency(start)
//metrics.RemoveGoroutine(broadcastID, "req")
//}
req.cancelFunc()
req.cancellationCtxCancel()
}()
for {
select {
case <-req.ctx.Done():
//if metrics != nil {
//metrics.AddFinishedFailed()
//}
return
case bMsg := <-req.broadcastChan:
if broadcastID != bMsg.BroadcastID {
Expand All @@ -55,15 +45,17 @@ func (req *BroadcastRequest) handle(router Router, broadcastID uint64, msg Conte
if alreadyBroadcasted(methods, bMsg.Method) {
continue
}
err := router.Send(broadcastID, msg.OriginAddr, msg.OriginMethod, bMsg.Msg)
go router.Send(broadcastID, msg.OriginAddr, msg.OriginMethod, bMsg.Msg)
/*err := router.Send(broadcastID, msg.OriginAddr, msg.OriginMethod, bMsg.Msg)
if err != nil {
continue
}
}*/
methods = append(methods, bMsg.Method)
} else {
// BroadcastCall if origin addr is non-empty.
if msg.isBroadcastCall() {
_ = router.Send(broadcastID, msg.OriginAddr, msg.OriginMethod, bMsg.Reply)
go router.Send(broadcastID, msg.OriginAddr, msg.OriginMethod, bMsg.Reply)
//_ = router.Send(broadcastID, msg.OriginAddr, msg.OriginMethod, bMsg.Reply)
//if err != nil && router.logger != nil {
//router.logger.Error("broadcast: could not send response to client", "err", err, "broadcastID", broadcastID)
//}
Expand Down
2 changes: 2 additions & 0 deletions broadcast/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func (r *mockRouter) Send(broadcastID uint64, addr, method string, req any) erro
return nil
}

func (r *mockRouter) Connect(addr string) {}

func TestHandleBroadcastOption(t *testing.T) {
snowflake := NewSnowflake("127.0.0.1:8080")
broadcastID := snowflake.NewBroadcastID()
Expand Down
5 changes: 5 additions & 0 deletions broadcast/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Client struct {

type Router interface {
Send(broadcastID uint64, addr, method string, req any) error
Connect(addr string)
}

type BroadcastRouter struct {
Expand Down Expand Up @@ -74,6 +75,10 @@ func (r *BroadcastRouter) Send(broadcastID uint64, addr, method string, req any)
return errors.New("wrong req type")
}

func (r *BroadcastRouter) Connect(addr string) {
r.getClient(addr)
}

func (r *BroadcastRouter) routeBroadcast(broadcastID uint64, addr, method string, msg *broadcastMsg) error {
if handler, ok := r.serverHandlers[msg.method]; ok {
// it runs an interceptor prior to broadcastCall, hence a different signature.
Expand Down
3 changes: 0 additions & 3 deletions broadcast/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,6 @@ func (s *shard) run(reqTTL time.Duration, sendBuffer int) {
}
case msg := <-s.broadcastChan:
s.metrics.numBroadcastMsgs++
//if metrics != nil {
//metrics.AddShardDistribution(s.id)
//}
if req, ok := s.reqs[msg.BroadcastID]; ok {
if msg.Cancellation != nil {
if msg.Cancellation.end {
Expand Down
2 changes: 2 additions & 0 deletions broadcast/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (r *slowRouter) Send(broadcastID uint64, addr, method string, req any) erro
return nil
}

func (r *slowRouter) Connect(addr string) {}

func TestShard(t *testing.T) {
snowflake := NewSnowflake("127.0.0.1:8080")
broadcastID := snowflake.NewBroadcastID()
Expand Down
7 changes: 2 additions & 5 deletions clientserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,7 @@ func createReq(ctx, clientCtx context.Context, cancel context.CancelFunc, req pr
// the receiving end. this can happen if the client
// chooses not to timeout the request and the server
// goes down.
select {
case doneChan <- nil:
case <-clientCtx.Done():
return
}
close(doneChan)
return
case resp := <-respChan:
resps = append(resps, resp)
Expand All @@ -124,6 +120,7 @@ func createReq(ctx, clientCtx context.Context, cancel context.CancelFunc, req pr
case <-ctx.Done():
case <-clientCtx.Done():
}
close(doneChan)
return
}
}
Expand Down

0 comments on commit 8b7000f

Please sign in to comment.