Skip to content

Commit

Permalink
Make SortedBucketIO.Read extendable for multi-format
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Dec 14, 2023
1 parent a45419c commit 2825981
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

/** API for reading and writing Avro sorted-bucket files. */
public class AvroSortedBucketIO {
Expand Down Expand Up @@ -191,6 +192,11 @@ public static <K1, K2, T extends SpecificRecord> TransformOutput<K1, K2, T> tran
/** Reads from Avro sorted-bucket files, to be used with {@link SortedBucketIO.CoGbk}. */
@AutoValue
public abstract static class Read<T extends IndexedRecord> extends SortedBucketIO.Read<T> {
@Nullable
abstract ImmutableList<String> getInputDirectories();

abstract String getFilenameSuffix();

@Nullable
abstract Schema getSchema();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

/** API for reading and writing BigQuery {@link TableRow} JSON sorted-bucket files. */
public class JsonSortedBucketIO {
Expand Down Expand Up @@ -107,6 +108,10 @@ public static <K1, K2> TransformOutput<K1, K2> transformOutput(
*/
@AutoValue
public abstract static class Read extends SortedBucketIO.Read<TableRow> {
@Nullable
abstract ImmutableList<String> getInputDirectories();

abstract String getFilenameSuffix();

abstract Compression getCompression();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -199,6 +200,11 @@ public static <K1, K2, T extends SpecificRecord> TransformOutput<K1, K2, T> tran
/** Reads from Avro sorted-bucket files, to be used with {@link SortedBucketIO.CoGbk}. */
@AutoValue
public abstract static class Read<T extends IndexedRecord> extends SortedBucketIO.Read<T> {
@Nullable
abstract ImmutableList<String> getInputDirectories();

abstract String getFilenameSuffix();

@Nullable
abstract Schema getSchema();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,6 @@ public abstract static class TransformOutput<K1, K2, V> implements Serializable

/** Represents a single sorted-bucket source written using {@link SortedBucketSink}. */
public abstract static class Read<V> implements Serializable {

@Nullable
abstract ImmutableList<String> getInputDirectories();

abstract String getFilenameSuffix();

public abstract TupleTag<V> getTupleTag();

protected abstract BucketedInput<V> toBucketedInput(SortedBucketSource.Keying keying);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,16 @@ public static <V> BucketedInput<V> of(
tupleTag, inputDirectories, filenameSuffix, fileOperations, predicate);
}

public static <V> BucketedInput<V> of(
Keying keying,
TupleTag<V> tupleTag,
Map<String, KV<String, FileOperations<V>>> directories,
Predicate<V> predicate) {
if (keying == Keying.PRIMARY)
return new PrimaryKeyedBucketedInput<>(tupleTag, directories, predicate);
return new PrimaryAndSecondaryKeyedBucktedInput<>(tupleTag, directories, predicate);
}

public BucketedInput(
Keying keying,
TupleTag<V> tupleTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.tensorflow.proto.example.Example;

/**
Expand Down Expand Up @@ -124,6 +125,10 @@ public static <K1, K2> TransformOutput<K1, K2> transformOutput(
*/
@AutoValue
public abstract static class Read extends SortedBucketIO.Read<Example> {
@Nullable
abstract ImmutableList<String> getInputDirectories();

abstract String getFilenameSuffix();

abstract Compression getCompression();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ object ParquetTypeSortedBucketIO {
def withConfiguration(configuration: Configuration): Read[T] =
this.copy(configuration = configuration)

override def getInputDirectories: ImmutableList[String] =
def getInputDirectories: ImmutableList[String] =
ImmutableList.copyOf(inputDirectories.asJava: java.lang.Iterable[String])
override def getFilenameSuffix: String = filenameSuffix
def getFilenameSuffix: String = filenameSuffix

override def getTupleTag: TupleTag[T] = tupleTag

override protected def toBucketedInput(
Expand Down

0 comments on commit 2825981

Please sign in to comment.