Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement prepbufr raobs obs builder 185 #395

Open
wants to merge 50 commits into
base: main
Choose a base branch
from

Conversation

randytpierce
Copy link
Contributor

This is the implementation of the prepbufr builder.

@randytpierce randytpierce added the VXingest issues related to the VXingest project label Jul 31, 2024
@randytpierce randytpierce self-assigned this Jul 31, 2024
@randytpierce
Copy link
Contributor Author

randytpierce commented Aug 2, 2024 via email

@randytpierce
Copy link
Contributor Author

randytpierce commented Aug 5, 2024 via email

@randytpierce
Copy link
Contributor Author

randytpierce commented Aug 6, 2024 via email

@ian-noaa
Copy link
Contributor

ian-noaa commented Aug 6, 2024

Certainly! Poetry references them in their Dependency Specification guide.

@ian-noaa
Copy link
Contributor

Now that I'm back from leave - what's the status on this PR? Last I recall, we had discovered we needed to install nceplibs-bufr.

It'd be good to get this finished and merged before we get pulled away onto other issues.

Copy link
Contributor

@ian-noaa ian-noaa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like I had a few pending PR comments around the Dockerfile & nceplibs-bufr version so I'm adding those here to make sure they don't get forgotten.

@randytpierce
Copy link
Contributor Author

randytpierce commented Aug 21, 2024 via email

Comment on lines +24 to +27
ncepbufr = [
{ platform = "linux", markers = "platform_machine == 'x86_64'", file = "./third_party/NCEPLIBS-bufr/wheel_dist/ncepbufr-12.1.0-py312-none-linux_x86_64.whl" },
{ platform = "darwin", markers = "platform_machine == 'arm64'", file = "./third_party/NCEPLIBS-bufr/wheel_dist/ncepbufr-12.1.0-py312-none-macosx_14_0_arm64.whl" }
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do ARM Linux as well - it's an important platform in AWS:

Suggested change
ncepbufr = [
{ platform = "linux", markers = "platform_machine == 'x86_64'", file = "./third_party/NCEPLIBS-bufr/wheel_dist/ncepbufr-12.1.0-py312-none-linux_x86_64.whl" },
{ platform = "darwin", markers = "platform_machine == 'arm64'", file = "./third_party/NCEPLIBS-bufr/wheel_dist/ncepbufr-12.1.0-py312-none-macosx_14_0_arm64.whl" }
]
ncepbufr = [
{ platform = "linux", markers = "platform_machine == 'x86_64'", file = "./third_party/NCEPLIBS-bufr/wheel_dist/ncepbufr-12.1.0-py312-none-linux_x86_64.whl" },
{ platform = "linux", markers = "platform_machine == 'aarch64'", file = "./third_party/NCEPLIBS-bufr/wheel_dist/ncepbufr-12.1.0-py312-none-linux_aarch64.whl" },
{ platform = "darwin", markers = "platform_machine == 'arm64'", file = "./third_party/NCEPLIBS-bufr/wheel_dist/ncepbufr-12.1.0-py312-none-macosx_14_0_arm64.whl" }
]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I belive we still need the new 12.1.0 libraries built & added to third_party.

Comment on lines +80 to +165
def test_one_thread_specify_file_pattern(tmp_path: Path):
"""Note: this test takes a long time to run (few minutes)"""
try:
log_queue = Queue()
vx_ingest = setup_connection()
# # stations = [
# # "70026",
# # "72393",
# # "74794",
# # "71119",
# # "76225",
# # "76256",
# # "76458",
# # "76526",
# # "76595",
# # "76612",
# # "76644",
# # "76654",
# # "76679",
# # "76692",
# # "76743",
# # "76903",
# # "78384",
# # "78397",
# # "78486",
# # "78526",
# # "78583",
# # "78954",
# # "78970",
# # "82022",
# # "82026",
# # "82099",
# # "82107",
# # "82193",
# # "82244",
# # "82332",
# # "82411",
# # "82532",
# # "82599",
# # "82705",
# # ]
# print("Testing stations: ", stations)
# print(f"output path is: {tmp_path}")
# vx_ingest.write_data_for_station_list = stations
# vx_ingest.write_data_for_levels = [200, 300, 500, 700, 900]
try:
vx_ingest.runit(
{
"job_id": "JOB-TEST:V01:RAOB:PREPBUFR:OBS",
"credentials_file": os.environ["CREDENTIALS"],
"file_name_mask": "%y%j%H%M", # only tests the first part of the file name i.e. 241011200.gdas.t12z.prepbufr.nr -> 241011200
"output_dir": f"{tmp_path}",
"threads": 1,
"file_pattern": "242130000*", # specifically /opt/data/prepbufr_to_cb/input_files/242130000.gdas.t00z.prepbufr.nr,
# "file_pattern": "242131200*", # specifically /opt/data/prepbufr_to_cb/input_files/242131200.gdas.t00z.prepbufr.nr,
# "file_pattern": "242121800*", # specifically /opt/data/prepbufr_to_cb/input_files/242121800.gdas.t00z.prepbufr.nr,
# "file_pattern": "241570000*", # specifically /opt/data/prepbufr_to_cb/input_files/241570000.gdas.t00z.prepbufr.nr,
},
log_queue,
stub_worker_log_configurer,
)
except Exception as e:
raise AssertionError(f"Exception: {e}") from e
# Test that we have one or more output files
output_file_list = list(
tmp_path.glob(
"[0123456789]????????.gdas.t[0123456789][0123456789]z.prepbufr.nr.json"
)
)

# Test that we have one "load job" ("LJ") document
lj_doc_regex = (
"LJ:RAOB:vxingest.prepbufr_to_cb.run_ingest_threads:VXIngest:*.json"
)
num_load_job_files = len(list(tmp_path.glob(lj_doc_regex)))
assert (
num_load_job_files >= 1
), f"Number of load job files is incorrect {num_load_job_files} is not >= 1"

# Test that we have one output file per input file
input_path = Path("/opt/data/prepbufr_to_cb/input_files")
num_input_files = len(list(input_path.glob("242130000*")))
# num_input_files = len(list(input_path.glob("242131200*")))
# num_input_files = len(list(input_path.glob("242121800*")))
# num_input_files = len(list(input_path.glob("241011200*")))
num_output_files = len(output_file_list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest cleaning up the commented-out code here.

Comment on lines +145 to +148
def write_data_for_debug(self, builder, document_map):
"""
write the raw data and interpolated for a specific set of stations for debugging purposes
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the debug function? If not, we can eliminate the tabulate & mysql.connector dependencies which would be good from a security standpoint.

Otherwise, if we do want to keep this functionality around, it'd be good to move it to a test suite or utility script so that we can move tabulate & mysql.connector from production to dev dependencies in our pyproject.toml file.

Comment on lines +75 to +79
_val = (
round(float(line.split()[1]))
if line.split()[1] != "MISSING"
else None
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is repeated a lot throughout this script - it should be extracted into a function so it's easier to update & maintain.

def process_line(line: str) -> float | None:
    """
    Could use a better name & docstring

    Assumes a line that looks like: <some example input>
    """

    parts = line.split()
    value = parts[1]
    return round(float(value)) if value != "MISSING" else None

Copy link

This PR is stale because it has been open 90 days with no activity.

@github-actions github-actions bot added the Stale label Jan 29, 2025
@github-actions github-actions bot removed the Stale label Feb 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
VXingest issues related to the VXingest project
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants