From 1f2e28e23a529abd5c28581efa62c8f1418605ed Mon Sep 17 00:00:00 2001 From: Morten Mjelva Date: Fri, 18 Oct 2024 05:56:51 +0200 Subject: [PATCH] Ensure we process build events in order --- cmd/bb_portal/main.go | 2 +- internal/api/grpc/bes/BUILD.bazel | 6 +- internal/api/grpc/bes/bes.go | 125 ------------------------------ internal/api/grpc/bes/channel.go | 72 +++++++++++++++++ internal/api/grpc/bes/handler.go | 38 +++++++++ internal/api/grpc/bes/server.go | 97 +++++++++++++++++++++++ 6 files changed, 213 insertions(+), 127 deletions(-) delete mode 100644 internal/api/grpc/bes/bes.go create mode 100644 internal/api/grpc/bes/channel.go create mode 100644 internal/api/grpc/bes/handler.go create mode 100644 internal/api/grpc/bes/server.go diff --git a/cmd/bb_portal/main.go b/cmd/bb_portal/main.go index 9fbc696..f7f6688 100644 --- a/cmd/bb_portal/main.go +++ b/cmd/bb_portal/main.go @@ -98,7 +98,7 @@ func main() { if err := bb_grpc.NewServersFromConfigurationAndServe( configuration.GrpcServers, func(s go_grpc.ServiceRegistrar) { - build.RegisterPublishBuildEventServer(s.(*go_grpc.Server), bes.New(dbClient, blobArchiver)) + build.RegisterPublishBuildEventServer(s.(*go_grpc.Server), bes.NewBuildEventServer(dbClient, blobArchiver)) }, siblingsGroup, ); err != nil { diff --git a/internal/api/grpc/bes/BUILD.bazel b/internal/api/grpc/bes/BUILD.bazel index 06e2297..fb444be 100644 --- a/internal/api/grpc/bes/BUILD.bazel +++ b/internal/api/grpc/bes/BUILD.bazel @@ -2,7 +2,11 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "bes", - srcs = ["bes.go"], + srcs = [ + "channel.go", + "handler.go", + "server.go", + ], importpath = "github.com/buildbarn/bb-portal/internal/api/grpc/bes", visibility = ["//:__subpackages__"], deps = [ diff --git a/internal/api/grpc/bes/bes.go b/internal/api/grpc/bes/bes.go deleted file mode 100644 index b428b72..0000000 --- a/internal/api/grpc/bes/bes.go +++ /dev/null @@ -1,125 +0,0 @@ -package bes - -import ( - "context" - "encoding/json" - "fmt" - "io" - "log/slog" - "time" - - build "google.golang.org/genproto/googleapis/devtools/build/v1" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/types/known/emptypb" - - "github.com/buildbarn/bb-portal/ent/gen/ent" - "github.com/buildbarn/bb-portal/pkg/events" - "github.com/buildbarn/bb-portal/pkg/processing" - "github.com/buildbarn/bb-portal/pkg/summary" - "github.com/buildbarn/bb-portal/third_party/bazel/gen/bes" -) - -// BES A type for the Build Event Service. -type BES struct { - db *ent.Client - blobArchiver processing.BlobMultiArchiver -} - -// New BES initializer function. -func New(db *ent.Client, blobArchiver processing.BlobMultiArchiver) build.PublishBuildEventServer { - return &BES{ - db: db, - blobArchiver: blobArchiver, - } -} - -// PublishLifecycleEvent Publush a life cycle event. -func (b BES) PublishLifecycleEvent(ctx context.Context, request *build.PublishLifecycleEventRequest) (*emptypb.Empty, error) { - slog.InfoContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent())) - return &emptypb.Empty{}, nil -} - -// PublishBuildToolEventStream Public a build tool event stream. -func (b BES) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error { - slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context()) - - summarizer := summary.NewSummarizer() - - ack := func(req *build.PublishBuildToolEventStreamRequest) { - if err := stream.Send(&build.PublishBuildToolEventStreamResponse{ - StreamId: req.OrderedBuildEvent.StreamId, - SequenceNumber: req.OrderedBuildEvent.SequenceNumber, - }); err != nil { - slog.ErrorContext(stream.Context(), "Send failed", "err", err) - } - } - - var streamID *build.StreamId - for { - req, err := stream.Recv() - if err == io.EOF { - slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context()) - break - } - if err != nil { - slog.ErrorContext(stream.Context(), "Recv failed", "err", err) - return err - } - - if streamID == nil { - streamID = req.GetOrderedBuildEvent().GetStreamId() - } - - err = processBazelEvent(stream.Context(), req.OrderedBuildEvent.Event, summarizer) - if err != nil { - return err - } - - ack(req) - } - - summaryReport, err := summarizer.FinishProcessing() - if err != nil { - slog.ErrorContext(stream.Context(), "FinishProcessing failed", "err", err) - return err - } - - // Hack for eventFile being required - summaryReport.EventFileURL = fmt.Sprintf( - "grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?streamID=%s", - streamID.String(), - ) - - workflow := processing.New(b.db, b.blobArchiver) - slog.InfoContext(stream.Context(), "Saving invocation", "id", streamID.String()) - startTime := time.Now() - invocation, err := workflow.SaveSummary(stream.Context(), summaryReport) - if err != nil { - slog.ErrorContext(stream.Context(), "SaveSummary failed", "err", err) - return err - } - endTime := time.Now() - elapsedTime := endTime.Sub(startTime) - slog.InfoContext(stream.Context(), fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID) - return nil -} - -// Process a bazel Event. -func processBazelEvent(ctx context.Context, event *build.BuildEvent, summarizer *summary.Summarizer) error { - if event.GetBazelEvent() == nil { - return nil - } - - var bazelEvent bes.BuildEvent - err := event.GetBazelEvent().UnmarshalTo(&bazelEvent) - if err != nil { - slog.ErrorContext(ctx, "UnmarshalTo failed", "err", err) - return err - } - buildEvent := events.NewBuildEvent(&bazelEvent, json.RawMessage(protojson.Format(&bazelEvent))) - if err = summarizer.ProcessEvent(&buildEvent); err != nil { - slog.ErrorContext(ctx, "ProcessEvent failed", "err", err) - return fmt.Errorf("could not process event (%s): , %w", buildEvent, err) - } - return nil -} diff --git a/internal/api/grpc/bes/channel.go b/internal/api/grpc/bes/channel.go new file mode 100644 index 0000000..d6ce2fa --- /dev/null +++ b/internal/api/grpc/bes/channel.go @@ -0,0 +1,72 @@ +package bes + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "time" + + "google.golang.org/protobuf/encoding/protojson" + + "github.com/buildbarn/bb-portal/pkg/events" + "github.com/buildbarn/bb-portal/pkg/processing" + "github.com/buildbarn/bb-portal/pkg/summary" + "github.com/buildbarn/bb-portal/third_party/bazel/gen/bes" + "google.golang.org/genproto/googleapis/devtools/build/v1" +) + +// BuildEventChannel handles a single BuildEvent stream +type BuildEventChannel struct { + ctx context.Context + streamID *build.StreamId + summarizer *summary.Summarizer + workflow *processing.Workflow +} + +// HandleBuildEvent processes a single BuildEvent +func (c *BuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { + if event.GetBazelEvent() == nil { + return nil + } + + var bazelEvent bes.BuildEvent + err := event.GetBazelEvent().UnmarshalTo(&bazelEvent) + if err != nil { + slog.ErrorContext(c.ctx, "UnmarshalTo failed", "err", err) + return err + } + buildEvent := events.NewBuildEvent(&bazelEvent, json.RawMessage(protojson.Format(&bazelEvent))) + if err = c.summarizer.ProcessEvent(&buildEvent); err != nil { + slog.ErrorContext(c.ctx, "ProcessEvent failed", "err", err) + return fmt.Errorf("could not process event (%s): , %w", buildEvent, err) + } + return nil +} + +// Finalize wraps up processing of a stream of BuildEvent +func (c *BuildEventChannel) Finalize() error { + summaryReport, err := c.summarizer.FinishProcessing() + if err != nil { + slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err) + return err + } + + // Hack for eventFile being required + summaryReport.EventFileURL = fmt.Sprintf( + "grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?streamID=%s", + c.streamID.String(), + ) + + slog.InfoContext(c.ctx, "Saving invocation", "id", c.streamID.String()) + startTime := time.Now() + invocation, err := c.workflow.SaveSummary(c.ctx, summaryReport) + if err != nil { + slog.ErrorContext(c.ctx, "SaveSummary failed", "err", err) + return err + } + endTime := time.Now() + elapsedTime := endTime.Sub(startTime) + slog.InfoContext(c.ctx, fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID) + return nil +} diff --git a/internal/api/grpc/bes/handler.go b/internal/api/grpc/bes/handler.go new file mode 100644 index 0000000..ee400ec --- /dev/null +++ b/internal/api/grpc/bes/handler.go @@ -0,0 +1,38 @@ +package bes + +import ( + "context" + + "github.com/buildbarn/bb-portal/pkg/processing" + "github.com/buildbarn/bb-portal/pkg/summary" + "google.golang.org/genproto/googleapis/devtools/build/v1" +) + +// BuildEventHandler orchestrates the handling of incoming Build Event streams. +// For each incoming stream, and BuildEventChannel is created, which handles that stream. +// BuildEventHandler is responsible for managing the things that are common to these event streams. +type BuildEventHandler struct { + workflow *processing.Workflow +} + +// NewBuildEventHandler constructs a new BuildEventHandler +// TODO: Ensure we allow processing to complete before shutdown +// TODO: Cancel previous processing for an invocation if the client retries +// TODO: Write metrics +func NewBuildEventHandler(workflow *processing.Workflow) *BuildEventHandler { + return &BuildEventHandler{ + workflow: workflow, + } +} + +// CreateEventChannel creates a new BuildEventChannel +func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, streamID *build.StreamId) *BuildEventChannel { + summarizer := summary.NewSummarizer() + + return &BuildEventChannel{ + ctx: ctx, + streamID: streamID, + summarizer: summarizer, + workflow: h.workflow, + } +} diff --git a/internal/api/grpc/bes/server.go b/internal/api/grpc/bes/server.go new file mode 100644 index 0000000..1481c7c --- /dev/null +++ b/internal/api/grpc/bes/server.go @@ -0,0 +1,97 @@ +package bes + +import ( + "context" + "io" + "log/slog" + + build "google.golang.org/genproto/googleapis/devtools/build/v1" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/buildbarn/bb-portal/ent/gen/ent" + "github.com/buildbarn/bb-portal/pkg/processing" +) + +// BuildEventServer implements the Build Event Service. +// It receives events and forwards them to a BuildEventChannel. +// TODO: Should this support forwarding events? Users might want to create their own +// tooling that reacts to build events, and it would be useful if this service could +// forward events to those. +type BuildEventServer struct { + handler *BuildEventHandler +} + +// NewBuildEventServer creates a new BuildEventServer +func NewBuildEventServer(db *ent.Client, blobArchiver processing.BlobMultiArchiver) build.PublishBuildEventServer { + return &BuildEventServer{ + handler: NewBuildEventHandler(processing.New(db, blobArchiver)), + } +} + +// PublishLifecycleEvent handles life cycle events. +func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *build.PublishLifecycleEventRequest) (*emptypb.Empty, error) { + slog.InfoContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent())) + return &emptypb.Empty{}, nil +} + +// PublishBuildToolEventStream handles a build tool event stream. +func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error { + slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context()) + + ack := func(req *build.PublishBuildToolEventStreamRequest) { + if err := stream.Send(&build.PublishBuildToolEventStreamResponse{ + StreamId: req.OrderedBuildEvent.StreamId, + SequenceNumber: req.OrderedBuildEvent.SequenceNumber, + }); err != nil { + slog.ErrorContext(stream.Context(), "Send failed", "err", err) + } + } + + var streamID *build.StreamId + reqCh := make(chan *build.PublishBuildToolEventStreamRequest) + errCh := make(chan error) + var eventCh *BuildEventChannel + + go func() { + for { + req, err := stream.Recv() + if err != nil { + errCh <- err + return + } + reqCh <- req + } + }() + + for { + select { + case err := <-errCh: + if err == io.EOF { + slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context()) + if eventCh == nil { + return nil + } + + return eventCh.Finalize() + } + + slog.ErrorContext(stream.Context(), "Recv failed", "err", err) + return err + + case req := <-reqCh: + // First request + if streamID == nil { + streamID = req.OrderedBuildEvent.GetStreamId() + eventCh = s.handler.CreateEventChannel(stream.Context(), streamID) + } + + if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil { + slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err) + return err + } + + ack(req) + } + } +}