Skip to content

Commit

Permalink
chore: Improve idle timeout logic for filetransfer downloads
Browse files Browse the repository at this point in the history
Signed-off-by: Alf-Rune Siqveland <[email protected]>
  • Loading branch information
alfrunes committed Nov 30, 2023
1 parent f991fa0 commit 176380f
Showing 1 changed file with 48 additions and 12 deletions.
60 changes: 48 additions & 12 deletions api/http/management_filetransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package http

import (
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -280,8 +281,41 @@ func (h ManagementController) downloadFileResponseError(c *gin.Context,
}
}

func chanTimeout(
src <-chan *natsio.Msg,
timeout time.Duration,
) <-chan *natsio.Msg {
timer := time.NewTimer(timeout)
dst := make(chan *natsio.Msg)
go func() {
for {
select {
case <-timer.C:
close(dst)
return
case msg, ok := <-src:
if !ok {
close(dst)
return
}
if !timer.Stop() {
// Timer must be stopped and drained before calling Reset.
select {
case <-timer.C:
default:
}
}
timer.Reset(timeout)
dst <- msg
}
}
}()
return dst
}

func (h ManagementController) downloadFileResponse(c *gin.Context, params *fileTransferParams,
request *model.DownloadFileRequest) {
ctx := c.Request.Context()
// send a JSON-encoded error message in case of failure
var responseError error
var responseHeaderSent bool
Expand All @@ -290,15 +324,18 @@ func (h ManagementController) downloadFileResponse(c *gin.Context, params *fileT
// subscribe to messages from the device
deviceTopic := model.GetDeviceSubject(params.TenantID, params.Device.ID)
sessionTopic := model.GetSessionSubject(params.TenantID, params.SessionID)
msgChan := make(chan *natsio.Msg, channelSize)
sub, err := h.nats.ChanSubscribe(sessionTopic, msgChan)
subChan := make(chan *natsio.Msg, channelSize)
defer close(subChan)
sub, err := h.nats.ChanSubscribe(sessionTopic, subChan)
if err != nil {
responseError = errors.Wrap(err, errFileTransferSubscribing.Error())
return
}
//nolint:errcheck
defer sub.Unsubscribe()

msgChan := chanTimeout(subChan, fileTransferTimeout)

if err = h.filetransferHandshake(msgChan, params.SessionID, deviceTopic); err != nil {
responseError = err
return
Expand All @@ -321,15 +358,16 @@ func (h ManagementController) downloadFileResponse(c *gin.Context, params *fileT
defer ticker.Stop()

// handle messages from the device
timeout := time.NewTimer(fileTransferTimeout)
latestOffset := int64(0)
numberOfChunks := 0
var fileInfo wsft.FileInfo
for {
select {
case wsMessage := <-msgChan:
// reset the timeout ticket
timeout.Reset(fileTransferTimeout)
case wsMessage, ok := <-msgChan:
if !ok {
responseError = errFileTransferTimeout
return
}
// process the message
err := h.downloadFileResponseProcessMessage(c, params, request,
wsMessage, deviceTopic, &latestOffset, &numberOfChunks,
Expand All @@ -348,11 +386,6 @@ func (h ManagementController) downloadFileResponse(c *gin.Context, params *fileT
if responseError != nil {
return
}

// no message after timeout expired, stop here
case <-timeout.C:
responseError = errFileTransferTimeout
return
}
}
}
Expand Down Expand Up @@ -555,7 +588,10 @@ func (h ManagementController) filetransferHandshake(
return errFileTransferPublishing
}
select {
case natsMsg := <-sessChan:
case natsMsg, ok := <-sessChan:
if !ok {
return errFileTransferTimeout
}
var msg ws.ProtoMsg
err := msgpack.Unmarshal(natsMsg.Data, &msg)
if err != nil {
Expand Down

0 comments on commit 176380f

Please sign in to comment.