Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply kerchunk to one-timestep-per-file (climate model) output #123

Open
aaronspring opened this issue Feb 3, 2022 · 10 comments
Open

Apply kerchunk to one-timestep-per-file (climate model) output #123

aaronspring opened this issue Feb 3, 2022 · 10 comments

Comments

@aaronspring
Copy link

aaronspring commented Feb 3, 2022

  • I want to use kerchunk to open climate model netcdf output stored 1 or 12 timestep/s per file in many files as one huge zarr.
  • I do not care about the cloud capabilities. I just need to work this out locally.
  • I do not want to convert nc files to zarr, because I dont have the storage to keep both and need to keep nc.
  • Therefore I thought kerchunk could help me.

I wrote a small example below but it fails with an zarr shape error which I do not understand. I get SingleHdf5ToZarr running, but MultiZarrToZarr fails. Does anyone see a fix for this? Is this example supposed to work?

import os
import zipfile
import kerchunk.hdf
import fsspec
import json
import xarray as xr
import numpy as np

# dummy data
ds = xr.DataArray(name='var', data=np.random.random((100,100)), dims=['lon','lat'], coords={'lon':range(100), 'lat':range(100)}).to_dataset()#.expand_dims('time')
urls =[f'test_{y}.nc' for y in range(2000,2011)]
for u in urls:
    ds.to_netcdf(u)

so = dict(
    anon=True, default_fill_cache=False, default_cache_type='first'
)
with zipfile.ZipFile("out.zip", mode="w") as zf:
    for u in urls:
        print(u)
        print(os.path.exists(u))
        with fsspec.open(u, **so) as inf:
            h5chunks = kerchunk.hdf.SingleHdf5ToZarr(inf, u, inline_threshold=100)#, mode='r')
            with zf.open(os.path.basename(u) + ".json", 'w') as outf:
                outf.write(json.dumps(h5chunks.translate()).encode())

from kerchunk.combine import MultiZarrToZarr
mzz = MultiZarrToZarr(
    "zip://*.json::out.zip",
    remote_protocol="file",
    xarray_open_kwargs={
       # "preprocess": None,#drop_coords,
        "decode_cf": False,
        "mask_and_scale": False,
        "decode_times": False,
        "decode_timedelta": False,
        "use_cftime": False,
        "decode_coords": False
    },
    xarray_concat_args={'dim':'time'}
)
#test_dict = mzz.translate()

# open single json as zarr
test_dict='test_2000.nc.json' # after unzip out.zip
m = fsspec.get_mapper('reference://', fo=test_dict, remote_protocoll='file')
print(xr.open_dataset(m, engine='zarr')) # works


mzz.translate("output.zarr") # see Traceback below

# This can also be written as a json
mzz.translate("output.json") # same Traceback

Output

(xr) aaron.spring@MacBook-Pro:~/Coding/kerchunk/my_kerchunk$ python run.py
test_2000.nc
True
test_2001.nc
True
test_2002.nc
True
test_2003.nc
True
test_2004.nc
True
test_2005.nc
True
test_2006.nc
True
test_2007.nc
True
test_2008.nc
True
test_2009.nc
True
test_2010.nc
True
/Users/aaron.spring/Coding/kerchunk/my_kerchunk/run.py:49: RuntimeWarning: Failed to open Zarr store with consolidated metadata, falling back to try reading non-consolidated metadata. This is typically much slower for opening a dataset. To silence this warning, consider:
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
  print(xr.open_dataset(m, engine='zarr'))
<xarray.Dataset>
Dimensions:  (lat: 100, lon: 100)
Coordinates:
  * lat      (lat) float64 0.0 1.0 2.0 3.0 4.0 5.0 ... 95.0 96.0 97.0 98.0 99.0
  * lon      (lon) float64 0.0 1.0 2.0 3.0 4.0 5.0 ... 95.0 96.0 97.0 98.0 99.0
Data variables:
    var      (lon, lat) float64 ...
Traceback (most recent call last):
  File "/Users/aaron.spring/Coding/kerchunk/my_kerchunk/run.py", line 52, in <module>
    mzz.translate("output.zarr")
  File "/Users/aaron.spring/Coding/kerchunk/kerchunk/combine.py", line 79, in translate
    out = self._build_output(ds, ds0, fss)
  File "/Users/aaron.spring/Coding/kerchunk/kerchunk/combine.py", line 185, in _build_output
    acc_len = make_coord(fss, z, accum_dim)
  File "/Users/aaron.spring/Coding/kerchunk/kerchunk/combine.py", line 301, in make_coord
    zz = zarr.open_array(fs.get_mapper(accum_dim))
  File "/Users/aaron.spring/mambaforge/envs/xr/lib/python3.9/site-packages/zarr/creation.py", line 526, in open_array
    init_array(store, shape=shape, chunks=chunks, dtype=dtype,
  File "/Users/aaron.spring/mambaforge/envs/xr/lib/python3.9/site-packages/zarr/storage.py", line 353, in init_array
    _init_array_metadata(store, shape=shape, chunks=chunks, dtype=dtype,
  File "/Users/aaron.spring/mambaforge/envs/xr/lib/python3.9/site-packages/zarr/storage.py", line 390, in _init_array_metadata
    shape = normalize_shape(shape) + dtype.shape
  File "/Users/aaron.spring/mambaforge/envs/xr/lib/python3.9/site-packages/zarr/util.py", line 42, in normalize_shape
    raise TypeError('shape is None')
TypeError: shape is None

Versions:

  • zarr 2.10.3
  • kerchunk 0.0.5+47.gf42b0c2 (current main)
  • fsspec 2022.1.0
  • xarray 0.20.2
@martindurant
Copy link
Member

I do not see the variable/coordinate "time" in the input data

@aaronspring
Copy link
Author

I got the same error even with ds = xr.DataArray(name='var', data=np.random.random((100,100)), dims=['lon','lat'], coords={'lon':range(100), 'lat':range(100)}).to_dataset().expand_dims('time') also with multiple time indices in ds

@martindurant
Copy link
Member

martindurant commented Feb 4, 2022

.expand_dims('time')

This just adds "time" to the list of dims but doesn't assign any values, so there's no way to determine what the output "time" coordinate should be.
Interestingly, in #122 I am developing a new version of combine, where you can define how to extract the concat coord(s) from each dataset.
The following works, for example

In [20]: from kerchunk.combine import MultiZarrToZarr
    ...: mzz = MultiZarrToZarr(
    ...:     "zip://*.json::out.zip",
    ...:     remote_protocol="file",
    ...:     concat_dims=["time"],
    ...:     coo_map={"time": "INDEX"}
    ...: )
In [21]: test_dict = mzz.translate()
In [22]: m = fsspec.get_mapper('reference://', fo=test_dict, remote_protocoll='file')
In [23]: xr.open_dataset(m, engine='zarr', backend_kwargs={"consolidated": False})
Out[23]:
<xarray.Dataset>
Dimensions:  (lat: 100, lon: 100, time: 11)
Coordinates:
  * lat      (lat) float64 0.0 1.0 2.0 3.0 4.0 5.0 ... 95.0 96.0 97.0 98.0 99.0
  * lon      (lon) float64 0.0 1.0 2.0 3.0 4.0 5.0 ... 95.0 96.0 97.0 98.0 99.0
  * time     (time) float64 nan 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
Data variables:
    var      (time, lon, lat) float64 ...

where we use the coo_map argument we define the value of "time" to be equal to the sequential index of the inputs. The output dtype of "time" should be int64, not float64, but this is pretty good.

@aaronspring
Copy link
Author

great. #122 works for me. only the final step fails File "/Users/aaron.spring/Coding/kerchunk/run.py", line 44, in <module> mzz.translate("output.json") TypeError: translate() takes 1 positional argument but 2 were given. looking forward to when this PR is merged!

@martindurant
Copy link
Member

Yep, for now it returns the references for you to deal with - but it should write directly as the previous version did.

@aaronspring
Copy link
Author

I want to use kerchunk to open climate model netcdf output stored 1 or 12 timestep/s per file in many files as one huge zarr.

After discussing with @d70-t, I found out that my climate model output is usually stored with no spatial chunks and the unlimited time dimension. Given those current defaults in creating climate model output, kerchunk cannot help me speeding up. Also my internal test case is slower using kerchunk compared to just using nc. Nevertheless #122 works for me and will resolve the issue title.

@martindurant
Copy link
Member

my climate model output is usually stored with no spatial chunks and the unlimited time dimension

That's OK!

See this branch, where I've made a start on netCDF3:

  • we can grab arrays as chunks (which could pre sub-chunked Add the ability to split large chunks #124)
  • the records part (for the unlimited dimension(s)) can either be read as a single numpy record array and sub-selected, or one chunk per record if they are big enough - or even something inbetween. The choice would be in the hands of the person making the references.

test case is slower using kerchunk compared to just using nc

Can you share?

@aaronspring
Copy link
Author

aaronspring commented Feb 28, 2022

I create from MultiZarrToZarr and compare this performance in xr.open_dataset(json) again xr.open_mfdatasets(nc_file_list). To get some more random read accesses shuffle the data and multiply with itself.

I would have hoped the open.dataset(zarr) would outperform the others, but doesnt.

Script:

import os
import zipfile
import kerchunk.hdf
import fsspec
from timeit import default_timer as timer

import xarray as xr
import numpy as np
from kerchunk.combine import MultiZarrToZarr
import json

recreate=True
use_dummy=True # False
nvar=20
ntime=50
nlev=1
if use_dummy:
    ds = xr.DataArray(name='var', data=np.random.random((220,256,12,nlev,nvar)), dims=['x','y','time','depth','var'], coords={'x':range(220), 'y':range(256), 'depth':range(nlev), 'var':[f'var{i}' for i in range(nvar)]}).to_dataset('var')

    urls =[f'test_{y}.nc' for y in range(2000,2000+ntime)]
    if recreate:
        for i,u in enumerate(urls):
            #ds = ds.assign_coords(time=xr.cftime_range(start=str(2000+i),freq='MS', periods=ds.time.size))
            ds = ds.assign_coords(time=range(2000+i*12,2000+i*12+12))
            ds.astype('float32').to_netcdf(u)

else:
    # use MPIESM output
    urls = [i for i in os.listdir('.') if 'asp_esmControl' in i]


start=timer()
so = dict(
    anon=True, default_fill_cache=False, default_cache_type='first'
)

with zipfile.ZipFile("out.zip", mode="w") as zf:
    for u in urls:
        print(u)
        with fsspec.open(u, **so) as inf:
            h5chunks = kerchunk.hdf.SingleHdf5ToZarr(inf, u, inline_threshold=100)
            with zf.open(os.path.basename(u) + ".json", 'w') as outf:
                outf.write(json.dumps(h5chunks.translate()).encode())

remote_protocol="memory"
from kerchunk.combine import MultiZarrToZarr
mzz = MultiZarrToZarr(
    "zip://*.json::out.zip",
    remote_protocol=remote_protocol,
    concat_dims=["time"],
    #coo_map={"time": "INDEX"},
)
print(mzz)
test_dict = mzz.translate()
end = timer()
print('create json '+str(round(end-start,3))+'s')

#print(test_dict)
#mzz.translate("output.zarr")

# This can also be written as a json
#mzz.translate("output.json")

combine='nested'
concat_dim='time'

m = fsspec.get_mapper('reference://', fo=test_dict, remote_protocoll=remote_protocol)
ker = xr.open_dataset(m, engine='zarr', backend_kwargs={"consolidated": False})
x = xr.open_mfdataset(urls, concat_dim=concat_dim, combine=combine)
print(f'{ker.sizes} {ker.nbytes*1e-9} GB')
print(f'{x.sizes} {x.nbytes*1e-9} GB')

def shuffle(ds):
    idx = np.arange(ds.time.size)
    np.random.shuffle(idx)
    return (ds.isel(time=idx)*ds).mean('time')


# time xr.open_mfdataset(, engine='zarr')
print('start shuffle(xr.open_dataset(json, engine="zarr")).compute()')
start = timer()
#m = fsspec.get_mapper('reference://', fo=test_dict, remote_protocoll=remote_protocol)
shuffle(xr.open_dataset(m, engine='zarr', backend_kwargs={"consolidated": False, "storage_options": {"fo":test_dict, "remote_protocol":remote_protocol}})).compute()
end = timer()
print('took '+str(round(end-start,3))+'s')



combine='nested'
concat_dim='time'
# time open xr.open_mfdataset(urls)
print('start shuffle(xr.open_mfdataset("*").compute()')
start = timer()
shuffle(xr.open_mfdataset(urls, concat_dim=concat_dim, combine=combine)).compute()
end = timer()
print('took '+str(round(end-start,3))+'s')


print('start shuffle(xr.open_mfdataset("*", parallel=True, coords="minimal", data_vars="minimal", compat="override").mean().compute()')
start = timer()
shuffle(xr.open_mfdataset(urls, combine=combine, concat_dim=concat_dim, parallel=True, coords="minimal", data_vars="minimal", compat="override")).compute()
end = timer()
print('took '+str(round(end-start,3))+'s')



print('start shuffle(xr.open_mfdataset("*", parallel=False, coords="minimal", data_vars="minimal", compat="override").mean().compute()')
start = timer()
shuffle(xr.open_mfdataset(urls, concat_dim=concat_dim, combine=combine, parallel=False, coords="minimal", data_vars="minimal", compat="override")).compute()
end = timer()
print('took '+str(round(end-start,3))+'s')


print("")
print("end")

Output:

(xr) aaron.spring@MacBook-Pro:~/Coding/kerchunk$ python zarrify_nc.py
test_2000.nc
test_2001.nc
test_2002.nc
...
test_2048.nc
test_2049.nc
<kerchunk.combine.MultiZarrToZarr object at 0x100d53f70>
create json 1.029s

Frozen({'depth': 1, 'time': 600, 'x': 220, 'y': 256}) 2.703368616 GB
Frozen({'x': 220, 'y': 256, 'time': 600, 'depth': 1}) 2.703368616 GB

start shuffle(xr.open_dataset(json, engine="zarr")).compute()
took 22.922s


start shuffle(xr.open_mfdataset("*").compute()
/Users/aaron.spring/mambaforge/envs/xr/lib/python3.9/site-packages/xarray/core/indexing.py:1227: PerformanceWarning: Slicing with an out-of-order index is generating 12 times more chunks
  return self.array[key]
...
/Users/aaron.spring/mambaforge/envs/xr/lib/python3.9/site-packages/xarray/core/indexing.py:1227: PerformanceWarning: Slicing with an out-of-order index is generating 12 times more chunks
  return self.array[key]
took 5.966s


start shuffle(xr.open_mfdataset("*", parallel=True, coords="minimal", data_vars="minimal", compat="override").mean().compute()
/Users/aaron.spring/mambaforge/envs/xr/lib/python3.9/site-packages/xarray/core/indexing.py:1227: PerformanceWarning: Slicing with an out-of-order index is generating 12 times more chunks
  return self.array[key]
...
/Users/aaron.spring/mambaforge/envs/xr/lib/python3.9/site-packages/xarray/core/indexing.py:1227: PerformanceWarning: Slicing with an out-of-order index is generating 12 times more chunks
  return self.array[key]
took 6.157s


start shuffle(xr.open_mfdataset("*", parallel=False, coords="minimal", data_vars="minimal", compat="override").mean().compute()
/Users/aaron.spring/mambaforge/envs/xr/lib/python3.9/site-packages/xarray/core/indexing.py:1227: PerformanceWarning: Slicing with an out-of-order index is generating 12 times more chunks
  return self.array[key]
...
/Users/aaron.spring/mambaforge/envs/xr/lib/python3.9/site-packages/xarray/core/indexing.py:1227: PerformanceWarning: Slicing with an out-of-order index is generating 12 times more chunks
  return self.array[key]
took 5.958s

end

@martindurant
Copy link
Member

martindurant commented Feb 28, 2022

The biggest difference here, is that the kerchunk-zarr dataset is not being opened with dask. You need to add chunks={} for that. I get the following results. Note that I saved my output references to a file, "test_out.json" (62kB) so that I could run everything with a fresh python session (I also added skip_instance_cache for this reason)

import xarray as xr
import numpy as np

ntime = 50
combine='nested'
concat_dim='time'

urls =[f'test_{y}.nc' for y in range(2000,2000+ntime)]

%time ds = xr.open_mfdataset(urls, concat_dim=concat_dim, combine=combine, parallel=True, coords="minimal",
                             data_vars="minimal", compat="override")
# 1.56 s

%time ds2 = xr.open_dataset("reference://", engine='zarr', backend_kwargs={"consolidated": False, "storage_options": {
    "fo":"test_out.json", "remote_protocol":"file", "skip_instance_cache": True}}, chunks={})
# 57 ms

def shuffle(ds):
    idx = np.arange(ds.time.size)
    np.random.shuffle(idx)
    return (ds.isel(time=idx)*ds).mean('time')

%time shuffle(ds).compute()
# 10.3 s

%time shuffle(ds2).compute(0)
# 14.3 s

import dask.distributed
client = dask.distributed.Client()

%time shuffle(ds).compute()
# 29.5 s

%time shuffle(ds2).compute(0)
# 20.6 s

So with default threaded Dask, kerchunk is somewhat slower (I suspect xarray is doing some caching of its own here, cc @rabernat ?) but with distributed it is somewhat faster (I suspect distributed's cache is efficient here). Kerchunk is always much faster to open the dataset, which is a small part of the overall cost in this case.

Note that kerchunk allows you to choose chunk sizes bigger that the native ones in the data (e.g., chunks={"time": 36}), which makes a big difference to remote data, but no real difference here.

@aaronspring aaronspring changed the title MultiZarrToZarr.translate: TypeError: shape is None from zarr Apply kerchunk to one-timestep-per-file (climate model) output Feb 28, 2022
@martindurant
Copy link
Member

On second thoughts, since the data is uncompressed, it may well be that h5py can read it directly without copy and without the GIL, whereas kerchunk/referenceFS deals with bytes. We could implement opening as mmap and passing around memorybuffers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants