diff --git a/commands/daemon/daemon.go b/commands/daemon/daemon.go index 66efc357d22..f9f6215973c 100644 --- a/commands/daemon/daemon.go +++ b/commands/daemon/daemon.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "sync/atomic" "github.com/arduino/arduino-cli/commands" "github.com/arduino/arduino-cli/commands/board" @@ -490,6 +491,10 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer _ = syncSend.Send(&rpc.MonitorResponse{Success: true}) cancelCtx, cancel := context.WithCancel(stream.Context()) + gracefulCloseInitiated := &atomic.Bool{} + gracefuleCloseCtx, gracefulCloseCancel := context.WithCancel(context.Background()) + + // gRPC stream receiver (gRPC data -> monitor, config, close) go func() { defer cancel() for { @@ -509,9 +514,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer } } if closeMsg := msg.GetClose(); closeMsg { + gracefulCloseInitiated.Store(true) if err := portProxy.Close(); err != nil { logrus.WithError(err).Debug("Error closing monitor port") } + gracefulCloseCancel() } tx := msg.GetTxData() for len(tx) > 0 { @@ -528,8 +535,9 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer } }() + // gRPC stream sender (monitor -> gRPC) go func() { - defer cancel() + defer cancel() // unlock the receiver buff := make([]byte, 4096) for { n, err := portProxy.Read(buff) @@ -547,6 +555,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer }() <-cancelCtx.Done() - portProxy.Close() + if gracefulCloseInitiated.Load() { + // Port closing has been initiated in the receiver + <-gracefuleCloseCtx.Done() + } else { + portProxy.Close() + } return nil } diff --git a/internal/integrationtest/monitor/monitor_grpc_test.go b/internal/integrationtest/monitor/monitor_grpc_test.go index a7a681ce28a..c96dc36d304 100644 --- a/internal/integrationtest/monitor/monitor_grpc_test.go +++ b/internal/integrationtest/monitor/monitor_grpc_test.go @@ -82,7 +82,7 @@ func TestMonitorGRPCClose(t *testing.T) { } // Now close the monitor using MonitorRequest_Close - for tries := 0; tries < 5; tries++ { // Try the test 5 times to avoid flukes + { // Keep a timeout to allow the test to exit in any case ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) mon, err := grpcInst.Monitor(ctx, ports[0].Port) diff --git a/internal/mock_serial_monitor/main.go b/internal/mock_serial_monitor/main.go index c7779ca0e00..f13745e1458 100644 --- a/internal/mock_serial_monitor/main.go +++ b/internal/mock_serial_monitor/main.go @@ -197,7 +197,7 @@ func (d *SerialMonitor) Close() error { d.mockedSerialPort.Close() d.openedPort = false if d.muxFile != nil { - time.Sleep(500 * time.Millisecond) // Emulate a small delay closing the port to check gRPC synchronization + time.Sleep(2000 * time.Millisecond) // Emulate a small delay closing the port to check gRPC synchronization d.muxFile.Remove() d.muxFile = nil }