Skip to content

Commit

Permalink
working with maptask
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Dec 18, 2023
1 parent 2840d1f commit 6ac3e91
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions flyteplugins/go/tasks/plugins/array/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state
// Create catalog put items, but only put the ones that were not originally cached (as read from the catalog results bitset)
catalogWriterItems, err := ConstructCatalogUploadRequests(*tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().TaskId,
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), taskTemplate.Metadata.DiscoveryVersion,
iface, &tasksToCache, inputReaders, outputReaders)
taskTemplate.Metadata.CacheIgnoreInputVars, iface, &tasksToCache, inputReaders, outputReaders)

Check warning on line 272 in flyteplugins/go/tasks/plugins/array/catalog.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/catalog.go#L272

Added line #L272 was not covered by tests

if err != nil {
return nil, externalResources, err
Expand Down Expand Up @@ -338,7 +338,7 @@ func WriteToCatalog(ctx context.Context, ownerSignal core.SignalAsync, catalogCl
}

func ConstructCatalogUploadRequests(keyID idlCore.Identifier, taskExecID idlCore.TaskExecutionIdentifier,
cacheVersion string, taskInterface idlCore.TypedInterface, whichTasksToCache *bitarray.BitSet,
cacheVersion string, cacheIgnoreInputVars []string, taskInterface idlCore.TypedInterface, whichTasksToCache *bitarray.BitSet,
inputReaders []io.InputReader, outputReaders []io.OutputReader) ([]catalog.UploadRequest, error) {

writerWorkItems := make([]catalog.UploadRequest, 0, len(inputReaders))
Expand All @@ -355,10 +355,11 @@ func ConstructCatalogUploadRequests(keyID idlCore.Identifier, taskExecID idlCore

wi := catalog.UploadRequest{
Key: catalog.Key{
Identifier: keyID,
InputReader: input,
CacheVersion: cacheVersion,
TypedInterface: taskInterface,
Identifier: keyID,
InputReader: input,
CacheVersion: cacheVersion,
CacheIgnoreInputVars: cacheIgnoreInputVars,
TypedInterface: taskInterface,

Check warning on line 362 in flyteplugins/go/tasks/plugins/array/catalog.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/catalog.go#L358-L362

Added lines #L358 - L362 were not covered by tests
},
ArtifactData: outputReaders[idx],
ArtifactMetadata: catalog.Metadata{
Expand Down

0 comments on commit 6ac3e91

Please sign in to comment.