diff --git a/flyteplugins/go/tasks/pluginmachinery/io/iface.go b/flyteplugins/go/tasks/pluginmachinery/io/iface.go index f876defe5a..1f32717812 100644 --- a/flyteplugins/go/tasks/pluginmachinery/io/iface.go +++ b/flyteplugins/go/tasks/pluginmachinery/io/iface.go @@ -27,13 +27,18 @@ type InputReader interface { Get(ctx context.Context) (*core.LiteralMap, error) } -// OutputReader provides an abstracted OutputReader interface. The plugins are responsible to provide -// the implementations for the interface. Some helper implementations can be found in ioutils -type OutputReader interface { +// ErrorReader provides an abstracted error reading interface, which is part of OutputReader below. +type ErrorReader interface { // IsError returns true if an error was detected when reading the output and false if no error was detected IsError(ctx context.Context) (bool, error) // ReadError returns the error as type ExecutionError ReadError(ctx context.Context) (ExecutionError, error) +} + +// OutputReader provides an abstracted OutputReader interface. The plugins are responsible to provide +// the implementations for the interface. Some helper implementations can be found in ioutils +type OutputReader interface { + ErrorReader // IsFile returns true if the outputs are using the OutputFilePaths specified files. If so it allows the system to // optimize the reads of the files IsFile(ctx context.Context) bool diff --git a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go index 2e81b8ecec..077b51c5c7 100644 --- a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go +++ b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go @@ -15,18 +15,13 @@ import ( "github.com/flyteorg/flyte/flytestdlib/storage" ) -type errorRetriever interface { - HasError(ctx context.Context) (bool, error) - GetError(ctx context.Context) (io.ExecutionError, error) -} - -type baseErrorRetriever struct { +type baseErrorReader struct { store storage.ComposedProtobufStore maxPayloadSize int64 } -type singleFileErrorRetriever struct { - baseErrorRetriever +type singleFileErrorReader struct { + baseErrorReader errorFilePath storage.DataReference } @@ -34,9 +29,9 @@ const errorFileNotFoundErrorCode = "ErrorFileNotFound" var ErrRemoteFileExceedsMaxSize = errors.New("remote file exceeds max size") -func newSingleFileErrorRetriever(errorFilePath storage.DataReference, store storage.ComposedProtobufStore, maxPayloadSize int64) *singleFileErrorRetriever { - return &singleFileErrorRetriever{ - baseErrorRetriever: baseErrorRetriever{ +func newSingleFileErrorReader(errorFilePath storage.DataReference, store storage.ComposedProtobufStore, maxPayloadSize int64) *singleFileErrorReader { + return &singleFileErrorReader{ + baseErrorReader: baseErrorReader{ store: store, maxPayloadSize: maxPayloadSize, }, @@ -44,7 +39,7 @@ func newSingleFileErrorRetriever(errorFilePath storage.DataReference, store stor } } -func (b *baseErrorRetriever) validatePayloadSize(filePath storage.DataReference, metadata storage.Metadata) error { +func (b *baseErrorReader) validatePayloadSize(filePath storage.DataReference, metadata storage.Metadata) error { if metadata.Exists() { if metadata.Size() > b.maxPayloadSize { return errors.Wrapf(ErrRemoteFileExceedsMaxSize, @@ -55,7 +50,7 @@ func (b *baseErrorRetriever) validatePayloadSize(filePath storage.DataReference, return nil } -func (s *singleFileErrorRetriever) HasError(ctx context.Context) (bool, error) { +func (s *singleFileErrorReader) IsError(ctx context.Context) (bool, error) { metadata, err := s.store.Head(ctx, s.errorFilePath) if err != nil { return false, errors.Wrapf(err, "failed to read error file @[%s]", s.errorFilePath) @@ -95,7 +90,7 @@ func errorDoc2ExecutionError(errorDoc *core.ErrorDocument, errorFilePath storage return executionError } -func (s *singleFileErrorRetriever) GetError(ctx context.Context) (io.ExecutionError, error) { +func (s *singleFileErrorReader) ReadError(ctx context.Context) (io.ExecutionError, error) { errorDoc := &core.ErrorDocument{} err := s.store.ReadProtobuf(ctx, s.errorFilePath, errorDoc) if err != nil { @@ -115,14 +110,14 @@ func (s *singleFileErrorRetriever) GetError(ctx context.Context) (io.ExecutionEr return errorDoc2ExecutionError(errorDoc, s.errorFilePath), nil } -type earliestFileErrorRetriever struct { - baseErrorRetriever +type earliestFileErrorReader struct { + baseErrorReader errorDirPath storage.DataReference errorFilePathPrefix storage.DataReference errorFileExtension string } -func (e *earliestFileErrorRetriever) HasError(ctx context.Context) (bool, error) { +func (e *earliestFileErrorReader) IsError(ctx context.Context) (bool, error) { hasError := false const maxItems = 1000 cursor := storage.NewCursorAtStart() @@ -150,7 +145,7 @@ func (e *earliestFileErrorRetriever) HasError(ctx context.Context) (bool, error) return hasError, nil } -func (e *earliestFileErrorRetriever) GetError(ctx context.Context) (io.ExecutionError, error) { +func (e *earliestFileErrorReader) ReadError(ctx context.Context) (io.ExecutionError, error) { var earliestTimestamp time.Time = time.Now() earliestExecutionError := io.ExecutionError{} const maxItems = 1000 @@ -181,7 +176,7 @@ func (e *earliestFileErrorRetriever) GetError(ctx context.Context) (io.Execution return earliestExecutionError, nil } -func newEarliestFileErrorRetriever(errorDirPath storage.DataReference, canonicalErrorFilename string, store storage.ComposedProtobufStore, maxPayloadSize int64) (*earliestFileErrorRetriever, error) { +func newEarliestFileErrorReader(errorDirPath storage.DataReference, canonicalErrorFilename string, store storage.ComposedProtobufStore, maxPayloadSize int64) (*earliestFileErrorReader, error) { // If the canonical error file name is error.pb, we expect multiple error files // to have name error.pb pieces := strings.Split(canonicalErrorFilename, ".") @@ -193,8 +188,8 @@ func newEarliestFileErrorRetriever(errorDirPath storage.DataReference, canonical errorFilePathPrefix := storage.NewDataReference(scheme, container, filepath.Join(key, errorFilePrefix)) errorFileExtension := fmt.Sprintf(".%s", pieces[1]) - return &earliestFileErrorRetriever{ - baseErrorRetriever: baseErrorRetriever{ + return &earliestFileErrorReader{ + baseErrorReader: baseErrorReader{ store: store, maxPayloadSize: maxPayloadSize, }, @@ -204,17 +199,17 @@ func newEarliestFileErrorRetriever(errorDirPath storage.DataReference, canonical }, nil } -func newErrorRetriever(errorAggregationStrategy k8s.ErrorAggregationStrategy, errorDirPath storage.DataReference, errorFilename string, store storage.ComposedProtobufStore, maxPayloadSize int64) (errorRetriever, error) { +func newErrorReader(errorAggregationStrategy k8s.ErrorAggregationStrategy, errorDirPath storage.DataReference, errorFilename string, store storage.ComposedProtobufStore, maxPayloadSize int64) (io.ErrorReader, error) { if errorAggregationStrategy == k8s.DefaultErrorAggregationStrategy { scheme, container, key, err := errorDirPath.Split() if err != nil { return nil, errors.Wrapf(err, "invalid error dir path %s", errorDirPath) } errorFilePath := storage.NewDataReference(scheme, container, filepath.Join(key, errorFilename)) - return newSingleFileErrorRetriever(errorFilePath, store, maxPayloadSize), nil + return newSingleFileErrorReader(errorFilePath, store, maxPayloadSize), nil } if errorAggregationStrategy == k8s.EarliestErrorAggregationStrategy { - return newEarliestFileErrorRetriever(errorDirPath, errorFilename, store, maxPayloadSize) + return newEarliestFileErrorReader(errorDirPath, errorFilename, store, maxPayloadSize) } return nil, errors.Errorf("unknown error aggregation strategy: %v", errorAggregationStrategy) } @@ -223,15 +218,15 @@ type RemoteFileOutputReader struct { outPath io.OutputFilePaths store storage.ComposedProtobufStore maxPayloadSize int64 - errorRetriever errorRetriever + errorReader io.ErrorReader } func (r RemoteFileOutputReader) IsError(ctx context.Context) (bool, error) { - return r.errorRetriever.HasError(ctx) + return r.errorReader.IsError(ctx) } func (r RemoteFileOutputReader) ReadError(ctx context.Context) (io.ExecutionError, error) { - return r.errorRetriever.GetError(ctx) + return r.errorReader.ReadError(ctx) } func (r RemoteFileOutputReader) Exists(ctx context.Context) (bool, error) { @@ -294,12 +289,12 @@ func getMaxPayloadSize(maxDatasetSize int64) int64 { func NewRemoteFileOutputReader(context context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader { maxPayloadSize := getMaxPayloadSize(maxDatasetSize) - errorRetriever := newSingleFileErrorRetriever(outPaths.GetErrorPath(), store, maxPayloadSize) + errorReader := newSingleFileErrorReader(outPaths.GetErrorPath(), store, maxPayloadSize) return RemoteFileOutputReader{ outPath: outPaths, store: store, maxPayloadSize: maxPayloadSize, - errorRetriever: errorRetriever, + errorReader: errorReader, } } @@ -311,7 +306,7 @@ func NewRemoteFileOutputReaderWithErrorAggregationStrategy(_ context.Context, st } errorFilename := filepath.Base(key) errorDirPath := storage.NewDataReference(scheme, container, filepath.Dir(key)) - errorRetriever, err := newErrorRetriever(errorAggregationStrategy, errorDirPath, errorFilename, store, maxPayloadSize) + errorReader, err := newErrorReader(errorAggregationStrategy, errorDirPath, errorFilename, store, maxPayloadSize) if err != nil { return nil, errors.Wrapf(err, "failed to create remote output reader with error aggregation strategy %v", errorAggregationStrategy) } @@ -319,6 +314,6 @@ func NewRemoteFileOutputReaderWithErrorAggregationStrategy(_ context.Context, st outPath: outPaths, store: store, maxPayloadSize: maxPayloadSize, - errorRetriever: errorRetriever, + errorReader: errorReader, }, nil }