Skip to content

Commit

Permalink
Ensure we process build events in order
Browse files Browse the repository at this point in the history
  • Loading branch information
mortenmj committed Oct 18, 2024
1 parent d0ff213 commit a5e23e1
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 127 deletions.
2 changes: 1 addition & 1 deletion cmd/bb_portal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func main() {
if err := bb_grpc.NewServersFromConfigurationAndServe(
configuration.GrpcServers,
func(s go_grpc.ServiceRegistrar) {
build.RegisterPublishBuildEventServer(s.(*go_grpc.Server), bes.New(dbClient, blobArchiver))
build.RegisterPublishBuildEventServer(s.(*go_grpc.Server), bes.NewBuildEventServer(dbClient, blobArchiver))
},
siblingsGroup,
); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion internal/api/grpc/bes/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "bes",
srcs = ["bes.go"],
srcs = [
"channel.go",
"handler.go",
"server.go",
],
importpath = "github.com/buildbarn/bb-portal/internal/api/grpc/bes",
visibility = ["//:__subpackages__"],
deps = [
Expand Down
125 changes: 0 additions & 125 deletions internal/api/grpc/bes/bes.go

This file was deleted.

70 changes: 70 additions & 0 deletions internal/api/grpc/bes/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package bes

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"google.golang.org/protobuf/encoding/protojson"

"github.com/buildbarn/bb-portal/pkg/events"
"github.com/buildbarn/bb-portal/pkg/processing"
"github.com/buildbarn/bb-portal/pkg/summary"
"github.com/buildbarn/bb-portal/third_party/bazel/gen/bes"
"google.golang.org/genproto/googleapis/devtools/build/v1"
)

// BuildEventChannel handles a single BuildEvent stream
type BuildEventChannel struct {
ctx context.Context
streamId *build.StreamId
summarizer *summary.Summarizer
workflow *processing.Workflow
}

func (c *BuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
if event.GetBazelEvent() == nil {
return nil
}

var bazelEvent bes.BuildEvent
err := event.GetBazelEvent().UnmarshalTo(&bazelEvent)
if err != nil {
slog.ErrorContext(c.ctx, "UnmarshalTo failed", "err", err)
return err
}
buildEvent := events.NewBuildEvent(&bazelEvent, json.RawMessage(protojson.Format(&bazelEvent)))
if err = c.summarizer.ProcessEvent(&buildEvent); err != nil {
slog.ErrorContext(c.ctx, "ProcessEvent failed", "err", err)
return fmt.Errorf("could not process event (%s): , %w", buildEvent, err)
}
return nil
}

func (c *BuildEventChannel) Finalize() error {
summaryReport, err := c.summarizer.FinishProcessing()
if err != nil {
slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err)
return err
}

// Hack for eventFile being required
summaryReport.EventFileURL = fmt.Sprintf(
"grpc://localhost:8082/google.devtools.build.v1/PublishLifecycleEvent?streamID=%s",
c.streamId.String(),
)

slog.InfoContext(c.ctx, "Saving invocation", "id", c.streamId.String())
startTime := time.Now()
invocation, err := c.workflow.SaveSummary(c.ctx, summaryReport)
if err != nil {
slog.ErrorContext(c.ctx, "SaveSummary failed", "err", err)
return err
}
endTime := time.Now()
elapsedTime := endTime.Sub(startTime)
slog.InfoContext(c.ctx, fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID)
return nil
}
36 changes: 36 additions & 0 deletions internal/api/grpc/bes/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package bes

import (
"context"

"github.com/buildbarn/bb-portal/pkg/processing"
"github.com/buildbarn/bb-portal/pkg/summary"
"google.golang.org/genproto/googleapis/devtools/build/v1"
)

// BuildEventHandler orchestrates the handling of incoming Build Event streams.
// For each incoming stream, and BuildEventChannel is created, which handles that stream.
// BuildEventHandler is responsible for managing the things that are common to these event streams.
type BuildEventHandler struct {
workflow *processing.Workflow
}

// TODO: Ensure we allow processing to complete before shutdown
// TODO: Cancel previous processing for an invocation if the client retries
// TODO: Write metrics
func NewBuildEventHandler(workflow *processing.Workflow) *BuildEventHandler {
return &BuildEventHandler{
workflow: workflow,
}
}

func (h *BuildEventHandler) GetEventChannel(ctx context.Context, streamId *build.StreamId) *BuildEventChannel {
summarizer := summary.NewSummarizer()

return &BuildEventChannel{
ctx: ctx,
streamId: streamId,
summarizer: summarizer,
workflow: h.workflow,
}
}
99 changes: 99 additions & 0 deletions internal/api/grpc/bes/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package bes

import (
"context"
"io"
"log/slog"

build "google.golang.org/genproto/googleapis/devtools/build/v1"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/buildbarn/bb-portal/ent/gen/ent"
"github.com/buildbarn/bb-portal/pkg/processing"
)

// BuildEventServer implements the Build Event Service.
// It receives events and forwards them to a BuildEventChannel.
// TODO: Should this support forwarding events? Users might want to create their own
// tooling that reacts to build events, and it would be useful if this service could
// forward events to those.
type BuildEventServer struct {
handler *BuildEventHandler
}

// New creates a new BuildEventServer
func NewBuildEventServer(db *ent.Client, blobArchiver processing.BlobMultiArchiver) build.PublishBuildEventServer {
return &BuildEventServer{
handler: NewBuildEventHandler(processing.New(db, blobArchiver)),
}
}

// PublishLifecycleEvent handles life cycle events.
func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *build.PublishLifecycleEventRequest) (*emptypb.Empty, error) {
slog.InfoContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent()))
return &emptypb.Empty{}, nil
}

// PublishBuildToolEventStream handles a build tool event stream.
func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error {
slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context())

ack := func(req *build.PublishBuildToolEventStreamRequest) {
if err := stream.Send(&build.PublishBuildToolEventStreamResponse{
StreamId: req.OrderedBuildEvent.StreamId,
SequenceNumber: req.OrderedBuildEvent.SequenceNumber,
}); err != nil {
slog.ErrorContext(stream.Context(), "Send failed", "err", err)
}
}

var streamID *build.StreamId
reqCh := make(chan *build.PublishBuildToolEventStreamRequest)
errCh := make(chan error)
var eventCh *BuildEventChannel

go func() {
for {
req, err := stream.Recv()
if err != nil {
errCh <- err
return
}
reqCh <- req
}
}()

// TODO: This will never terminate unless we receive EOF (i.e. successful completion).
// Should we have a timeout, or support some other form of cancellation?
for {
select {
case err := <-errCh:
if err == io.EOF {
slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context())
if eventCh == nil {
return nil
}

return eventCh.Finalize()
}

slog.ErrorContext(stream.Context(), "Recv failed", "err", err)
return err

case req := <-reqCh:
// First request
if streamID == nil {
streamID = req.OrderedBuildEvent.GetStreamId()
eventCh = s.handler.GetEventChannel(stream.Context(), streamID)
}

if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil {
slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err)
return err
}

ack(req)
}
}
}

0 comments on commit a5e23e1

Please sign in to comment.