-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core, Spark: Refactor FileRewriter interface to separate planning and execution #11513
base: main
Are you sure you want to change the base?
Conversation
return new RewriteFileGroup(info, Lists.newArrayList(tasks)); | ||
} | ||
|
||
public static class RewritePlanResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the serializable as it was not required by the tests.
@RussellSpitzer, @szehon-ho: Could you please check if this is ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just hesitate to remove something that was there before. I think originally we wanted it because we were considering shipping this class around but I don't see any evidence that it is currently serialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that these are Public we need java docs describing their purpose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add the javadoc.
Since we make this public, I would remove the Serializable. We can always add back if needed, but I restrict ourselves on a public API only when it is strictly necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the javadoc.
I think shipping around a stream is a non-trivial task, so I would keep it not Serializable for now.
@@ -191,7 +191,7 @@ protected long inputSize(List<T> group) { | |||
* of output files. The final split size is adjusted to be at least as big as the target file size | |||
* but less than the max write file size. | |||
*/ | |||
protected long splitSize(long inputSize) { | |||
public long splitSize(long inputSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is somewhat unrelated change - Flink will need to extract the split size since in the Flink compaction implementation the planner which needs its own rewriter, and the executor (where the rewriter is not used) runs on a different instance.
We could put into the FileGroupInfo
, but as a first run I opted for the minimal change, especially on public APIs.
For easier review it would be great if you could highlight the changes made. I see the note on RewritePlanResult but i'm not sure where this class came from. It is unclear what has been extracted and renamed and what is new code. |
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* Checks the files in the table, and using the {@link FileRewriter} plans the groups for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused about the inheritance structure here. If we want to make this common, maybe we should just have this belong to the FileRewriter as a base class?
If I can think through this correctly in this draft we have
RewriteFileGroupPlanner Responsible for
- Scanning Table
- Grouping Tasks by Partition
- Further Grouping and Filtering in FileRewriter
FileRewriter Responsible for
- Filtering Tasks based on Rewriter Config
- Grouping tasks within partitions
Feels like all of that could just be in the FileRewriter without having a separate GroupPlanner? Do we have an argument against that? I think we could even just have some new default methods on the FileRewriter interface for planning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me check that. I was hesitant to change existing classes, but if we have a consensus, I'm happy to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FileRewriter
is a generic class with <T extends ContentScanTask<F>, F extends ContentFile<F>>
.
The RewriteFileGroup
which is returned by the planning is specific for FileScanTask
.
I'm not sure that we use the FileRewriter
with any other parametrization ATM, but to put everything into a single class we need to do one of the things below:
- Changing
FileRewriter<T extends FileScanTask>
instead of the currentFileRewriter<T extends ContentScanTask<F>, F extends ContentFile<F>>
, or - Changing the
public List<FileScanTask> RewriteFileGroup.fileScans()
to returnpublic <T extends ContentScanTask<F>, F extends ContentFile<F>> List<T> fileScans
Both of the would be a breaking change in the public API so we should be careful about them.
We still can "duplicate" and create a new FileRewriter, and deprecate the old one - but I would only do this, if we think that the original API needs a serious refactor anyways.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been thinking a lot about the refactor. Trying to figure out what we should do.
we could possibly do
Rewriter[T]
has a Planner[T] // Responsible for generating tasks (move actual filtering into rewriter)
Maybe we should also have
Planner[T]
DeletePlanner[DeleteScanTask]
FilePlanner[FileScanTask]
So then you would have
SizeBasedDataRewriter <FileScanTask, DataFile>
private planner = new FilePlanner<FileScanTask>
Would that end up being more complicated? I feel like we are getting some interdependencies here I'm not comfortable with extending. @aokolnychyi Probably has some smart things to say here as well
// which belong to multiple partitions in the current spec. Treating all such files as | ||
// un-partitioned and grouping them together helps to minimize new files made. | ||
StructLike taskPartition = | ||
task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor fix; we can drop this now and use EmptyStructLike.get()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to make EmptyStructLike
, and EmptyStructLike.get
public?
Currently it is package private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made EmptyStructLike
public for now, but if you think this change doesn't worth it to make it public, I can revert this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got this exception:
TestRewriteDataFilesAction > testBinPackRewriterWithSpecificOutputSpec() > formatVersion = 3 FAILED
java.lang.UnsupportedOperationException: Can't retrieve values from an empty struct
at org.apache.iceberg.EmptyStructLike.get(EmptyStructLike.java:40)
at org.apache.iceberg.types.JavaHashes$StructLikeHash.hash(JavaHashes.java:96)
at org.apache.iceberg.types.JavaHashes$StructLikeHash.hash(JavaHashes.java:75)
at org.apache.iceberg.util.StructLikeWrapper.hashCode(StructLikeWrapper.java:96)
at java.base/java.util.HashMap.hash(HashMap.java:338)
So reverted the EmptyStructLike
changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blargh
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
Let me amend this now, and thanks for taking a look anyway! So there were no significant new code in the refactor. Originally the The methods and the class got moved to the |
@RussellSpitzer: This is ready to another round if you have time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable to me. Some minor nit
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewriteFileGroupPlanner.java
Outdated
Show resolved
Hide resolved
@pvary and I were talking this over a bit, I think we really want to get a stronger division between "Planning" and "Excecution" since the two are very intertwined right now. Ideally we end up in a situation where core has base classes that are only responsible for planning out exactly which files should be rewritten and in what logical groups they should be rewritten in, while another class is responsible for the physical implementation on how that actually occurs. Currently I think our structure is: Action contains Rewriter which extends a Planning Class with an Implementation. Because of this the Action and rewriter talk back and forth causing planning to occur in both the action and with parts of code in the rewriter. |
6e9370f
to
f236df9
Compare
6a8172d
to
bfcc848
Compare
This is just a WIP for the change, which uses git file renames. I hope it is easier to review this way. After the initial reviews, I need to do these steps
|
} | ||
|
||
@Override | ||
public void initPlan(FileRewritePlan<I, T, F, G> plan) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write now this method is just used to set config from the plan in the rewrite executor, i'm wondering if we should just be setting those parameters directly on the rewriter rather than inheriting them from the plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are fairly confident, that only writeMaxFileSize
and outputSpecId
comes from the plan, then we can add setWriteMaxFileSize
and setOutputSpecId
to the FileRewriteExecutor
API instead of the initPlan(FileRewritePlan)
import org.apache.iceberg.StructLike; | ||
|
||
/** Result of the positional delete file rewrite planning. */ | ||
public class RewritePositionDeletePlan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use FileRewritePlan<RewritePositionDeleteFiles.FileGroupInfo, PositionDeletesScanTask, DeleteFile, RewritePositionDeletesGroup>
everywhere instead of creating a new class, but after I had to create the RewriteFilePlan
to handle outputSpecId
, this seems more elegant
public Set<String> validOptions() { | ||
return ImmutableSet.<String>builder() | ||
.addAll(super.validOptions()) | ||
.add(RewritePositionDeleteFiles.REWRITE_JOB_ORDER) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is strange for me that currently we have RewritePositionDeleteFiles.REWRITE_JOB_ORDER
and RewriteDataFiles.REWRITE_JOB_ORDER
. If we merge these then we could move the REWRITE_JOB_ORDER
stuff to the SizeBasedFileRewritePlanner
class, which might be a bit nicer.
* Extends the {@link RewriteFileGroupPlanner} with the possibility to set the expected compression | ||
* factor. | ||
*/ | ||
public class SparkShufflingDataRewritePlanner extends RewriteFileGroupPlanner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason for this class is that compression-factor
generates different target groups.
If we agree that this configuration should be available in RewriteFileGroupPlanner
, or every Spark Planner, then we can get rid of this extra class and some related nastiness in RewriteDataFilesSparkAction
(shufflingPlanner).
@aokolnychyi: @RussellSpitzer is out until Thursday. Could you please review it in the meantime? |
@jackye1995: Would you happen to be interested in a standardized interface for compaction planning? |
….3 tests. Disabling Spark 3.4, 3.3 compilation as well.
…gDataRewritePlanner
import org.apache.iceberg.ContentScanTask; | ||
|
||
/** | ||
* A class for rewriting content file groups ({@link FileRewriteGroup}). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really just a rephrasing of the class name, we need to be much more clear about what this is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the javadoc
* | ||
* @param options options to initialize this rewriter | ||
*/ | ||
void init(Map<String, String> options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not clear from the definition here why initPlan and init are both required. It is also not clear when they are called or if they are both called etc ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add javadoc.
init
is called when the executor is initializedinitPlan
is called when the plan has been generated and the parameters needed for the executor is calculated (writeMaxFileSize
,outputSpecId
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the javadoc
/** | ||
* Container class representing a set of files to be rewritten by a {@link FileRewriteExecutor}. | ||
* | ||
* @param <I> the Java type of the plan info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very ambiguous to me, and it's not clear what I should be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the RewriteDataFiles.FileGroupInfo
and RewritePositionDeleteFiles.FileGroupInfo
this.expectedOutputFiles = expectedOutputFiles; | ||
} | ||
|
||
/** Identifiers and partition information about the group. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't this just a structlike? What other class types might we be passing through here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is RewriteDataFiles.FileGroupInfo
and RewritePositionDeleteFiles.FileGroupInfo
These are inherited from the old code
import org.apache.iceberg.RewriteJobOrder; | ||
|
||
/** | ||
* Container class representing a set of files to be rewritten by a {@link FileRewriteExecutor}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be the case since it also has
/** Expected split size for the output files. */
public long splitSize() {
return splitSize;
}
/** Expected number of the output files. */
public int expectedOutputFiles() {
return expectedOutputFiles;
}
The first one "splitSize" feels like it's global to a lot of FileRewriteGroups and not a particular property of 1 RewriteGroup, so may that belongs in Plan
"estimatedOutputFiles" could live here but maybe just makes sense as
estimatedOutputFiles(splitSize), where split size is passed in from elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first one "splitSize" feels like it's global to a lot of FileRewriteGroups and not a particular property of 1 RewriteGroup, so may that belongs in Plan
By my understanding currently this could be different for different groups in the same plan. The splitSize(long inputSize)
and the numOutputFiles(long inputSize)
is dependent on the input size, and big part of that is dependent on the configuration of the planner (see: #9069)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current code makes sure that the exact same compaction result is generated than with the previous algorithm.
In the old code we had:
- Plan level
- writeMaxFileSize - which governs the target file size when rewriting (SparkWriteOptions.TARGET_FILE_SIZE_BYTES)
- Group level
- splitSize - which governs the input split size when rewriting (SparkReadOptions.SPLIT_SIZE, splitSize)]
- expectedOutputFiles - which governs the shuffle partition numbers for shuffling rewriters
In the new code I have mirrored this.
I think the new code would be much cleaner if we put all of this information into the group level (we could get rid of the initPlan
step)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think basically anything having to do with "output" should be part of "Plan" or "Execute" depending. Like in this case we basically are just using these properties because they make it easy to control spark's behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me apply this to the current situation, so we can check if I understand correctly:
- We should keep
splitSize
in the group, as this drives the reading - We should move
expectedOutputFiles
to the plan, as it is driving the output
Is this the change that you are suggesting?
I see 3 possible separation of concerns:
- Plan is the 'result' - everything below that is only organized based on the multiplicity of the data. So if some value applies to every group, then that value belongs to the 'global' plan variables. If a value is different for every group, then that value belongs to the group (current code)
- Plan is the
write
config, group is theread
config. If I understand correctly, this is what you are suggesting. IMHO this is a bit awkward, as currently the groups are part of the plan. Maybe we could have areadConfig
,writeConfig
map in the plan instead of adding extra attributes to the plan and to the groups. This comes with a cost of extra parsing for the configs, but allows us more flexibility (fewer classes) - The group should contain every information which is required for a single job. So the job (executor) only receives a single group and every other bit of information is global. The drawback is that some information is duplicated, but cleaner on the executor side.
Your thoughts?
import org.apache.iceberg.StructLike; | ||
|
||
/** | ||
* Result of the file rewrite planning. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs more details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the javadoc
return totalGroupCount; | ||
} | ||
|
||
/** Calculated maximum file size for the target files */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what "target" means here. Maximum output file size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the javadoc.
Also see: #11513 (comment)
/** | ||
* A class for planning content file rewrites. | ||
* | ||
* <p>The entire rewrite operation is broken down into pieces based on partitioning, and size-based |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"partitioning and size-based" are implementation details right? Not necessarily always true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied from FileRewriter, but agree with your assessment, so rewritten the javadoc.
The partitioning is currently necessary (see FileRewritePlan#groupsInPartition
). Removed the size-based part.
* A class for planning content file rewrites. | ||
* | ||
* <p>The entire rewrite operation is broken down into pieces based on partitioning, and size-based | ||
* groups within a partition. These subunits of the rewrite are referred to as file groups. A file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these details belong in the details of FileGroup. This doc needs to talk about the actual planning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the javadoc
As requested in #11497 extracted the Spark and Core related changes.
Based on the discussions it was decided that we should separate out the planning (shared between the engines) and the rewriting which are engine specific. This will allow Flink to reuse the planning related code.
Created the following new classes/interfaces:
FileRewritePlan
for storing the planning resultsFileRewritePlanner
for grouping the task based on partitions and the planning algorithm (currently size based)FileRewriteExecutor
for executing the the actual rewrite on specific groupsDeprecated the following classes/interface:
FileRewriter
- split intoFileRewritePlanner
andFileRewriteExecutor
SizeBasedFileRewriter
- split intoSizeBasedFileRewritePlanner
andFileRewriteExecutor
SizeBasedPositionDeletesRewriter
- split intoRewritePositionDeletesGroupPlanner
andFileRewriteExecutor
Refactored the Spark code and the testing as well.