-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnc2zarr.py
75 lines (59 loc) · 2.36 KB
/
nc2zarr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import itertools
import os
import shutil
import numpy as np
import xarray as xr
from optimize_chunks import optimize_chunk_size
MAX_IPFS_CHUNK_SIZE = 256 * 1024 - 14 # default 256 KiB blocksize - 14 bytes header
def guess_chunks(variable, compression_factor=1.):
return optimize_chunk_size(variable.dtype.itemsize, variable.shape, MAX_IPFS_CHUNK_SIZE, compression_factor)
def find_compression_factors(ds, encoding, zarrpath):
factors = {}
for varname, enc in encoding.items():
if "chunks" in enc:
var = ds[varname]
chunks = enc["chunks"]
gross_size = var.dtype.itemsize * np.prod(chunks)
folder = os.path.join(zarrpath, varname)
n_full_chunks = tuple(s // c for s, c in zip(var.shape, chunks))
full_chunk_files = [".".join(map(str, cid)) for cid in itertools.product(*map(range, n_full_chunks))]
if len(full_chunk_files) == 0:
continue
sizes = [os.path.getsize(os.path.join(folder, fcf)) for fcf in full_chunk_files]
net_size = np.mean(sizes)
factors[varname] = gross_size / net_size
return factors
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("infile", help="input netcdf file")
parser.add_argument("outfile", help="output zarr folder")
parser.add_argument("-O", "--optimization", default=1, type=int, help="optimization level")
args = parser.parse_args()
ds = xr.open_dataset(args.infile, decode_cf=False)
if args.optimization >= 1:
encoding = {
k: {"chunks": guess_chunks(v)}
for k, v in list(ds.items()) + list(ds.coords.items())
}
else:
encoding = {}
if os.path.exists(args.outfile):
shutil.rmtree(args.outfile)
ds.to_zarr(args.outfile,
mode="w",
consolidated=True,
encoding=encoding)
if args.optimization >= 2:
cf = find_compression_factors(ds, encoding, args.outfile)
encoding = {
k: {"chunks": guess_chunks(v, cf.get(k, 1))}
for k, v in list(ds.items()) + list(ds.coords.items())
}
shutil.rmtree(args.outfile)
ds.to_zarr(args.outfile,
mode="w",
consolidated=True,
encoding=encoding)
if __name__ == "__main__":
main()