Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IPC StreamDecoder #5531

Merged
merged 3 commits into from
Mar 23, 2024
Merged

Add IPC StreamDecoder #5531

merged 3 commits into from
Mar 23, 2024

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Mar 19, 2024

Which issue does this PR close?

Closes #1207

Rationale for this change

Following on from #5249 this adds a StreamDecoder.

This is the final piece to close off the async support for IPC, in particular we now provide:

  • IpcDataGenerator for encoding data to Messages which can be sent over the wire
  • get_mut and into_inner on FileWriter and StreamWriter for flushing an in-memory buffer
  • FileDecoder for reading an IPC file from an async source supporting ranged reads
  • StreamDecoder for reading an IPC stream from an async stream

What changes are included in this PR?

Are there any user-facing changes?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold -- I went through this PR carefully and it looks good to me. I had some stylistic suggestions but nothing that would prevent this PR from merging, in my opinion.

I do think a truncated stream error is important

Reading through the code I noticed it has non trivial overlap with the FileReader -- I wonder if it would make sense to use the StreamReader within the FileReader and reduce the implementation redundancy 🤔

assert!(arrow_json.equals_reader(&mut reader).unwrap());
// the next batch must be empty
assert!(reader.next().is_none());
// the stream must indicate that it's finished
assert!(reader.is_finished());
}

// Test stream decoder
let expected = arrow_json.get_record_batches().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ this is a very nice test


/// Return the [`Message`]
#[inline]
pub fn as_ref(&self) -> Message<'_> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more standard to call this function as_message and then provide an impl AsRef<Message> for MessageBuffer and that way the compiler can automatically do the deref.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this isn't possible because of the lifetime on Message

/// Return the [`Message`]
#[inline]
pub fn as_ref(&self) -> Message<'_> {
// SAFETY: Run verifier on construction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried that this is a not a safe API as since this is a public struct and it could be created directly

let buf = MessageBuffer(my_unsafe_buffer);
// cast to message without checks being called:
let my unsafe_message = buf.as_ref();

However, I convinced myself it was actually safe as long as MessageBuffer is used from some other module:

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=efb81e5fe993e75995b09e8773d31643

/// Ok(())
/// }
/// ```
pub fn decode(&mut self, buffer: &mut Buffer) -> Result<Option<RecordBatch>, ArrowError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked decode is consistent with the names used by the csv reader

arrow-ipc/src/reader/stream.rs Outdated Show resolved Hide resolved
arrow-ipc/src/reader/stream.rs Outdated Show resolved Hide resolved
}
}
decoder.finish().unwrap();
assert_eq!(expected, actual);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add tests for error conditions?

At the very least I think we should have a test for truncated stream which I think would be the most likely error in practice.

It would be nice to have a test for things like missing dictionaries, but I think that the value of those tests is lower

@tustvold
Copy link
Contributor Author

I wonder if it would make sense to use the StreamReader within the FileReader and reduce the implementation redundancy

For the most part the shared logic is already broken out into separate functions already, it is just they require quite a few arguments and are fairly verbose... I agree this could probably be improved

@tustvold tustvold merged commit a8c4232 into apache:master Mar 23, 2024
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Async IPC Reader/Writer
2 participants