-
Notifications
You must be signed in to change notification settings - Fork 671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix propeller crash when inferring literal type for an offloaded literal #5771
Changes from all commits
6091885
0a08dfc
ce8f2fe
e6e1e9d
134af61
8672c30
7a652b3
986b41d
3ef0e8f
9539ea9
195818e
9edd5f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -191,6 +191,7 @@ | |
} | ||
|
||
size := -1 | ||
|
||
for key, variable := range literalMap.Literals { | ||
literalType := validators.LiteralTypeForLiteral(variable) | ||
err := validators.ValidateLiteralType(literalType) | ||
|
@@ -200,10 +201,19 @@ | |
handler.PhaseInfoFailure(idlcore.ExecutionError_USER, errors.IDLNotFoundErr, errMsg, nil), | ||
), nil | ||
} | ||
if variable.GetOffloadedMetadata() != nil { | ||
// variable will be overwritten with the contents of the offloaded data which contains the actual large literal. | ||
// We need this for the map task to be able to create the subNodeSpec | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just so I understand, this is also needed so the map task can index into the inputs as well too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah subNodeSpec covers that as it contains the inputs for each subnode |
||
err := common.ReadLargeLiteral(ctx, nCtx.DataStore(), variable) | ||
if err != nil { | ||
return handler.DoTransition(handler.TransitionTypeEphemeral, | ||
handler.PhaseInfoFailure(idlcore.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "couldn't read the offloaded literal", nil), | ||
), nil | ||
} | ||
} | ||
switch literalType.Type.(type) { | ||
case *idlcore.LiteralType_CollectionType: | ||
collectionLength := len(variable.GetCollection().Literals) | ||
|
||
if size == -1 { | ||
size = collectionLength | ||
} else if size != collectionLength { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -122,7 +122,7 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry | |
logger.Debugf(ctx, "DataCatalog failed to get artifact by tag %+v, err: %+v", tag, err) | ||
return catalog.Entry{}, err | ||
} | ||
logger.Debugf(ctx, "Artifact found %v from tag %v", artifact, tag) | ||
logger.Debugf(ctx, "Artifact found %v from tag %v", artifact.GetId(), tag) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. curious why these changes are necessary? do we ever send too big inputs/outputs to cache service? shouldn't we be using the offloaded literal? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even they are not too big breaching the 10 MB limit, but anything less than that threshold will get logged which is also huge and hence removed it. Let me know if you disagree There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good, I could still see this being useful, maybe we only log a deterministic prefix? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was logging an entire object and hence i think logging an identifier should good for long term too and we have api's to fetch the aritifact using the ID if we want to debug further. Logging a prefix of object converted to string format doesn't seem useful IMO |
||
|
||
var relevantTag *datacatalog.Tag | ||
if len(artifact.GetTags()) > 0 { | ||
|
@@ -230,7 +230,7 @@ func (m *CatalogClient) createArtifact(ctx context.Context, key catalog.Key, dat | |
createArtifactRequest := &datacatalog.CreateArtifactRequest{Artifact: cachedArtifact} | ||
_, err := m.client.CreateArtifact(ctx, createArtifactRequest) | ||
if err != nil { | ||
logger.Errorf(ctx, "Failed to create Artifact %+v, err: %v", cachedArtifact, err) | ||
logger.Errorf(ctx, "Failed to create Artifact %+v, err: %v", cachedArtifact.Id, err) | ||
return catalog.Status{}, err | ||
} | ||
logger.Debugf(ctx, "Created artifact: %v, with %v outputs from execution %+v", cachedArtifact.Id, len(artifactDataList), metadata) | ||
|
@@ -259,7 +259,7 @@ func (m *CatalogClient) createArtifact(ctx context.Context, key catalog.Key, dat | |
} | ||
} | ||
|
||
logger.Debugf(ctx, "Successfully created artifact %+v for key %+v, dataset %+v and execution %+v", cachedArtifact, key, datasetID, metadata) | ||
logger.Debugf(ctx, "Successfully created artifact %+v for key %+v, dataset %+v and execution %+v", cachedArtifact.Id, key, datasetID, metadata) | ||
return catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, EventCatalogMetadata(datasetID, tag, nil)), nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, thank you for updating