From b95197f3fb3c8dea53f716b619e37047da97f90f Mon Sep 17 00:00:00 2001 From: Bugra Gedik Date: Tue, 1 Oct 2024 22:06:20 +0000 Subject: [PATCH] Add multi file error aggregation strategy --- .../ioutils/remote_file_output_reader.go | 42 +++++++++++++------ .../ioutils/remote_file_output_reader_test.go | 12 ++++++ 2 files changed, 41 insertions(+), 13 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go index 02439e2daf..efb9e449dc 100644 --- a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go +++ b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go @@ -20,19 +20,19 @@ type ErrorRetriever interface { GetError(ctx context.Context) (io.ExecutionError, error) } -type ErrorRetrieverBase struct { +type BaseErrorRetriever struct { store storage.ComposedProtobufStore maxPayloadSize int64 } type SingleFileErrorRetriever struct { - ErrorRetrieverBase + BaseErrorRetriever errorFilePath storage.DataReference } func NewSingleFileErrorRetriever(errorFilePath storage.DataReference, store storage.ComposedProtobufStore, maxPayloadSize int64) *SingleFileErrorRetriever { return &SingleFileErrorRetriever{ - ErrorRetrieverBase: ErrorRetrieverBase{ + BaseErrorRetriever: BaseErrorRetriever{ store: store, maxPayloadSize: maxPayloadSize, }, @@ -40,18 +40,25 @@ func NewSingleFileErrorRetriever(errorFilePath storage.DataReference, store stor } } +func (b *BaseErrorRetriever) validatePayloadSize(metadata storage.Metadata) error { + if metadata.Exists() { + if metadata.Size() > b.maxPayloadSize { + return errors.Errorf("file is too large [%d] bytes, max allowed [%d] bytes", metadata.Size(), b.maxPayloadSize) + } + } + return nil +} + func (s *SingleFileErrorRetriever) HasError(ctx context.Context) (bool, error) { metadata, err := s.store.Head(ctx, s.errorFilePath) if err != nil { return false, errors.Wrapf(err, "failed to read error file @[%s]", s.errorFilePath) } - if metadata.Exists() { - if metadata.Size() > s.maxPayloadSize { - return false, errors.Wrapf(err, "error file @[%s] is too large [%d] bytes, max allowed [%d] bytes", s.errorFilePath, metadata.Size(), s.maxPayloadSize) - } - return true, nil + err = s.validatePayloadSize(metadata) + if err != nil { + return false, errors.Wrapf(err, "error file @[%s] failed payload size validation", s.errorFilePath) } - return false, nil + return metadata.Exists(), nil } func errorDoc2ExecutionError(errorDoc *core.ErrorDocument, errorFilePath storage.DataReference) io.ExecutionError { @@ -105,7 +112,7 @@ func (s *SingleFileErrorRetriever) GetError(ctx context.Context) (io.ExecutionEr } type EarliestFileErrorRetriever struct { - ErrorRetrieverBase + BaseErrorRetriever errorDirPath storage.DataReference canonicalErrorFilename string } @@ -130,6 +137,7 @@ func (e *EarliestFileErrorRetriever) HasError(ctx context.Context) (bool, error) if err != nil { return false, errors.Wrapf(err, "failed to parse canonical error filename @[%s]", e.canonicalErrorFilename) } + hasError := false const maxItems = 1000 cursor := storage.NewCursorAtStart() for cursor != storage.NewCursorAtEnd() { @@ -141,11 +149,19 @@ func (e *EarliestFileErrorRetriever) HasError(ctx context.Context) (bool, error) } for _, errorFilePath := range errorFilePaths { if strings.HasSuffix(errorFilePath.String(), errorFileExtension) { - return true, nil + metadata, err := e.store.Head(ctx, errorFilePath) + if err != nil { + return false, errors.Wrapf(err, "failed to read error file @[%s]", errorFilePath) + } + err = e.validatePayloadSize(metadata) + if err != nil { + return false, errors.Wrapf(err, "error file @[%s] failed payload size validation", errorFilePath) + } + hasError = true } } } - return false, nil + return hasError, nil } func (e *EarliestFileErrorRetriever) GetError(ctx context.Context) (io.ExecutionError, error) { @@ -201,7 +217,7 @@ func (e *EarliestFileErrorRetriever) GetError(ctx context.Context) (io.Execution func NewEarliestFileErrorRetriever(errorDirPath storage.DataReference, canonicalErrorFilename string, store storage.ComposedProtobufStore, maxPayloadSize int64) *EarliestFileErrorRetriever { return &EarliestFileErrorRetriever{ - ErrorRetrieverBase: ErrorRetrieverBase{ + BaseErrorRetriever: BaseErrorRetriever{ store: store, maxPayloadSize: maxPayloadSize, }, diff --git a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go index d699403601..a30960da93 100644 --- a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go @@ -140,6 +140,18 @@ func TestReadOrigin(t *testing.T) { store.OnList(ctx, storage.DataReference("s3://errors/error"), 1000, storage.NewCursorAtStart()).Return( []storage.DataReference{"error-0.pb", "error-1.pb", "error-2.pb"}, storage.NewCursorAtEnd(), nil) + store.OnHead(ctx, storage.DataReference("error-0.pb")).Return(MemoryMetadata{ + exists: true, + }, nil) + + store.OnHead(ctx, storage.DataReference("error-1.pb")).Return(MemoryMetadata{ + exists: true, + }, nil) + + store.OnHead(ctx, storage.DataReference("error-2.pb")).Return(MemoryMetadata{ + exists: true, + }, nil) + maxPayloadSize := int64(0) r := NewRemoteFileOutputReaderWithErrorAggregationStrategy( ctx,