-
Notifications
You must be signed in to change notification settings - Fork 63
refactor: decouple PartitionSpec from Schema #299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
335a0da to
bb499ad
Compare
| partition_fields.push_back(std::move(*partition_field)); | ||
| } | ||
| return std::make_unique<PartitionSpec>(schema, spec_id, std::move(partition_fields)); | ||
| return std::make_unique<PartitionSpec>(spec_id, std::move(partition_fields)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, we cannot use the unbound form here. Instead, we need to use a new PartitionSpec::Make to find the source field of each partition field from schema and then verify it is acceptable by the transform object.
wgtmac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main feedback is that we need to keep the schema parameter because we eventually need it to create bound partition spec.
|
|
||
| Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson( | ||
| const std::shared_ptr<Schema>& schema, const nlohmann::json& json) { | ||
| Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(const nlohmann::json& json) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the function signature to still accept const std::shared_ptr<Schema>& schema and add a TODO comment below for PartitionSpec to validate the partition fields with this schema. Otherwise you need to add this parameter back in the next PR.
| /// \param[out] default_spec_id The default partition spec ID. | ||
| /// \param[out] partition_specs The list of partition specs. | ||
| Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version, | ||
| const std::shared_ptr<Schema>& current_schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, let's keep it.
| for (const auto& spec_json : spec_array) { | ||
| ICEBERG_ASSIGN_OR_RAISE(auto spec, | ||
| PartitionSpecFromJson(current_schema, spec_json)); | ||
| ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecFromJson(spec_json)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
|
||
| auto spec = std::make_unique<PartitionSpec>( | ||
| current_schema, PartitionSpec::kInitialSpecId, std::move(fields)); | ||
| auto spec = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a TODO comment here as well.
| ICEBERG_RETURN_UNEXPECTED(ParsePartitionSpecs( | ||
| json, table_metadata->format_version, current_schema, | ||
| table_metadata->default_spec_id, table_metadata->partition_specs)); | ||
| ICEBERG_RETURN_UNEXPECTED(ParsePartitionSpecs(json, table_metadata->format_version, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec, | ||
| std::shared_ptr<Schema> table_schema) | ||
| : partition_spec_(std::move(partition_spec)), | ||
| table_schema_(std::move(table_schema)) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| table_schema_(std::move(table_schema)) {} | |
| current_schema_(std::move(current_schema)) {} |
Let's avoid calling it table schema because a table can have schemas of different versions.
| auto adapter = | ||
| std::make_unique<ManifestEntryAdapterV1>(snapshot_id, std::move(partition_spec)); | ||
| std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec, | ||
| std::shared_ptr<Schema> table_schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for the name of table_schema
|
|
||
| Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType() { | ||
| Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType( | ||
| std::shared_ptr<Schema> schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| std::shared_ptr<Schema> schema) { | |
| const std::shared_ptr<Schema>& schema) { |
Both const std::shared_ptr<Schema>& or const Schema& is OK. Perhaps the latter is better since it accepts wider range of inputs.
|
|
||
| /// \brief Get the partition type. | ||
| Result<std::shared_ptr<StructType>> PartitionType(); | ||
| Result<std::shared_ptr<StructType>> PartitionType(std::shared_ptr<Schema> schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Result<std::shared_ptr<StructType>> PartitionType(std::shared_ptr<Schema> schema); | |
| Result<std::shared_ptr<StructType>> PartitionType(const Schema& schema); |
| // Get the table schema and partition type | ||
| ICEBERG_ASSIGN_OR_RAISE(auto table_schema, context_.table_metadata->Schema()); | ||
| ICEBERG_ASSIGN_OR_RAISE(auto partition_type, | ||
| partition_spec->PartitionType(table_schema)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Get the table schema and partition type | |
| ICEBERG_ASSIGN_OR_RAISE(auto table_schema, context_.table_metadata->Schema()); | |
| ICEBERG_ASSIGN_OR_RAISE(auto partition_type, | |
| partition_spec->PartitionType(table_schema)); | |
| // Get the current schema and partition type | |
| ICEBERG_ASSIGN_OR_RAISE(auto current_schema, context_.table_metadata->Schema()); | |
| ICEBERG_ASSIGN_OR_RAISE(auto partition_type, | |
| partition_spec->PartitionType(current_schema)); |
| std::shared_ptr<Schema> schema) { | ||
| if (fields_.empty()) { | ||
| return nullptr; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation allows using partition_type_ cache based on tableschema_, but the given schema parameter may belong to a different schema version, which could lead to correctness issues.
Suggest adding schema_id to the cache key to ensure consistency.
such as:
std::unordered_map<int32_t, std::shared_ptr> partition_type_cache_;
schema_member fromPartitionSpec, MakingPartitionType()method require a Schema parameter instead of using the stored schema.table_schema_inManifestEntryAdapter.table_schemainmetadata_["schema"].