Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example for using a separate threadpool for CPU bound work #13424

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Nov 14, 2024

TODOs:

  • Add some example of trying to do IO on an object store (and should error with no io registered)
  • Contemplate adding the DedicatedExecutor code to make the example simple
  • Add wrapper over object store
  • Add wrapper over streams on the DedicatedExecutor
  • Complete example
  • Wrap stream results in object store wrapper
  • Split the DedicatedExecutor / etc into its own PR

Which issue does this PR close?

Rationale for this change

I added documentation that explains the problem here:

But now we need to show people how to fix it

I am reminded when trying to do this how non obvious it is

What changes are included in this PR?

Add a well commented example of how to use mutiple runtimes

The DedicatedExecutor code is orginally from

  1. InfluxDB 3.0 (todo link), largely written by @tustvold and @crepererum
  2. Largely based on Add DedicatedExecutor to FlightSQL Server datafusion-contrib/datafusion-dft#247 from @matthewmturner

The XXX object store code is also based on work from @matthewmturner in

Are these changes tested?

By CI

Are there any user-facing changes?

Comment on lines +126 to +130
// Calling `next()` to drive the plan on the different threadpool
while let Some(batch) = stream.next().await {
println!("{}", pretty_format_batches(&[batch?]).unwrap());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the tricky bit and shouldn't be underestimated. This means that for streaming responses you need to buffer data somewhere or accept higher latency. Also note that if you use ANY form of IO within DF (e.g. to talk to the object store) and, you need to isolate that as well.

So to me that mostly looks like a hack/workaround for DF not handling this properly by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- I hope that the different_runtime_advanced will show how do it "right" -- I haven't yet figued out how to do it.

@tustvold and @matthewmturner and I have been discussing the same issue here: datafusion-contrib/datafusion-dft#248 (comment)

@alamb alamb force-pushed the alamb/threadpool_example branch from 19d8916 to c24f4b1 Compare November 22, 2024 16:59
@github-actions github-actions bot added the physical-expr Physical Expressions label Nov 22, 2024
Comment on lines 193 to 197
// as mentioned above, calling `next()` (including indirectly by using
// FlightDataEncoder to convert the results to flight to send it over the
// network), will *still* result in the CPU work (and a bunch of spawned
// tasks) being done on the runtime calling next() (aka the current runtime)
// and not on the dedicated runtime.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this comment copied from somewhere? It's not clear to me what the reference to FlightDataEncoder is for - i dont think this example is using flight. Is the idea here to make sure that IO to object store that is done as part of executing the query is handled by the main runtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is from how we use it in InfluxDN (to send data back when encoding into FlightData for FligthtSQL is somewhat non trivial effort). Stay tuned

@alamb
Copy link
Contributor Author

alamb commented Nov 22, 2024

Ok, I am pretty happy with where this PR is now. It shows the entire process running end to end, with the DedicatedExecutor and running always on the dedicated executor

Things it needs to sort out:

  1. Wrapping requests like ObjectStore::list and ObjectStore::get so they run on the dedicated executor as well
  2. Cleaning up the code structure, start getting it ready to merge

@alamb alamb force-pushed the alamb/threadpool_example branch from 473ceff to 1bb1bf2 Compare November 22, 2024 18:37
@github-actions github-actions bot removed the common Related to common crate label Nov 22, 2024
@matthewmturner
Copy link
Contributor

Ok, I am pretty happy with where this PR is now. It shows the entire process running end to end, with the DedicatedExecutor and running always on the dedicated executor

Things it needs to sort out:

  1. Wrapping requests like ObjectStore::list and ObjectStore::get so they run on the dedicated executor as well

  2. Cleaning up the code structure, start getting it ready to merge

Can you expand on point 1? My naive expectation was that all network io went through the main runtime.

@alamb
Copy link
Contributor Author

alamb commented Nov 22, 2024

Can you expand on point 1? My naive expectation was that all network io went through the main runtime.

Yes, that is what should happen. The problem is here. As written, calling IoObjectStore::list will actually try and do IO on the wrong thread pool (as the stream executes on the wrong runtime)

impl ObjectStore for IoObjectStore {

    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
        // TODL run the inner list on the dedicated executor
        let inner_stream = self.inner.list(prefix);

        inner_stream

    }

I am trying to figure out how to make Rust run the stream on the right threadpool (we need to wrap it somehow and transfer results). This is what @tustvold was concerned about here: datafusion-contrib/datafusion-dft#248 (comment)

I thought I could use the CrossRTStream added in this PR, Ideally it would look like

impl ObjectStore for IoObjectStore {

    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
        let inner_stream = self.inner.list(prefix);
        // run the inner list on the dedicated executor
        self.dediated_executor.wrap_stream(inner_stream, convert_error)
    }

But but I am fighting Rust compiler lifetime shenanigans.

@matthewmturner
Copy link
Contributor

@alamb perfect makes sense. I just wasn't originally clear what you meant by running it on the dedicated executor.

@tustvold
Copy link
Contributor

At the risk of repeating myself from datafusion-contrib/datafusion-dft#248 (comment) I would strongly discourage overloading the ObjectStore trait as some sort of IO/CPU boundary.

Not only is this not what the trait is designed for, but it is overly pessimistic. Tokio is designed handle some CPU bound work, e.g. interleaved CSV processing or similar, it just can't handle tasks stalling for seconds at a time.

Forcing every individual IO operation to be spawned to a separate runtime feels like the wrong solution to be encouraging. Instead DF should make this judgement call at a meaningful semantic boundary.

@matthewmturner
Copy link
Contributor

I had the impression that this example was for illustration purposes for what it would look like to have fully separate io and cpu runtimes - although not the desired end state due to the reasons you stated. I do understand the concern of not wanting to overload the object store interface but if it works I do think it provides a useful example nonetheless.

@alamb
Copy link
Contributor Author

alamb commented Nov 22, 2024

At the risk of repeating myself from datafusion-contrib/datafusion-dft#248 (comment) I would strongly discourage overloading the ObjectStore trait as some sort of IO/CPU boundary.

I know you have said you have said you suggest doing something different, but I don't know how to translate your suggestions into actual code. I am pretty happy now that this PR illiustrates the core usecase of running DataFusion plans on a separate runtime/threadpool.

If you can give me some hits on how to update the example in this PR to do what you have in mind I would be glad to try

Forcing every individual IO operation to be spawned to a separate runtime feels like the wrong solution to be encouraging. Instead DF should make this judgement call at a meaningful semantic boundary.

In my mind the ObjectStore is both a meaningful and obvious semantic boundary (it is the IO abstraction used by DataFusion), so I don't fully understand this point. Also having all the IO on a separate threadpool I thought was best practice 🤔

@tustvold
Copy link
Contributor

tustvold commented Nov 23, 2024

but I don't know how to translate your suggestions into actual code

The basic idea is rather than shoehorning the runtime dispatch into the ObjectStore trait, instead make the components within DataFusion that perform IO themselves spawn the relevant work to a separate runtime. Or in other words, make DataFusion draw a distinction between IO bound and CPU bound operators. Not only does this avoid the issues above, but is also much easier to reason about. I at least am not confident I fully grasp the full implications of things like CrossRtStream w.r.t backpressure, task wakeups, etc... If I stream a 10GB CSV file, will it end up buffering the entire 10GB CSV in memory whilst waiting for the DF runtime to have capacity, I don't honestly know? 😅

To give a concrete example of this, rather than bridging ObjectStore::list across runtimes, and incurring that penalty for every wakeup of every stream, instead dispatch list_partitions or possibly one of the higher level methods in ListingTable to the IO runtime.

A similar approach could be taken for AsyncFileReader in parquet, or FileStream, or any of the other IO components.

I appreciate this is a more intrusive approach, but I don't really think DataFusion can continue to leave this sort of thing as an exercise for the reader, especially given the issues only start to become obvious as load increases. Having the DataFusion operators designed to accommodate and coordinate this IO separation will lead to the best outcomes, especially when it comes to resource constrained systems where it really matters when/how IO is interleaved with the corresponding CPU work.

That all being said what is being proposed in this PR is an improvement over the current state of play, and I don't want to detract from that, however, I had hoped that we might be able to take this opportunity to define a better story for this rather than simply "blessing" the somewhat arcane hackery we worked into InfluxDB to get around this.

I personally would be interested in @crepererum's take on this, as someone who wrote much of this code for InfluxDB. I am aware my judgement may be slightly clouded by my long-held desire for a more formal separation of IO and CPU bound tasks within DF (#2199), combined with a deep distaste for using async with CPU-bound work, but I don't think I am being unreasonable here

Edit: it's also worth highlighting ObjectStore is but one form of IO performed by DF, custom catalogs or APIs like flight will run into the same challenges

@djanderson
Copy link
Contributor

I appreciate this is a more intrusive approach, but I don't really think DataFusion can continue to leave this sort of thing as an exercise for the reader, especially given the issues only start to become obvious as load increases.

This speaks to me 😅. My first experience with DF was plugging in a non-toy dataset into the arrow-flight-sql example and being immediately thrown into the deep end with non-deterministic failures with challenging error messages.

I'm interesting in a near-term solution, but I also agree that if there's a way to have DataFusion "Do The Right Thing" without surfacing the complexities of tokio runtimes to high-level APIs like ObjectStore and SessionContext, that seems like a great goal as well.

@alamb
Copy link
Contributor Author

alamb commented Nov 23, 2024

I appreciate this is a more intrusive approach, but I don't really think DataFusion can continue to leave this sort of thing as an exercise for the reader, especially given the issues only start to become obvious as load increases.

This speaks to me 😅. My first experience with DF was plugging in a non-toy dataset into the arrow-flight-sql example and being immediately thrown into the deep end with non-deterministic failures with challenging error messages.

I'm interesting in a near-term solution, but I also agree that if there's a way to have DataFusion "Do The Right Thing" without surfacing the complexities of tokio runtimes to high-level APIs like ObjectStore and SessionContext, that seems like a great goal as well.

Thank you @tustvold and @djanderson .

I agree leaving the separation as an exercise to the reader is not a good idea (which is why I am working on this example)

I think the high level idea of being more explicit about the boundaries is a good idea. One important requirement I think is to make sure the pattern works for I/O that is not part of operators built in to DataFusion -- it is also potentially done in user defined operators (notably TableProviders)

Maybe we can do something like register the current DedicatedExector with the SessionContext and then explicitly call into that for all I/O bound work. I will ponder and see what I can come up with

@alamb
Copy link
Contributor Author

alamb commented Dec 6, 2024

Update here is I hacked up the alternate approproach (annotating all locations in DataFusion that use a different threadpool) on the plane. It didn't go great but I will make a PR tomorrow with some documentation of what i tried and how it worked and the issues i found

@alamb
Copy link
Contributor Author

alamb commented Dec 8, 2024

Update:

So TLDR unless someone can come up with anything else, the only practical approach I know of is what is in this PR and the similar in spirit @adriangb 's code in #13634

@tustvold
Copy link
Contributor

tustvold commented Dec 8, 2024

I'll try to get something to show how you can do this up

@alamb
Copy link
Contributor Author

alamb commented Dec 8, 2024

@djanderson -- do you have some sort of reproducable program that produces the

I'll try to get something to show how you can do this up

FWIW I understand in theory how to do it, but not in practice given the wide use of &self in the various DataFusion APIs

@tustvold
Copy link
Contributor

tustvold commented Dec 8, 2024

So I had a brief play and actually ended up wondering if spawning IO is even the right approach to this problem... I wrote up some thoughts on #13692

FWIW I understand in theory how to do it, but not in practice given the wide use of &self in the various DataFusion APIs

Yeah, I found SessionState in particular to be quite tricky to workaround, lots of stuff depends on it and it is not cheaply cloneable 😅

@alamb
Copy link
Contributor Author

alamb commented Jan 10, 2025

This PR appears to have stalled -- it seems we are not ready to commit to this kind of wrapping in the main DataFusion crate but we also don't have any plausible alternative.

Thus, what I plan to do is to revive it but move all the code into the example so there is at least a reference and then when we have some better alternate we can consider adding it directly into the DataFusion crate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Document DataFusion Threading / tokio runtimes (how to separate IO and CPU bound work)
6 participants