Skip to content

Commit

Permalink
Inlining methods in ArduinoCoreServiceImpl (part 4: BoardListWatch)
Browse files Browse the repository at this point in the history
The BoardListWatch RPC call has been converted into a method of the gRPC
server implementation.

This commit boasts an helper method to convert a gRPC streaming response
into a channel.
  • Loading branch information
cmaglie committed Mar 14, 2024
1 parent e35c740 commit 6093927
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 67 deletions.
78 changes: 78 additions & 0 deletions commands/grpc_streaming_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// This file is part of arduino-cli.
//
// Copyright 2024 ARDUINO SA (http://www.arduino.cc/)
//
// This software is released under the GNU General Public License version 3,
// which covers the main part of arduino-cli.
// The terms of this license can be found at:
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// You can be released from the requirements of the above licenses by purchasing
// a commercial license. Buying such a license is mandatory if you want to
// modify or otherwise use the software for commercial activities involving the
// Arduino software without disclosing the source code of your own applications.
// To purchase a commercial license, send an email to [email protected].

package commands

import (
"context"
"errors"
"sync"

"google.golang.org/grpc/metadata"
)

type streamingResponseProxyToChan[T any] struct {
ctx context.Context
respChan chan<- *T
respLock sync.Mutex
}

func streamResponseToChan[T any](ctx context.Context) (*streamingResponseProxyToChan[T], <-chan *T) {
respChan := make(chan *T, 1)
w := &streamingResponseProxyToChan[T]{
ctx: ctx,
respChan: respChan,
}
go func() {
<-ctx.Done()
w.respLock.Lock()
close(w.respChan)
w.respChan = nil
w.respLock.Unlock()
}()
return w, respChan
}

func (w *streamingResponseProxyToChan[T]) Send(resp *T) error {
w.respLock.Lock()
if w.respChan != nil {
w.respChan <- resp
}
w.respLock.Unlock()
return nil
}

func (w *streamingResponseProxyToChan[T]) Context() context.Context {
return w.ctx
}

func (w *streamingResponseProxyToChan[T]) RecvMsg(m any) error {
return errors.New("RecvMsg not implemented")
}

func (w *streamingResponseProxyToChan[T]) SendHeader(metadata.MD) error {
return errors.New("SendHeader not implemented")
}

func (w *streamingResponseProxyToChan[T]) SendMsg(m any) error {
return errors.New("SendMsg not implemented")
}

func (w *streamingResponseProxyToChan[T]) SetHeader(metadata.MD) error {
return errors.New("SetHeader not implemented")
}

func (w *streamingResponseProxyToChan[T]) SetTrailer(tr metadata.MD) {
}
32 changes: 0 additions & 32 deletions commands/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package commands
import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"

Expand All @@ -42,37 +41,6 @@ type arduinoCoreServerImpl struct {
versionString string
}

// BoardSearch exposes to the gRPC interface the board search command
func (s *arduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardSearchRequest) (*rpc.BoardSearchResponse, error) {
return BoardSearch(ctx, req)
}

// BoardListWatch FIXMEDOC
func (s *arduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, stream rpc.ArduinoCoreService_BoardListWatchServer) error {
syncSend := NewSynchronizedSend(stream.Send)
if req.GetInstance() == nil {
err := fmt.Errorf(tr("no instance specified"))
syncSend.Send(&rpc.BoardListWatchResponse{
EventType: "error",
Error: err.Error(),
})
return err
}

eventsChan, err := BoardListWatch(stream.Context(), req)
if err != nil {
return err
}

for event := range eventsChan {
if err := syncSend.Send(event); err != nil {
logrus.Infof("sending board watch message: %v", err)
}
}

return nil
}

// Destroy FIXMEDOC
func (s *arduinoCoreServerImpl) Destroy(ctx context.Context, req *rpc.DestroyRequest) (*rpc.DestroyResponse, error) {
return Destroy(ctx, req)
Expand Down
34 changes: 24 additions & 10 deletions commands/service_board_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,29 +262,43 @@ func hasMatchingBoard(b *rpc.DetectedPort, fqbnFilter *cores.FQBN) bool {
return false
}

// BoardListWatch returns a channel that receives boards connection and disconnection events.
func BoardListWatch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, error) {
// BoardListWatchProxyToChan return a stream, to be used in BoardListWatch method,
// that proxies all the responses to a channel.
func BoardListWatchProxyToChan(ctx context.Context) (rpc.ArduinoCoreService_BoardListWatchServer, <-chan *rpc.BoardListWatchResponse) {
return streamResponseToChan[rpc.BoardListWatchResponse](ctx)
}

// BoardListWatch FIXMEDOC
func (s *arduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, stream rpc.ArduinoCoreService_BoardListWatchServer) error {
syncSend := NewSynchronizedSend(stream.Send)
if req.GetInstance() == nil {
err := fmt.Errorf(tr("no instance specified"))
syncSend.Send(&rpc.BoardListWatchResponse{
EventType: "error",
Error: err.Error(),
})
return err
}

pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
if err != nil {
return nil, err
return err
}
defer release()
dm := pme.DiscoveryManager()

watcher, err := dm.Watch()
if err != nil {
return nil, err
return err
}

go func() {
<-ctx.Done()
<-stream.Context().Done()
logrus.Trace("closed watch")
watcher.Close()
}()

outChan := make(chan *rpc.BoardListWatchResponse)
go func() {
defer close(outChan)
for event := range watcher.Feed() {
port := &rpc.DetectedPort{
Port: rpc.DiscoveryPortToRPC(event.Port),
Expand All @@ -298,13 +312,13 @@ func BoardListWatch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan
}
port.MatchingBoards = boards
}
outChan <- &rpc.BoardListWatchResponse{
stream.Send(&rpc.BoardListWatchResponse{
EventType: event.Type,
Port: port,
Error: boardsError,
}
})
}
}()

return outChan, nil
return nil
}
2 changes: 1 addition & 1 deletion commands/service_board_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// Boards are searched in all platforms, including those in the index that are not yet
// installed. Note that platforms that are not installed don't include boards' FQBNs.
// If no search argument is used all boards are returned.
func BoardSearch(ctx context.Context, req *rpc.BoardSearchRequest) (*rpc.BoardSearchResponse, error) {
func (s *arduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardSearchRequest) (*rpc.BoardSearchResponse, error) {
pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion internal/cli/arguments/fqbn.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func CalculateFQBNAndPort(portArgs *Port, fqbnArg *Fqbn, instance *rpc.Instance,
return fqbn, port
}

port, err := portArgs.GetPort(instance, defaultAddress, defaultProtocol)
port, err := portArgs.GetPort(instance, srv, defaultAddress, defaultProtocol)
if err != nil {
feedback.Fatal(tr("Error getting port metadata: %v", err), feedback.ErrGeneric)
}
Expand Down
12 changes: 7 additions & 5 deletions internal/cli/arguments/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func (p *Port) AddToCommand(cmd *cobra.Command, srv rpc.ArduinoCoreServiceServer
// This method allows will bypass the discoveries if:
// - a nil instance is passed: in this case the plain port and protocol arguments are returned (even if empty)
// - a protocol is specified: in this case the discoveries are not needed to autodetect the protocol.
func (p *Port) GetPortAddressAndProtocol(instance *rpc.Instance, defaultAddress, defaultProtocol string) (string, string, error) {
func (p *Port) GetPortAddressAndProtocol(instance *rpc.Instance, srv rpc.ArduinoCoreServiceServer, defaultAddress, defaultProtocol string) (string, string, error) {
if p.protocol != "" || instance == nil {
return p.address, p.protocol, nil
}

port, err := p.GetPort(instance, defaultAddress, defaultProtocol)
port, err := p.GetPort(instance, srv, defaultAddress, defaultProtocol)
if err != nil {
return "", "", err
}
Expand All @@ -70,8 +70,7 @@ func (p *Port) GetPortAddressAndProtocol(instance *rpc.Instance, defaultAddress,

// GetPort returns the Port obtained by parsing command line arguments.
// The extra metadata for the ports is obtained using the pluggable discoveries.
func (p *Port) GetPort(instance *rpc.Instance, defaultAddress, defaultProtocol string) (*rpc.Port, error) {

func (p *Port) GetPort(instance *rpc.Instance, srv rpc.ArduinoCoreServiceServer, defaultAddress, defaultProtocol string) (*rpc.Port, error) {
address := p.address
protocol := p.protocol
if address == "" && (defaultAddress != "" || defaultProtocol != "") {
Expand All @@ -91,7 +90,10 @@ func (p *Port) GetPort(instance *rpc.Instance, defaultAddress, defaultProtocol s

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watcher, err := commands.BoardListWatch(ctx, &rpc.BoardListWatchRequest{Instance: instance})

stream, watcher := commands.BoardListWatchProxyToChan(ctx)
err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: instance}, stream)

if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/cli/board/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func initAttachCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
if len(args) > 0 {
sketchPath = args[0]
}
runAttachCommand(sketchPath, &port, fqbn.String(), &programmer)
runAttachCommand(srv, sketchPath, &port, fqbn.String(), &programmer)
},
}
fqbn.AddToCommand(attachCommand, srv)
Expand All @@ -55,10 +55,10 @@ func initAttachCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
return attachCommand
}

func runAttachCommand(path string, port *arguments.Port, fqbn string, programmer *arguments.Programmer) {
func runAttachCommand(srv rpc.ArduinoCoreServiceServer, path string, port *arguments.Port, fqbn string, programmer *arguments.Programmer) {
sketchPath := arguments.InitSketchPath(path)

portAddress, portProtocol, _ := port.GetPortAddressAndProtocol(nil, "", "")
portAddress, portProtocol, _ := port.GetPortAddressAndProtocol(nil, srv, "", "")
newDefaults, err := commands.SetSketchDefaults(context.Background(), &rpc.SetSketchDefaultsRequest{
SketchPath: sketchPath.String(),
DefaultFqbn: fqbn,
Expand Down
2 changes: 1 addition & 1 deletion internal/cli/board/board.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
boardCommand.AddCommand(initDetailsCommand(srv))
boardCommand.AddCommand(initListCommand(srv))
boardCommand.AddCommand(initListAllCommand(srv))
boardCommand.AddCommand(initSearchCommand())
boardCommand.AddCommand(initSearchCommand(srv))

return boardCommand
}
7 changes: 4 additions & 3 deletions internal/cli/board/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func runListCommand(srv rpc.ArduinoCoreServiceServer, watch bool, timeout int64,
logrus.Info("Executing `arduino-cli board list`")

if watch {
watchList(inst)
watchList(inst, srv)
return
}

Expand All @@ -88,8 +88,9 @@ func runListCommand(srv rpc.ArduinoCoreServiceServer, watch bool, timeout int64,
feedback.PrintResult(listResult{result.NewDetectedPorts(ports)})
}

func watchList(inst *rpc.Instance) {
eventsChan, err := commands.BoardListWatch(context.Background(), &rpc.BoardListWatchRequest{Instance: inst})
func watchList(inst *rpc.Instance, srv rpc.ArduinoCoreServiceServer) {
stream, eventsChan := commands.BoardListWatchProxyToChan(context.Background())
err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: inst}, stream)
if err != nil {
feedback.Fatal(tr("Error detecting boards: %v", err), feedback.ErrNetwork)
}
Expand Down
11 changes: 6 additions & 5 deletions internal/cli/board/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sort"
"strings"

"github.com/arduino/arduino-cli/commands"
"github.com/arduino/arduino-cli/internal/cli/feedback"
"github.com/arduino/arduino-cli/internal/cli/feedback/result"
"github.com/arduino/arduino-cli/internal/cli/feedback/table"
Expand All @@ -32,7 +31,7 @@ import (
"github.com/spf13/cobra"
)

func initSearchCommand() *cobra.Command {
func initSearchCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
var searchCommand = &cobra.Command{
Use: fmt.Sprintf("search [%s]", tr("boardname")),
Short: tr("Search for a board in the Boards Manager."),
Expand All @@ -41,18 +40,20 @@ func initSearchCommand() *cobra.Command {
" " + os.Args[0] + " board search\n" +
" " + os.Args[0] + " board search zero",
Args: cobra.ArbitraryArgs,
Run: runSearchCommand,
Run: func(cmd *cobra.Command, args []string) {
runSearchCommand(srv, args)
},
}
searchCommand.Flags().BoolVarP(&showHiddenBoard, "show-hidden", "a", false, tr("Show also boards marked as 'hidden' in the platform"))
return searchCommand
}

func runSearchCommand(cmd *cobra.Command, args []string) {
func runSearchCommand(srv rpc.ArduinoCoreServiceServer, args []string) {
inst := instance.CreateAndInit()

logrus.Info("Executing `arduino-cli board search`")

res, err := commands.BoardSearch(context.Background(), &rpc.BoardSearchRequest{
res, err := srv.BoardSearch(context.Background(), &rpc.BoardSearchRequest{
Instance: inst,
SearchArgs: strings.Join(args, " "),
IncludeHiddenBoards: showHiddenBoard,
Expand Down
2 changes: 1 addition & 1 deletion internal/cli/burnbootloader/burnbootloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func runBootloaderCommand(srv rpc.ArduinoCoreServiceServer) {
logrus.Info("Executing `arduino-cli burn-bootloader`")

// We don't need a Sketch to upload a board's bootloader
discoveryPort, err := port.GetPort(instance, "", "")
discoveryPort, err := port.GetPort(instance, srv, "", "")
if err != nil {
feedback.Fatal(tr("Error during Upload: %v", err), feedback.ErrGeneric)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/cli/debug/debug_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func runDebugCheckCommand(srv rpc.ArduinoCoreServiceServer, portArgs *arguments.
instance := instance.CreateAndInit()
logrus.Info("Executing `arduino-cli debug`")

port, err := portArgs.GetPort(instance, "", "")
port, err := portArgs.GetPort(instance, srv, "", "")
if err != nil {
feedback.FatalError(err, feedback.ErrBadArgument)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/cli/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func runMonitorCmd(
fqbn, _ = portArgs.DetectFQBN(inst, srv)
}

portAddress, portProtocol, err := portArgs.GetPortAddressAndProtocol(inst, defaultPort, defaultProtocol)
portAddress, portProtocol, err := portArgs.GetPortAddressAndProtocol(inst, srv, defaultPort, defaultProtocol)
if err != nil {
feedback.FatalError(err, feedback.ErrGeneric)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/integrationtest/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ func TestArduinoCliDaemon(t *testing.T) {
require.NoError(t, err)
watcherCanceldCh := make(chan struct{})
go func() {
defer close(watcherCanceldCh)
for {
msg, err := watcher.Recv()
if errors.Is(err, io.EOF) {
fmt.Println("Watcher EOF")
fmt.Println("Got EOF from watcher")
return
}
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
fmt.Println("Watcher canceled")
watcherCanceldCh <- struct{}{}
fmt.Println("Got Canceled error from watcher")
return
}
require.NoError(t, err, "BoardListWatch grpc call returned an error")
Expand Down

0 comments on commit 6093927

Please sign in to comment.