Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/nicke sfcshp prepbufr #9

Merged
merged 90 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
48b4a3f
added dump diur
rmclaren Oct 8, 2024
df49114
ADPSFC mapping and encoder file
Oct 31, 2024
94f233e
SFCSHP mapping and encoder file
Oct 31, 2024
ba62d79
remove path appends
Oct 31, 2024
f13cc3d
remove path appends
Oct 31, 2024
51928b5
CAT mnemonic
Oct 31, 2024
d46c503
CAT mnemonic
Oct 31, 2024
d3478fd
Add mapping file, python scripts, and shell scripts for satwind goes
Nov 13, 2024
7277cc5
remove test
Nov 18, 2024
a4efbb2
Update components (mapping, Python script, configuration) for satwind
Nov 18, 2024
5689332
remove old yaml
Nov 19, 2024
16db06f
Add configuration files and test shell script
Nov 19, 2024
927bae2
rename configuration yaml
emilyhcliu Nov 19, 2024
9dfd16d
Add temporary testing script
emilyhcliu Nov 19, 2024
54305c9
Add README file
emilyhcliu Nov 19, 2024
89f429a
rename README to README.md
emilyhcliu Nov 19, 2024
b115130
Update README.md
emilyhcliu Nov 19, 2024
f0ba20f
Update readme file
emilyhcliu Nov 19, 2024
8d324d0
Update README.md
emilyhcliu Nov 19, 2024
d70867f
Update README
emilyhcliu Nov 19, 2024
e5a95e7
rename process_bufr2ioda to bufr2ioda.sh
emilyhcliu Nov 19, 2024
7dbc826
Update README.md
emilyhcliu Nov 19, 2024
6346844
Update README.md
emilyhcliu Nov 19, 2024
f91d1d1
Update README.md
emilyhcliu Nov 19, 2024
8ba4d7b
Update README.md
emilyhcliu Nov 19, 2024
87096eb
Update readme
emilyhcliu Nov 19, 2024
3e7789c
Add comments
emilyhcliu Nov 19, 2024
a645370
update comments
emilyhcliu Nov 19, 2024
c0fdb95
update bufr2ioda.sh
emilyhcliu Nov 19, 2024
c262e3d
update usage
emilyhcliu Nov 19, 2024
dde4d27
Add comments
emilyhcliu Nov 19, 2024
d080c23
add cycle in input path
emilyhcliu Nov 20, 2024
09d0061
Modify global attribute in the mapping file
emilyhcliu Nov 20, 2024
8f23b5b
Merge branch 'develop' into feature/dump_satwind_goes
emilyhcliu Nov 21, 2024
cd74eee
remove . before bufr2ioda.sh
emilyhcliu Nov 21, 2024
6351d34
Update README.md
emilyhcliu Nov 21, 2024
b2ea3ae
Update README.md
emilyhcliu Nov 22, 2024
d678190
Update README.md
emilyhcliu Nov 22, 2024
67311d6
remove wxflow from the test script
emilyhcliu Nov 22, 2024
428e600
remove wxflow from input
emilyhcliu Nov 22, 2024
6564a18
Update README.md
emilyhcliu Nov 22, 2024
f757393
Add comment block for logger
emilyhcliu Nov 24, 2024
77f2e92
add bufrtype (this is bufr dump list)
emilyhcliu Nov 24, 2024
00cd504
Update documentation
emilyhcliu Nov 24, 2024
9d9e821
Add bufrtype and update README
emilyhcliu Nov 24, 2024
e9b8833
Add split_by_category input
emilyhcliu Nov 25, 2024
aec8e56
Update README
emilyhcliu Nov 25, 2024
5679a32
Update README.md
emilyhcliu Nov 25, 2024
70f90d9
Rename bufr2ioda to encodeBufr and update readme
emilyhcliu Nov 25, 2024
1365772
Update README.md
emilyhcliu Nov 26, 2024
9a8487a
Merge branch 'feature/dump_satwind_goes' into feature/NICKE_adpsfc_pr…
nicholasesposito Nov 26, 2024
0f5dce2
adpsfc changes. works
nicholasesposito Dec 5, 2024
e523fdb
update 1
nicholasesposito Dec 5, 2024
07a7239
Merge branch 'feature/NICKE_adpsfc_prepbufr' into feature/NICKE_sfcsh…
nicholasesposito Dec 5, 2024
c178022
initial
nicholasesposito Dec 5, 2024
eba41c2
name changes
nicholasesposito Dec 5, 2024
a9fec90
Merge branch 'feature/NICKE_adpsfc_prepbufr' into feature/NICKE_sfcsh…
nicholasesposito Dec 5, 2024
72f5622
changes
nicholasesposito Dec 10, 2024
414b489
encodeBufr_Nick.sh
nicholasesposito Dec 10, 2024
98d73fc
Merge branch 'feature/NICKE_adpsfc_prepbufr' into feature/NICKE_sfcsh…
nicholasesposito Dec 10, 2024
4f92d5c
all works now
nicholasesposito Dec 10, 2024
282e177
small changes for script
nicholasesposito Dec 10, 2024
de85533
Merge branch 'feature/NICKE_adpsfc_prepbufr' into feature/NICKE_sfcsh…
nicholasesposito Dec 10, 2024
61cdae5
all work
nicholasesposito Dec 10, 2024
61427a9
rm bufr2ioda* for bufr
nicholasesposito Dec 10, 2024
cb3dbe8
adpsfc -> sfcshp in text
nicholasesposito Dec 10, 2024
77ffd4a
script update. make_obs works
nicholasesposito Dec 11, 2024
da53125
rm adpsfc, fix virT
nicholasesposito Dec 12, 2024
f53d4b1
changes for dt
nicholasesposito Dec 12, 2024
a6a3559
cycle time added for script_backend
nicholasesposito Dec 12, 2024
35e3be4
encodeBufr_Nick.sh
nicholasesposito Dec 12, 2024
d9a37f3
remove longitude range for now, encodeBufr_Nick python 3.10/3.7
Dec 20, 2024
2aa5842
_backend -> 4backend, mpi, namechange updates
Dec 26, 2024
adb2c8a
move yamls to config
Dec 26, 2024
2162c4a
move py and mapping to mapping dir
Dec 26, 2024
7a1d0ba
re-add satwnd. not sure if supposed to be here
Dec 26, 2024
a407810
add reference time
Dec 27, 2024
f4e11ab
subsets, input filename, global attributes, dateTime min
Jan 2, 2025
0537f45
moving, sorting, updating, good
Jan 15, 2025
e4614cc
Merge branch 'develop' into feature/NICKE_sfcshp_prepbufr
Jan 15, 2025
e3fa5bd
encodeBufr.sh
Jan 15, 2025
716a0c0
encodeBufr.sh chmod
Jan 15, 2025
327701e
rm bufr2ioda satwnd from before
Jan 15, 2025
00fb7dc
remove stray
Jan 15, 2025
3bd73dc
Merge branch 'develop' into feature/NICKE_sfcshp_prepbufr
Jan 23, 2025
88df308
function name updates, description add
Jan 23, 2025
48e2069
some coding norms
Jan 23, 2025
9432096
datetime in definitino name
Jan 23, 2025
4d6a203
remove add)global for now
nicholasesposito Jan 28, 2025
bbd6bfe
lon update check
nicholasesposito Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 311 additions & 0 deletions dump/mapping/bufr_sfcshp_prepbufr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
#!/usr/bin/env python3
import os
import sys
import bufr
import argparse
import copy
import numpy as np
import numpy.ma as ma
import math
import calendar
import time
from datetime import datetime
from pyioda.ioda.Engines.Bufr import Encoder as iodaEncoder
from bufr.encoders.netcdf import Encoder as netcdfEncoder
from wxflow import Logger


# Initialize Logger
# Get log level from the environment variable, default to 'INFO it not set
log_level = os.getenv('LOG_LEVEL', 'INFO')
logger = Logger('bufr_sfcshp_prepbufr.py', level=log_level, colored_log=False)

def logging(comm, level, message):
"""
Logs a message to the console or log file, based on the specified logging level.

This function ensures that logging is only performed by the root process (`rank 0`)
in a distributed computing environment. The function maps the logging level to
appropriate logger methods and defaults to the 'INFO' level if an invalid level is provided.

Parameters:
comm: object
The communicator object, typically from a distributed computing framework
(e.g., MPI). It must have a `rank()` method to determine the process rank.
level: str
The logging level as a string. Supported levels are:
- 'DEBUG'
- 'INFO'
- 'WARNING'
- 'ERROR'
- 'CRITICAL'
If an invalid level is provided, a warning will be logged, and the level
will default to 'INFO'.
message: str
The message to be logged.

Behavior:
- Logs messages only on the root process (`comm.rank() == 0`).
- Maps the provided logging level to a method of the logger object.
- Defaults to 'INFO' and logs a warning if an invalid logging level is given.
- Supports standard logging levels for granular control over log verbosity.

Example:
>>> logging(comm, 'DEBUG', 'This is a debug message.')
>>> logging(comm, 'ERROR', 'An error occurred!')

Notes:
- Ensure that a global `logger` object is configured before using this function.
- The `comm` object should conform to MPI-like conventions (e.g., `rank()` method).
"""

if comm.rank() == 0:
# Define a dictionary to map levels to logger methods
log_methods = {
'DEBUG': logger.debug,
'INFO': logger.info,
'WARNING': logger.warning,
'ERROR': logger.error,
'CRITICAL': logger.critical,
}

# Get the appropriate logging method, default to 'INFO'
log_method = log_methods.get(level.upper(), logger.info)

if log_method == logger.info and level.upper() not in log_methods:
# Log a warning if the level is invalid
logger.warning(f'log level = {level}: not a valid level --> set to INFO')

# Call the logging method
log_method(message)


def _make_description(mapping_path, cycle_time, update=False):
description = bufr.encoders.Description(mapping_path)

ReferenceTime = np.int64(calendar.timegm(time.strptime(str(int(cycle_time)), '%Y%m%d%H')))

if update:
# Define the variables to be added in a list of dictionaries
variables = [
{
'name': 'MetaData/sequenceNumber',
'source': 'variables/sequenceNumber',
'units': '1',
'longName': 'Sequence Number (Obs Subtype)',
},
]

# Loop through each variable and add it to the description
for var in variables:
description.add_variable(
name=var['name'],
source=var['source'],
units=var['units'],
longName=var['longName']
)

#description.add_global(name='Reference_time', value=str(ReferenceTime))

return description


def _compute_sequence_number(typ, t29):
"""
Compute sequenceNumber

Parameters:
typ: observation Type (obsType)
t29: data dump report type

Returns:
Masked array of sequenceNumber values
"""

sequenceNumber = np.zeros(typ.shape, dtype=np.int32)
for i in range(len(typ)):
if (typ[i] == 180 or typ[i] == 280):
if (t29[i] > 555 and t29[i] < 565):
sequenceNumber[i] = 0
else:
sequenceNumber[i] = 1

return sequenceNumber


def _compute_datetime(cycleTimeSinceEpoch, dhr):
"""
Compute dateTime using the cycleTimeSinceEpoch and Cycle Time
minus Cycle Time

Parameters:
cycleTimeSinceEpoch: Time of cycle in Epoch Time
dhr: Observation Time Minus Cycle Time

Returns:
Masked array of dateTime values
"""

int64_fill_value = np.int64(0)

dateTime = np.zeros(dhr.shape, dtype=np.int64)
for i in range(len(dateTime)):
if ma.is_masked(dhr[i]):
continue
else:
dateTime[i] = np.int64(dhr[i]*3600) + cycleTimeSinceEpoch

dateTime = ma.array(dateTime)
dateTime = ma.masked_values(dateTime, int64_fill_value)

return dateTime



def _make_obs(comm, input_path, mapping_path, cycle_time):
"""
Create the ioda sfcshp prepbufr observations:
- reads values
- adds sequenceNum

Parameters
----------
comm: object
The communicator object (e.g., MPI)
input_path: str
The input bufr file
mapping_path: str
The input bufr2ioda mapping file
cycle_time: str
The cycle in YYYYMMDDHH format
"""

# Get container from mapping file first
logging(comm, 'INFO', 'Get container from bufr')
container = bufr.Parser(input_path, mapping_path).parse(comm)

logging(comm, 'DEBUG', f'container list (original): {container.list()}')
logging(comm, 'DEBUG', f'Change longitude range from [0,360] to [-180,180]')
lon = container.get('variables/longitude')
lon_paths = container.get_paths('variables/longitude')
lon[lon>180] -= 360
lon = ma.round(lon, decimals=2)
logging(comm, 'DEBUG', f'longitude new max/min: ${lon.max()}, ${lon.min()}')

logging(comm, 'DEBUG', f'Do DateTime calculation')
otmct = container.get('variables/obsTimeMinusCycleTime')
otmct_paths = container.get_paths('variables/obsTimeMinusCycleTime')
otmct2 = np.array(otmct)
cycleTimeSinceEpoch = np.int64(calendar.timegm(time.strptime(str(int(cycle_time)), '%Y%m%d%H')))
dateTime = _compute_datetime(cycleTimeSinceEpoch, otmct2)
min_dateTime_ge_zero = min(x for x in dateTime if x >= 0)
logging(comm, 'DEBUG', f'dateTime min/max = {min_dateTime_ge_zero} {dateTime.max()}')

logging(comm, 'DEBUG', f'Do sequenceNumber (Obs SubType) calculation')
typ = container.get('variables/observationType')
typ_paths = container.get_paths('variables/observationType')
t29 = container.get('variables/obssubtype')
t29_paths = container.get_paths('variables/obssubtype')
seqNum = _compute_sequence_number(typ, t29)
logging(comm, 'DEBUG', f' sequenceNum min/max = {seqNum.min()} {seqNum.max()}')

logging(comm, 'DEBUG', f'Do tsen and tv calculation')
tpc = container.get('variables/temperatureEventCode')
tob = container.get('variables/airTemperatureObsValue')
tob_paths = container.get_paths('variables/airTemperatureObsValue')
tsen = np.full(tob.shape[0], tob.fill_value)
tsen = np.where(((tpc >= 1) & (tpc < 8)), tob, tsen)
tvo = np.full(tob.shape[0], tob.fill_value)
tvo = np.where((tpc == 8), tob, tvo)

logging(comm, 'DEBUG', f'Do tsen and tv QM calculations')
tobqm = container.get('variables/airTemperatureQualityMarker')
tsenqm = np.full(tobqm.shape[0], tobqm.fill_value)
tsenqm = np.where(((tpc >= 1) & (tpc < 8)), tobqm, tsenqm)
tvoqm = np.full(tobqm.shape[0], tobqm.fill_value)
tvoqm = np.where((tpc == 8), tobqm, tvoqm)

logging(comm, 'DEBUG', f'Do tsen and tv ObsError calculations')
toboe = container.get('variables/airTemperatureObsError')
tsenoe = np.full(toboe.shape[0], toboe.fill_value)
tsenoe = np.where(((tpc >= 1) & (tpc < 8)), toboe, tsenoe)
tvooe = np.full(toboe.shape[0], toboe.fill_value)
tvooe = np.where((tpc == 8), toboe, tvooe)

logging(comm, 'DEBUG', f'Update variables in container')
container.replace('variables/longitude', lon)
container.replace('variables/timestamp', dateTime)
container.replace('variables/airTemperatureObsValue', tsen)
container.replace('variables/airTemperatureQualityMarker', tsenqm)
container.replace('variables/airTemperatureObsError', tsenoe)
container.replace('variables/virtualTemperatureObsValue', tvo)
container.replace('variables/virtualTemperatureQualityMarker', tvoqm)
container.replace('variables/virtualTemperatureObsError', tvooe)

logging(comm, 'DEBUG', f'Add variables to container')
container.add('variables/sequenceNumber', seqNum, typ_paths)

# Check
logging(comm, 'DEBUG', f'container list (updated): {container.list()}')

return container


def create_obs_group(input_path, mapping_path, cycle_time, env):

comm = bufr.mpi.Comm(env["comm_name"])

logging(comm, 'INFO', f'Make description and make obs')
description = _make_description(mapping_path, cycle_time, update=True)
container = _make_obs(comm, input_path, mapping_path, cycle_time)

# Gather data from all tasks into all tasks. Each task will have the complete record
logging(comm, 'INFO', f'Gather data from all tasks into all tasks')
container.all_gather(comm)

logging(comm, 'INFO', f'Encode the data')
data = next(iter(iodaEncoder(description).encode(container).values()))

logging(comm, 'INFO', f'Return the encoded data.')
return data


def create_obs_file(input_path, mapping_path, output_path, cycle_time):

comm = bufr.mpi.Comm("world")
container = _make_obs(comm, input_path, mapping_path, cycle_time)
container.gather(comm)

description = _make_description(mapping_path, cycle_time, update=True)

# Encode the data
if comm.rank() == 0:
netcdfEncoder(description).encode(container, output_path)

logging(comm, 'INFO', f'Return the encoded data')


if __name__ == '__main__':
start_time = time.time()

bufr.mpi.App(sys.argv)
comm = bufr.mpi.Comm("world")

# Required input arguments as positional arguments
parser = argparse.ArgumentParser(description="Convert BUFR to NetCDF using a mapping file.")
parser.add_argument('input', type=str, help='Input BUFR file')
parser.add_argument('mapping', type=str, help='BUFR2IODA Mapping File')
parser.add_argument('output', type=str, help='Output NetCDF file')
parser.add_argument('cycle_time', type=str, help='cycle time in YYYYMMDDHH format')

args = parser.parse_args()
infile = args.input
mapping = args.mapping
output = args.output
cycle_time = args.cycle_time

create_obs_file(infile, mapping, output, cycle_time)

end_time = time.time()
running_time = end_time - start_time
logging(comm, 'INFO', f'Total running time: {running_time}')
Loading