-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use avro builder API #5119
Use avro builder API #5119
Conversation
.sink(AvroBytesUtil.schema) | ||
.asInstanceOf[BAvroIO.Sink[T]] | ||
.withDatumWriterFactory(AvroBytesUtil.datumWriterFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small rewrite here to move away from deprecated sinkViaGenericRecords
API
private val byteField = schema.getField("bytes") | ||
|
||
def datumWriterFactory[T: Coder]: DatumWriterFactory[T] = { | ||
val bCoder = CoderMaterializer.beamWithDefault(Coder[T]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic change here since we do not use the coder options for byte serialization. I think this cased of an issue when consuming data from PubSub, and tunring one Nullable, records could not be read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙌
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #5119 +/- ##
==========================================
- Coverage 63.36% 63.34% -0.02%
==========================================
Files 291 291
Lines 10842 10837 -5
Branches 745 753 +8
==========================================
- Hits 6870 6865 -5
Misses 3972 3972 ☔ View full report in Codecov by Sentry. |
No description provided.