Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add components for converting satwnd amv goes data from bufr dump to ioda. #13

Open
wants to merge 43 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
d3478fd
Add mapping file, python scripts, and shell scripts for satwind goes
Nov 13, 2024
7277cc5
remove test
Nov 18, 2024
a4efbb2
Update components (mapping, Python script, configuration) for satwind
Nov 18, 2024
5689332
remove old yaml
Nov 19, 2024
16db06f
Add configuration files and test shell script
Nov 19, 2024
927bae2
rename configuration yaml
emilyhcliu Nov 19, 2024
9dfd16d
Add temporary testing script
emilyhcliu Nov 19, 2024
54305c9
Add README file
emilyhcliu Nov 19, 2024
89f429a
rename README to README.md
emilyhcliu Nov 19, 2024
b115130
Update README.md
emilyhcliu Nov 19, 2024
f0ba20f
Update readme file
emilyhcliu Nov 19, 2024
8d324d0
Update README.md
emilyhcliu Nov 19, 2024
d70867f
Update README
emilyhcliu Nov 19, 2024
e5a95e7
rename process_bufr2ioda to bufr2ioda.sh
emilyhcliu Nov 19, 2024
7dbc826
Update README.md
emilyhcliu Nov 19, 2024
6346844
Update README.md
emilyhcliu Nov 19, 2024
f91d1d1
Update README.md
emilyhcliu Nov 19, 2024
8ba4d7b
Update README.md
emilyhcliu Nov 19, 2024
87096eb
Update readme
emilyhcliu Nov 19, 2024
3e7789c
Add comments
emilyhcliu Nov 19, 2024
a645370
update comments
emilyhcliu Nov 19, 2024
c0fdb95
update bufr2ioda.sh
emilyhcliu Nov 19, 2024
c262e3d
update usage
emilyhcliu Nov 19, 2024
dde4d27
Add comments
emilyhcliu Nov 19, 2024
d080c23
add cycle in input path
emilyhcliu Nov 20, 2024
09d0061
Modify global attribute in the mapping file
emilyhcliu Nov 20, 2024
8f23b5b
Merge branch 'develop' into feature/dump_satwind_goes
emilyhcliu Nov 21, 2024
cd74eee
remove . before bufr2ioda.sh
emilyhcliu Nov 21, 2024
6351d34
Update README.md
emilyhcliu Nov 21, 2024
b2ea3ae
Update README.md
emilyhcliu Nov 22, 2024
d678190
Update README.md
emilyhcliu Nov 22, 2024
67311d6
remove wxflow from the test script
emilyhcliu Nov 22, 2024
428e600
remove wxflow from input
emilyhcliu Nov 22, 2024
6564a18
Update README.md
emilyhcliu Nov 22, 2024
f757393
Add comment block for logger
emilyhcliu Nov 24, 2024
77f2e92
add bufrtype (this is bufr dump list)
emilyhcliu Nov 24, 2024
00cd504
Update documentation
emilyhcliu Nov 24, 2024
9d9e821
Add bufrtype and update README
emilyhcliu Nov 24, 2024
e9b8833
Add split_by_category input
emilyhcliu Nov 25, 2024
aec8e56
Update README
emilyhcliu Nov 25, 2024
5679a32
Update README.md
emilyhcliu Nov 25, 2024
70f90d9
Rename bufr2ioda to encodeBufr and update readme
emilyhcliu Nov 25, 2024
1365772
Update README.md
emilyhcliu Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions dump/config/bufr2ioda_bufr_backend_satwnd_amv_goes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
time window:
begin: "2018-04-14T21:00:00Z"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just noting that we will want to template these if they are not in a fixed 'test' directory

end: "2023-12-15T03:00:00Z"

observations:
- obs space:
name: "satwind_goes-16"
simulated variables: [windDirection, windSpeed]
obsdatain:
engine:
type: bufr
obsfile: "./testinput/2021080100/gdas.t00z.satwnd.tm00.bufr_d"
mapping file: "./bufr2ioda_satwnd_amv_goes_mapping.yaml"
category: ["goes-16"]
cache categories: # optional
- ["goes-16"]
- ["goes-17"]
- ["goes-18"]
obsdataout:
engine:
type: H5File
obsfile: "./testoutput/2021080100/bufr_backend/gdas.t00z.satwnd.abi_goes-16.tm00.nc"

- obs space:
name: "satwind_goes-17"
simulated variables: [windDirection, windSpeed]
obsdatain:
engine:
type: bufr
obsfile: "./testinput/2021080100/gdas.t00z.satwnd.tm00.bufr_d"
mapping file: "./bufr2ioda_satwnd_amv_goes_mapping.yaml"
category: ["goes-17"]
cache categories: # optional
- ["goes-16"]
- ["goes-17"]
- ["goes-18"]
obsdataout:
engine:
type: H5File
obsfile: "./testoutput/2021080100/bufr_backend/gdas.t00z.satwnd.abi_goes-17.tm00.nc"

- obs space:
name: "satwind_goes-18"
simulated variables: [windDirection, windSpeed]
obsdatain:
engine:
type: bufr
obsfile: "./testinput/2021080100/gdas.t00z.satwnd.tm00.bufr_d"
mapping file: "./bufr2ioda_satwnd_amv_goes_mapping.yaml"
category: ["goes-18"]
cache categories: # optional
- ["goes-16"]
- ["goes-17"]
- ["goes-18"]
obsdataout:
engine:
type: H5File
obsfile: "./testoutput/2021080100/bufr_backend/gdas.t00z.satwnd.abi_goes-18.tm00.nc"
58 changes: 58 additions & 0 deletions dump/config/bufr2ioda_script_backend_satwnd_amv_goes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
time window:
begin: "2018-04-14T21:00:00Z"
end: "2023-12-15T03:00:00Z"

observations:
- obs space:
name: "satwind_goes-16"
observed variables: [windSpeed, windDirection]
derived variables: [windEastward, windNorthward]
simulated variables: [windEastward, windNorthward]
obsdatain:
engine:
type: script
script file: "bufr2ioda_satwnd_amv_goes.py"
args:
input_path: "./testinput/2021080100/gdas.t00z.satwnd.tm00.bufr_d"
mapping_path: "./bufr2ioda_satwnd_amv_goes_mapping.yaml"
category: "goes-16"
obsdataout:
engine:
type: H5File
obsfile: "./testoutput/2021080100/script_backend/gdas.t00z.satwnd.abi_goes-16.tm00.nc"

- obs space:
name: "satwind_goes-17"
observed variables: [windSpeed, windDirection]
derived variables: [windEastward, windNorthward]
simulated variables: [windEastward, windNorthward]
obsdatain:
engine:
type: script
script file: "bufr2ioda_satwnd_amv_goes.py"
args:
input_path: "./testinput/2021080100/gdas.t00z.satwnd.tm00.bufr_d"
mapping_path: "./bufr2ioda_satwnd_amv_goes_mapping.yaml"
category: "goes-17"
obsdataout:
engine:
type: H5File
obsfile: "./testoutput/2021080100/script_backend/gdas.t00z.satwnd.abi_goes-17.tm00.nc"

- obs space:
name: "satwind_goes-18"
observed variables: [windSpeed, windDirection]
derived variables: [windEastward, windNorthward]
simulated variables: [windEastward, windNorthward]
obsdatain:
engine:
type: script
script file: "bufr2ioda_satwnd_amv_goes.py"
args:
input_path: "./testinput/2021080100/gdas.t00z.satwnd.tm00.bufr_d"
mapping_path: "./bufr2ioda_satwnd_amv_goes_mapping.yaml"
category: "goes-18"
obsdataout:
engine:
type: H5File
obsfile: "./testoutput/2021080100/script_backend/gdas.t00z.satwnd.abi_goes-18.tm00.nc"
271 changes: 271 additions & 0 deletions dump/mapping/bufr2ioda_satwnd_amv_goes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
#!/usr/bin/env python3
import sys
import os
import argparse
import time
import numpy as np
import bufr
from pyioda.ioda.Engines.Bufr import Encoder as iodaEncoder
from bufr.encoders.netcdf import Encoder as netcdfEncoder
from wxflow import Logger

# Initialize Logger
# Get log level from the environment variable, default to 'INFO it not set
log_level = os.getenv('LOG_LEVEL', 'INFO')
logger = Logger('BUFR2IODA_satwnd_amv_goes.py', level=log_level, colored_log=False)

def logging(comm, level, message):

if comm.rank() == 0:
# Define a dictionary to map levels to logger methods
log_methods = {
'DEBUG': logger.debug,
'INFO': logger.info,
'WARNING': logger.warning,
'ERROR': logger.error,
'CRITICAL': logger.critical,
}

# Get the appropriate logging method, default to 'INFO'
log_method = log_methods.get(level.upper(), logger.info)

if log_method == logger.info and level.upper() not in log_methods:
# Log a warning if the level is invalid
logger.warning(f'log level = {level}: not a valid level --> set to INFO')

# Call the logging method
log_method(message)

def _make_description(mapping_path, update=False):
description = bufr.encoders.Description(mapping_path)

if update:
# Define the variables to be added in a list of dictionaries
variables = [
{
'name': 'ObsType/windEastward',
'source': 'variables/obstype_uwind',
'units': '1',
'longName': 'Observation Type based on Satellite-derived Wind Computation Method and Spectral Band',
},
{
'name': 'ObsType/windNorthward',
'source': 'variables/obstype_vwind',
'units': '1',
'longName': 'Observation Type based on Satellite-derived Wind Computation Method and Spectral Band',
},
{
'name': 'ObsValue/windEastward',
'source': 'variables/windEastward',
'units': 'm/s',
'longName': 'Eastward Wind Component',
},
{
'name': 'ObsValue/windNorthward',
'source': 'variables/windNorthward',
'units': 'm/s',
'longName': 'Northward Wind Component',
},
]

# Loop through each variable and add it to the description
for var in variables:
description.add_variable(
name=var['name'],
source=var['source'],
units=var['units'],
longName=var['longName']
)

return description

def compute_wind_components(wdir, wspd):
"""
Compute the U and V wind components from wind direction and wind speed.

Parameters:
wdir (array-like): Wind direction in degrees (meteorological convention: 0° = North, 90° = East).
wspd (array-like): Wind speed.

Returns:
tuple: U and V wind components as numpy arrays with dtype float32.
"""
wdir_rad = np.radians(wdir) # Convert degrees to radians
u = -wspd * np.sin(wdir_rad)
v = -wspd * np.cos(wdir_rad)

return u.astype(np.float32), v.astype(np.float32)

def _get_obs_type(swcm, chanfreq):
"""
Determine the observation type based on `swcm` and `chanfreq`.

Parameters:
swcm (array-like): Switch mode values.
chanfreq (array-like): Channel frequency values (Hz).

Returns:
numpy.ndarray: Observation type array.

Raises:
ValueError: If any `obstype` is unassigned.
"""

obstype = swcm.copy()

# Use numpy vectorized operations
obstype = np.where(swcm == 5, 247, obstype) # WVCA/DL
obstype = np.where(swcm == 3, 246, obstype) # WVCT
obstype = np.where(swcm == 2, 251, obstype) # VIS
obstype = np.where(swcm == 1, 245, obstype) # IRLW

condition = np.logical_and(swcm == 1, chanfreq >= 5e13) # IRSW
obstype = np.where(condition, 240, obstype)

if not np.any(np.isin(obstype, [247, 246, 251, 245, 240])):
raise ValueError("Error: Unassigned ObsType found ... ")

return obstype

def _make_obs(comm, input_path, mapping_path):

# Get container from mapping file first
logging(comm, 'INFO', 'Get container from bufr')
container = bufr.Parser(input_path, mapping_path).parse(comm)

logging(comm, 'DEBUG', f'container list (original): {container.list()}')
logging(comm, 'DEBUG', f'all_sub_categories = {container.all_sub_categories()}')
logging(comm, 'DEBUG', f'category map = {container.get_category_map()}')

# Add new/derived data into container
for cat in container.all_sub_categories():

logging(comm, 'DEBUG', f'category = {cat}')

satid = container.get('variables/satelliteId', cat)
if satid.size == 0:
logging(comm, 'WARNING', f'category {cat[0]} does not exist in input file')
paths = container.get_paths('variables/windComputationMethod', cat)
obstype = container.get('variables/windComputationMethod', cat)
container.add('variables/obstype_uwind', obstype, paths, cat)
container.add('variables/obstype_vwind', obstype, paths, cat)

paths = container.get_paths('variables/windSpeed', cat)
wob = container.get('variables/windSpeed', cat)
container.add('variables/windEastward', wob, paths, cat)
container.add('variables/windNorthward', wob, paths, cat)

else:
# Add new variables: ObsType/windEastward & ObsType/windNorthward
swcm = container.get('variables/windComputationMethod', cat)
chanfreq = container.get('variables/sensorCentralFrequency', cat)

logging(comm, 'DEBUG', f'swcm min/max = {swcm.min()} {swcm.max()}')
logging(comm, 'DEBUG', f'chanfreq min/max = {chanfreq.min()} {chanfreq.max()}')

obstype = _get_obs_type(swcm, chanfreq)

logging(comm, 'DEBUG', f'obstype = {obstype}')
logging(comm, 'DEBUG', f'obstype min/max = {obstype.min()} {obstype.max()}')

paths = container.get_paths('variables/windComputationMethod', cat)
container.add('variables/obstype_uwind', obstype, paths, cat)
container.add('variables/obstype_vwind', obstype, paths, cat)

# Add new variables: ObsValue/windEastward & ObsValue/windNorthward
wdir = container.get('variables/windDirection', cat)
wspd = container.get('variables/windSpeed', cat)

logging(comm, 'DEBUG', f'wdir min/max = {wdir.min()} {wdir.max()}')
logging(comm, 'DEBUG', f'wspd min/max = {wspd.min()} {wspd.max()}')

uob, vob = compute_wind_components(wdir, wspd)

logging(comm, 'DEBUG', f'uob min/max = {uob.min()} {uob.max()}')
logging(comm, 'DEBUG', f'vob min/max = {vob.min()} {vob.max()}')

paths = container.get_paths('variables/windSpeed', cat)
container.add('variables/windEastward', uob, paths, cat)
container.add('variables/windNorthward', vob, paths, cat)

# Check
logging(comm, 'DEBUG', f'container list (updated): {container.list()}')
logging(comm, 'DEBUG', f'all_sub_categories {container.all_sub_categories()}')

return container

def create_obs_group(input_path, mapping_path, category, env):

comm = bufr.mpi.Comm(env["comm_name"])

description = _make_description(mapping_path, update=True)

# Check the cache for the data and return it if it exists
logging(comm, 'DEBUG', f'Check if bufr.DataCache exists? {bufr.DataCache.has(input_path, mapping_path)}')
if bufr.DataCache.has(input_path, mapping_path):
container = bufr.DataCache.get(input_path, mapping_path)
logging(comm, 'INFO', f'Encode {category} from cache')
data = iodaEncoder(description).encode(container)[(category,)]
logging(comm, 'INFO', f'Mark {category} as finished in the cache')
bufr.DataCache.mark_finished(input_path, mapping_path, [category])
logging(comm, 'INFO', f'Return the encoded data for {category}')
return data

container = _make_obs(comm, input_path, mapping_path)

# Gather data from all tasks into all tasks. Each task will have the complete record
logging(comm, 'INFO', f'Gather data from all tasks into all tasks')
container.all_gather(comm)

logging(comm, 'INFO', f'Add container to cache')
# Add the container to the cache
bufr.DataCache.add(input_path, mapping_path, container.all_sub_categories(), container)

# Encode the data
logging(comm, 'INFO', f'Encode {category}')
data = iodaEncoder(description).encode(container)[(category,)]

logging(comm, 'INFO', f'Mark {category} as finished in the cache')
# Mark the data as finished in the cache
bufr.DataCache.mark_finished(input_path, mapping_path, [category])

logging(comm, 'INFO', f'Return the encoded data for {category}')
return data

def create_obs_file(input_path, mapping_path, output_path):

comm = bufr.mpi.Comm("world")
container = _make_obs(comm, input_path, mapping_path)
container.gather(comm)

description = _make_description(mapping_path, update=True)

# Encode the data
if comm.rank() == 0:
netcdfEncoder(description).encode(container, output_path)

logging(comm, 'INFO', f'Return the encoded data')

if __name__ == '__main__':

start_time = time.time()

bufr.mpi.App(sys.argv)
comm = bufr.mpi.Comm("world")

# Required input arguments
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input', type=str, help='Input BUFR', required=True)
parser.add_argument('-m', '--mapping', type=str, help='BUFR2IODA Mapping File', required=True)
parser.add_argument('-o', '--output', type=str, help='Output NetCDF', required=True)

args = parser.parse_args()
mapping = args.mapping
infile = args.input
output = args.output

create_obs_file(infile, mapping, output)

end_time = time.time()
running_time = end_time - start_time
logging(comm, 'INFO', f'Total running time: {running_time}')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider adding runtimes to the all subroutines in case we're curious if there are hang ups?

Loading