Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task graph building becomes uncomfortably slow for large physics analyses #446

Open
lgray opened this issue Jan 11, 2024 · 27 comments
Open

Comments

@lgray
Copy link
Collaborator

lgray commented Jan 11, 2024

When the unoptimized task graph size starts to approach a few thousand layers (which is typical in end-stage physics analysis), generating the task graph can take a few seconds. Typical analyses have a O(100) datasets, so this means that a user can easily incur 10 minutes of overhead before their jobs even start (and if the compute time of the full analysis itself is ~20 minutes this is a large slowdown).

I suppose one thing that could be done if we can't squeeze the building time down any further is to parallelize the building with dask.Delayed and collect the graphs back at the client before submitting them, but we should see if we can make the building process more efficient first.

Profile of the task-graph building of two datasets attached using awkward@main, coffea@master. (macOS/arm64)
apply_to_fileset.prof.zip

@agoose77 @douglasdavis

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

FYI The code that produced the profile above is here: https://github.com/cmstas/ewkcoffea/blob/coffea2023/analysis/wwz/wwz4l.py#L33

and you can run it yourself by doing:
install:

pip install git+https://github.com/scikit-hep/awkward.git@main
pip install git+https://github.com/CoffeaTeam/coffea.git@master
pip install xgboost
pip install mt2
git clone https://github.com/TopEFT/topcoffea.git -b coffea2023
pushd topcoffea && pip install -e . && popd
git clone https://github.com/cmstas/ewkcoffea.git -b coffea2023
pushd ewkcoffea && pip install -e . && popd

running:

cd ewkcoffea/analysis/wwz
source run_wrapper.sh

@martindurant
Copy link
Collaborator

I slack conversations, a few different factors were mentioned, so I would like to be clear here which of them we are talking about.

  • creation of a number of dask-awkward specific layers using our API, for any number of dak arrays
  • conversion of dak arrays to dask.array , which forces optimisation during initial graph creation
  • time to optimise the graph, particularly for the purposes of column selection
  • bundling/serialising the graph to send to the scheduler

I would say that creating multiple objects or converting to dask.array in parallel is possible. Running the graph for the purposes of optimisation might parallelise, but amending the graph probably does not.

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

conversion of dak array to dask.array has been removed - it was an artifact of simpler times :-)

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

and in particular the profile above tends to only your first bullet point - optimizing the graph for columns is quite fast (dak.necessary_columns is 10x slower for big graphs but that's another, less important issue).

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

Though if we can get building of the task graph itself faster then the next item on the list will be the speed of optimization.

There's also some related issues of task graphs with ML algorithms in them or large corrections being multiple MB in size and that seems to make dask unhappy.

We could go to using scatter and futures but that's a big leap in programming paradigm for some folks. For sure it is something we can roll out, but it's not what we taught people to do for five years so far :D.

@martindurant
Copy link
Collaborator

large corrections being multiple MB in size and that seems to make dask unhappy

Some computes are just hard, it's ok is dask complains so long as it - eventually - works. We need to set reasonable expectations.

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

Yeah that I can be happy with for now, there are other bigger issues to tackle in terms of UX.

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

In particular the multiple 10s of megabytes is just metadata so we could scatter it to the cluster.
It's more a matter of showing people how to do it in their analyses.

@martindurant
Copy link
Collaborator

You don't need scatter() for that, you just need to have that multi-MB value be a dask key in the graph as opposed to a literal value. So wrapping it in ak.from_awkward (1 partition) would be enough, I think .

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

huh, interesting - I'll try that!

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

Hmm - the first encounter with that doesn't work out so well since the metadata is an object (a functor), and awkward array does not like that.

@martindurant
Copy link
Collaborator

Maybe as any delayed(), then (this works on objects, not only functions).

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

At present that yields: dask_awkward.utils.DaskAwkwardNotImplemented: Use of Array with other Dask collections is currently unsupported.

@martindurant
Copy link
Collaborator

@douglasdavis , any ideas? ;)

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

I have found a dirty dirty hack. No it doesn't quite work all the way through. I was almost able to hide it in a dask_awkward.Scalar.

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

Hope springs anew - if I can hide it in an attrs of an awkward array I think I can get it done. However, this exposed a bug!

@douglasdavis
Copy link
Collaborator

douglasdavis commented Jan 11, 2024

We can definitely make delayed objects work nicely with binary/map_parititions dak.Array operations. that exception is pretty strict and there's room for relaxing it a bit

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

Being able to pass in a single delayed object as an argument would be perfect in my case.

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

Anyway this particular direction is vastly beside the major source of slow down, which appears to be awkward operations on zero length arrays and typetracers. I understand those take time but seems to add up quickly!

@martindurant
Copy link
Collaborator

awkward operations on zero length arrays and typetracers

Whether or not they are slow is debatable, but we have MANY of them, so it adds up

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

If there's performance to squeeze out, we should find it! Telling everyone to make their analyses simpler won't go over very well. :S

@douglasdavis
Copy link
Collaborator

douglasdavis commented Jan 11, 2024

When I wrote that exception (some time ago), the problem with mixing dask-awkward collection objects with "anything else" was metadata determination. We always want to be able to keep around an awkward typetracer. If we are mixing a dask-awkward collection object with, for example, a "black box" Delayed object:

@delayed
def a_delayed_array():
    return ak.Array([1, 2])

array = ak.Array([[1, 2, 3], [4], [5, 6, 7], [8]])
dak_array = dak.from_awkward(array, npartitions=2)
result = dak_array * a_delayed_array()

We're going to get an exception trying to figure out what result._meta should be, because a_delayed_array() does not have a metadata that can be used with dak_array._meta

In this case we know result._meta should be the same as dak_array._meta, so we can be explicit with:

result = map_partitions(operator.mul, dak_array, a_delayed_array(), meta=dak_array._meta, output_divisions=1)

But this is of course a bit heavy handed and we don't like suggesting people use map_partitions in cases that are not at least quite nontrivial.

Right now that last code snippet won't work because of the exception mention in @lgray's #446 (comment) - we can remove the exception to allow that flavor of map_partitions call at least, but the simple actually use * case will require more thought.

@lgray
Copy link
Collaborator Author

lgray commented Jan 11, 2024

Yeah I just need it internal to coffea for a map_partitions call where I can pre-calculate the meta by hand quite easily.

I think you can even leave in that exception if someone doesn't supply a meta by hand.

@douglasdavis
Copy link
Collaborator

Alright that seems like a good starting point for a PR

@lgray
Copy link
Collaborator Author

lgray commented Jan 12, 2024

I was almost able to get this to work by hiding the correction I need to apply in the attrs of a throw-away awkward array, but the partitioning bites me in the ass and it won't compute for large datasets.

So a PR is definitely need so we can have a single delayed object get sent around and applied to multiple calls.

@lgray
Copy link
Collaborator Author

lgray commented Jan 12, 2024

So close!

@douglasdavis
Copy link
Collaborator

douglasdavis commented Jan 12, 2024

Alright #449 allows map_partitions calls (where meta= is explicitly passed by the caller) to have Delayed objects in the arguments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants