Skip to content

Conversation

@gaborkaszab
Copy link
Collaborator

No description provided.

@gaborkaszab
Copy link
Collaborator Author

Some background: The current the way to query partition stats is through PartitionStatsHandler.readPartitionStatsFile(). For the the user has to put together the schema and get the input file to read. It would be beneficial for easier usability (also one comment on my stats proposal doc mentions) to have a more convenient API to scan partition stats. This could also have filter and projection capabilities.

The content of this PR:

  1. Introduce PartitionStatisticsScan API and its implementation BasePartitionStatisticsScan in core. For simplicity this has the functionality that exists today, no filtering by partition, no projection.
  2. Replace the usage of PartitionStatsHandler.readPartitionStatsFile() with the new API
  3. Introduce PartitionStatistics interface into the API module, make PartitionStats in core to derive from this. This is needed so that the Scan API could use this as return value, while the existing PartitionStats class is in core module.
  4. Replace the usage of PartitionStats whenever possible with the new interface.

These could possibly be some follow-up steps:

  1. Implementation of filter() and project() on the new Scan API
  2. The naming of affected classes is a bit weird: interface api/PartitionStatistics that is implemented by core/PartitionStats. Ideally the name of the implementation would be BasePartitionStatistics. As a next step we can introduce a class with the same content and new name and deprecate the existing one, also remove usage. Changes within PartitionStats are easier to review in case "renaming" happens in a follow-up PR.
  3. Older Spark versions should be covered

value,
(existingEntry, newEntry) -> {
existingEntry.appendStats(newEntry);
((PartitionStats) existingEntry).appendStats(newEntry);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If PartitionStatistics interface had the appendStats function, this cast (and another occurrence) wouldn't be needed. It seemed a bit weird to have it there, but I'm open to make this change to clean up casts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep the interface clean

PartitionStatisticsScan filter(Expression filter);

/**
* Create a new scan from this with the schema as its projection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe describe what will happen with the PartitionStatistics attributes which are not part of the schema.

/**
* Create a new scan from this with the schema as its projection.
*
* @param schema a projection schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the user create the Schema?

I would prefer something like the DataFile where the possible columns are available as constants, and the type is available as well. Maybe copy/move/deprecate the schema from the old place.

Types.StructType partitionType = Partitioning.partitionType(table);
Schema schema = PartitionStatsHandler.schema(partitionType, TableUtil.formatVersion(table));

FileFormat fileFormat = FileFormat.fromFileName(statsFile.get().path());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that getting the file format from the name is brittle. Maybe not in this PR, but I would love to have this fixed


try {
stats = computeAndMergeStatsIncremental(table, snapshot, partitionType, statisticsFile);
stats = computeAndMergeStatsIncremental(table, snapshot, statisticsFile.snapshotId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove the partitionType parameter? We recalculate it later again for every file. Isn't that unnecessary?

Comment on lines +294 to +296
rec -> {
return (PartitionStats) recordToPartitionStats(rec);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the braces?

}

@Override
public CloseableIterable<PartitionStatistics> scan() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have tests for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants