Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel writing of zarr array from single process #20

Closed
tcompa opened this issue Jul 28, 2022 · 8 comments
Closed

Parallel writing of zarr array from single process #20

tcompa opened this issue Jul 28, 2022 · 8 comments
Assignees

Comments

@tcompa
Copy link
Collaborator

tcompa commented Jul 28, 2022

Here is a branch from https://github.com/fractal-analytics-platform/fractal/issues/115, to specifically address parallel writing within a "simple" example (unrelated to Fractal). I'll split our notes into a few comments, to keep it readable.

Context

  • We create the array [0, 0, 0, 0] and we store it to disk, with different chunk sizes (either 1, 2 or 4).
  • We define some relevant set of ROIs, where each ROI correspond to a list of indices. An example of a single-pixel ROI is [0]. An example of a three-pixel ROI is [0, 1, 2]. The first set of ROIs is made of four single-pixel regions: [0], [1], [2], [3]. The second set of ROIs is made of three ROIs: [0, 1], [1, 2], [2, 3]. Notice that in the former case there is no pixel overlapping, while in the latter ROIs share some pixels. The chunk size combined with the ROI size also determines whether each chunk hosts a single ROI or several ROIs.
  • We design a task which iterates a function process_ROI() over all ROIs of an array. The function simply increments the array values by two, and it includes a time.sleep(1).
  • We run the task six times, corresponding to three different chunk sizes (1, 2, 4) and two different choices of ROIs. For each run, we verify the following statements:
  1. The output array is [2, 2, 2, 2];
  2. The number of calls to process_ROI() match with the number of ROIs;
  3. The total time is close to 1 second, meaning that all ROIs were processed in parallel.

Important caveat

Within the task, we carefully assign the results of the delayed process_ROI calls to an output array which is not the same as the input. First of all, this keeps us close to dask best practices, Moreover, re-assigning values to the same array would impact the three checks above:

  1. The output array could differ from [2, 2, 2, 2], in cases where two ROIs share a pixel (this is OK and expected);
  2. The number of calls to process_ROI may be larger than the actual number of ROIs, due to some suboptimal construction of dask graph (we observed this in some example);
  3. The total time could grow much beyond the expected value (related to previous point).

For this reasons, we should always assign result of delayed/indexed execution to a new array.

After this long summary/introduction, code and results are in the following comments.

@tcompa tcompa self-assigned this Jul 28, 2022
@tcompa
Copy link
Collaborator Author

tcompa commented Jul 28, 2022

First, here is the code used for testing. It iterates over two kinds of ROIs, and over three options for the chunk sizes (1, 2, and 4).

import os
import shutil
import numpy as np
import dask
import dask.array as da
import time


# Global tmpfile
tmpfile = "tmp.dat"


# Mock of a task: read an array, process it per-ROI, write resulting array
def task(ROIs, N, chunksize, _case=0, do_graphs=False):

    # Initialize tmpfile (to count number of process_ROI calls)
    if os.path.isfile(tmpfile):
        os.remove(tmpfile)

    # Function to be applied to each ROI
    def process_ROI(ROI):
        time.sleep(1)
        with open(tmpfile, "a") as f:
            f.write("1\n")
        return ROI + 2
    delayed_process_ROI = dask.delayed(process_ROI)

    # Read array
    y = da.from_zarr("x.zarr")

    # Define output array
    z = da.empty(y.shape, chunks=y.chunks, dtype=y.dtype)

    # Per-ROI processing
    for indices in ROIs:
        sx, ex = indices[:]
        shape = (ex - sx,)
        res = delayed_process_ROI(y[sx:ex])
        z[sx:ex] = da.from_delayed(res, shape=shape, dtype=np.float32)

    if do_graphs:
        z.visualize(f"fig_graph_N{N}_c{chunksize}_case_{_case}_opt.png",
                    optimize_graph=True,
                    color="order",
                    cmap="autumn",
                    node_attr={"penwidth": "3"})

    # Write array to disk (after removing temporary folders, if needed)
    if os.path.isdir("/tmp/tmp.zarr"):
        shutil.rmtree("/tmp/tmp.zarr")
    z.to_zarr("/tmp/tmp.zarr", compute=True)


# Global parameters
N = 4
do_graphs = False

# Loop over chunk sizes
for chunksize in [1, 2, 4]:

    # Create a chunked size-N array of zeros and store it to disk
    if os.path.isdir("x.zarr"):
        shutil.rmtree("x.zarr")
    x = da.zeros(shape=N, chunks=chunksize, dtype=int)
    x.to_zarr("x.zarr", compute=True)

    # Define ROIs
    ROIs_case1 = [(i, i + 1) for i in range(N)]
    ROIs_case2 = [(i, i + 2) for i in range(N) if i + 2 <= N]

    print()
    print(f"N={N}, chunksize={chunksize}")
    print()

    # Loop over two choices of ROIs
    for iROIs, ROIs in enumerate([ROIs_case1, ROIs_case2]):

        # Print information about ROIs
        n_ROIs = len(ROIs)
        ROIs_long = [list(range(ROI[0], ROI[1])) for ROI in ROIs]
        len_ROI = len(ROIs_long[0])
        print(f"{n_ROIs} ROIs of {len_ROI} pixels each: {ROIs_long}")

        # Execute task
        t0 = time.perf_counter()
        task(ROIs, N, chunksize, _case=iROIs, do_graphs=do_graphs)
        t1 = time.perf_counter()

        # Diagnostics on output
        out = da.from_zarr("/tmp/tmp.zarr").compute()
        num_calls = np.loadtxt(tmpfile, dtype=int).sum()
        print(f"  Task completed (elapsed: {t1-t0:.3f} s)")
        print(f"  Output array: {out}")
        print(f"  Number or ROIs:              {len(ROIs)}")
        print(f"  Number or process_ROI calls: {num_calls}")

        assert np.allclose(out, np.ones(N) * 2)
        assert num_calls == len(ROIs)

        print()
    print("----------")

@tcompa
Copy link
Collaborator Author

tcompa commented Jul 28, 2022

We first look at the first kind of ROIs, namely the ones of a single-pixel each (and with no shared pixels).
The corresponding part of the output is

N=4, chunksize=1

4 ROIs of 1 pixels each: [[0], [1], [2], [3]]
  Task completed (elapsed: 1.015 s)
  Output array: [2 2 2 2]
  Number or ROIs:              4
  Number or process_ROI calls: 4

----------

N=4, chunksize=2

4 ROIs of 1 pixels each: [[0], [1], [2], [3]]
  Task completed (elapsed: 1.011 s)
  Output array: [2 2 2 2]
  Number or ROIs:              4
  Number or process_ROI calls: 4

----------

N=4, chunksize=4

4 ROIs of 1 pixels each: [[0], [1], [2], [3]]
  Task completed (elapsed: 1.017 s)
  Output array: [2 2 2 2]
  Number or ROIs:              4
  Number or process_ROI calls: 4

----------

and we see that the three statements (correct result, correct number of calls, correct timing) are all satisfied, including in cases where all ROIs belong to the same chunk.

@tcompa
Copy link
Collaborator Author

tcompa commented Jul 28, 2022

The second case is the one where ROIs have length 2 and they share some pixels.
Here is the related ouput

N=4, chunksize=1

3 ROIs of 2 pixels each: [[0, 1], [1, 2], [2, 3]]
  Task completed (elapsed: 1.016 s)
  Output array: [2 2 2 2]
  Number or ROIs:              3
  Number or process_ROI calls: 3

----------

N=4, chunksize=2

  3 ROIs of 2 pixels each: [[0, 1], [1, 2], [2, 3]]
  Task completed (elapsed: 1.020 s)
  Output array: [2 2 2 2]
  Number or ROIs:              3
  Number or process_ROI calls: 3

----------

N=4, chunksize=4

3 ROIs of 2 pixels each: [[0, 1], [1, 2], [2, 3]]
  Task completed (elapsed: 1.015 s)
  Output array: [2 2 2 2]
  Number or ROIs:              3
  Number or process_ROI calls: 3

----------

Once again, all statements are satisfied, even when we have both "dangerous" overlaps (ROIs that share both pixels and chunks).

@tcompa
Copy link
Collaborator Author

tcompa commented Jul 28, 2022

There's no free lunch though, and when you write multiple ROIs to the same chunk you pay a price in the dask graph complexity.
As an example, we look at the first case, where there is no pixel overlap between ROIs - and we show the dask graph for three cases:

  1. Chunk size equal to 1, i.e. one ROI per chunk;
  2. Chunk size equal to 2, i.e. two ROIs per chunk;
  3. Chunk size equal to 4, i.e. all ROIs in the same chunk.

It is clear, in the figures below, that the complexity grows when multiple parts of the chunk need to be processed (this is also very reassuring, meaning that dask will not just compute-and-write everything in parallel on the same chunk). When scaling up array sizes, we may expect overheads coming from different functions acting on the same chunk - and this is perfectly expected. The size of the overhead will need to be checked, in some way, for the relevant scale.

Here are the three graphs:
fig_graph_N4_c1_case_0_opt
fig_graph_N4_c2_case_0_opt
fig_graph_N4_c4_case_0_opt

@tcompa
Copy link
Collaborator Author

tcompa commented Jul 28, 2022

That's it for now, and IMO it closes the issue of parallel writing - until further notice.

Points that we are not considering as understood:

  1. How large is the overhead for per-ROI processing in a realistic case? Notice that one can always try to reduce it by reducing the chunk size, but that's just a compromise and not a catch-all recipe.
  2. When we need to overwrite an array, without ever creating a copy (not even a lazy one), things are not under control.
  3. We don't know whether a similar safe behavior applies when writes come from different python processes. First guess is that that situation would rapidly lead to errors, unless we use other safety measures (e.g. a file lock).

@tcompa
Copy link
Collaborator Author

tcompa commented Jul 28, 2022

I'm closing this now. Feel free to re-open for additional comments.

@jluethi
Copy link
Collaborator

jluethi commented Jul 28, 2022

Wow, great test and really good documentation of the results. Thanks a lot @tcompa !

It is very impressive to see how dask handles such processing for more complex, overlapping setups! I'm particularly impressed & surprised that it handles the overlapping ROIs!

You are right, there is no free lunch. Though if the only cost for this lunch is a slight overhead, but dask handles all the complexity: Well, then I'm very willing to pay the overhead for that lunch! Even if processing in arbitrary chunks were 50% slower, but it still actually works without raising the complexity of the code we need to write, that is a super good trade-off!

Really interesting to eventually consider this for fancier implementation of cellpose. See also here for distributed cellpose runs that still all come together again into a single, already combined output (without later relabeling): https://github.com/MouseLand/cellpose/blob/main/cellpose/contrib/distributed_segmentation.py
(not something we need to worry about for the time being, just a link I found this morning that may be interesting in the future)

  1. How large is the overhead for per-ROI processing in a realistic case? Notice that one can always try to reduce it by reducing the chunk size, but that's just a compromise and not a catch-all recipe.

This may actually become significantly easier to do if sharding gets implemented (see #54) and we have the option to play with chunk sizes without exploding file numbers.

  1. When we need to overwrite an array, without ever creating a copy (not even a lazy one), things are not under control.

Very good point. Means that there is extra temporary storage overhead that Fractal will need. But this overhead is passable.

  1. We don't know whether a similar safe behavior applies when writes come from different python processes. First guess is that that situation would rapidly lead to errors, unless we use other safety measures (e.g. a file lock).

So another good argument to keep tasks running on a per-well basis then. Good to be aware of this!

@tcompa
Copy link
Collaborator Author

tcompa commented Jul 29, 2022

3. When we need to overwrite an array, without ever creating a copy (not even a lazy one), things are not under control.

Very good point. Means that there is extra temporary storage overhead that Fractal will need. But this overhead is passable.

A clarification:
What I meant with "overwrite" is not the same as in "we want to overwrite the zarr file" [as discussed in https://github.com//issues/53], but it's rather the (wrong) idea of using code like:

x = da.from_zarr(...)
for ROI in ROIs:
    start_x, end_x = ...
    out = delayed_process(x[start_x:end_x])
    x[start_x:end_x] = da.from_delayed(out, ...)
x.to_zarr(..)

This code creates a dask graph with additional calls to the time-consuming function, and that's the main reason for avoiding it (apart from being a bad dask practice).

Concerning the other issue (overwriting a zarr file - as in #53), it would be actually very interesting to re-assess it after we moved away from map_blocks. It is even possible that the new implementation is free from the issue, as it is quite different in what it does. To be checked.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants