-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(scio-smb) Support mixed FileOperations per BucketedInput #5064
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #5064 +/- ##
==========================================
+ Coverage 63.33% 63.35% +0.02%
==========================================
Files 291 291
Lines 10837 10843 +6
Branches 753 755 +2
==========================================
+ Hits 6864 6870 +6
Misses 3973 3973 ☔ View full report in Codecov by Sentry. |
scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java
Show resolved
Hide resolved
scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java
Outdated
Show resolved
Hide resolved
scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java
Outdated
Show resolved
Hide resolved
9980a60
to
a437c55
Compare
long numDistinctFileOperations = | ||
directories.values().stream().map(kv -> kv.getValue().getClass()).distinct().count(); | ||
|
||
// If all partitions use the same file operations type, don't keep re-encoding it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past serialized transform size has been an issue (if hundreds+ of partitions are being read), but this solution is definitely a bit convoluted 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could potentially add a header to write out the distinct file operations with indicies and have a reference to those indicies in each of the output objects. Slight increase in overhead if all file ops are different, but would decrease payload size even more if there are some (but not exactly 1) shared operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a good idea - will give it a try
a11633b
to
a45419c
Compare
How might a user use this change? |
scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java
Outdated
Show resolved
Hide resolved
scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java
Outdated
Show resolved
Hide resolved
long numDistinctFileOperations = | ||
directories.values().stream().map(kv -> kv.getValue().getClass()).distinct().count(); | ||
|
||
// If all partitions use the same file operations type, don't keep re-encoding it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could potentially add a header to write out the distinct file operations with indicies and have a reference to those indicies in each of the output objects. Slight increase in overhead if all file ops are different, but would decrease payload size even more if there are some (but not exactly 1) shared operations
scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java
Outdated
Show resolved
Hide resolved
scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java
Show resolved
Hide resolved
scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketIO.java
Show resolved
Hide resolved
You can use it if you're instantiating a SortedBucketSource transform directly (i.e. creating your own |
return (AvroCoder<ValueT>) AvroCoder.of(getSchema()); | ||
return recordClass == null | ||
? (AvroCoder<ValueT>) AvroCoder.of(getSchema()) | ||
: AvroCoder.of(recordClass, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did this work before? Codec for GenericRecord was used for SpecificRecords?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a bug in beam apache/beam#29518. In current version it will use SpecificData
. Prefer explicit AvroCoder.reflect
scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java
Show resolved
Hide resolved
@@ -427,18 +442,45 @@ public static <V> BucketedInput<V> of( | |||
tupleTag, inputDirectories, filenameSuffix, fileOperations, predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we remove old constructor signature, as long as interaction is done via of
factory method?
public abstract TupleTag<V> getTupleTag(); | ||
|
||
protected abstract BucketedInput<V> toBucketedInput(SortedBucketSource.Keying keying); | ||
public abstract BucketedInput<V> toBucketedInput(SortedBucketSource.Keying keying); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use this public method to construct SMB taps from the Scala API bindings, which take in SortedBucketIO.Read
objects 👍
|
||
public abstract K1 extractKeyPrimary(V value); | ||
|
||
public abstract K2 extractKeySecondary(V value); | ||
|
||
abstract int hashPrimaryKeyMetadata(); | ||
|
||
abstract int hashSecondaryKeyMetadata(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the secondary key impl. (lexicographic) we have today is one way of implementing it, which in some cases like analytics is less efficient/common as opposed to sth like z-ordering. Might be nice to think a bit more to find out with a more future proof API e.g. support N keys?
scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/FileOperations.java
Outdated
Show resolved
Hide resolved
@@ -86,11 +85,11 @@ int leastNumBuckets() { | |||
} | |||
|
|||
private <V> SourceMetadata<V> getSourceMetadata( | |||
List<String> directories, | |||
String filenameSuffix, | |||
Map<ResourceId, KV<String, FileOperations<V>>> directories, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This is now a bit more than direcories. Would probably be nice to change naming for smth more precise like directoryOperations
Basically:
[List<Directory>, FilenameSuffix, FileOperations]
, BucketInputs now containMap<Directory -> [FilenameSuffix, FileOperations]>
BucketMetadata
implementations will now implement a "hash" of their primary/secondary key information, used to assess intra-partition compatibility. Optionally, they can also override a new function,Set<Class<? extends BucketMetadata>> compatibleMetadataTypes()
, to specify which types of BucketMetadatas can be mixed in the same BucketedInput (i.e. Avro+Parquet).