From b4f8849cae83199d05e14aa15a2fc050fd5aed1b Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Fri, 12 Jul 2024 11:00:18 +0200 Subject: [PATCH] grpc: fixed BoardListWatch streaming call (#2664) * grpc: fixed BoardListWatch streaming call * Added integration test * Fixed also existing calls to BoardListWatch --- commands/service_board_list.go | 1 + internal/cli/arguments/port.go | 13 +++++-------- internal/cli/board/list.go | 10 ++++++---- internal/integrationtest/daemon/daemon_test.go | 18 ++++++++++++++++++ 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/commands/service_board_list.go b/commands/service_board_list.go index a5376cb9f1f..44e89fdc8dd 100644 --- a/commands/service_board_list.go +++ b/commands/service_board_list.go @@ -318,5 +318,6 @@ func (s *arduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, s } }() + <-stream.Context().Done() return nil } diff --git a/internal/cli/arguments/port.go b/internal/cli/arguments/port.go index 885423243f4..dfd120f162f 100644 --- a/internal/cli/arguments/port.go +++ b/internal/cli/arguments/port.go @@ -89,17 +89,14 @@ func (p *Port) GetPort(ctx context.Context, instance *rpc.Instance, srv rpc.Ardu } logrus.WithField("port", address).Tracef("Upload port") - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithTimeout(ctx, p.timeout.Get()) defer cancel() stream, watcher := commands.BoardListWatchProxyToChan(ctx) - err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: instance}, stream) + go func() { + _ = srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: instance}, stream) + }() - if err != nil { - return nil, err - } - - deadline := time.After(p.timeout.Get()) for { select { case portEvent := <-watcher: @@ -111,7 +108,7 @@ func (p *Port) GetPort(ctx context.Context, instance *rpc.Instance, srv rpc.Ardu return port, nil } - case <-deadline: + case <-ctx.Done(): // No matching port found if protocol == "" { return &rpc.Port{ diff --git a/internal/cli/board/list.go b/internal/cli/board/list.go index ffc49431d21..7d0c0c69082 100644 --- a/internal/cli/board/list.go +++ b/internal/cli/board/list.go @@ -91,10 +91,12 @@ func runListCommand(ctx context.Context, srv rpc.ArduinoCoreServiceServer, watch func watchList(ctx context.Context, inst *rpc.Instance, srv rpc.ArduinoCoreServiceServer) { stream, eventsChan := commands.BoardListWatchProxyToChan(ctx) - err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: inst}, stream) - if err != nil { - feedback.Fatal(i18n.Tr("Error detecting boards: %v", err), feedback.ErrNetwork) - } + go func() { + err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: inst}, stream) + if err != nil { + feedback.Fatal(i18n.Tr("Error detecting boards: %v", err), feedback.ErrNetwork) + } + }() // This is done to avoid printing the header each time a new event is received if feedback.GetFormat() == feedback.Text { diff --git a/internal/integrationtest/daemon/daemon_test.go b/internal/integrationtest/daemon/daemon_test.go index 366fec20497..742e7774316 100644 --- a/internal/integrationtest/daemon/daemon_test.go +++ b/internal/integrationtest/daemon/daemon_test.go @@ -91,6 +91,24 @@ func TestArduinoCliDaemon(t *testing.T) { testWatcher() testWatcher() + + { + // Test that the watcher stays open until the grpc call is canceled + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + + start := time.Now() + watcher, err := grpcInst.BoardListWatch(ctx) + require.NoError(t, err) + for { + _, err := watcher.Recv() + if err != nil { + break + } + } + require.Greater(t, time.Since(start), 2*time.Second) + } } func TestDaemonAutoUpdateIndexOnFirstInit(t *testing.T) {