diff --git a/internal/api/grpc/bes/BUILD.bazel b/internal/api/grpc/bes/BUILD.bazel index fb444be..26b7c57 100644 --- a/internal/api/grpc/bes/BUILD.bazel +++ b/internal/api/grpc/bes/BUILD.bazel @@ -16,6 +16,8 @@ go_library( "//pkg/summary", "//third_party/bazel/gen/bes", "@org_golang_google_genproto//googleapis/devtools/build/v1:build", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", "@org_golang_google_protobuf//encoding/protojson", "@org_golang_google_protobuf//types/known/emptypb", ], diff --git a/internal/api/grpc/bes/channel.go b/internal/api/grpc/bes/channel.go index d6ce2fa..c03f853 100644 --- a/internal/api/grpc/bes/channel.go +++ b/internal/api/grpc/bes/channel.go @@ -17,15 +17,25 @@ import ( ) // BuildEventChannel handles a single BuildEvent stream -type BuildEventChannel struct { +type BuildEventChannel interface { + // HandleBuildEvent processes a single BuildEvent + // This method should be called for each received event. + HandleBuildEvent(event *build.BuildEvent) error + + // Finalize does post-processing of a stream of BuildEvents. + // This method should be called after receiving the EOF event. + Finalize() error +} + +type buildEventChannel struct { ctx context.Context streamID *build.StreamId summarizer *summary.Summarizer workflow *processing.Workflow } -// HandleBuildEvent processes a single BuildEvent -func (c *BuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { +// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. +func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { if event.GetBazelEvent() == nil { return nil } @@ -44,8 +54,8 @@ func (c *BuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { return nil } -// Finalize wraps up processing of a stream of BuildEvent -func (c *BuildEventChannel) Finalize() error { +// Finalize implements BuildEventChannel.Finalize. +func (c *buildEventChannel) Finalize() error { summaryReport, err := c.summarizer.FinishProcessing() if err != nil { slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err) @@ -70,3 +80,17 @@ func (c *BuildEventChannel) Finalize() error { slog.InfoContext(c.ctx, fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID) return nil } + +// noOpBuildEventChannel is an implementation of BuildEventChannel which does no processing of events. +// It is used when receiving a stream of events that we wish to ack without processing. +type noOpBuildEventChannel struct{} + +// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. +func (c *noOpBuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { + return nil +} + +// Finalize implements BuildEventChannel.Finalize. +func (c *noOpBuildEventChannel) Finalize() error { + return nil +} diff --git a/internal/api/grpc/bes/handler.go b/internal/api/grpc/bes/handler.go index ee400ec..5749013 100644 --- a/internal/api/grpc/bes/handler.go +++ b/internal/api/grpc/bes/handler.go @@ -26,12 +26,18 @@ func NewBuildEventHandler(workflow *processing.Workflow) *BuildEventHandler { } // CreateEventChannel creates a new BuildEventChannel -func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, streamID *build.StreamId) *BuildEventChannel { +func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, initialEvent *build.OrderedBuildEvent) BuildEventChannel { summarizer := summary.NewSummarizer() - return &BuildEventChannel{ + // If the first event does not have sequence number 1, we have processed this + // invocation previously, and should skip all processing. + if initialEvent.SequenceNumber != 1 { + return &noOpBuildEventChannel{} + } + + return &buildEventChannel{ ctx: ctx, - streamID: streamID, + streamID: initialEvent.StreamId, summarizer: summarizer, workflow: h.workflow, } diff --git a/internal/api/grpc/bes/server.go b/internal/api/grpc/bes/server.go index 1481c7c..d2e9837 100644 --- a/internal/api/grpc/bes/server.go +++ b/internal/api/grpc/bes/server.go @@ -2,10 +2,14 @@ package bes import ( "context" + "fmt" "io" "log/slog" + "sort" build "google.golang.org/genproto/googleapis/devtools/build/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/emptypb" @@ -39,10 +43,14 @@ func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *bu func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error { slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context()) - ack := func(req *build.PublishBuildToolEventStreamRequest) { + // List of SequenceIds we've received. + // We'll want to ack these once all events are received, as we don't support resumption. + seqNrs := make([]int64, 0) + + ack := func(streamID *build.StreamId, sequenceNumber int64) { if err := stream.Send(&build.PublishBuildToolEventStreamResponse{ - StreamId: req.OrderedBuildEvent.StreamId, - SequenceNumber: req.OrderedBuildEvent.SequenceNumber, + StreamId: streamID, + SequenceNumber: sequenceNumber, }); err != nil { slog.ErrorContext(stream.Context(), "Send failed", "err", err) } @@ -51,7 +59,7 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE var streamID *build.StreamId reqCh := make(chan *build.PublishBuildToolEventStreamRequest) errCh := make(chan error) - var eventCh *BuildEventChannel + var eventCh BuildEventChannel go func() { for { @@ -73,25 +81,47 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE return nil } - return eventCh.Finalize() + // Validate that all events were received + sort.Slice(seqNrs, func(i, j int) bool { return seqNrs[i] < seqNrs[j] }) + + // TODO: Find out if initial sequence number can be != 1 + expected := int64(1) + for _, seqNr := range seqNrs { + if seqNr != expected { + return status.Error(codes.Unknown, fmt.Sprintf("received unexpected sequence number %d, expected %d", seqNr, expected)) + } + expected++ + } + + err := eventCh.Finalize() + if err != nil { + return err + } + + // Ack all events + for _, seqNr := range seqNrs { + ack(streamID, seqNr) + } + + return nil } slog.ErrorContext(stream.Context(), "Recv failed", "err", err) return err case req := <-reqCh: - // First request + // First event if streamID == nil { streamID = req.OrderedBuildEvent.GetStreamId() - eventCh = s.handler.CreateEventChannel(stream.Context(), streamID) + eventCh = s.handler.CreateEventChannel(stream.Context(), req.OrderedBuildEvent) } + seqNrs = append(seqNrs, req.OrderedBuildEvent.GetSequenceNumber()) + if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil { slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err) return err } - - ack(req) } } }