-
Notifications
You must be signed in to change notification settings - Fork 601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-8530] Handle out-of-sync producers by writing records to past-schema parquet #24955
[CORE-8530] Handle out-of-sync producers by writing records to past-schema parquet #24955
Conversation
20ddb11
to
8e9d51a
Compare
b4ffa8f
to
0d4f346
Compare
This comment was marked as outdated.
This comment was marked as outdated.
770eb1d
to
4940eea
Compare
CI test resultstest results on build#61406
test results on build#61444
test results on build#61469
|
4940eea
to
ac6735d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The C++ changes could use some testing, though this functionally looks pretty good to me
namespace iceberg { | ||
bool schemas_equivalent(const struct_type& source, const struct_type& dest) { | ||
chunked_vector<const nested_field*> source_stk; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use some simple tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe same with table metadata and/or catalog_schema_manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah fair point. slipped my mind.
auto source_copy = schema->schema_struct.copy(); | ||
auto compat_res = check_schema_compat(dest_type, schema->schema_struct); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily related to this PR, but this seems like a really easy footgun to hit. If the solution in general is to make a copy of the struct beforehand, should we make check_schema_compat
take the source schema as non-const?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah good point. perhaps it would be most clear to pass by value.
Checks whether two structs are precisely equivalent[1] using a simultaneous depth-first traversal. The use case is for performing schema lookups on cached table metadata by type rather than by ID. [1] - Exclusive of IDs but inclusive of order. Signed-off-by: Oren Leiman <[email protected]>
Search for a schema that matches the provided type. Signed-off-by: Oren Leiman <[email protected]>
For catalog_schema_manager, we can use this to perform a type-wise schema lookup on cached metadata, resulting in table_info bound to an arbitrary schema rather (possibly) other than the current table schema. Also update catalog_schema_manager::get_ids_from_table_meta to try a type-wise lookup before performing the usual compat check. This way we can short-circuit a schema update if the desired schema is already present in the table. Also pass source struct to check_schema_compat by value to avoid polluting cached table metadata with compat annotations. Signed-off-by: Oren Leiman <[email protected]>
Rather than current schema ID. By this point we should have ensured that the record schema exists in the table (either historically or currently). This change lets us look past the current schema to build a writer for historical data. Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
ac6735d
to
f5a8ad8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@@ -230,7 +235,10 @@ catalog_schema_manager::get_table_info( | |||
} | |||
const auto& table = load_res.value(); | |||
|
|||
auto cur_schema = table.get_schema(table.current_schema_id); | |||
const auto* cur_schema = desired_type.has_value() | |||
? table.get_equivalent_schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there are perf implications here? Might be good to check what the impact here is and if there some other mechanism we want to have here (or does this get cached per schema ID in the translation path - I will look).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya I've been thinking about this. i got the impression from @andrwng that there's some caching in play, but I still need to chase down details.
in retrospect, not sure whether we can expect table_metadata::schemas
to be in sorted (oldest first) order. maybe slightly better to look up by current_schema_id up front and compare if necessary, since that's the common case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah there are other options like using a fingerprint to quickly sort out mismatches. Hopefully it's all behind a cache and this won't matter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify, the caching I was referring to is pretty indirect -- it's caching at the level of the record multiplexer, such that each translation of an offset range will have a shared map of parquet writers that's more or less keyed by schema ID. So we will need to do this lookup once per record schema per translation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right right. So assuming that:
- the steady state of most translators is a stream of records conforming to the corresponding table's current schema
- the first thing we look up in get_table_info is the current table schema
Then incurring a single additional equivalence check in the common case seems basically fine?
In principle we should only get in the out-of-sync producer state when the schema changes, but I don't have a good sense of how long we might spend there.
maybe an on-shard cache keyed by some fingerprint is good enough? Vs the additional work of wiring something into the coordinator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrwng - completely misread your comment 🤦.
once per record schema per translation
should be totally fine then as written, yeah?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops I missed the earlier comment. Yeah I wasn't implying that what we have is insufficient, just thought i'd clarify where there is schema reuse. I think it's fine as is (or at least, let's wait and see as the translation scheduling policy crystalizes, though I'd be surprised if it becomes a bottleneck)
I will say, maybe this is worth trying out with some massive schemas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no worries. yeah for sure - what i read on mobile was "check per batch", so I went off an wrote a pointless benchmark... entirely my bad.
maybe this is worth trying out with some massive schemas
yeah i can whip something up at some point.
wait and see...surprised if it becomes a bottleneck
agree
builds on #24862 . interesting commits start at e215bfe
Backports Required
Release Notes