-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement LazyFrame.sink_ndjson
#10786
Conversation
|
||
// if we don't allow threads and we have udfs trying to acquire the gil from different | ||
// threads we deadlock. | ||
py.allow_threads(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also this here is duplicated every sink.
Can we not do something like 1 generic sink method where you need to pass in an instance of a BatchedWriter?
That way we can much more easily add more types to sink?
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also agree on this one, will try to refactor this code as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at the code, I don't think we can do much on this part 😞 there are only 2-3 lines of duplicated code because of py.allow_threads
call, it would possible to remove the duplicated code if pyo3 implements an allow_threads
attribute so the execution of the whole function releases the GIL.
It would be nice to be able to pass a BatchedWriter instance but then we will be mixing API and task execution code. I think is a better design to keep a separation of concerns that enables the API and the task's code to evolve in their own way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll think about it a bit more tomorrow & come back if I would think of a certain way to handle it.
link for my own benefit: Parallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into it. Nothing came to mind when I started to ponder about deadlocks and gil and stuff :-)
assert_frame_equal(df, expected) | ||
|
||
|
||
def test_sink_json_should_support_with_options(io_files_path: Path, tmp_path: Path) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing is being tested in this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, we are still finishing the testing 😄
08279c8
to
2f40aed
Compare
Hi everyone, this pull request is ready, we got two failed checks nonetheless they are not in the files that we modified for this feature. Any feedback or comments are welcome. |
2f40aed
to
878ef5d
Compare
Some tests started to fail after doing the merge with main, we are going to fix them. |
crates/polars-io/src/json/mod.rs
Outdated
/// # Panics | ||
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned. | ||
fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { | ||
let fields = df.iter().map(|s| s.field().to_arrow()).collect::<Vec<_>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I would make a class in between that handles all the generic stuff between JsonWriter
& JsonLinesBatchedWriter
. In the final classes, I would just implement an on_block_write
and on_finish
.
My 2ct :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we were thinking of something similar, that's why we implemented the BatchedWriter trait in the json module but we were thinking that it would be better to expose the SinkWriter trait, that is part of the pipeline execution or maybe a struct with some common code as you mentioned, to each of the io modules, so each module implements this interface and we can remove all the implementations currently in file_sink.rs. Of course this will create an extra dependency as the IO modules will now depend of a trait of the execution core, but I think this will make sense since we will only be exposing an interface or a collections of interfaces that need to be implemented by each module. Please let me know your thoughts on this 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can chime in on #11056? Because that is very much in line with what you are saying here, except on a broader scale.
Just asking ritchie for his opinion there, but with your knowledge of the framework, it might be you have more concrete ideas on how to handle it?
|
||
// if we don't allow threads and we have udfs trying to acquire the gil from different | ||
// threads we deadlock. | ||
py.allow_threads(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into it. Nothing came to mind when I started to ponder about deadlocks and gil and stuff :-)
78e6a56
to
812b453
Compare
I'll review this more in-depth later, but I already noticed a doc entry on the Python side is missing. So writing it down now before I forget :) |
Hi @stinodego I hope you are doing well, Regarding this PR, is there something else missing in order to merge it? |
f4746cd
to
d97ba76
Compare
7fdc081
to
26328c4
Compare
26328c4
to
2c815a8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have the JsonFormat
enum, I think it may be easier to use if the batched writers were combined & accepted a JsonFormat
as an input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it, could you check again?
crates/polars-io/src/json/mod.rs
Outdated
@@ -155,6 +161,101 @@ where | |||
} | |||
} | |||
|
|||
pub trait BatchedWriter<W: Write> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we reuse the SinkWriter
trait here instead of adding a new trait?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we changed the implementation to accept a JsonFormat as an input, BatchedWriter trait is not necessary anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as a comment, It would be great if we can expose SinkWriter to be used by other crates. This way we can move all SinkWriter implementations to the corresponding modules. 😄
crates/polars-io/src/json/mod.rs
Outdated
let fields = df.iter().map(|s| s.field().to_arrow()).collect::<Vec<_>>(); | ||
let batches = df | ||
.iter_chunks() | ||
.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldn't we use into_struct
here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We tried but the implementation does not seem very transparent, do you have an example?
a863625
to
2e4e179
Compare
fee49ba
to
47745b4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your patience @fernandocast. It is going in now. :)
My pleasure @ritchie46, my friend and I are convinced of the potential that Polars has. |
Closes #10762
Implementing new sink_json method to support json format in streaming mode.