diff --git a/commands/service.go b/commands/service.go index 21baf0b12fc..bbf4c0e4582 100644 --- a/commands/service.go +++ b/commands/service.go @@ -40,89 +40,11 @@ func (s *arduinoCoreServerImpl) Version(ctx context.Context, req *rpc.VersionReq return &rpc.VersionResponse{Version: s.versionString}, nil } -// Upload FIXMEDOC -func (s *arduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error { - syncSend := NewSynchronizedSend(stream.Send) - outStream := feedStreamTo(func(data []byte) { - syncSend.Send(&rpc.UploadResponse{ - Message: &rpc.UploadResponse_OutStream{OutStream: data}, - }) - }) - errStream := feedStreamTo(func(data []byte) { - syncSend.Send(&rpc.UploadResponse{ - Message: &rpc.UploadResponse_ErrStream{ErrStream: data}, - }) - }) - res, err := Upload(stream.Context(), req, outStream, errStream) - outStream.Close() - errStream.Close() - if res != nil { - syncSend.Send(&rpc.UploadResponse{ - Message: &rpc.UploadResponse_Result{ - Result: res, - }, - }) - } - return err -} - -// UploadUsingProgrammer FIXMEDOC -func (s *arduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error { - syncSend := NewSynchronizedSend(stream.Send) - outStream := feedStreamTo(func(data []byte) { - syncSend.Send(&rpc.UploadUsingProgrammerResponse{ - Message: &rpc.UploadUsingProgrammerResponse_OutStream{ - OutStream: data, - }, - }) - }) - errStream := feedStreamTo(func(data []byte) { - syncSend.Send(&rpc.UploadUsingProgrammerResponse{ - Message: &rpc.UploadUsingProgrammerResponse_ErrStream{ - ErrStream: data, - }, - }) - }) - err := UploadUsingProgrammer(stream.Context(), req, outStream, errStream) - outStream.Close() - errStream.Close() - if err != nil { - return err - } - return nil -} - // SupportedUserFields FIXMEDOC func (s *arduinoCoreServerImpl) SupportedUserFields(ctx context.Context, req *rpc.SupportedUserFieldsRequest) (*rpc.SupportedUserFieldsResponse, error) { return SupportedUserFields(ctx, req) } -// BurnBootloader FIXMEDOC -func (s *arduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error { - syncSend := NewSynchronizedSend(stream.Send) - outStream := feedStreamTo(func(data []byte) { - syncSend.Send(&rpc.BurnBootloaderResponse{ - Message: &rpc.BurnBootloaderResponse_OutStream{ - OutStream: data, - }, - }) - }) - errStream := feedStreamTo(func(data []byte) { - syncSend.Send(&rpc.BurnBootloaderResponse{ - Message: &rpc.BurnBootloaderResponse_ErrStream{ - ErrStream: data, - }, - }) - }) - resp, err := BurnBootloader(stream.Context(), req, outStream, errStream) - outStream.Close() - errStream.Close() - if err != nil { - return err - } - return syncSend.Send(resp) -} - // ListProgrammersAvailableForUpload FIXMEDOC func (s *arduinoCoreServerImpl) ListProgrammersAvailableForUpload(ctx context.Context, req *rpc.ListProgrammersAvailableForUploadRequest) (*rpc.ListProgrammersAvailableForUploadResponse, error) { return ListProgrammersAvailableForUpload(ctx, req) diff --git a/commands/service_upload.go b/commands/service_upload.go index a364f3f88f2..7990b8aaf79 100644 --- a/commands/service_upload.go +++ b/commands/service_upload.go @@ -120,8 +120,33 @@ func getUserFields(toolID string, platformRelease *cores.PlatformRelease) []*rpc return userFields } -// Upload FIXMEDOC -func Upload(ctx context.Context, req *rpc.UploadRequest, outStream io.Writer, errStream io.Writer) (*rpc.UploadResult, error) { +// UploadToServerStreams return a server stream that forwards the output and error streams to the provided writers. +// It also returns a function that can be used to retrieve the result of the upload. +func UploadToServerStreams(ctx context.Context, outStream io.Writer, errStream io.Writer) (rpc.ArduinoCoreService_UploadServer, func() *rpc.UploadResult) { + var result *rpc.UploadResult + stream := streamResponseToCallback(ctx, func(resp *rpc.UploadResponse) error { + if errData := resp.GetErrStream(); len(errData) > 0 { + _, err := errStream.Write(errData) + return err + } + if outData := resp.GetOutStream(); len(outData) > 0 { + _, err := outStream.Write(outData) + return err + } + if res := resp.GetResult(); res != nil { + result = res + } + return nil + }) + return stream, func() *rpc.UploadResult { + return result + } +} + +// Upload performs the upload of a sketch to a board. +func (s *arduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error { + syncSend := NewSynchronizedSend(stream.Send) + logrus.Tracef("Upload %s on %s started", req.GetSketchPath(), req.GetFqbn()) // TODO: make a generic function to extract sketch from request @@ -129,12 +154,12 @@ func Upload(ctx context.Context, req *rpc.UploadRequest, outStream io.Writer, er sketchPath := paths.New(req.GetSketchPath()) sk, err := sketch.New(sketchPath) if err != nil && req.GetImportDir() == "" && req.GetImportFile() == "" { - return nil, &cmderrors.CantOpenSketchError{Cause: err} + return &cmderrors.CantOpenSketchError{Cause: err} } pme, pmeRelease, err := instances.GetPackageManagerExplorer(req.GetInstance()) if err != nil { - return nil, err + return err } defer pmeRelease() @@ -151,6 +176,20 @@ func Upload(ctx context.Context, req *rpc.UploadRequest, outStream io.Writer, er programmer = sk.GetDefaultProgrammer() } + outStream := feedStreamTo(func(data []byte) { + syncSend.Send(&rpc.UploadResponse{ + Message: &rpc.UploadResponse_OutStream{OutStream: data}, + }) + }) + defer outStream.Close() + errStream := feedStreamTo(func(data []byte) { + syncSend.Send(&rpc.UploadResponse{ + Message: &rpc.UploadResponse_ErrStream{ErrStream: data}, + }) + }) + defer errStream.Close() + // TODO: inject context + // ctx := stream.Context() updatedPort, err := runProgramAction( pme, sk, @@ -168,22 +207,45 @@ func Upload(ctx context.Context, req *rpc.UploadRequest, outStream io.Writer, er req.GetUserFields(), ) if err != nil { - return nil, err + return err } - - return &rpc.UploadResult{ - UpdatedUploadPort: updatedPort, - }, nil + return syncSend.Send(&rpc.UploadResponse{ + Message: &rpc.UploadResponse_Result{ + Result: &rpc.UploadResult{ + UpdatedUploadPort: updatedPort, + }, + }, + }) } // UploadUsingProgrammer FIXMEDOC -func UploadUsingProgrammer(ctx context.Context, req *rpc.UploadUsingProgrammerRequest, outStream io.Writer, errStream io.Writer) error { +func (s *arduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error { + syncSend := NewSynchronizedSend(stream.Send) + streamAdapter := streamResponseToCallback(stream.Context(), func(resp *rpc.UploadResponse) error { + if errData := resp.GetErrStream(); len(errData) > 0 { + syncSend.Send(&rpc.UploadUsingProgrammerResponse{ + Message: &rpc.UploadUsingProgrammerResponse_ErrStream{ + ErrStream: errData, + }, + }) + } + if outData := resp.GetOutStream(); len(outData) > 0 { + syncSend.Send(&rpc.UploadUsingProgrammerResponse{ + Message: &rpc.UploadUsingProgrammerResponse_OutStream{ + OutStream: outData, + }, + }) + } + // resp.GetResult() is ignored + return nil + }) + logrus.Tracef("Upload using programmer %s on %s started", req.GetSketchPath(), req.GetFqbn()) if req.GetProgrammer() == "" { return &cmderrors.MissingProgrammerError{} } - _, err := Upload(ctx, &rpc.UploadRequest{ + return s.Upload(&rpc.UploadRequest{ Instance: req.GetInstance(), SketchPath: req.GetSketchPath(), ImportFile: req.GetImportFile(), @@ -194,8 +256,7 @@ func UploadUsingProgrammer(ctx context.Context, req *rpc.UploadUsingProgrammerRe Verbose: req.GetVerbose(), Verify: req.GetVerify(), UserFields: req.GetUserFields(), - }, outStream, errStream) - return err + }, streamAdapter) } func runProgramAction(pme *packagemanager.Explorer, diff --git a/commands/service_upload_burnbootloader.go b/commands/service_upload_burnbootloader.go index 7d9c34cb7cc..96352e57fd5 100644 --- a/commands/service_upload_burnbootloader.go +++ b/commands/service_upload_burnbootloader.go @@ -24,8 +24,42 @@ import ( "github.com/sirupsen/logrus" ) -// BurnBootloader FIXMEDOC -func BurnBootloader(ctx context.Context, req *rpc.BurnBootloaderRequest, outStream io.Writer, errStream io.Writer) (*rpc.BurnBootloaderResponse, error) { +// BurnBootloaderToServerStreams return a server stream that forwards the output and error streams to the provided io.Writers +func BurnBootloaderToServerStreams(ctx context.Context, outStrem, errStream io.Writer) rpc.ArduinoCoreService_BurnBootloaderServer { + stream := streamResponseToCallback(ctx, func(resp *rpc.BurnBootloaderResponse) error { + if outData := resp.GetOutStream(); len(outData) > 0 { + _, err := outStrem.Write(outData) + return err + } + if errData := resp.GetErrStream(); len(errData) > 0 { + _, err := errStream.Write(errData) + return err + } + return nil + }) + return stream +} + +// BurnBootloader performs the burn bootloader action +func (s *arduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error { + syncSend := NewSynchronizedSend(stream.Send) + outStream := feedStreamTo(func(data []byte) { + syncSend.Send(&rpc.BurnBootloaderResponse{ + Message: &rpc.BurnBootloaderResponse_OutStream{ + OutStream: data, + }, + }) + }) + defer outStream.Close() + errStream := feedStreamTo(func(data []byte) { + syncSend.Send(&rpc.BurnBootloaderResponse{ + Message: &rpc.BurnBootloaderResponse_ErrStream{ + ErrStream: data, + }, + }) + }) + defer errStream.Close() + logrus. WithField("fqbn", req.GetFqbn()). WithField("port", req.GetPort()). @@ -34,7 +68,7 @@ func BurnBootloader(ctx context.Context, req *rpc.BurnBootloaderRequest, outStre pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance()) if err != nil { - return nil, err + return err } defer release() @@ -54,7 +88,7 @@ func BurnBootloader(ctx context.Context, req *rpc.BurnBootloaderRequest, outStre req.GetDryRun(), map[string]string{}, // User fields ); err != nil { - return nil, err + return err } - return &rpc.BurnBootloaderResponse{}, nil + return syncSend.Send(&rpc.BurnBootloaderResponse{}) } diff --git a/internal/cli/burnbootloader/burnbootloader.go b/internal/cli/burnbootloader/burnbootloader.go index 21ea22dbf3f..c17a822b3f4 100644 --- a/internal/cli/burnbootloader/burnbootloader.go +++ b/internal/cli/burnbootloader/burnbootloader.go @@ -78,7 +78,8 @@ func runBootloaderCommand(srv rpc.ArduinoCoreServiceServer) { } stdOut, stdErr, res := feedback.OutputStreams() - if _, err := commands.BurnBootloader(context.Background(), &rpc.BurnBootloaderRequest{ + stream := commands.BurnBootloaderToServerStreams(ctx, stdOut, stdErr) + if err := srv.BurnBootloader(&rpc.BurnBootloaderRequest{ Instance: instance, Fqbn: fqbn.String(), Port: discoveryPort, @@ -86,7 +87,7 @@ func runBootloaderCommand(srv rpc.ArduinoCoreServiceServer) { Verify: verify, Programmer: programmer.String(instance, srv, fqbn.String()), DryRun: dryRun, - }, stdOut, stdErr); err != nil { + }, stream); err != nil { errcode := feedback.ErrGeneric if errors.Is(err, &cmderrors.ProgrammerRequiredForUploadError{}) { errcode = feedback.ErrMissingProgrammer diff --git a/internal/cli/compile/compile.go b/internal/cli/compile/compile.go index 6a2ea645bf0..8deba88efad 100644 --- a/internal/cli/compile/compile.go +++ b/internal/cli/compile/compile.go @@ -294,7 +294,8 @@ func runCompileCommand(cmd *cobra.Command, args []string, srv rpc.ArduinoCoreSer UserFields: fields, } - if res, err := commands.Upload(context.Background(), uploadRequest, stdOut, stdErr); err != nil { + stream, streamRes := commands.UploadToServerStreams(ctx, stdOut, stdErr) + if err := srv.Upload(uploadRequest, stream); err != nil { errcode := feedback.ErrGeneric if errors.Is(err, &cmderrors.ProgrammerRequiredForUploadError{}) { errcode = feedback.ErrMissingProgrammer @@ -304,7 +305,7 @@ func runCompileCommand(cmd *cobra.Command, args []string, srv rpc.ArduinoCoreSer } feedback.Fatal(tr("Error during Upload: %v", err), errcode) } else { - uploadRes = res + uploadRes = streamRes() } } diff --git a/internal/cli/upload/upload.go b/internal/cli/upload/upload.go index 751083aebd5..c7500adfe98 100644 --- a/internal/cli/upload/upload.go +++ b/internal/cli/upload/upload.go @@ -199,7 +199,8 @@ func runUploadCommand(srv rpc.ArduinoCoreServiceServer, args []string, uploadFie DryRun: dryRun, UserFields: fields, } - if res, err := commands.Upload(context.Background(), req, stdOut, stdErr); err != nil { + stream, streamResp := commands.UploadToServerStreams(ctx, stdOut, stdErr) + if err := srv.Upload(req, stream); err != nil { errcode := feedback.ErrGeneric if errors.Is(err, &cmderrors.ProgrammerRequiredForUploadError{}) { errcode = feedback.ErrMissingProgrammer @@ -213,7 +214,7 @@ func runUploadCommand(srv rpc.ArduinoCoreServiceServer, args []string, uploadFie feedback.PrintResult(&uploadResult{ Stdout: io.Stdout, Stderr: io.Stderr, - UpdatedUploadPort: result.NewPort(res.GetUpdatedUploadPort()), + UpdatedUploadPort: result.NewPort(streamResp().GetUpdatedUploadPort()), }) } }