Skip to content

Commit

Permalink
Merge pull request #43 from buildbarn/ack
Browse files Browse the repository at this point in the history
Force complete retransmit on retry
  • Loading branch information
mortenmj authored Oct 21, 2024
2 parents be28d3c + db5f97e commit 5d770a0
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 17 deletions.
2 changes: 2 additions & 0 deletions internal/api/grpc/bes/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ go_library(
"//pkg/summary",
"//third_party/bazel/gen/bes",
"@org_golang_google_genproto//googleapis/devtools/build/v1:build",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//encoding/protojson",
"@org_golang_google_protobuf//types/known/emptypb",
],
Expand Down
34 changes: 29 additions & 5 deletions internal/api/grpc/bes/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@ import (
)

// BuildEventChannel handles a single BuildEvent stream
type BuildEventChannel struct {
type BuildEventChannel interface {
// HandleBuildEvent processes a single BuildEvent
// This method should be called for each received event.
HandleBuildEvent(event *build.BuildEvent) error

// Finalize does post-processing of a stream of BuildEvents.
// This method should be called after receiving the EOF event.
Finalize() error
}

type buildEventChannel struct {
ctx context.Context
streamID *build.StreamId
summarizer *summary.Summarizer
workflow *processing.Workflow
}

// HandleBuildEvent processes a single BuildEvent
func (c *BuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent.
func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
if event.GetBazelEvent() == nil {
return nil
}
Expand All @@ -44,8 +54,8 @@ func (c *BuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
return nil
}

// Finalize wraps up processing of a stream of BuildEvent
func (c *BuildEventChannel) Finalize() error {
// Finalize implements BuildEventChannel.Finalize.
func (c *buildEventChannel) Finalize() error {
summaryReport, err := c.summarizer.FinishProcessing()
if err != nil {
slog.ErrorContext(c.ctx, "FinishProcessing failed", "err", err)
Expand All @@ -70,3 +80,17 @@ func (c *BuildEventChannel) Finalize() error {
slog.InfoContext(c.ctx, fmt.Sprintf("Saved invocation in %v", elapsedTime.String()), "id", invocation.InvocationID)
return nil
}

// noOpBuildEventChannel is an implementation of BuildEventChannel which does no processing of events.
// It is used when receiving a stream of events that we wish to ack without processing.
type noOpBuildEventChannel struct{}

// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent.
func (c *noOpBuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
return nil
}

// Finalize implements BuildEventChannel.Finalize.
func (c *noOpBuildEventChannel) Finalize() error {
return nil
}
12 changes: 9 additions & 3 deletions internal/api/grpc/bes/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@ func NewBuildEventHandler(workflow *processing.Workflow) *BuildEventHandler {
}

// CreateEventChannel creates a new BuildEventChannel
func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, streamID *build.StreamId) *BuildEventChannel {
func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, initialEvent *build.OrderedBuildEvent) BuildEventChannel {
summarizer := summary.NewSummarizer()

return &BuildEventChannel{
// If the first event does not have sequence number 1, we have processed this
// invocation previously, and should skip all processing.
if initialEvent.SequenceNumber != 1 {
return &noOpBuildEventChannel{}
}

return &buildEventChannel{
ctx: ctx,
streamID: streamID,
streamID: initialEvent.StreamId,
summarizer: summarizer,
workflow: h.workflow,
}
Expand Down
48 changes: 39 additions & 9 deletions internal/api/grpc/bes/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package bes

import (
"context"
"fmt"
"io"
"log/slog"
"sort"

build "google.golang.org/genproto/googleapis/devtools/build/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/emptypb"

Expand Down Expand Up @@ -39,10 +43,14 @@ func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *bu
func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error {
slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context())

ack := func(req *build.PublishBuildToolEventStreamRequest) {
// List of SequenceIds we've received.
// We'll want to ack these once all events are received, as we don't support resumption.
seqNrs := make([]int64, 0)

ack := func(streamID *build.StreamId, sequenceNumber int64) {
if err := stream.Send(&build.PublishBuildToolEventStreamResponse{
StreamId: req.OrderedBuildEvent.StreamId,
SequenceNumber: req.OrderedBuildEvent.SequenceNumber,
StreamId: streamID,
SequenceNumber: sequenceNumber,
}); err != nil {
slog.ErrorContext(stream.Context(), "Send failed", "err", err)
}
Expand All @@ -51,7 +59,7 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE
var streamID *build.StreamId
reqCh := make(chan *build.PublishBuildToolEventStreamRequest)
errCh := make(chan error)
var eventCh *BuildEventChannel
var eventCh BuildEventChannel

go func() {
for {
Expand All @@ -73,25 +81,47 @@ func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildE
return nil
}

return eventCh.Finalize()
// Validate that all events were received
sort.Slice(seqNrs, func(i, j int) bool { return seqNrs[i] < seqNrs[j] })

// TODO: Find out if initial sequence number can be != 1
expected := int64(1)
for _, seqNr := range seqNrs {
if seqNr != expected {
return status.Error(codes.Unknown, fmt.Sprintf("received unexpected sequence number %d, expected %d", seqNr, expected))
}
expected++
}

err := eventCh.Finalize()
if err != nil {
return err
}

// Ack all events
for _, seqNr := range seqNrs {
ack(streamID, seqNr)
}

return nil
}

slog.ErrorContext(stream.Context(), "Recv failed", "err", err)
return err

case req := <-reqCh:
// First request
// First event
if streamID == nil {
streamID = req.OrderedBuildEvent.GetStreamId()
eventCh = s.handler.CreateEventChannel(stream.Context(), streamID)
eventCh = s.handler.CreateEventChannel(stream.Context(), req.OrderedBuildEvent)
}

seqNrs = append(seqNrs, req.OrderedBuildEvent.GetSequenceNumber())

if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil {
slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err)
return err
}

ack(req)
}
}
}

0 comments on commit 5d770a0

Please sign in to comment.