Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hook up Avro Decoder #6820

Merged
merged 6 commits into from
Dec 1, 2024
Merged

Hook up Avro Decoder #6820

merged 6 commits into from
Dec 1, 2024

Conversation

tustvold
Copy link
Contributor

Which issue does this PR close?

Part of #4886

Rationale for this change

This hooks up the codec logic into a decoder, the rest of #4886 is then just a plumbing and testing exercise.

What changes are included in this PR?

Are there any user-facing changes?

No, none of these types are public yet

@github-actions github-actions bot added the arrow Changes to the arrow crate label Nov 30, 2024
&self.schema
}

/// Decode `count` records from `buf`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This separation of decode and flush allows reading records from multiple blocks into the same RecordBatch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help to add in the documentation that the user should call flush to generate record batches.

@alamb
Copy link
Contributor

alamb commented Dec 1, 2024

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good step to me -- I think testing list/struct decoding is important, ideally could be done before merge.

Otherwise 🚀

arrow-avro/src/reader/cursor.rs Show resolved Hide resolved
arrow-avro/src/reader/cursor.rs Show resolved Hide resolved
self.buf = &self.buf[1..];
in_progress |= ((byte & 0x7F) as u32) << shift;
shift += 7;
if byte & 0x80 == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might help to ensure the loop runs a bounded number of times (as it is bad input might run many times / overflow)

It running more than 4 times is ok, a comment about why would be nice

let mut in_progress = 0;
let mut shift = 0;

while let Some(byte) = self.buf.first().copied() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about the loop

];

let expected = RecordBatch::try_from_iter_with_nullable([
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

&self.schema
}

/// Decode `count` records from `buf`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help to add in the documentation that the user should call flush to generate record batches.

}

impl RecordDecoder {
pub fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might help to document what types are supported and which are not (e.g. interval / fixed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make a note to include this in the documentation for the public reader once created

}
Codec::Fixed(_) => return nyi("decoding fixed"),
Codec::Interval => return nyi("decoding interval"),
Codec::List(item) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I didn't see any tests for list / struct decoding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wanted to keep the diff smaller, definitely more testing is needed before we make any of this public (currently all the module is private)

@alamb
Copy link
Contributor

alamb commented Dec 1, 2024

Thank you @tustvold

/// Returns the current cursor position
#[inline]
pub(crate) fn position(&self) -> usize {
self.start_len - self.buf.len()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where start_len is used

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see -- thank you

arrow-avro/src/reader/cursor.rs Show resolved Hide resolved
}
Codec::Fixed(_) => return nyi("decoding fixed"),
Codec::Interval => return nyi("decoding interval"),
Codec::List(item) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wanted to keep the diff smaller, definitely more testing is needed before we make any of this public (currently all the module is private)

@tustvold tustvold merged commit 3ed0f06 into apache:main Dec 1, 2024
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants