Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance issue with repartition #509

Open
yimuchen opened this issue May 31, 2024 · 27 comments
Open

Performance issue with repartition #509

yimuchen opened this issue May 31, 2024 · 27 comments

Comments

@yimuchen
Copy link

I was testing a workflow of file skimming, and to account for the possibility that the rate that events of interest is very low in the skimming scheme, I attempted to use array.repartition to reduce the number of files that would be generated, as all file writing methods that I know of creates 1 file per partition.

I've provided a code to generate a set of dummy data that roughly matches the data schema (jagged arrays with very mismatch collection sizes), and performing a simple skim operation. What is observed is that during the is repartition is specified, the memory is pinned at ~5-7GB regardless of the partitioning scheme that is used defined by uproot. A suggestion to use dask.array.persist makes the computation of the array.repartition step takes a very long time and just as much memory.

This is how I am attempting to skim the files in question:

import uproot
import dask
from dask.distributed import Client

client = Client(processes=False, n_workers=1, threads_per_worker=1)


def make_skimmed_events(events):
    # Place your selection logic here
    skimmed = events[events.nJets > 10]
    skimmed["myCustom"] = 137 * 9.8
    return skimmed


events = uproot.dask({f"dummy_{idx}.root": "Events" for idx in range(0, 2)}, step_size=2_000) # This only helps save memory if no repartition is used.

print("Calculate skimm")
skimmed = make_skimmed_events(events)

# Trying persist
print("Calculating persisted")
persisted = skimmed.persist()
print("Calculating repartition")
parted = persisted.repartition(rows_per_partition=10_000)

# Or trying direct repartition doesn't work
# parted = skimmed.repartition(rows_per_partition=5_000)
print("Calculating running uproot write")
writer = uproot.dask_write(
    parted,
    destination="skimtest/",
    prefix="mytest/skimmed",
    compute=False,
)
print("Calculating the graphs")
dask.visualize(writer, filename="Skim_test_puredask.pdf")
dask.visualize(writer, filename="Skim_test_opt_puredask.pdf", optimize_graph=True)
print("Executing the final task")
dask.compute(writer, optimize_graph=False)

The data in question can be generated using this script (each file will be about 2.5GB in size)

import awkward as ak
import numpy as np
import uproot


for name_idx in range(0, 10):
    n_events = 100_000

    n_jets = np.random.poisson(lam=5, size=n_events)
    n_part = np.random.poisson(lam=300, size=n_events)
    n_obj = np.random.poisson(lam=75, size=n_events)

    jets_arr = ak.zip(
        {
            field: ak.unflatten(np.random.random(size=ak.sum(n_jets)), n_jets)
            for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
        }
    )
    part_arr = ak.zip(
        {
            field: ak.unflatten(np.random.random(size=ak.sum(n_part)), n_part)
            for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
        }
    )
    obj_arr = ak.zip(
        {
            field: ak.unflatten(np.random.random(size=ak.sum(n_obj)), n_obj)
            for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
        }
    )

    with uproot.recreate(f"dummy_{name_idx}.root") as f:
        f["Events"] = {
            "Jets": jets_arr,
            "Particles": part_arr,
            "Object": obj_arr,
        }
@lgray
Copy link
Collaborator

lgray commented Jun 6, 2024

@martindurant when you have time - some thought here would be nice.

If we do this in two steps, i.e. write out files with no repartitioning and then repartition those files the memory issues vanish.

This is odd since the repartition shouldn't care about data that we've cut out, but some how it's acting like it needs all the data before cuts in the tasks doing the partition aggregation.

This really sounds like some data lifecycle problem.

@martindurant
Copy link
Collaborator

memory is pinned at ~5-7GB

It seems that the in-memory size of each (unfiltered) partition really is at least a few GB. This must be completely loaded, in fact each output partition will need inputs from multiple files. Only then do you do the filtering. As far as I know, it's no possible to filter during the process of streaming data from a file. I read-and-filter operation would be great! But this basic process is the reason that parquet partitions (for an example I am more familiar with) are usually <<100MB.

In addition, this whole workflow is entirely disk bound, since filtering is very fast. That means that any parallel tasks are trying to read various parts of various files and write too, all over the same bus. I don't really expect dask to be able to do anything for this case. (I realise that this is not the real workflow you want to run)

Nevertheless, this reminds me, that I think we considered at some point a type of repartition that is exactly every N inputs -> 1 output; that at least simplifies each task.

@martindurant
Copy link
Collaborator

this whole workflow is entirely disk bound

I might be wrong - the dask profile shows decompression is taking 93% of time

@martindurant
Copy link
Collaborator

btw: for me, the output only has 2805 rows, so it's a single partition.

@yimuchen
Copy link
Author

yimuchen commented Jun 7, 2024

I wasn't able to replicate this example [1] with just dask/uproot, but it looks like if we expose the dask_awkward structure before performing a repartition, this does help with the memory issue. I also tried to use the uproot.dask(step_size="100MB") [2] to force each unfiltered partition to be small, but it still takes upwards of 6GBs of memory in single threaded execution, meaning that the program is still loading multiple unfiltered partitions into memory. I was assuming that repartition would only be called after determining the size of the filtered partition, meaning that we only have at most 1 unfiltered partition in memory at the same time? Or is this too naive of a picture of what repartition is trying to do?

[1] CoffeaTeam/coffea#1100
[2] https://uproot.readthedocs.io/en/latest/uproot._dask.dask.html

@lgray
Copy link
Collaborator

lgray commented Jun 7, 2024

I guess I'm not entirely clear as to why the repartition on the filtered data needs to have any knowledge at all about the filtered data... and why the filtered data is hanging around that long in dask worker. At the point it isn't needed any more it should just be dropped, and there's nothing in this workflow that needs to know about the original partitions for the repartition step, iiuc.

@martindurant
Copy link
Collaborator

When you say filtered.repartition(num_rows=), dask-awkward needs to know the number of rows per input partition, in order to know which of them belong in a given output partition. This means loading the whole of the filter predicate, partition by partition. It then loads the (whole of) each input partition and filters them. In the given example, all of the input partitions end up in the same output partition.

uproot.dask(step_size="100MB")

I don't know what this does.

@yimuchen
Copy link
Author

yimuchen commented Jun 7, 2024

I see... after running the same process with a parquet file. I'm seeing this behavior as well, so this is a limit of dask, not some strange interaction with uproot.

So if I'm understanding this restriction correctly, this is mainly because repartitions must be evaluated during the generation of the task graph, so it needs to have all inputs available. Would it be possible to have a different method of obtaining the repartition scheme, where we aggregate filtered results as they arrive? Or this is fundamentally at odds with the paradigm of dask?


The step_size="100MB" attempts to limit each partition extracted by uproot to be no larger than 100MB.

@lgray
Copy link
Collaborator

lgray commented Jun 7, 2024

Right but once it is done loading the input partitions and filtering them it doesn't need to keep the original input data in memory.

What's really weird is that if I put a .persist() as a firebreak before making the repartition it still keeps all the memory for reading in the unfiltered data and that unfiltered data should be completely meaningless to the repartitioning data at that point.

@martindurant
Copy link
Collaborator

Given the new options in #250 , can we try again?

@yimuchen
Copy link
Author

Hi @martindurant, just tried out on the latest master branch.

The current implementation does not play nicely with uproot or dak.to_parquet, in particular this line in uproot, as npartitions is now set to 0 after a repartition(n_to_one=1xx) operation (Any value assigned to n_to_one seems to give this result).

This I'm guessing can be a "simple" fix in uproot with by changing npartitions to max([npartitions, 1]), but I want to check what is meant by npartitions=0, and where this simple fix may break things down the line.

If I add this additional new check to uproot, we do get much more reasonable memory usage (a new hundred MB usage rather than a few GB).

@yimuchen
Copy link
Author

If the assignment of zfill is purely aesthetic [1,2], I think this fix should be sufficient? Though it would break the output file parity with and without repartitioning if we use a trivial repartition like n_to_one=1.

[1] https://github.com/scikit-hep/uproot5/blob/main/src/uproot/writing/_dask_write.py#L36
[2] https://github.com/dask-contrib/dask-awkward/blob/main/src/dask_awkward/lib/io/parquet.py#L502

@martindurant
Copy link
Collaborator

The current implementation does not play nicely with uproot or dak.to_parquet

Since parquet does not have that line, what is the problem there?

@martindurant
Copy link
Collaborator

(remembering that in your previous version, the number of output partitions was actually 1)

@yimuchen
Copy link
Author

The current implementation does not play nicely with uproot or dak.to_parquet

Since parquet does not have that line, what is the problem there?

to_parquet actually also calls a similar line here [1]. I think this is purely an aesthetic choice of having the index in the files names be 00x, 0xy, xyz is there are an order of 1000-9000 files being written for example. At the time the function is called, arr.npartitions evaluates to 0.
[1] https://github.com/dask-contrib/dask-awkward/blob/main/src/dask_awkward/lib/io/parquet.py#L491

@martindurant
Copy link
Collaborator

OK, so the actual problem is, that the output of the repartition shows as having 0 partitions?

@yimuchen
Copy link
Author

Yes, arrays after a repartition(num_to_one) has npartitions=0. I have tried to have an additional eager_compute_divisions(), but that still renders a partition count of 0. I also just noticed that this is no output to be generated (I was accidentally looking at the old files generated), though some computation is being processing.

@yimuchen
Copy link
Author

I just had time to test a bit more. In my test, the original array will be separated into 52 partition.

  • If I set n_to_one=1, the output looks identical to no reparitition
  • If I set n_to_one to some value larger than 52, then we get the 0-paritition output and does not generate any output.
  • If I set n_to_one=1 to some smaller value, say 10, I get a strange AttritubeError: 'tuple' object has no attribute 'fields', so I think something is malformed here.

@martindurant
Copy link
Collaborator

#517 should fix those issues for you (tests included). In your case, you wanted n_to_one=52 which is the same as npartitions=1.

@yimuchen
Copy link
Author

This new implementation seems to have problems when loading from files:

import dask_awkward as dak

array = dak.from_lists([[[1, 2, 3], [], [4, 5]]] * 100)
array.to_parquet("test.parquet")

array2 = dak.from_parquet("test.parquet")
array2.repartition(n_to_one=10) # Fails on this line: 

The full error message is:

File "/srv/dask-awkward/src/dask_awkward/lib/core.py", line 1006, in repartition
    new_layer_raw, new_divisions = simple_repartition_layer(
                                   ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/srv/dask-awkward/src/dask_awkward/lib/structure.py", line 1424, in simple_repartition_layer
    (arr.name, part) for part in range(new_divisions[-2], new_divisions[-1])
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object cannot be interpreted as an integer

@martindurant
Copy link
Collaborator

Can you try with #518 ?

@yimuchen
Copy link
Author

This now seems to be working! Without any modifications to uproot required.

I think the only last piece that I wanted to confirm is that there seems to be a massive discrepancy in memory usage if repartition is called second-to-last step as opposed to being the last step before running a write call. An extra "conversion" function is required for uproot writing to avoid double-ly nested structures appearing in the awkward array (see the uproot_writable function here [1]). If this is after the repartition method, we get back our rather massive memory footprint.

Is this a behavior we should expect? I wanted to have a record to help future users avoid some hidden gotchas, in case things change in the future (I've attached the calculated tasks graphs and recorded memory footprint with the conversion placed before and after the repartition call in this thread)

re2conv_opt_new.pdf
conv2re_opt_new.pdf
memusage_conv2re_new.csv
memusage_re2conv_new.csv

[1]CoffeaTeam/coffea#1100

@lgray
Copy link
Collaborator

lgray commented Jun 19, 2024

@yimuchen uproot_writeable should be able to operate correctly using only the _meta (the typetracer) of the dask_awkward array, you don't need to map_partitions it or apply all those ak.to_packed to the actual data to normalize the forms.

You may be able to just drop it?

@yimuchen
Copy link
Author

I tried without the ak.to_packed, and I still run into the same memory issue. I think this is not strictly an issue with repartition (since I also saw something similar before the reparition fix). Should I mark this issue as closed and open a new issue if this becomes a recuring problem?

@lgray
Copy link
Collaborator

lgray commented Jun 20, 2024

I'm fine with tracking it down a bit more here so we understand. Memory problems are pretty crucial for us so we should make sure it isn't somehow related.

@yimuchen
Copy link
Author

Pardon me, was caught up with other items, but I managed to get a script that better illustrates the issue. This script can be ran with just the master branch of dask_awkward and uproot, and the dummy input will be automatically generated.

import dask_awkward as dak
import numpy as np
import uproot
from dask.distributed import Client

import awkward as ak


def make_events(rng: np.random.Generator, n_events: int):
    # Event-level variables
    events = ak.zip({f"prop_{idx}": rng.random(size=n_events) for idx in range(10)})

    # Large collection with many entries per event
    n_entries = rng.poisson(lam=300, size=n_events)
    large_col = ak.zip(
        {f"prop_{idx}": rng.random(size=ak.sum(n_entries)) for idx in range(10)}
    )
    events["large_col"] = ak.unflatten(large_col, n_entries)

    # Small collections with a handul of entries per event
    n_entries = rng.poisson(lam=10, size=n_events)
    small_col = ak.zip(
        {f"prop_{idx}": rng.random(size=ak.sum(n_entries)) for idx in range(10)}
    )
    events["small_col"] = ak.unflatten(small_col, n_entries)
    return events


def make_skimmed_events(events):
    return events[events.prop_0 < 0.1]  # Random 10% file reduction


if __name__ == "__main__":
    # Creating single thread client to better monitor performance
    client = Client(processes=False, n_workers=1, threads_per_worker=1)

    rng = np.random.default_rng(seed=123456)
    for file_idx in range(10):
        print("Making file", file_idx, "...")
        events = make_events(rng, 10_000)  # Each file will take ~200MB
        ak.to_parquet(events, f"unskimmed_{file_idx}.parquet")
        with uproot.recreate(f"unskimmed_{file_idx}.root") as f:
            f["Event"] = {k: events[k] for k in events.fields}

    print("Skimming with parquet file inputs")
    events = dak.from_parquet("unskimmed_*.parquet")
    events = make_skimmed_events(events)
    events = events.repartition(
        n_to_one=20
    )  # Given our basic estimate, everything should fit in one file
    dak.to_parquet(events, "skimmed.parquet")

    print("Skimming with root file inputs")
    events = uproot.dask("unskimmed_*.root")
    events = events.repartition(n_to_one=20)
    events = make_skimmed_events(events)
    uproot.dask_write(events, "skimmed.root")

Using memory_profiler [1] to map the memory usage, we get the following

test

Basically, we still get a very large spike in memory usage, despite attempting to merge the results. This is true both for parquet and uproot file writing. The magnitude of these memory usage peaks appears regardless of the filtering efficiency.

If I disable the repartition line, I get a much more reasonable memory usage, but with much more fragmented outputs.

test_no_repart

Let me know if any more testing could provide more insight.

[1] https://pypi.org/project/memory-profiler/

@yimuchen
Copy link
Author

Another discover that might help pin down what is causing this memory consumption is that if we attempt to strip the event before repartitioning:

events = events[["small_col"]] # New line to strip down save content
events = events.repartition(n_to_one=20)

This reduces the memory usage for dask.to_parquet [1], but not of uproot.dask_write [2], so the differences in these 2 implementations may be and indication for what part of memory is not being released?

[1] https://github.com/dask-contrib/dask-awkward/blob/main/src/dask_awkward/lib/io/parquet.py#L511
[2] https://github.com/scikit-hep/uproot5/blob/main/src/uproot/writing/_dask_write.py#L45

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants