From 797e5eee563660a8d827eb636e8fbcfbbbc4520d Mon Sep 17 00:00:00 2001 From: aleksander-vedvik Date: Wed, 5 Jun 2024 13:34:25 +0200 Subject: [PATCH] fix(broadcast): checks whether a client req has arrived immediately --- broadcast/processor.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/broadcast/processor.go b/broadcast/processor.go index eafa6e13..583e330b 100644 --- a/broadcast/processor.go +++ b/broadcast/processor.go @@ -57,6 +57,30 @@ func (p *BroadcastProcessor) handle(msg *Content) { p.initOrder() // connect to client immediately to potentially save some time go p.router.Connect(metadata.OriginAddr) + if msg.IsBroadcastClient { + // important to set this option to prevent duplicate client reqs. + // this can be the result if a server forwards the req but the + // leader has already received the client req. + metadata.HasReceivedClientReq = true + go func() { + // new.Ctx will correspond to the streamCtx between the client and this server. + // We can thus listen to it and signal a cancellation if the client goes offline + // or cancels the request. We also have to listen to the p.ctx to prevent leaking + // the goroutine. + select { + case <-p.ctx.Done(): + case <-msg.Ctx.Done(): + } + // when the processor returns it sets p.cancellationCtxCancel = nil. + // Hence, it is important to check if it is nil. Also, the processor + // calls this function when it returns. + if p.cancellationCtxCancel != nil { + p.cancellationCtxCancel() + } + }() + p.log("msg: received client req", nil, logging.Method(msg.CurrentMethod), logging.From(msg.SenderAddr)) + } + // all messages should always have a ReceiveChan if msg.ReceiveChan != nil { if !p.isInOrder(msg.CurrentMethod) { // save the message and execute it later