Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bgedik committed Oct 18, 2024
1 parent 0ea8096 commit 0a12e95
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 34 deletions.
11 changes: 8 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/io/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ type InputReader interface {
Get(ctx context.Context) (*core.LiteralMap, error)
}

// OutputReader provides an abstracted OutputReader interface. The plugins are responsible to provide
// the implementations for the interface. Some helper implementations can be found in ioutils
type OutputReader interface {
// ErrorReader provides an abstracted error reading interface, which is part of OutputReader below.
type ErrorReader interface {
// IsError returns true if an error was detected when reading the output and false if no error was detected
IsError(ctx context.Context) (bool, error)
// ReadError returns the error as type ExecutionError
ReadError(ctx context.Context) (ExecutionError, error)
}

// OutputReader provides an abstracted OutputReader interface. The plugins are responsible to provide
// the implementations for the interface. Some helper implementations can be found in ioutils
type OutputReader interface {
ErrorReader
// IsFile returns true if the outputs are using the OutputFilePaths specified files. If so it allows the system to
// optimize the reads of the files
IsFile(ctx context.Context) bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,31 @@ import (
"github.com/flyteorg/flyte/flytestdlib/storage"
)

type errorRetriever interface {
HasError(ctx context.Context) (bool, error)
GetError(ctx context.Context) (io.ExecutionError, error)
}

type baseErrorRetriever struct {
type baseErrorReader struct {
store storage.ComposedProtobufStore
maxPayloadSize int64
}

type singleFileErrorRetriever struct {
baseErrorRetriever
type singleFileErrorReader struct {
baseErrorReader
errorFilePath storage.DataReference
}

const errorFileNotFoundErrorCode = "ErrorFileNotFound"

var ErrRemoteFileExceedsMaxSize = errors.New("remote file exceeds max size")

func newSingleFileErrorRetriever(errorFilePath storage.DataReference, store storage.ComposedProtobufStore, maxPayloadSize int64) *singleFileErrorRetriever {
return &singleFileErrorRetriever{
baseErrorRetriever: baseErrorRetriever{
func newSingleFileErrorReader(errorFilePath storage.DataReference, store storage.ComposedProtobufStore, maxPayloadSize int64) *singleFileErrorReader {
return &singleFileErrorReader{
baseErrorReader: baseErrorReader{
store: store,
maxPayloadSize: maxPayloadSize,
},
errorFilePath: errorFilePath,
}
}

func (b *baseErrorRetriever) validatePayloadSize(filePath storage.DataReference, metadata storage.Metadata) error {
func (b *baseErrorReader) validatePayloadSize(filePath storage.DataReference, metadata storage.Metadata) error {
if metadata.Exists() {
if metadata.Size() > b.maxPayloadSize {
return errors.Wrapf(ErrRemoteFileExceedsMaxSize,
Expand All @@ -55,7 +50,7 @@ func (b *baseErrorRetriever) validatePayloadSize(filePath storage.DataReference,
return nil
}

func (s *singleFileErrorRetriever) HasError(ctx context.Context) (bool, error) {
func (s *singleFileErrorReader) IsError(ctx context.Context) (bool, error) {
metadata, err := s.store.Head(ctx, s.errorFilePath)
if err != nil {
return false, errors.Wrapf(err, "failed to read error file @[%s]", s.errorFilePath)
Expand Down Expand Up @@ -95,7 +90,7 @@ func errorDoc2ExecutionError(errorDoc *core.ErrorDocument, errorFilePath storage
return executionError
}

func (s *singleFileErrorRetriever) GetError(ctx context.Context) (io.ExecutionError, error) {
func (s *singleFileErrorReader) ReadError(ctx context.Context) (io.ExecutionError, error) {
errorDoc := &core.ErrorDocument{}
err := s.store.ReadProtobuf(ctx, s.errorFilePath, errorDoc)
if err != nil {
Expand All @@ -115,14 +110,14 @@ func (s *singleFileErrorRetriever) GetError(ctx context.Context) (io.ExecutionEr
return errorDoc2ExecutionError(errorDoc, s.errorFilePath), nil
}

type earliestFileErrorRetriever struct {
baseErrorRetriever
type earliestFileErrorReader struct {
baseErrorReader
errorDirPath storage.DataReference
errorFilePathPrefix storage.DataReference
errorFileExtension string
}

func (e *earliestFileErrorRetriever) HasError(ctx context.Context) (bool, error) {
func (e *earliestFileErrorReader) IsError(ctx context.Context) (bool, error) {
hasError := false
const maxItems = 1000
cursor := storage.NewCursorAtStart()
Expand Down Expand Up @@ -150,7 +145,7 @@ func (e *earliestFileErrorRetriever) HasError(ctx context.Context) (bool, error)
return hasError, nil
}

func (e *earliestFileErrorRetriever) GetError(ctx context.Context) (io.ExecutionError, error) {
func (e *earliestFileErrorReader) ReadError(ctx context.Context) (io.ExecutionError, error) {
var earliestTimestamp time.Time = time.Now()
earliestExecutionError := io.ExecutionError{}
const maxItems = 1000
Expand Down Expand Up @@ -181,7 +176,7 @@ func (e *earliestFileErrorRetriever) GetError(ctx context.Context) (io.Execution
return earliestExecutionError, nil
}

func newEarliestFileErrorRetriever(errorDirPath storage.DataReference, canonicalErrorFilename string, store storage.ComposedProtobufStore, maxPayloadSize int64) (*earliestFileErrorRetriever, error) {
func newEarliestFileErrorReader(errorDirPath storage.DataReference, canonicalErrorFilename string, store storage.ComposedProtobufStore, maxPayloadSize int64) (*earliestFileErrorReader, error) {
// If the canonical error file name is error.pb, we expect multiple error files
// to have name error<suffix>.pb
pieces := strings.Split(canonicalErrorFilename, ".")
Expand All @@ -193,8 +188,8 @@ func newEarliestFileErrorRetriever(errorDirPath storage.DataReference, canonical
errorFilePathPrefix := storage.NewDataReference(scheme, container, filepath.Join(key, errorFilePrefix))
errorFileExtension := fmt.Sprintf(".%s", pieces[1])

return &earliestFileErrorRetriever{
baseErrorRetriever: baseErrorRetriever{
return &earliestFileErrorReader{
baseErrorReader: baseErrorReader{
store: store,
maxPayloadSize: maxPayloadSize,
},
Expand All @@ -204,17 +199,17 @@ func newEarliestFileErrorRetriever(errorDirPath storage.DataReference, canonical
}, nil
}

func newErrorRetriever(errorAggregationStrategy k8s.ErrorAggregationStrategy, errorDirPath storage.DataReference, errorFilename string, store storage.ComposedProtobufStore, maxPayloadSize int64) (errorRetriever, error) {
func newErrorReader(errorAggregationStrategy k8s.ErrorAggregationStrategy, errorDirPath storage.DataReference, errorFilename string, store storage.ComposedProtobufStore, maxPayloadSize int64) (io.ErrorReader, error) {
if errorAggregationStrategy == k8s.DefaultErrorAggregationStrategy {
scheme, container, key, err := errorDirPath.Split()
if err != nil {
return nil, errors.Wrapf(err, "invalid error dir path %s", errorDirPath)
}
errorFilePath := storage.NewDataReference(scheme, container, filepath.Join(key, errorFilename))
return newSingleFileErrorRetriever(errorFilePath, store, maxPayloadSize), nil
return newSingleFileErrorReader(errorFilePath, store, maxPayloadSize), nil

Check warning on line 209 in flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go#L204-L209

Added lines #L204 - L209 were not covered by tests
}
if errorAggregationStrategy == k8s.EarliestErrorAggregationStrategy {
return newEarliestFileErrorRetriever(errorDirPath, errorFilename, store, maxPayloadSize)
return newEarliestFileErrorReader(errorDirPath, errorFilename, store, maxPayloadSize)
}
return nil, errors.Errorf("unknown error aggregation strategy: %v", errorAggregationStrategy)

Check warning on line 214 in flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go#L214

Added line #L214 was not covered by tests
}
Expand All @@ -223,15 +218,15 @@ type RemoteFileOutputReader struct {
outPath io.OutputFilePaths
store storage.ComposedProtobufStore
maxPayloadSize int64
errorRetriever errorRetriever
errorReader io.ErrorReader
}

func (r RemoteFileOutputReader) IsError(ctx context.Context) (bool, error) {
return r.errorRetriever.HasError(ctx)
return r.errorReader.IsError(ctx)
}

func (r RemoteFileOutputReader) ReadError(ctx context.Context) (io.ExecutionError, error) {
return r.errorRetriever.GetError(ctx)
return r.errorReader.ReadError(ctx)
}

func (r RemoteFileOutputReader) Exists(ctx context.Context) (bool, error) {
Expand Down Expand Up @@ -294,12 +289,12 @@ func getMaxPayloadSize(maxDatasetSize int64) int64 {

func NewRemoteFileOutputReader(context context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader {
maxPayloadSize := getMaxPayloadSize(maxDatasetSize)
errorRetriever := newSingleFileErrorRetriever(outPaths.GetErrorPath(), store, maxPayloadSize)
errorReader := newSingleFileErrorReader(outPaths.GetErrorPath(), store, maxPayloadSize)
return RemoteFileOutputReader{
outPath: outPaths,
store: store,
maxPayloadSize: maxPayloadSize,
errorRetriever: errorRetriever,
errorReader: errorReader,
}
}

Expand All @@ -311,14 +306,14 @@ func NewRemoteFileOutputReaderWithErrorAggregationStrategy(_ context.Context, st
}
errorFilename := filepath.Base(key)
errorDirPath := storage.NewDataReference(scheme, container, filepath.Dir(key))
errorRetriever, err := newErrorRetriever(errorAggregationStrategy, errorDirPath, errorFilename, store, maxPayloadSize)
errorReader, err := newErrorReader(errorAggregationStrategy, errorDirPath, errorFilename, store, maxPayloadSize)
if err != nil {
return nil, errors.Wrapf(err, "failed to create remote output reader with error aggregation strategy %v", errorAggregationStrategy)
}

Check warning on line 312 in flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go#L311-L312

Added lines #L311 - L312 were not covered by tests
return &RemoteFileOutputReader{
outPath: outPaths,
store: store,
maxPayloadSize: maxPayloadSize,
errorRetriever: errorRetriever,
errorReader: errorReader,
}, nil
}

0 comments on commit 0a12e95

Please sign in to comment.