From dc7754c21394e2bbfc1b0053b26b379212748acb Mon Sep 17 00:00:00 2001 From: Morten Mjelva Date: Sat, 19 Oct 2024 08:58:28 +0200 Subject: [PATCH 1/3] Ack all events after receiving the entire set, to force complete retransmit on failure Deferring the acknowledgement of incoming events until all events in a stream are received and processed greatly simplifies handling of retries. While it would, in some ways, be more efficient if we were able to resume transmission in a way that only required missed events to be retransmitted, this would complicate matters in others. For one, we would need to persist data as we received the events. We would also need to handle the scenario where the retry is sent to a different instance of the server, in a setting where the server is scaled out horizontally. While there are certainly solutions we might investigate that allow us to support resuming on retry, it is for now far simpler to require a complete retransmission of events. --- internal/api/grpc/bes/BUILD.bazel | 2 ++ internal/api/grpc/bes/server.go | 42 ++++++++++++++++++++++++++----- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/internal/api/grpc/bes/BUILD.bazel b/internal/api/grpc/bes/BUILD.bazel index fb444be..26b7c57 100644 --- a/internal/api/grpc/bes/BUILD.bazel +++ b/internal/api/grpc/bes/BUILD.bazel @@ -16,6 +16,8 @@ go_library( "//pkg/summary", "//third_party/bazel/gen/bes", "@org_golang_google_genproto//googleapis/devtools/build/v1:build", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", "@org_golang_google_protobuf//encoding/protojson", "@org_golang_google_protobuf//types/known/emptypb", ], diff --git a/internal/api/grpc/bes/server.go b/internal/api/grpc/bes/server.go index 1481c7c..7df4efd 100644 --- a/internal/api/grpc/bes/server.go +++ b/internal/api/grpc/bes/server.go @@ -2,10 +2,14 @@ package bes import ( "context" + "fmt" "io" "log/slog" + "sort" build "google.golang.org/genproto/googleapis/devtools/build/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/emptypb" @@ -39,10 +43,14 @@ func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *bu func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error { slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context()) - ack := func(req *build.PublishBuildToolEventStreamRequest) { + // List of SequenceIds we've received. + // We'll want to ack these once all events are received, as we don't support resumption. + seqNrs := make([]int64, 0) + + ack := func(streamID *build.StreamId, sequenceNumber int64) { if err := stream.Send(&build.PublishBuildToolEventStreamResponse{ - StreamId: req.OrderedBuildEvent.StreamId, - SequenceNumber: req.OrderedBuildEvent.SequenceNumber, + StreamId: streamID, + SequenceNumber: sequenceNumber, }); err != nil { slog.ErrorContext(stream.Context(), "Send failed", "err", err) } @@ -73,7 +81,29 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE return nil } - return eventCh.Finalize() + // Validate that all events were received + sort.Slice(seqNrs, func(i, j int) bool { return seqNrs[i] < seqNrs[j] }) + + // TODO: Find out if initial sequence number can be != 1 + expected := int64(1) + for _, seqNr := range seqNrs { + if seqNr != expected { + return status.Error(codes.Unknown, fmt.Sprintf("received unexpected sequence number %d, expected %d", seqNr, expected)) + } + expected++ + } + + err := eventCh.Finalize() + if err != nil { + return err + } + + // Ack all events + for _, seqNr := range seqNrs { + ack(streamID, seqNr) + } + + return nil } slog.ErrorContext(stream.Context(), "Recv failed", "err", err) @@ -86,12 +116,12 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE eventCh = s.handler.CreateEventChannel(stream.Context(), streamID) } + seqNrs = append(seqNrs, req.OrderedBuildEvent.GetSequenceNumber()) + if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil { slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err) return err } - - ack(req) } } } From 0533d545fe510847329ca6bbfd1ba2bea7871324 Mon Sep 17 00:00:00 2001 From: Morten Mjelva Date: Sat, 19 Oct 2024 22:57:07 +0200 Subject: [PATCH 2/3] Avoid repeated processing if the client retries We now ack all events after the entire stream of events is received and processed. This means that if the client retries, it can be expected to retry from the start of the stream such that any new request should always start with sequence number 1. If, however, our response to the client is dropped, it is possible for the client to begin its retry from some other position. In this situation we would already have persisted the event stream. If this occurs we should simply skip any processing and ack the events. --- internal/api/grpc/bes/channel.go | 21 ++++++++++++++++++--- internal/api/grpc/bes/handler.go | 12 +++++++++--- internal/api/grpc/bes/server.go | 6 +++--- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/internal/api/grpc/bes/channel.go b/internal/api/grpc/bes/channel.go index d6ce2fa..36d06a0 100644 --- a/internal/api/grpc/bes/channel.go +++ b/internal/api/grpc/bes/channel.go @@ -16,8 +16,13 @@ import ( "google.golang.org/genproto/googleapis/devtools/build/v1" ) +type BuildEventChannel interface { + HandleBuildEvent(event *build.BuildEvent) error + Finalize() error +} + // BuildEventChannel handles a single BuildEvent stream -type BuildEventChannel struct { +type buildEventChannel struct { ctx context.Context streamID *build.StreamId summarizer *summary.Summarizer @@ -25,7 +30,7 @@ type BuildEventChannel struct { } // HandleBuildEvent processes a single BuildEvent -func (c *BuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { +func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { if event.GetBazelEvent() == nil { return nil } @@ -45,7 +50,7 @@ func (c *BuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { } // Finalize wraps up processing of a stream of BuildEvent -func (c *BuildEventChannel) Finalize() error { +func (c *buildEventChannel) Finalize() error { summaryReport, err := c.summarizer.FinishProcessing() if err != nil { slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err) @@ -70,3 +75,13 @@ func (c *BuildEventChannel) Finalize() error { slog.InfoContext(c.ctx, fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID) return nil } + +type noOpBuildEventChannel struct{} + +func (c *noOpBuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { + return nil +} + +func (c *noOpBuildEventChannel) Finalize() error { + return nil +} diff --git a/internal/api/grpc/bes/handler.go b/internal/api/grpc/bes/handler.go index ee400ec..5749013 100644 --- a/internal/api/grpc/bes/handler.go +++ b/internal/api/grpc/bes/handler.go @@ -26,12 +26,18 @@ func NewBuildEventHandler(workflow *processing.Workflow) *BuildEventHandler { } // CreateEventChannel creates a new BuildEventChannel -func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, streamID *build.StreamId) *BuildEventChannel { +func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, initialEvent *build.OrderedBuildEvent) BuildEventChannel { summarizer := summary.NewSummarizer() - return &BuildEventChannel{ + // If the first event does not have sequence number 1, we have processed this + // invocation previously, and should skip all processing. + if initialEvent.SequenceNumber != 1 { + return &noOpBuildEventChannel{} + } + + return &buildEventChannel{ ctx: ctx, - streamID: streamID, + streamID: initialEvent.StreamId, summarizer: summarizer, workflow: h.workflow, } diff --git a/internal/api/grpc/bes/server.go b/internal/api/grpc/bes/server.go index 7df4efd..d2e9837 100644 --- a/internal/api/grpc/bes/server.go +++ b/internal/api/grpc/bes/server.go @@ -59,7 +59,7 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE var streamID *build.StreamId reqCh := make(chan *build.PublishBuildToolEventStreamRequest) errCh := make(chan error) - var eventCh *BuildEventChannel + var eventCh BuildEventChannel go func() { for { @@ -110,10 +110,10 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE return err case req := <-reqCh: - // First request + // First event if streamID == nil { streamID = req.OrderedBuildEvent.GetStreamId() - eventCh = s.handler.CreateEventChannel(stream.Context(), streamID) + eventCh = s.handler.CreateEventChannel(stream.Context(), req.OrderedBuildEvent) } seqNrs = append(seqNrs, req.OrderedBuildEvent.GetSequenceNumber()) From db5f97ef51395e3ab27dc07a75e0d537c1e09f3f Mon Sep 17 00:00:00 2001 From: Morten Mjelva Date: Sun, 20 Oct 2024 08:39:53 +0200 Subject: [PATCH 3/3] Code comments --- internal/api/grpc/bes/channel.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/api/grpc/bes/channel.go b/internal/api/grpc/bes/channel.go index 36d06a0..c03f853 100644 --- a/internal/api/grpc/bes/channel.go +++ b/internal/api/grpc/bes/channel.go @@ -16,12 +16,17 @@ import ( "google.golang.org/genproto/googleapis/devtools/build/v1" ) +// BuildEventChannel handles a single BuildEvent stream type BuildEventChannel interface { + // HandleBuildEvent processes a single BuildEvent + // This method should be called for each received event. HandleBuildEvent(event *build.BuildEvent) error + + // Finalize does post-processing of a stream of BuildEvents. + // This method should be called after receiving the EOF event. Finalize() error } -// BuildEventChannel handles a single BuildEvent stream type buildEventChannel struct { ctx context.Context streamID *build.StreamId @@ -29,7 +34,7 @@ type buildEventChannel struct { workflow *processing.Workflow } -// HandleBuildEvent processes a single BuildEvent +// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { if event.GetBazelEvent() == nil { return nil @@ -49,7 +54,7 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { return nil } -// Finalize wraps up processing of a stream of BuildEvent +// Finalize implements BuildEventChannel.Finalize. func (c *buildEventChannel) Finalize() error { summaryReport, err := c.summarizer.FinishProcessing() if err != nil { @@ -76,12 +81,16 @@ func (c *buildEventChannel) Finalize() error { return nil } +// noOpBuildEventChannel is an implementation of BuildEventChannel which does no processing of events. +// It is used when receiving a stream of events that we wish to ack without processing. type noOpBuildEventChannel struct{} +// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. func (c *noOpBuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { return nil } +// Finalize implements BuildEventChannel.Finalize. func (c *noOpBuildEventChannel) Finalize() error { return nil }