Skip to content

Commit

Permalink
Fix for Parquet-Avro
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Mar 6, 2024
1 parent a9482a8 commit e2d7ae8
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,24 @@ static <K> K extractKey(Method[] keyGetters, Object value) {
return key;
}

@Override
<OtherKeyType> boolean keyClassMatches(Class<OtherKeyType> requestedReadType) {
if (requestedReadType == String.class && getKeyClass() == CharSequence.class) {
return true;
} else {
return super.keyClassMatches(requestedReadType);
}
}

@Override
<OtherKeyType> boolean keyClassSecondaryMatches(Class<OtherKeyType> requestedReadType) {
if (requestedReadType == String.class && getKeyClassSecondary() == CharSequence.class) {
return true;
} else {
return super.keyClassSecondaryMatches(requestedReadType);
}
}

////////////////////////////////////////////////////////////////////////////////
// Logic for dealing with Avro records vs Scala case classes
////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package com.spotify.scio.smb
import com.spotify.scio.avro.{Account, AccountStatus}
import com.spotify.scio.ScioContext
import com.spotify.scio.testing.PipelineSpec
import org.apache.beam.sdk.extensions.smb.AvroSortedBucketIO
import org.apache.beam.sdk.extensions.smb.{
AvroSortedBucketIO,
ParquetAvroSortedBucketIO,
SortedBucketIO
}
import org.apache.beam.sdk.values.TupleTag

import java.nio.file.Files

class SmbVersionParityTest extends PipelineSpec {
"SortedBucketSource" should "be able to read CharSequence-keyed sources written before 0.14" in {
val output = Files.createTempDirectory("smb-version-test").toFile
output.deleteOnExit()

private def testRoundtrip(
write: SortedBucketIO.Write[CharSequence, _, Account],
read: SortedBucketIO.Read[Account]
): Unit = {
val accounts = (1 to 10).map { i =>
Account
.newBuilder()
Expand All @@ -27,30 +31,51 @@ class SmbVersionParityTest extends PipelineSpec {
{
val sc = ScioContext()
sc.parallelize(accounts)
.saveAsSortedBucket(
AvroSortedBucketIO
.write(classOf[CharSequence], "name", classOf[Account])
.to(output.getAbsolutePath)
.withNumBuckets(1)
.withNumShards(1)
)
.saveAsSortedBucket(write)
sc.run()
}

// Read data
val sc = ScioContext()
val tap = sc
.sortMergeGroupByKey(
classOf[String],
AvroSortedBucketIO
.read(new TupleTag[Account], classOf[Account])
.from(output.getAbsolutePath)
)
.sortMergeGroupByKey(classOf[String], read)
.materialize
tap
.get(sc.run().waitUntilDone())
.flatMap(_._2)
.value
.toSeq should contain theSameElementsAs accounts
}

"SortedBucketSource" should "be able to read CharSequence-keyed Avro sources written before 0.14" in {
val output = Files.createTempDirectory("smb-version-test-avro").toFile
output.deleteOnExit()

testRoundtrip(
AvroSortedBucketIO
.write(classOf[CharSequence], "name", classOf[Account])
.to(output.getAbsolutePath)
.withNumBuckets(1)
.withNumShards(1),
AvroSortedBucketIO
.read(new TupleTag[Account], classOf[Account])
.from(output.getAbsolutePath)
)
}

it should "be able to read CharSequence-keyed Parquet sources written before 0.14" in {
val output = Files.createTempDirectory("smb-version-test-parquet").toFile
output.deleteOnExit()

testRoundtrip(
ParquetAvroSortedBucketIO
.write(classOf[CharSequence], "name", classOf[Account])
.to(output.getAbsolutePath)
.withNumBuckets(1)
.withNumShards(1),
ParquetAvroSortedBucketIO
.read(new TupleTag[Account], classOf[Account])
.from(output.getAbsolutePath)
)
}
}

0 comments on commit e2d7ae8

Please sign in to comment.