Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve avro support #691

Merged
merged 13 commits into from
Feb 16, 2024
Merged

Improve avro support #691

merged 13 commits into from
Feb 16, 2024

Conversation

RustedBones
Copy link
Contributor

@RustedBones RustedBones commented Nov 28, 2023

  • Do not rely on beam serialization to generate avro records
  • Add proper support of logical-types according to avro spec
  • Refactor AvroBigDiffy to leverage type information

Compile with avro 1.8 and run test with latest avro (avro 1.8 generates broken code)

@RustedBones RustedBones changed the title Cut beam dep Improve avro support Nov 28, 2023
Copy link

codecov bot commented Nov 28, 2023

Codecov Report

Attention: 36 lines in your changes are missing coverage. Please review.

Comparison is base (51ec798) 71.34% compared to head (dcd413d) 71.09%.

Files Patch % Lines
...om/spotify/ratatool/scalacheck/AvroGenerator.scala 80.23% 17 Missing ⚠️
...n/scala/com/spotify/ratatool/diffy/AvroDiffy.scala 89.02% 9 Missing ⚠️
...in/scala/com/spotify/ratatool/diffy/BigDiffy.scala 50.00% 7 Missing ⚠️
...m/spotify/ratatool/scalacheck/HashMapBuilder.scala 75.00% 1 Missing ⚠️
...m/spotify/ratatool/scalacheck/HashMapBuilder.scala 75.00% 1 Missing ⚠️
...spotify/ratatool/scalacheck/HashMapBuildable.scala 66.66% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #691      +/-   ##
==========================================
- Coverage   71.34%   71.09%   -0.26%     
==========================================
  Files          41       44       +3     
  Lines        1752     1816      +64     
  Branches      246      291      +45     
==========================================
+ Hits         1250     1291      +41     
- Misses        502      525      +23     
Flag Coverage Δ
ratatoolCli 2.92% <0.00%> (-0.07%) ⬇️
ratatoolCommon 0.00% <ø> (ø)
ratatoolDiffy 32.86% <80.31%> (+1.23%) ⬆️
ratatoolExamples 17.40% <50.50%> (+1.39%) ⬆️
ratatoolSampling 62.36% <79.38%> (+<0.01%) ⬆️
ratatoolScalacheck 78.14% <77.31%> (-3.34%) ⬇️
ratatoolShapeless 4.20% <0.00%> (-0.10%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines -204 to -214
Delta("repeated_record.nested_repeated_field", Option(jl(10, 20, 30)), None, UnknownDelta),
Delta("repeated_record.string_field", Option("b"), None, UnknownDelta)
Copy link
Contributor Author

@RustedBones RustedBones Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This result does not make sense to me. As we are keying by field, we should get the same output as map comparison.
I propose an output that puts the key in the field path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. Can you explain/comment on the source of this change so it is more apparent?

Copy link
Contributor

@idreeskhan idreeskhan Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm roughly OK with this change but I'd want to verify my understanding of how this output looks like with multiple nestings. I think if we are putting it on the field path we probably want to also recursively pass the field keys which I don't think is happening here https://github.com/spotify/ratatool/pull/691/files#diff-c02df8e7364f3ad968c28ac24f66eb4de3e9f15ef79dd7f05f5c05b1ecb98225R112

Though perhaps I'm misreading something here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import org.scalacheck.util.Buildable

private[scalacheck] object HashMapBuildable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to port that upstream in typelevel/scalacheck#1023

@benkonz
Copy link
Contributor

benkonz commented Dec 1, 2023

Hi! I'm taking a look! Thanks for the contribution!

@RustedBones RustedBones changed the base branch from master to scio-0.14.0 January 26, 2024 13:38
@RustedBones RustedBones marked this pull request as draft January 26, 2024 13:38
@RustedBones
Copy link
Contributor Author

Updated to latest scio. Code now also works for avro 1.8.2

Base automatically changed from scio-0.14.0 to master February 5, 2024 08:08
@RustedBones RustedBones marked this pull request as ready for review February 5, 2024 08:15
Copy link
Member

@monzalo14 monzalo14 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution! I'd like to get a couple things clarified before approving. For some of the changes here, it's a bit hard to understand whether these could break any user code or not, without having full context on the recent avro/coder updates. Clarifying that to the extent possible would be ideal 👍

On another topic, did you use a different formatter version than what we currently have in prod? If possible, I'd like to reduce formatting changes to a minimum to keep better track of this, since it is a somewhat big PR.

Comment on lines -35 to -37
new SchemaValidatorBuilder().canReadStrategy
.validateLatest()
.validate(y.getSchema, List(x.getSchema).asJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were there any breaking changes on this implementation, or is there any other reason why we're moving away from avro schema validation? I'm not very familiar with how strict is the avro definition of "compatible schemas", but at first glance it seems like we're loosing some flexibility and/or some level of detail with the new validations. This is not my area of expertise, though, so your recommendations are more than welcome!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of diff, avro schemas must be strictly equal. This is checked line 62 in the new version.

Compatible schema are used on read time to adapt stored data to the desired read schema. Once in memory, we should not compare data constructed with different schema, even if compatible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on what you mean? I'm a bit confused reading this thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avro record with different models must not be compared.

schema compatibility is relevant when reading, making sure the writerSchema and the readerSchema are compatible. Once read, the records strictly folow the readerSchema where field index matters. Strict schema equality must be ensured before comparing content.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, trying to confirm my understanding here since I'm still a bit confused.
Dataset A is updated to add new nullable field x and becomes Dataset A'.
We go to diff these two datasets.
Are you saying that this field comparison will end up in one of the above null cases prior to the schema comparison?

Copy link
Contributor

@idreeskhan idreeskhan Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go one step further and say it should necessarily be the A' schema, even if it's nullable/has default, and cases where a field is missing should fail. Semantically, it's still a difference between the two datasets. IIRC this is the current functionality. It's unclear to me if/where this behaviour is retained

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BigDiffy API of this lib is not file aware. It only works in terms of in-memory records and can't make any assumption on writer schema.

It is up to the users creating the SCollection to read using the correct schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for the diffAvro API, the reader schema used is the one from the generated class.

It is totally possible that underlying files are using a different schema, ratatool-diffy will miss those.

Copy link
Contributor

@clairemcginty clairemcginty Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BigDiffy API of this lib is not file aware.

cc @RustedBones, AFAIK, BigDiffy is file-aware when run through the CLI, which invokes BigDiffy#run:

val schema = new AvroSampler(rhs, conf = Some(sc.options))
.sample(1, head = true)
.head
.getSchema
implicit val grCoder: Coder[GenericRecord] = avroGenericRecordCoder(schema)
val diffy = new AvroDiffy[GenericRecord](ignore, unordered, unorderedKeys)
val lhsSCollection = sc.avroFile(lhs, schema)
val rhsSCollection = sc.avroFile(rhs, schema)
BigDiffy
.diff[GenericRecord](lhsSCollection, rhsSCollection, diffy, avroKeyFn(keys), ignoreNan)

but even then, it looks like it selects the schema associated with the RHS and uses that for both resulting SCollections. So maybe we could add schema validation there (ensure that RHS schema is equal to, or a superset of, the LHS schema)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an extra check that will prefer backward compatible reader schema.
When schemas are different, but both forward and backward compatible, will print a warning

Comment on lines 59 to 50
case (null, null) => Seq.empty
case (_, null) => Seq(Delta("", Some(x), None, UnknownDelta))
case (null, _) => Seq(Delta("", None, Some(y), UnknownDelta))
case _ if x.getSchema != y.getSchema => Seq(Delta("", Some(x), Some(y), UnknownDelta))
case _ => diff(x, y, x.getSchema, "")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this refactor! 👍

Comment on lines +100 to +90
val a = x.asInstanceOf[IndexedRecord]
val b = y.asInstanceOf[IndexedRecord]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this! Seems more resource-efficient. Is there any case in which this cast could not work, though, so to add in some catch statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if schema type is a record, IndexedRecord is the least powerful abstraction we need to check equality. We were previously using GenericRecord that extends IndexedRecord, but equality can be done on field order.

Comment on lines +31 to +32
import java.nio.ByteBuffer
import scala.jdk.CollectionConverters._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you run sbt scalaFmt? I'm surprised there's so many formatting changes. Are you running a different formatter version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is prob my intelliJ organize import. I don't think we have a scalafmt rule for import order.

Comment on lines -204 to -214
Delta("repeated_record.nested_repeated_field", Option(jl(10, 20, 30)), None, UnknownDelta),
Delta("repeated_record.string_field", Option("b"), None, UnknownDelta)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. Can you explain/comment on the source of this change so it is more apparent?

)
}

it should "support schema evolution if ignored" in {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify how the rest of the tests are covering this test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not covered. records with different schemas are not equal, as explained above.

private def getRawType(schema: Schema): Schema = {
schema.getType match {
private def numericValue(value: AnyRef): Double = value match {
case i: java.lang.Integer => i.toDouble
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why .toDouble here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because numericDelta only supports double

logger.warn("Avro schemas are compatible, but not equal. Using schema from {}", rhs)
}
rhsSchema
case (COMPATIBLE, INCOMPATIBLE) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in underlying functionality, IMO We should also warn in these cases rather than proceed transparently

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I still think we enforce RHS backwards compatibility unless otherwise shown to be necessary, but if we are changing functionality/flexibility then we need to do so in a way that is transparent to users.

I'll leave the actual decision here down to current members of the owning team

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted to previous behavior using the SchemaValidatorBuilder that thows a SchemaValidationException with detailed error in case of incompatibility.

@benkonz benkonz merged commit 3eeebfd into master Feb 16, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants