From 898c44114b353754bf93f665981851abf3d785bc Mon Sep 17 00:00:00 2001 From: Darshan Prajapati Date: Thu, 14 Sep 2023 11:23:17 +0000 Subject: [PATCH 1/3] Data Seeding Scripts For Analysis Ready Dataset --- src/arco_era5/__init__.py | 5 +- src/arco_era5/source_data.py | 78 +++++++++++- src/arco_era5/update.py | 36 ++++++ src/netcdf_to_zarr.py | 241 ++++++++++++++++------------------- src/update-data.py | 33 +++++ 5 files changed, 254 insertions(+), 139 deletions(-) create mode 100644 src/arco_era5/update.py create mode 100644 src/update-data.py diff --git a/src/arco_era5/__init__.py b/src/arco_era5/__init__.py index 95a3810..c58b04d 100644 --- a/src/arco_era5/__init__.py +++ b/src/arco_era5/__init__.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from .source_data import GCP_DIRECTORY,SINGLE_LEVEL_VARIABLES,MULTILEVEL_VARIABLES,PRESSURE_LEVELS_GROUPS, TIME_RESOLUTION_HOURS -from .source_data import get_var_attrs_dict, read_multilevel_vars, read_single_level_vars, daily_date_iterator, align_coordinates, parse_arguments +from .source_data import GCP_DIRECTORY,STATIC_VARIABLES,SINGLE_LEVEL_VARIABLES,MULTILEVEL_VARIABLES,PRESSURE_LEVELS_GROUPS, TIME_RESOLUTION_HOURS, HOURS_PER_DAY +from .source_data import get_var_attrs_dict, read_multilevel_vars, read_static_vars, read_single_level_vars, daily_date_iterator, align_coordinates, parse_arguments, get_pressure_levels_arg, LoadTemporalDataForDateDoFn from .pangeo import run, parse_args +from .update import UpdateSlice diff --git a/src/arco_era5/source_data.py b/src/arco_era5/source_data.py index e1fd9dd..132248a 100644 --- a/src/arco_era5/source_data.py +++ b/src/arco_era5/source_data.py @@ -2,17 +2,20 @@ __author__ = 'Matthew Willson, Alvaro Sanchez, Peter Battaglia, Stephan Hoyer, Stephan Rasp' +import apache_beam as beam import argparse +import datetime import fsspec import immutabledict +import logging import pathlib -import xarray import numpy as np import pandas as pd import typing as t import xarray as xr +import xarray_beam as xb TIME_RESOLUTION_HOURS = 1 @@ -334,6 +337,7 @@ "geopotential_at_surface": "geopotential" } +HOURS_PER_DAY = 24 def _read_nc_dataset(gpath_file): """Read a .nc NetCDF dataset from a cloud storage path and disk. @@ -352,7 +356,7 @@ def _read_nc_dataset(gpath_file): """ path = str(gpath_file).replace('gs:/', 'gs://') with fsspec.open(path, mode="rb") as fid: - dataset = xarray.open_dataset(fid, engine="scipy", cache=False) + dataset = xr.open_dataset(fid, engine="scipy", cache=False) # All dataset have a single data array in them, so we just return the array. assert len(dataset) == 1 dataarray = next(iter(dataset.values())) @@ -412,7 +416,7 @@ def read_single_level_vars(year, month, day, variables=SINGLE_LEVEL_VARIABLES, relative_path = SINGLE_LEVEL_SUBDIR_TEMPLATE.format( year=year, month=month, day=day, variable=era5_variable) output[variable] = _read_nc_dataset(root_path / relative_path) - return xarray.Dataset(output) + return xr.Dataset(output) def read_multilevel_vars(year, @@ -451,8 +455,8 @@ def read_multilevel_vars(year, single_level_data_array.coords["level"] = pressure_level pressure_data.append( single_level_data_array.expand_dims(dim="level", axis=1)) - output[variable] = xarray.concat(pressure_data, dim="level") - return xarray.Dataset(output) + output[variable] = xr.concat(pressure_data, dim="level") + return xr.Dataset(output) def get_var_attrs_dict(root_path=GCP_DIRECTORY): @@ -558,6 +562,62 @@ def align_coordinates(dataset: xr.Dataset) -> xr.Dataset: return dataset +def get_pressure_levels_arg(pressure_levels_group: str): + return PRESSURE_LEVELS_GROUPS[pressure_levels_group] + +class LoadTemporalDataForDateDoFn(beam.DoFn): + def __init__(self, data_path, start_date, pressure_levels_group): + self.data_path = data_path + self.start_date = start_date + self.pressure_levels_group = pressure_levels_group + + def process(self, args): + + """Loads temporal data for a day, with an xarray_beam key for it..""" + year, month, day = args + logging.info("Loading NetCDF files for %d-%d-%d", year, month, day) + + try: + single_level_vars = read_single_level_vars( + year, + month, + day, + variables=SINGLE_LEVEL_VARIABLES, + root_path=self.data_path) + multilevel_vars = read_multilevel_vars( + year, + month, + day, + variables=MULTILEVEL_VARIABLES, + pressure_levels=get_pressure_levels_arg(self.pressure_levels_group), + root_path=self.data_path) + except BaseException as e: + # Make sure we print the date as part of the error for easier debugging + # if something goes wrong. Note "from e" will also raise the details of the + # original exception. + raise Exception(f"Error loading {year}-{month}-{day}") from e + + # It is crucial to actually "load" as otherwise we get a pickle error. + single_level_vars = single_level_vars.load() + multilevel_vars = multilevel_vars.load() + + dataset = xr.merge([single_level_vars, multilevel_vars]) + dataset = align_coordinates(dataset) + offsets = {"latitude": 0, "longitude": 0, "level": 0, + "time": offset_along_time_axis(self.start_date, year, month, day)} + key = xb.Key(offsets, vars=set(dataset.data_vars.keys())) + logging.info("Finished loading NetCDF files for %s-%s-%s", year, month, day) + yield key, dataset + dataset.close() + +def offset_along_time_axis(start_date: str, year: int, month: int, day: int) -> int: + """Offset in indices along the time axis, relative to start of the dataset.""" + # Note the length of years can vary due to leap years, so the chunk lengths + # will not always be the same, and we need to do a proper date calculation + # not just multiply by 365*24. + time_delta = pd.Timestamp( + year=year, month=month, day=day) - pd.Timestamp(start_date) + return time_delta.days * HOURS_PER_DAY // TIME_RESOLUTION_HOURS def parse_arguments(desc: str) -> t.Tuple[argparse.Namespace, t.List[str]]: """Parse command-line arguments for the data processing pipeline. @@ -580,8 +640,6 @@ def parse_arguments(desc: str) -> t.Tuple[argparse.Namespace, t.List[str]]: help='Start date, iso format string.') parser.add_argument('-e', "--end_date", default='2020-01-02', help='End date, iso format string.') - parser.add_argument("--temp_location", type=str, required=True, - help="A temp location where this data is stored temporarily.") parser.add_argument('--find-missing', action='store_true', default=False, help='Print all paths to missing input data.') # implementation pending parser.add_argument("--pressure_levels_group", type=str, default="weatherbench_13", @@ -589,5 +647,11 @@ def parse_arguments(desc: str) -> t.Tuple[argparse.Namespace, t.List[str]]: parser.add_argument("--time_chunk_size", type=int, required=True, help="Number of 1-hourly timesteps to include in a \ single chunk. Must evenly divide 24.") + parser.add_argument("--init_date", type=str, default='1900-01-01', + help="Date to initialize the zarr store.") + parser.add_argument("--from_init_date", action='store_true', default=False, + help="To initialize the store from some previous date (--init_date). i.e. 1900-01-01") + parser.add_argument("--only_initialize_store", action='store_true', default=False, + help="Initialize zarr store without data.") return parser.parse_known_args() diff --git a/src/arco_era5/update.py b/src/arco_era5/update.py new file mode 100644 index 0000000..6b73d98 --- /dev/null +++ b/src/arco_era5/update.py @@ -0,0 +1,36 @@ +import apache_beam as beam +import datetime +import logging +import xarray as xr +import zarr as zr + +from arco_era5 import HOURS_PER_DAY +from dataclasses import dataclass +from typing import Tuple + +logger = logging.getLogger(__name__) + +def update(offset_ds: Tuple[int, xr.Dataset, str], target: str, init_date: str): + """Generate region slice and update zarr array directly""" + key, ds = offset_ds + offset = key.offsets['time'] + date = datetime.datetime.strptime(init_date, '%Y-%m-%d') + datetime.timedelta(days=offset / HOURS_PER_DAY) + date_str = date.strftime('%Y-%m-%d') + zf = zr.open(target) + region = slice(offset, offset + HOURS_PER_DAY) + for vname in ds.data_vars: + logger.info(f"Started {vname} for {date_str}") + zv = zf[vname] + zv[region] = ds[vname].values + logger.info(f"Done {vname} for {date_str}") + del zv + del ds + +@dataclass +class UpdateSlice(beam.PTransform): + + target: str + init_date: str + + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return pcoll | beam.Map(update, target=self.target, init_date=self.init_date) diff --git a/src/netcdf_to_zarr.py b/src/netcdf_to_zarr.py index 1bc8956..163b7c7 100644 --- a/src/netcdf_to_zarr.py +++ b/src/netcdf_to_zarr.py @@ -14,12 +14,38 @@ Example usage: - $ python src/netcdf_to_zarr.py \ + Generate zarr store from start_date with data + + python src/netcdf_to_zarr.py \ + --output_path="gs://gcp-public-data-arco-era5/ar/$USER-1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2" \ + --pressure_levels_group="full_37" \ + --time_chunk_size=1 \ + --start_date '1959-01-01' \ + --end_date '2021-12-31' \ + --runner DataflowRunner \ + --project $PROJECT \ + --region $REGION \ + --temp_location "gs://$BUCKET/tmp/" \ + --setup_file ./setup.py \ + --disk_size_gb 500 \ + --machine_type m1-ultramem-40 \ + --no_use_public_ips \ + --network=$NETWORK \ + --subnetwork=regions/$REGION/subnetworks/$SUBNET \ + --job_name $USER-ar-zarr-full \ + --number_of_worker_harness_threads 20 + + Generate zarr store from init_date and fill data from start_date. Default init_date will be 1900-01-01 + + ``` + python src/netcdf_to_zarr.py \ --output_path="gs://gcp-public-data-arco-era5/ar/$USER-1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2" \ --pressure_levels_group="full_37" \ --time_chunk_size=1 \ --start_date '1959-01-01' \ --end_date '2021-12-31' \ + --init_date '1900-01-01' \ + --from_init_date --runner DataflowRunner \ --project $PROJECT \ --region $REGION \ @@ -32,6 +58,46 @@ --subnetwork=regions/$REGION/subnetworks/$SUBNET \ --job_name $USER-ar-zarr-full \ --number_of_worker_harness_threads 20 + ``` + + Generate zarr store from init_date without data. Default init_date will be 1900-01-01. Static variables will be loaded. + + ``` + python src/netcdf_to_zarr.py \ + --output_path="gs://gcp-public-data-arco-era5/ar/$USER-1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2" \ + --pressure_levels_group="full_37" \ + --time_chunk_size=1 \ + --start_date '1959-01-01' \ + --end_date '2021-12-31' \ + --init_date '1800-01-01' \ + --from_init_date \ + --only_initialize_store + ``` + + Seed data in the existing store. + + ``` + python src/update-data.py \ + --output_path="gs://gcp-public-data-arco-era5/ar/$USER-1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2" \ + --pressure_levels_group="full_37" \ + --time_chunk_size=1 \ + --start_date '1959-01-01' \ + --end_date '2021-12-31' \ + --init_date '1900-01-01' \ + --runner DataflowRunner \ + --project $PROJECT \ + --region $REGION \ + --temp_location "gs://$BUCKET/tmp/" \ + --setup_file ./setup.py \ + --disk_size_gb 500 \ + --machine_type m1-ultramem-40 \ + --no_use_public_ips \ + --network=$NETWORK \ + --subnetwork=regions/$REGION/subnetworks/$SUBNET \ + --job_name $USER-ar-zarr-full \ + --number_of_worker_harness_threads 20 + ``` + """ # TODO(alvarosg): Make this pipeline resumable in case of error in the middle @@ -44,6 +110,7 @@ import logging import apache_beam as beam +import datetime import numpy as np import pandas as pd import typing as t @@ -52,38 +119,24 @@ from arco_era5 import ( GCP_DIRECTORY, + HOURS_PER_DAY, + STATIC_VARIABLES, SINGLE_LEVEL_VARIABLES, MULTILEVEL_VARIABLES, - PRESSURE_LEVELS_GROUPS, TIME_RESOLUTION_HOURS, + get_pressure_levels_arg, get_var_attrs_dict, read_multilevel_vars, read_single_level_vars, daily_date_iterator, align_coordinates, - parse_arguments + parse_arguments, + LoadTemporalDataForDateDoFn ) INPUT_PATH = GCP_DIRECTORY -_HOURS_PER_DAY = 24 # TODO(alvarosg): Add pressure level chunk size. - -def _get_pressure_levels_arg(pressure_levels_group: str): - """Get pressure levels based on a pressure levels group. - - Args: - pressure_levels_group (str): The group label for the set of pressure levels. - - Returns: - list: A list of pressure levels. - - Example: - >>> pressure_levels = _get_pressure_levels_arg("weatherbench_13") - """ - return PRESSURE_LEVELS_GROUPS[pressure_levels_group] - - def make_template(data_path: str, start_date: str, end_date: str, time_chunk_size: int, pressure_levels_group: str) -> t.Tuple[xa.Dataset, t.Dict[str, int]]: """Create a lazy template with the same dimensions, coordinates, and variables as expected results. @@ -115,24 +168,23 @@ def make_template(data_path: str, start_date: str, end_date: str, time_chunk_siz # Get some sample multi-level data to get coordinates, only for one var, # so it downloads quickly. logging.info("Downloading one variable of sample data for template.") - first_year, first_month, first_day = next(iter( - daily_date_iterator(start_date, end_date))) + date = datetime.datetime.strptime(end_date, '%Y-%m-%d') - datetime.timedelta(days=1) sample_multilevel_vars = align_coordinates( read_multilevel_vars( # Date is irrelevant. - first_year, - first_month, - first_day, + date.year, + date.month, + date.day, root_path=data_path, variables=MULTILEVEL_VARIABLES[:1], - pressure_levels=_get_pressure_levels_arg(pressure_levels_group))) + pressure_levels=get_pressure_levels_arg(pressure_levels_group))) logging.info("Finished downloading.") lat_size = sample_multilevel_vars.sizes["latitude"] lon_size = sample_multilevel_vars.sizes["longitude"] level_size = sample_multilevel_vars.sizes["level"] assert level_size == len( - _get_pressure_levels_arg(pressure_levels_group) + get_pressure_levels_arg(pressure_levels_group) ), "Mismatched level sizes" # Take the coordinates from the richer, multi-level dataset. @@ -166,88 +218,6 @@ def make_template(data_path: str, start_date: str, end_date: str, time_chunk_siz chunk_sizes = {"time": time_chunk_size} return xa.Dataset(template_dataset, coords=coords), chunk_sizes - -class LoadTemporalDataForDateDoFn(beam.DoFn): - """A Beam DoFn for loading temporal data for a specific date. - - This class is responsible for loading temporal data for a given date, including both single-level and multi-level variables. - - Args: - data_path (str): The path to the data source. - start_date (str): The start date in ISO format (YYYY-MM-DD). - pressure_levels_group (str): The group label for the set of pressure levels. - - Methods: - process(args): Loads temporal data for a specific date and yields it with an xarray_beam key. - - Example: - >>> data_path = "gs://your-bucket/data/" - >>> start_date = "2023-09-01" - >>> pressure_levels_group = "weatherbench_13" - >>> loader = LoadTemporalDataForDateDoFn(data_path, start_date, pressure_levels_group) - >>> for result in loader.process((2023, 9, 11)): - ... key, dataset = result - ... print(f"Loaded data for key: {key}") - ... print(dataset) - """ - def __init__(self, data_path, start_date, pressure_levels_group): - """Initialize the LoadTemporalDataForDateDoFn. - - Args: - data_path (str): The path to the data source. - start_date (str): The start date in ISO format (YYYY-MM-DD). - pressure_levels_group (str): The group label for the set of pressure levels. - """ - self.data_path = data_path - self.start_date = start_date - self.pressure_levels_group = pressure_levels_group - - def process(self, args): - """Load temporal data for a day, with an xarray_beam key for it. - - Args: - args (tuple): A tuple containing the year, month, and day. - - Yields: - tuple: A tuple containing an xarray_beam key and the loaded dataset. - """ - year, month, day = args - logging.info("Loading NetCDF files for %d-%d-%d", year, month, day) - - try: - single_level_vars = read_single_level_vars( - year, - month, - day, - variables=SINGLE_LEVEL_VARIABLES, - root_path=self.data_path) - multilevel_vars = read_multilevel_vars( - year, - month, - day, - variables=MULTILEVEL_VARIABLES, - pressure_levels=_get_pressure_levels_arg(self.pressure_levels_group), - root_path=self.data_path) - except BaseException as e: - # Make sure we print the date as part of the error for easier debugging - # if something goes wrong. Note "from e" will also raise the details of the - # original exception. - raise Exception(f"Error loading {year}-{month}-{day}") from e - - # It is crucial to actually "load" as otherwise we get a pickle error. - single_level_vars = single_level_vars.load() - multilevel_vars = multilevel_vars.load() - - dataset = xa.merge([single_level_vars, multilevel_vars]) - dataset = align_coordinates(dataset) - offsets = {"latitude": 0, "longitude": 0, "level": 0, - "time": offset_along_time_axis(self.start_date, year, month, day)} - key = xb.Key(offsets, vars=set(dataset.data_vars.keys())) - logging.info("Finished loading NetCDF files for %s-%s-%s", year, month, day) - yield key, dataset - dataset.close() - - def offset_along_time_axis(start_date: str, year: int, month: int, day: int) -> int: """Calculate the offset in indices along the time axis relative to the start date of the dataset. @@ -276,7 +246,7 @@ def offset_along_time_axis(start_date: str, year: int, month: int, day: int) -> # not just multiply by 365*24. time_delta = pd.Timestamp( year=year, month=month, day=day) - pd.Timestamp(start_date) - return time_delta.days * _HOURS_PER_DAY // TIME_RESOLUTION_HOURS + return time_delta.days * HOURS_PER_DAY // TIME_RESOLUTION_HOURS def define_pipeline( @@ -286,7 +256,10 @@ def define_pipeline( time_chunk_size: int, start_date: str, end_date: str, - pressure_levels_group: str + pressure_levels_group: str, + init_date: str, + from_init_date: bool, + only_initialize_store: bool ) -> t.Tuple[beam.Pipeline, beam.Pipeline]: """Define a Beam pipeline to convert ERA5 NetCDF files to Zarr format. @@ -319,7 +292,7 @@ def define_pipeline( """ template, chunk_sizes = make_template( - input_path, start_date, end_date, time_chunk_size, pressure_levels_group) + input_path, init_date if from_init_date else start_date, end_date, time_chunk_size, pressure_levels_group) # We will create a single `chunks_to_zarr` object, but connect it at the end # of the two pipelines separately. This causes the full transformation to be @@ -339,22 +312,24 @@ def define_pipeline( template=xb.make_template(template), zarr_chunks=chunk_sizes) - load_temporal_data_for_date_do_fn = LoadTemporalDataForDateDoFn( - data_path=input_path, - start_date=start_date, - pressure_levels_group=pressure_levels_group - ) - logging.info("Setting up temporal variables.") - temporal_variables_chunks = ( - root - | "DayIterator" >> beam.Create(daily_date_iterator(start_date, end_date)) - | "TemporalDataForDay" >> beam.ParDo(load_temporal_data_for_date_do_fn) - | xb.SplitChunks(chunk_sizes) - # We can skip the consolidation if we are using a `time_chunk_size` that - # evenly divides a day worth of data. - # | xb.ConsolidateChunks(chunk_sizes) - | "ChunksToZarrTemporal" >> chunks_to_zarr - ) + temporal_variables_chunks = None + if not only_initialize_store: + load_temporal_data_for_date_do_fn = LoadTemporalDataForDateDoFn( + data_path=input_path, + start_date=init_date if from_init_date else start_date, + pressure_levels_group=pressure_levels_group + ) + logging.info("Setting up temporal variables.") + temporal_variables_chunks = ( + root + | "DayIterator" >> beam.Create(daily_date_iterator(start_date, end_date)) + | "TemporalDataForDay" >> beam.ParDo(load_temporal_data_for_date_do_fn) + | xb.SplitChunks(chunk_sizes) + # We can skip the consolidation if we are using a `time_chunk_size` that + # evenly divides a day worth of data. + # | xb.ConsolidateChunks(chunk_sizes) + | "ChunksToZarrTemporal" >> chunks_to_zarr + ) logging.info("Finished defining pipeline.") return temporal_variables_chunks @@ -373,12 +348,15 @@ def main(): known_args, pipeline_args = parse_arguments( "Create a Zarr dataset from NetCDF files." ) + if known_args.init_date != "1900-01-01" and (not known_args.from_init_date): + raise RuntimeError("--init_date can only be used along with --from_init_date flag.") + pipeline_args.extend(['--save_main_session', 'True']) if fs.exists(known_args.output_path): raise ValueError(f"{known_args.output_path} already exists") - num_steps_per_day = _HOURS_PER_DAY // TIME_RESOLUTION_HOURS + num_steps_per_day = HOURS_PER_DAY // TIME_RESOLUTION_HOURS if num_steps_per_day % known_args.time_chunk_size != 0: raise ValueError( f"time_chunk_size {known_args.time_chunk_size} must evenly divide {num_steps_per_day}" @@ -392,7 +370,10 @@ def main(): start_date=known_args.start_date, end_date=known_args.end_date, time_chunk_size=known_args.time_chunk_size, - pressure_levels_group=known_args.pressure_levels_group + pressure_levels_group=known_args.pressure_levels_group, + init_date=known_args.init_date, + from_init_date=known_args.from_init_date, + only_initialize_store=known_args.only_initialize_store ) diff --git a/src/update-data.py b/src/update-data.py new file mode 100644 index 0000000..2dc8d9c --- /dev/null +++ b/src/update-data.py @@ -0,0 +1,33 @@ +import apache_beam as beam +from arco_era5 import daily_date_iterator, LoadTemporalDataForDateDoFn, GCP_DIRECTORY, UpdateSlice +import logging +import argparse +from typing import Tuple, List + +logging.getLogger().setLevel(logging.INFO) + +def parse_arguments(desc: str) -> Tuple[argparse.Namespace, List[str]]: + parser = argparse.ArgumentParser(description=desc) + + parser.add_argument("--output_path", type=str, required=True, + help="Path to the destination Zarr archive.") + parser.add_argument('-s', "--start_date", default='2020-01-01', + help='Start date, iso format string.') + parser.add_argument('-e', "--end_date", default='2020-01-02', + help='End date, iso format string.') + parser.add_argument("--pressure_levels_group", type=str, default="weatherbench_13", + help="Group label for the set of pressure levels to use.") + parser.add_argument("--init_date", type=str, default='1900-01-01', + help="Date to initialize the zarr store.") + + return parser.parse_known_args() + +known_args, pipeline_args = parse_arguments("Update Data Slice") + +with beam.Pipeline(argv=pipeline_args) as p: + path = ( + p + | "CreateDayIterator" >> beam.Create(daily_date_iterator(known_args.start_date, known_args.end_date)) + | "LoadDataForDay" >> beam.ParDo(LoadTemporalDataForDateDoFn(data_path=GCP_DIRECTORY, start_date=known_args.init_date, pressure_levels_group=known_args.pressure_levels_group)) + | "UpdateSlice" >> UpdateSlice(target=known_args.output_path, init_date=known_args.init_date) + ) From 3d6adc52075774372446ebf9eba9d8b0f695bc1e Mon Sep 17 00:00:00 2001 From: Darshan Prajapati Date: Fri, 15 Sep 2023 09:44:43 +0000 Subject: [PATCH 2/3] Some Minor Nits --- src/arco_era5/source_data.py | 9 +++++---- src/arco_era5/update.py | 36 ++++++++++++++++++------------------ 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/src/arco_era5/source_data.py b/src/arco_era5/source_data.py index 132248a..7fead45 100644 --- a/src/arco_era5/source_data.py +++ b/src/arco_era5/source_data.py @@ -376,12 +376,12 @@ def _read_nc_dataset(gpath_file): # and: https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation#ERA5:datadocumentation-Dataupdatefrequency # pylint: disable=line-too-long # for further details. - all_dims_except_time = tuple(set(dataarray.dims) - {"time"}) + all_dims_except_time = tuple(set(dataarray.dims) - {"time", "expver"}) # Should have only trailing nans. a = dataarray.sel(expver=1).isnull().any(dim=all_dims_except_time) # Should having only leading nans. b = dataarray.sel(expver=5).isnull().any(dim=all_dims_except_time) - disjoint_nans = bool(next(iter((a ^ b).all().data_vars.values()))) + disjoint_nans = bool((a ^ b).all().variable.values) assert disjoint_nans, "The nans are not disjoint in expver=1 vs 5" dataarray = dataarray.sel(expver=1).combine_first(dataarray.sel(expver=5)) return dataarray @@ -565,6 +565,7 @@ def align_coordinates(dataset: xr.Dataset) -> xr.Dataset: def get_pressure_levels_arg(pressure_levels_group: str): return PRESSURE_LEVELS_GROUPS[pressure_levels_group] + class LoadTemporalDataForDateDoFn(beam.DoFn): def __init__(self, data_path, start_date, pressure_levels_group): self.data_path = data_path @@ -572,8 +573,7 @@ def __init__(self, data_path, start_date, pressure_levels_group): self.pressure_levels_group = pressure_levels_group def process(self, args): - - """Loads temporal data for a day, with an xarray_beam key for it..""" + """Loads temporal data for a day, with an xarray_beam key for it.""" year, month, day = args logging.info("Loading NetCDF files for %d-%d-%d", year, month, day) @@ -610,6 +610,7 @@ def process(self, args): yield key, dataset dataset.close() + def offset_along_time_axis(start_date: str, year: int, month: int, day: int) -> int: """Offset in indices along the time axis, relative to start of the dataset.""" # Note the length of years can vary due to leap years, so the chunk lengths diff --git a/src/arco_era5/update.py b/src/arco_era5/update.py index 6b73d98..018fd2c 100644 --- a/src/arco_era5/update.py +++ b/src/arco_era5/update.py @@ -2,7 +2,7 @@ import datetime import logging import xarray as xr -import zarr as zr +import zarr from arco_era5 import HOURS_PER_DAY from dataclasses import dataclass @@ -10,27 +10,27 @@ logger = logging.getLogger(__name__) -def update(offset_ds: Tuple[int, xr.Dataset, str], target: str, init_date: str): - """Generate region slice and update zarr array directly""" - key, ds = offset_ds - offset = key.offsets['time'] - date = datetime.datetime.strptime(init_date, '%Y-%m-%d') + datetime.timedelta(days=offset / HOURS_PER_DAY) - date_str = date.strftime('%Y-%m-%d') - zf = zr.open(target) - region = slice(offset, offset + HOURS_PER_DAY) - for vname in ds.data_vars: - logger.info(f"Started {vname} for {date_str}") - zv = zf[vname] - zv[region] = ds[vname].values - logger.info(f"Done {vname} for {date_str}") - del zv - del ds - @dataclass class UpdateSlice(beam.PTransform): target: str init_date: str + def apply(self, offset_ds: Tuple[int, xr.Dataset, str]): + """Generate region slice and update zarr array directly""" + key, ds = offset_ds + offset = key.offsets['time'] + date = datetime.datetime.strptime(self.init_date, '%Y-%m-%d') + datetime.timedelta(days=offset / HOURS_PER_DAY) + date_str = date.strftime('%Y-%m-%d') + zf = zarr.open(self.target) + region = slice(offset, offset + HOURS_PER_DAY) + for vname in ds.data_vars: + logger.info(f"Started {vname} for {date_str}") + zv = zf[vname] + zv[region] = ds[vname].values + logger.info(f"Done {vname} for {date_str}") + del zv + del ds + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: - return pcoll | beam.Map(update, target=self.target, init_date=self.init_date) + return pcoll | beam.Map(self.apply) From 8e61de27384aa32182df593db1a3dd870145a293 Mon Sep 17 00:00:00 2001 From: Darshan Prajapati Date: Tue, 19 Sep 2023 12:20:21 +0000 Subject: [PATCH 3/3] Add inclusive --- src/arco_era5/__init__.py | 4 ++-- src/netcdf_to_zarr.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/arco_era5/__init__.py b/src/arco_era5/__init__.py index c58b04d..e6ae499 100644 --- a/src/arco_era5/__init__.py +++ b/src/arco_era5/__init__.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from .source_data import GCP_DIRECTORY,STATIC_VARIABLES,SINGLE_LEVEL_VARIABLES,MULTILEVEL_VARIABLES,PRESSURE_LEVELS_GROUPS, TIME_RESOLUTION_HOURS, HOURS_PER_DAY -from .source_data import get_var_attrs_dict, read_multilevel_vars, read_static_vars, read_single_level_vars, daily_date_iterator, align_coordinates, parse_arguments, get_pressure_levels_arg, LoadTemporalDataForDateDoFn +from .source_data import GCP_DIRECTORY,SINGLE_LEVEL_VARIABLES,MULTILEVEL_VARIABLES,PRESSURE_LEVELS_GROUPS, TIME_RESOLUTION_HOURS, HOURS_PER_DAY +from .source_data import get_var_attrs_dict, read_multilevel_vars, read_single_level_vars, daily_date_iterator, align_coordinates, parse_arguments, get_pressure_levels_arg, LoadTemporalDataForDateDoFn from .pangeo import run, parse_args from .update import UpdateSlice diff --git a/src/netcdf_to_zarr.py b/src/netcdf_to_zarr.py index 163b7c7..451a06f 100644 --- a/src/netcdf_to_zarr.py +++ b/src/netcdf_to_zarr.py @@ -120,14 +120,12 @@ from arco_era5 import ( GCP_DIRECTORY, HOURS_PER_DAY, - STATIC_VARIABLES, SINGLE_LEVEL_VARIABLES, MULTILEVEL_VARIABLES, TIME_RESOLUTION_HOURS, get_pressure_levels_arg, get_var_attrs_dict, read_multilevel_vars, - read_single_level_vars, daily_date_iterator, align_coordinates, parse_arguments, @@ -193,6 +191,7 @@ def make_template(data_path: str, start_date: str, end_date: str, time_chunk_siz pd.Timestamp(start_date), pd.Timestamp(end_date), freq=pd.DateOffset(hours=TIME_RESOLUTION_HOURS), + inclusive="left" ).values time_size = len(coords["time"])