Skip to content

Commit

Permalink
Add multi file error aggregation strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
bgedik committed Oct 1, 2024
1 parent d74d7a2 commit b95197f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,45 @@ type ErrorRetriever interface {
GetError(ctx context.Context) (io.ExecutionError, error)
}

type ErrorRetrieverBase struct {
type BaseErrorRetriever struct {
store storage.ComposedProtobufStore
maxPayloadSize int64
}

type SingleFileErrorRetriever struct {
ErrorRetrieverBase
BaseErrorRetriever
errorFilePath storage.DataReference
}

func NewSingleFileErrorRetriever(errorFilePath storage.DataReference, store storage.ComposedProtobufStore, maxPayloadSize int64) *SingleFileErrorRetriever {
return &SingleFileErrorRetriever{
ErrorRetrieverBase: ErrorRetrieverBase{
BaseErrorRetriever: BaseErrorRetriever{
store: store,
maxPayloadSize: maxPayloadSize,
},
errorFilePath: errorFilePath,
}
}

func (b *BaseErrorRetriever) validatePayloadSize(metadata storage.Metadata) error {
if metadata.Exists() {
if metadata.Size() > b.maxPayloadSize {
return errors.Errorf("file is too large [%d] bytes, max allowed [%d] bytes", metadata.Size(), b.maxPayloadSize)
}
}
return nil
}

func (s *SingleFileErrorRetriever) HasError(ctx context.Context) (bool, error) {
metadata, err := s.store.Head(ctx, s.errorFilePath)
if err != nil {
return false, errors.Wrapf(err, "failed to read error file @[%s]", s.errorFilePath)
}
if metadata.Exists() {
if metadata.Size() > s.maxPayloadSize {
return false, errors.Wrapf(err, "error file @[%s] is too large [%d] bytes, max allowed [%d] bytes", s.errorFilePath, metadata.Size(), s.maxPayloadSize)
}
return true, nil
err = s.validatePayloadSize(metadata)
if err != nil {
return false, errors.Wrapf(err, "error file @[%s] failed payload size validation", s.errorFilePath)
}
return false, nil
return metadata.Exists(), nil
}

func errorDoc2ExecutionError(errorDoc *core.ErrorDocument, errorFilePath storage.DataReference) io.ExecutionError {
Expand Down Expand Up @@ -105,7 +112,7 @@ func (s *SingleFileErrorRetriever) GetError(ctx context.Context) (io.ExecutionEr
}

type EarliestFileErrorRetriever struct {
ErrorRetrieverBase
BaseErrorRetriever
errorDirPath storage.DataReference
canonicalErrorFilename string
}
Expand All @@ -130,6 +137,7 @@ func (e *EarliestFileErrorRetriever) HasError(ctx context.Context) (bool, error)
if err != nil {
return false, errors.Wrapf(err, "failed to parse canonical error filename @[%s]", e.canonicalErrorFilename)
}
hasError := false
const maxItems = 1000
cursor := storage.NewCursorAtStart()
for cursor != storage.NewCursorAtEnd() {
Expand All @@ -141,11 +149,19 @@ func (e *EarliestFileErrorRetriever) HasError(ctx context.Context) (bool, error)
}
for _, errorFilePath := range errorFilePaths {
if strings.HasSuffix(errorFilePath.String(), errorFileExtension) {
return true, nil
metadata, err := e.store.Head(ctx, errorFilePath)
if err != nil {
return false, errors.Wrapf(err, "failed to read error file @[%s]", errorFilePath)
}
err = e.validatePayloadSize(metadata)
if err != nil {
return false, errors.Wrapf(err, "error file @[%s] failed payload size validation", errorFilePath)
}
hasError = true
}
}
}
return false, nil
return hasError, nil
}

func (e *EarliestFileErrorRetriever) GetError(ctx context.Context) (io.ExecutionError, error) {
Expand Down Expand Up @@ -201,7 +217,7 @@ func (e *EarliestFileErrorRetriever) GetError(ctx context.Context) (io.Execution

func NewEarliestFileErrorRetriever(errorDirPath storage.DataReference, canonicalErrorFilename string, store storage.ComposedProtobufStore, maxPayloadSize int64) *EarliestFileErrorRetriever {
return &EarliestFileErrorRetriever{
ErrorRetrieverBase: ErrorRetrieverBase{
BaseErrorRetriever: BaseErrorRetriever{
store: store,
maxPayloadSize: maxPayloadSize,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ func TestReadOrigin(t *testing.T) {
store.OnList(ctx, storage.DataReference("s3://errors/error"), 1000, storage.NewCursorAtStart()).Return(
[]storage.DataReference{"error-0.pb", "error-1.pb", "error-2.pb"}, storage.NewCursorAtEnd(), nil)

store.OnHead(ctx, storage.DataReference("error-0.pb")).Return(MemoryMetadata{
exists: true,
}, nil)

store.OnHead(ctx, storage.DataReference("error-1.pb")).Return(MemoryMetadata{
exists: true,
}, nil)

store.OnHead(ctx, storage.DataReference("error-2.pb")).Return(MemoryMetadata{
exists: true,
}, nil)

maxPayloadSize := int64(0)
r := NewRemoteFileOutputReaderWithErrorAggregationStrategy(
ctx,
Expand Down

0 comments on commit b95197f

Please sign in to comment.