Skip to content

Commit

Permalink
Code formatting done.
Browse files Browse the repository at this point in the history
  • Loading branch information
dabhicusp committed Nov 21, 2024
1 parent e9fbfb5 commit 2c440ee
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 97 deletions.
26 changes: 18 additions & 8 deletions src/arco_era5/update_config_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,31 @@ def get_month_range(date: datetime.date) -> t.Tuple[datetime.date, datetime.date


def get_last_sixth_date() -> t.Dict[str, t.Any]:
"""
Calculate the date six days ago from the current date.
Returns:
Dict[str, Any]: A dictionary with the key 'six_days_ago' and the corresponding date value.
"""
current_date = datetime.datetime.now().date()
sixth_last_date = current_date - datetime.timedelta(days=6)

return { 'last_sixth_date' : sixth_last_date}


def get_previous_month_dates(last_month: bool = False) -> MonthDates:
"""Return a dictionary containing the first and third previous month's dates from
"""Return a dictionary containing the first or third previous month's dates from
the current date.
Args:
last_month (bool, optional): If True, calculates relative to the last month.
Defaults to False (relative to the third last month).
Returns:
dict: A dictionary containing the following key-value pairs:
- 'first_day_third_prev': The first day of the third previous month
- 'first_day': The first day of the first/third previous month
(datetime.date).
- 'last_day_third_prev': The last day of the third previous month
- 'last_day': The last day of the first/third previous month
(datetime.date).
- 'sl_year': The year of the third previous month in 'YYYY' format (str).
- 'sl_month': The month of the third previous month in 'MM' format (str).
Expand All @@ -135,13 +145,13 @@ def get_previous_month_dates(last_month: bool = False) -> MonthDates:
today = datetime.date.today()
# Calculate the correct previous third month considering months from 1 to 12
third_prev_month = today - datetime.timedelta(days= (0 if last_month else 2)*366/12)
first_day_third_prev, last_day_third_prev = get_month_range(third_prev_month)
first_date_third_prev = first_day_third_prev
first_day, last_day = get_month_range(third_prev_month)
first_date_third_prev = first_day
sl_year, sl_month = str(first_date_third_prev)[:4], str(first_date_third_prev)[5:7]

return {
'first_day_third_prev': first_day_third_prev,
'last_day_third_prev': last_day_third_prev,
'first_day': first_day,
'last_day': last_day,
'sl_year': sl_year,
'sl_month': sl_month,
}
Expand Down Expand Up @@ -179,7 +189,7 @@ def add_licenses_in_config_files(directory: str, licenses: str) -> None:


def update_date_in_config_file(directory: str, dates_data: t.Dict[str, t.Any]) -> None:
"""Update the configuration files in the specified directory.
"""Update the date in the configuration files in the specified directory.
Parameters:
directory (str): The path to the directory containing the configuration files.
Expand Down
36 changes: 32 additions & 4 deletions src/arco_era5/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,23 @@ def parse_arguments_raw_to_zarr_to_bq(desc: str) -> t.Tuple[argparse.Namespace,
def raw_data_download_dataflow_job(python_path: str, project: str, region: str,
bucket: str, sdk_container_image: str,
manifest_location: str, directory: str,
type: str = None):
"""Launches a Dataflow job to process weather data."""
type: str = None) -> None:
"""
Launches a Dataflow job to download weather data.
Args:
python_path (str): Path to the Python executable.
project (str): Google Cloud project ID.
region (str): Google Cloud region.
bucket (str): Google Cloud Storage bucket for temporary data.
sdk_container_image (str): SDK container image for Dataflow.
manifest_location (str): Path to the manifest table.
directory (str): Directory containing input files.
job_type (Optional[str]): Type of job ('ERA5T_DAILY', 'ERA5T_MONTHLY', or None).
Raises:
subprocess.CalledProcessError: If the Dataflow job command fails.
"""

AR_FILES = [ f'{directory}/{file}' for file in AR_FILES ]
CO_MODEL_LEVEL_FILES = [ f'{directory}/{file}' for file in CO_MODEL_LEVEL_FILES ]
Expand Down Expand Up @@ -153,8 +168,21 @@ def raw_data_download_dataflow_job(python_path: str, project: str, region: str,


def data_splitting_dataflow_job(python_path: str, project: str, region: str,
bucket: str, sdk_container_image: str, date: str):
"""Launches a Dataflow job to splitting soil & pcp weather data."""
bucket: str, sdk_container_image: str, date: str) -> None:
"""
Launches a Dataflow job to splitting soil & pcp weather data for a given date.
Args:
python_path (str): Path to the Python executable.
project (str): Google Cloud project ID.
region (str): Google Cloud region.
bucket (str): Google Cloud Storage bucket for temporary data.
sdk_container_image (str): SDK container image for Dataflow.
date (str): The target date in 'YYYY-MM' format for the data splitting job.
Raises:
subprocess.CalledProcessError: If any of the Dataflow job commands fail.
"""
year = date[:4]
month = year + date[5:7]
typeOfLevel = '{' + 'typeOfLevel' + '}'
Expand Down
155 changes: 99 additions & 56 deletions src/raw-to-zarr-to-bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,45 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import logging
import os
import re
import zarr

import numpy as np
import xarray as xr

from concurrent.futures import ThreadPoolExecutor
from arco_era5 import (
GCP_DIRECTORY,
HOURS_PER_DAY,
INIT_DATE,
MODEL_LEVEL_WIND_VARIABLE,
MODEL_LEVEL_MOISTURE_VARIABLE,
MULTILEVEL_VARIABLES,
SINGLE_LEVEL_FORECAST_VARIABLE,
SINGLE_LEVEL_REANALYSIS_VARIABLE,
SINGLE_LEVEL_SURFACE_VARIABLE,
SINGLE_LEVEL_VARIABLES,
add_licenses_in_config_files,
check_data_availability,
data_splitting_dataflow_job,
date_range,
ingest_data_in_zarr_dataflow_job,
generate_input_paths,
generate_input_paths_of_ar_data,
generate_offset,
get_previous_month_dates,
get_secret,
ingest_data_in_zarr_dataflow_job,
offset_along_time_axis,
opener,
parse_arguments_raw_to_zarr_to_bq,
raw_data_download_dataflow_job,
replace_non_alphanumeric_with_hyphen,
update_zarr_metadata,
subprocess_run,
update_date_in_config_file,
update_target_path_in_config_file,
GCP_DIRECTORY,
HOURS_PER_DAY,
INIT_DATE,
MODEL_LEVEL_WIND_VARIABLE,
MODEL_LEVEL_MOISTURE_VARIABLE,
MULTILEVEL_VARIABLES,
SINGLE_LEVEL_FORECAST_VARIABLE,
SINGLE_LEVEL_REANALYSIS_VARIABLE,
SINGLE_LEVEL_SURFACE_VARIABLE,
SINGLE_LEVEL_VARIABLES,
update_zarr_metadata,
)

# Logger Configuration
Expand All @@ -69,14 +67,22 @@
API_KEY_PATTERN = re.compile(r"^API_KEY_\d+$")
API_KEY_LIST = []

SPLITTING_DATASETS = ['soil', 'pcp']
BQ_TABLES_LIST = json.loads(os.environ.get("BQ_TABLES_LIST"))
REGION_LIST = json.loads(os.environ.get("REGION_LIST"))
TEMP_TARGET_PATH = "gs://gcp-public-data-arco-era5/raw-era5"

zarr_to_netcdf_file_mapping = {
'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3': MULTILEVEL_VARIABLES + SINGLE_LEVEL_VARIABLES,
'gs://gcp-public-data-arco-era5/co/model-level-moisture.zarr-v2' : MODEL_LEVEL_MOISTURE_VARIABLE,
'gs://gcp-public-data-arco-era5/co/model-level-wind.zarr-v2' : MODEL_LEVEL_WIND_VARIABLE,
'gs://gcp-public-data-arco-era5/co/single-level-forecast.zarr-v2': SINGLE_LEVEL_FORECAST_VARIABLE,
'gs://gcp-public-data-arco-era5/co/single-level-reanalysis.zarr-v2': SINGLE_LEVEL_REANALYSIS_VARIABLE,
'gs://gcp-public-data-arco-era5/co/single-level-surface.zarr-v2': SINGLE_LEVEL_SURFACE_VARIABLE
}

dates_data = get_previous_month_dates()
data_date_range = date_range(
dates_data["first_day_third_prev"], dates_data["last_day_third_prev"]
dates_data["first_day"], dates_data["last_day"]
)
start_date = data_date_range[0].strftime("%Y/%m/%d")
end_date = data_date_range[-1].strftime("%Y/%m/%d")
Expand Down Expand Up @@ -133,53 +139,89 @@ def perform_data_operations(z_file: str, table: str, region: str, start_date: st
logger.error(
f"An error occurred in process_zarr_and_table for {z_file}: {str(e)}")

zarr_to_netcdf_file_mapping = {
'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3': MULTILEVEL_VARIABLES + SINGLE_LEVEL_VARIABLES,
'gs://gcp-public-data-arco-era5/co/model-level-moisture.zarr-v2' : MODEL_LEVEL_MOISTURE_VARIABLE,
'gs://gcp-public-data-arco-era5/co/model-level-wind.zarr-v2' : MODEL_LEVEL_WIND_VARIABLE,
'gs://gcp-public-data-arco-era5/co/single-level-forecast.zarr-v2': SINGLE_LEVEL_FORECAST_VARIABLE,
'gs://gcp-public-data-arco-era5/co/single-level-reanalysis.zarr-v2': SINGLE_LEVEL_REANALYSIS_VARIABLE,
'gs://gcp-public-data-arco-era5/co/single-level-surface.zarr-v2': SINGLE_LEVEL_SURFACE_VARIABLE
}

def open_dataset_from_url(url: str, engine: str) -> xr.Dataset:
"""
Opens a dataset from a given URL using the specified engine(i.e. Netcdf4, grib).
def open_dataset(url: str, engine: str):
Args:
url (str): The URL of the dataset to open.
engine (str): The engine to use for reading the dataset.
Returns:
xr.Dataset: The opened xarray dataset.
"""
with opener(url) as file:
ds = xr.open_dataset(file, engine=engine)
return ds
return xr.open_dataset(file, engine=engine)


def compare_datasets(url1: str, url2: str, engine: str) -> bool:
"""
Compares two xarray datasets from given URLs to check if they are equal.
Args:
url1 (str): The URL of the first dataset.
url2 (str): The URL of the second dataset.
engine (str): The engine to use for reading the datasets.
Returns:
bool: True if the datasets are equal, False otherwise.
"""
dataset1 = open_dataset_from_url(url1, engine)
dataset2 = open_dataset_from_url(url2, engine)

def data_comparison(url1: str, url2: str, engine: str) -> bool:
data1 = open_dataset(url1, engine)
data2 = open_dataset(url2, engine)
return dataset1.equals(dataset2)

return data1.equals(data2)

def calculate_time_offset_for_ar_data(raw_file: str) -> slice:
"""
Calculates the time offset for AR data.
Args:
raw_file (str): Path to the raw data file.
def get_offset_region_for_ar_data(raw_file: str) -> slice:
Returns:
slice: The time offset region as a slice object.
"""
year, month, day = raw_file.split('/')[5:8]
offset = offset_along_time_axis(INIT_DATE, year, month, day)
region = slice(offset, offset + HOURS_PER_DAY)
return region
return slice(offset, offset + HOURS_PER_DAY)


def update_era5t_data_with_era5_data(z_file: str, new_raw_file: str, engine: str):
def synchronize_era5t_with_era5_data(z_file: str, new_raw_file: str, engine: str) -> None:
"""
Updates ERA5T data using corresponding ERA5 data.
Args:
z_file (str): Path to the Zarr file containing ERA5T data.
new_raw_file (str): Path to the raw ERA5 data file.
engine (str): The engine used for reading raw the datasets.
Returns:
None
"""
zf = zarr.open(z_file)
ds = open_dataset(new_raw_file, engine)
logger.info(f'data updation starts for {z_file}.')
if "model-level" or "single-level" in z_file:
single_level = True if "single-level" in z_file else False
ds = open_dataset_from_url(new_raw_file, engine)
logger.info(f"Data update process starts for {z_file}.")

# Determine the appropriate time region
if "model-level" in z_file or "single-level" in z_file:
single_level = "single-level" in z_file
region, _ = generate_offset(new_raw_file, single_level, INIT_DATE, HOURS_PER_DAY)
else:
region = get_offset_region_for_ar_data(new_raw_file)
region = calculate_time_offset_for_ar_data(new_raw_file)

# Update variables
for variable_name in ds.data_vars:
zv = zf[variable_name]
zv[region] = ds[variable_name].values

logger.info(f"Data update process completed for {z_file}.")

for vname in ds.data_vars:
zv = zf[vname]
zv[region] = ds[vname].values
logger.info(f'data updation completed for {z_file}.')

def update_era5t_data(z_file: str) -> None:
"""Synchronizes ERA5T data with ERA5 data."""

def update_era5t_data(z_file: str):
variables = zarr_to_netcdf_file_mapping[z_file]
era5t_raw_files = []
if "model-level" in z_file:
Expand All @@ -190,17 +232,18 @@ def update_era5t_data(z_file: str):
for date in data_date_range:
era5t_raw_files.extend(generate_input_paths_of_ar_data(date, variables))

era5_raw_files = (map(lambda url: url.replace("gs://gcp-public-data-arco-era5/raw",
TEMP_TARGET_PATH), era5t_raw_files))
era5_raw_files = [
url.replace("gs://gcp-public-data-arco-era5/raw", TEMP_TARGET_PATH)
for url in era5t_raw_files
]

for old_file, new_file in zip(era5t_raw_files, era5_raw_files):
logger.info(f"data comparison of {old_file} is started.")
engine = 'netcdf4' if "/ar/" in z_file else 'cfgrib'
equal_data = data_comparison(old_file, new_file, engine)
if not equal_data:
logger.info(f'data is not equal for the file {new_file}.')
update_era5t_data_with_era5_data(z_file, new_file, engine)
logger.info(f"data comparison of {old_file} is completed.")
logger.info(f"Comparing data between {old_file} and {new_file}.")
engine = "netcdf4" if "/ar/" in z_file else "cfgrib"
if not compare_datasets(old_file, new_file, engine):
logger.info(f"Data mismatch detected for {new_file}. Updating data.")
synchronize_era5t_with_era5_data(z_file, new_file, engine)
logger.info(f"Data comparison completed for {old_file}.")


if __name__ == "__main__":
Expand Down Expand Up @@ -234,7 +277,7 @@ def update_era5t_data(z_file: str):
logger.info("Raw data Splitting started.")
data_splitting_dataflow_job(PYTHON_PATH, PROJECT, REGION, BUCKET,
WEATHER_TOOLS_SDK_CONTAINER_IMAGE,
dates_data['first_day_third_prev'].strftime("%Y/%m"))
dates_data['first_day'].strftime("%Y/%m"))
logger.info("Raw data Splitting successfully.")

logger.info("Data availability check started.")
Expand All @@ -248,7 +291,7 @@ def update_era5t_data(z_file: str):
MANIFEST_LOCATION, DIRECTORY)
data_splitting_dataflow_job(PYTHON_PATH, PROJECT, REGION, BUCKET,
WEATHER_TOOLS_SDK_CONTAINER_IMAGE,
dates_data['first_day_third_prev'].strftime("%Y/%m"))
dates_data['first_day'].strftime("%Y/%m"))
logger.info("Data availability check completed successfully.")

# update raw ERA5T data with the ERA5. ## Pending
Expand Down
Loading

0 comments on commit 2c440ee

Please sign in to comment.