Skip to content

Commit

Permalink
Inlining methods in ArduinoCoreServiceImpl (part 8: Monitor)
Browse files Browse the repository at this point in the history
This change is quite challenging because it implements a bidirectional
streaming service. The gRPC implementation is slightly simpler, BTW the
command-line requires a bit of streams fiddling to get the same
behaviour as before because now:

 * the Monitor call do not return anymore a clean io.ReadWriteCloser.
 * the call to srv.Monitor is blocking until the port is closed or the
   context is canceled.
  • Loading branch information
cmaglie committed Mar 15, 2024
1 parent 5712969 commit c048f80
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 168 deletions.
101 changes: 0 additions & 101 deletions commands/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@ package commands

import (
"context"
"errors"
"io"
"sync/atomic"

"github.com/arduino/arduino-cli/commands/cmderrors"
rpc "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
"github.com/sirupsen/logrus"
)

// NewArduinoCoreServer returns an implementation of the ArduinoCoreService gRPC service
Expand Down Expand Up @@ -343,99 +338,3 @@ func (s *arduinoCoreServerImpl) GitLibraryInstall(req *rpc.GitLibraryInstallRequ
func (s *arduinoCoreServerImpl) EnumerateMonitorPortSettings(ctx context.Context, req *rpc.EnumerateMonitorPortSettingsRequest) (*rpc.EnumerateMonitorPortSettingsResponse, error) {
return EnumerateMonitorPortSettings(ctx, req)
}

// Monitor FIXMEDOC
func (s *arduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorServer) error {
syncSend := NewSynchronizedSend(stream.Send)

// The configuration must be sent on the first message
req, err := stream.Recv()
if err != nil {
return err
}

openReq := req.GetOpenRequest()
if openReq == nil {
return &cmderrors.InvalidInstanceError{}
}
portProxy, _, err := Monitor(stream.Context(), openReq)
if err != nil {
return err
}

// Send a message with Success set to true to notify the caller of the port being now active
_ = syncSend.Send(&rpc.MonitorResponse{Success: true})

cancelCtx, cancel := context.WithCancel(stream.Context())
gracefulCloseInitiated := &atomic.Bool{}
gracefuleCloseCtx, gracefulCloseCancel := context.WithCancel(context.Background())

// gRPC stream receiver (gRPC data -> monitor, config, close)
go func() {
defer cancel()
for {
msg, err := stream.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
if conf := msg.GetUpdatedConfiguration(); conf != nil {
for _, c := range conf.GetSettings() {
if err := portProxy.Config(c.GetSettingId(), c.GetValue()); err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
}
}
}
if closeMsg := msg.GetClose(); closeMsg {
gracefulCloseInitiated.Store(true)
if err := portProxy.Close(); err != nil {
logrus.WithError(err).Debug("Error closing monitor port")
}
gracefulCloseCancel()
}
tx := msg.GetTxData()
for len(tx) > 0 {
n, err := portProxy.Write(tx)
if errors.Is(err, io.EOF) {
return
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
tx = tx[n:]
}
}
}()

// gRPC stream sender (monitor -> gRPC)
go func() {
defer cancel() // unlock the receiver
buff := make([]byte, 4096)
for {
n, err := portProxy.Read(buff)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
break
}
if err := syncSend.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
break
}
}
}()

<-cancelCtx.Done()
if gracefulCloseInitiated.Load() {
// Port closing has been initiated in the receiver
<-gracefuleCloseCtx.Done()
} else {
portProxy.Close()
}
return nil
}
230 changes: 178 additions & 52 deletions commands/service_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package commands

import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/arduino/arduino-cli/commands/cmderrors"
"github.com/arduino/arduino-cli/commands/internal/instances"
Expand All @@ -27,87 +29,211 @@ import (
pluggableMonitor "github.com/arduino/arduino-cli/internal/arduino/monitor"
rpc "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
"github.com/arduino/go-properties-orderedmap"
"github.com/djherbis/buffer"
"github.com/djherbis/nio/v3"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
)

// portProxy is an io.ReadWriteCloser that maps into the monitor port of the board
type portProxy struct {
rw io.ReadWriter
changeSettingsCB func(setting, value string) error
closeCB func() error
type monitorPipeServer struct {
ctx context.Context
req atomic.Pointer[rpc.MonitorPortOpenRequest]
in *nio.PipeReader
out *nio.PipeWriter
}

func (p *portProxy) Read(buff []byte) (int, error) {
return p.rw.Read(buff)
func (s *monitorPipeServer) Send(resp *rpc.MonitorResponse) error {
if len(resp.GetRxData()) > 0 {
if _, err := s.out.Write(resp.GetRxData()); err != nil {
return err
}
}
return nil
}

func (s *monitorPipeServer) Recv() (r *rpc.MonitorRequest, e error) {
if conf := s.req.Swap(nil); conf != nil {
return &rpc.MonitorRequest{Message: &rpc.MonitorRequest_OpenRequest{OpenRequest: conf}}, nil
}
buff := make([]byte, 4096)
n, err := s.in.Read(buff)
if err != nil {
return nil, err
}
return &rpc.MonitorRequest{Message: &rpc.MonitorRequest_TxData{TxData: buff[:n]}}, nil
}

func (p *portProxy) Write(buff []byte) (int, error) {
return p.rw.Write(buff)
func (s *monitorPipeServer) Context() context.Context {
return s.ctx
}

// Config sets the port configuration setting to the specified value
func (p *portProxy) Config(setting, value string) error {
return p.changeSettingsCB(setting, value)
func (s *monitorPipeServer) RecvMsg(m any) error { return nil }
func (s *monitorPipeServer) SendHeader(metadata.MD) error { return nil }
func (s *monitorPipeServer) SendMsg(m any) error { return nil }
func (s *monitorPipeServer) SetHeader(metadata.MD) error { return nil }
func (s *monitorPipeServer) SetTrailer(metadata.MD) {}

type monitorPipeClient struct {
in *nio.PipeReader
out *nio.PipeWriter
close func()
}

// Close the port
func (p *portProxy) Close() error {
return p.closeCB()
func (s *monitorPipeClient) Read(buff []byte) (n int, err error) {
return s.in.Read(buff)
}

// Monitor opens a communication port. It returns a PortProxy to communicate with the port and a PortDescriptor
// that describes the available configuration settings.
func Monitor(ctx context.Context, req *rpc.MonitorPortOpenRequest) (*portProxy, *pluggableMonitor.PortDescriptor, error) {
pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
if err != nil {
return nil, nil, err
}
defer release()
func (s *monitorPipeClient) Write(buff []byte) (n int, err error) {
return s.out.Write(buff)
}

m, boardSettings, err := findMonitorAndSettingsForProtocolAndBoard(pme, req.GetPort().GetProtocol(), req.GetFqbn())
func (s *monitorPipeClient) Close() error {
s.in.Close()
s.out.Close()
s.close()
return nil
}

// MonitorServerToReadWriteCloser creates a monitor server that proxies the data to a ReadWriteCloser.
// The server is returned along with the ReadWriteCloser that can be used to send and receive data
// to the server. The MonitorPortOpenRequest is used to configure the monitor.
func MonitorServerToReadWriteCloser(ctx context.Context, req *rpc.MonitorPortOpenRequest) (rpc.ArduinoCoreService_MonitorServer, io.ReadWriteCloser) {
server := &monitorPipeServer{}
client := &monitorPipeClient{}
server.req.Store(req)
server.ctx, client.close = context.WithCancel(ctx)
client.in, server.out = nio.Pipe(buffer.New(32 * 1024))
server.in, client.out = nio.Pipe(buffer.New(32 * 1024))
return server, client
}

// Monitor opens a port monitor and streams data back and forth until the request is kept alive.
func (s *arduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorServer) error {
// The configuration must be sent on the first message
req, err := stream.Recv()
if err != nil {
return nil, nil, err
return err
}

if err := m.Run(); err != nil {
return nil, nil, &cmderrors.FailedMonitorError{Cause: err}
openReq := req.GetOpenRequest()
if openReq == nil {
return &cmderrors.InvalidInstanceError{}
}

descriptor, err := m.Describe()
pme, release, err := instances.GetPackageManagerExplorer(openReq.GetInstance())
if err != nil {
m.Quit()
return nil, nil, &cmderrors.FailedMonitorError{Cause: err}
return err
}

// Apply user-requested settings
if portConfig := req.GetPortConfiguration(); portConfig != nil {
defer release()
monitor, boardSettings, err := findMonitorAndSettingsForProtocolAndBoard(pme, openReq.GetPort().GetProtocol(), openReq.GetFqbn())
if err != nil {
return err
}
if err := monitor.Run(); err != nil {
return &cmderrors.FailedMonitorError{Cause: err}
}
if _, err := monitor.Describe(); err != nil {
monitor.Quit()
return &cmderrors.FailedMonitorError{Cause: err}
}
if portConfig := openReq.GetPortConfiguration(); portConfig != nil {
for _, setting := range portConfig.GetSettings() {
boardSettings.Remove(setting.GetSettingId()) // Remove board settings overridden by the user
if err := m.Configure(setting.GetSettingId(), setting.GetValue()); err != nil {
boardSettings.Remove(setting.GetSettingId())
if err := monitor.Configure(setting.GetSettingId(), setting.GetValue()); err != nil {
logrus.Errorf("Could not set configuration %s=%s: %s", setting.GetSettingId(), setting.GetValue(), err)
}
}
}
// Apply specific board settings
for setting, value := range boardSettings.AsMap() {
m.Configure(setting, value)
monitor.Configure(setting, value)
}

monIO, err := m.Open(req.GetPort().GetAddress(), req.GetPort().GetProtocol())
monitorIO, err := monitor.Open(openReq.GetPort().GetAddress(), openReq.GetPort().GetProtocol())
if err != nil {
m.Quit()
return nil, nil, &cmderrors.FailedMonitorError{Cause: err}
}

logrus.Infof("Port %s successfully opened", req.GetPort().GetAddress())
return &portProxy{
rw: monIO,
changeSettingsCB: m.Configure,
closeCB: func() error {
m.Close()
return m.Quit()
},
}, descriptor, nil
monitor.Quit()
return &cmderrors.FailedMonitorError{Cause: err}
}
logrus.Infof("Port %s successfully opened", openReq.GetPort().GetAddress())
monitorClose := func() error {
monitor.Close()
return monitor.Quit()
}

// Send a message with Success set to true to notify the caller of the port being now active
syncSend := NewSynchronizedSend(stream.Send)
_ = syncSend.Send(&rpc.MonitorResponse{Success: true})

ctx, cancel := context.WithCancel(stream.Context())
gracefulCloseInitiated := &atomic.Bool{}
gracefuleCloseCtx, gracefulCloseCancel := context.WithCancel(context.Background())

// gRPC stream receiver (gRPC data -> monitor, config, close)
go func() {
defer cancel()
for {
msg, err := stream.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
if conf := msg.GetUpdatedConfiguration(); conf != nil {
for _, c := range conf.GetSettings() {
if err := monitor.Configure(c.GetSettingId(), c.GetValue()); err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
}
}
}
if closeMsg := msg.GetClose(); closeMsg {
gracefulCloseInitiated.Store(true)
if err := monitorClose(); err != nil {
logrus.WithError(err).Debug("Error closing monitor port")
}
gracefulCloseCancel()
}
tx := msg.GetTxData()
for len(tx) > 0 {
n, err := monitorIO.Write(tx)
if errors.Is(err, io.EOF) {
return
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
tx = tx[n:]
}
}
}()

// gRPC stream sender (monitor -> gRPC)
go func() {
defer cancel() // unlock the receiver
buff := make([]byte, 4096)
for {
n, err := monitorIO.Read(buff)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
break
}
if err := syncSend.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
break
}
}
}()

<-ctx.Done()
if gracefulCloseInitiated.Load() {
// Port closing has been initiated in the receiver
<-gracefuleCloseCtx.Done()
} else {
monitorClose()
}
return nil
}

func findMonitorAndSettingsForProtocolAndBoard(pme *packagemanager.Explorer, protocol, fqbn string) (*pluggableMonitor.PluggableMonitor, *properties.Map, error) {
Expand Down
Loading

0 comments on commit c048f80

Please sign in to comment.