Skip to content

Commit

Permalink
grpc: fixed BoardListWatch streaming call (#2664)
Browse files Browse the repository at this point in the history
* grpc: fixed BoardListWatch streaming call

* Added integration test

* Fixed also existing calls to BoardListWatch
  • Loading branch information
cmaglie authored Jul 12, 2024
1 parent 625aaac commit b4f8849
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 12 deletions.
1 change: 1 addition & 0 deletions commands/service_board_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,6 @@ func (s *arduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, s
}
}()

<-stream.Context().Done()
return nil
}
13 changes: 5 additions & 8 deletions internal/cli/arguments/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,14 @@ func (p *Port) GetPort(ctx context.Context, instance *rpc.Instance, srv rpc.Ardu
}
logrus.WithField("port", address).Tracef("Upload port")

ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, p.timeout.Get())
defer cancel()

stream, watcher := commands.BoardListWatchProxyToChan(ctx)
err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: instance}, stream)
go func() {
_ = srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: instance}, stream)
}()

if err != nil {
return nil, err
}

deadline := time.After(p.timeout.Get())
for {
select {
case portEvent := <-watcher:
Expand All @@ -111,7 +108,7 @@ func (p *Port) GetPort(ctx context.Context, instance *rpc.Instance, srv rpc.Ardu
return port, nil
}

case <-deadline:
case <-ctx.Done():
// No matching port found
if protocol == "" {
return &rpc.Port{
Expand Down
10 changes: 6 additions & 4 deletions internal/cli/board/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ func runListCommand(ctx context.Context, srv rpc.ArduinoCoreServiceServer, watch

func watchList(ctx context.Context, inst *rpc.Instance, srv rpc.ArduinoCoreServiceServer) {
stream, eventsChan := commands.BoardListWatchProxyToChan(ctx)
err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: inst}, stream)
if err != nil {
feedback.Fatal(i18n.Tr("Error detecting boards: %v", err), feedback.ErrNetwork)
}
go func() {
err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: inst}, stream)
if err != nil {
feedback.Fatal(i18n.Tr("Error detecting boards: %v", err), feedback.ErrNetwork)
}
}()

// This is done to avoid printing the header each time a new event is received
if feedback.GetFormat() == feedback.Text {
Expand Down
18 changes: 18 additions & 0 deletions internal/integrationtest/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,24 @@ func TestArduinoCliDaemon(t *testing.T) {

testWatcher()
testWatcher()

{
// Test that the watcher stays open until the grpc call is canceled

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()

start := time.Now()
watcher, err := grpcInst.BoardListWatch(ctx)
require.NoError(t, err)
for {
_, err := watcher.Recv()
if err != nil {
break
}
}
require.Greater(t, time.Since(start), 2*time.Second)
}
}

func TestDaemonAutoUpdateIndexOnFirstInit(t *testing.T) {
Expand Down

0 comments on commit b4f8849

Please sign in to comment.