From d2e3fdafd8fddce569f06b6c66abcbf7399a5348 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Fri, 26 Jul 2024 11:09:40 +0200 Subject: [PATCH] REFACTOR: remove unused arguments, redundant params, bare except --- pyglider/seaexplorer.py | 53 ++++++++++++++++++--------------------- tests/test_seaexplorer.py | 7 +++--- 2 files changed, 28 insertions(+), 32 deletions(-) diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index c25e57f..ee56193 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -3,11 +3,11 @@ SeaExplorer-specific processing routines. """ import glob +from pathlib import Path import logging import numpy as np import os import xarray as xr -import yaml import pyglider.utils as utils import datetime import polars as pl @@ -28,10 +28,10 @@ def _outputname(f, outdir): return outdir + fnout + 'parquet', filenum -def _needsupdating(ftype, fin, fout): +def _needsupdating(fin, fout): if not os.path.isfile(fout): return True - return (os.path.getmtime(fin) >= os.path.getmtime(fout)) + return os.path.getmtime(fin) >= os.path.getmtime(fout) def _sort(ds): @@ -123,7 +123,7 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, # output name: fnout, filenum = _outputname(f, outdir) _log.info(f'{f} to {fnout}') - if not incremental or _needsupdating(ftype, f, fnout): + if not incremental or _needsupdating(f, fnout): _log.info(f'Doing: {f} to {fnout}') # Try to read the file with polars. If the file is corrupted (rare), file read will fail and file # is appended to badfiles @@ -156,7 +156,7 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, if rawsub == 'raw' and dropna_subset is not None: # This check is the polars equivalent of pandas dropna. See docstring note on dropna out = out.with_columns(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64)) - .sum(axis=1).alias("null_count")).filter( + .sum(axis=1).alias("null_count")).filter( pl.col("null_count") <= dropna_thresh) \ .drop("null_count") @@ -207,7 +207,7 @@ def drop_pre_1971_samples(df): return df.filter(pl.col("time") > dt_1971) -def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'): +def merge_parquet(indir, outdir, deploymentyaml, kind='raw'): """ Merge all the raw netcdf files in indir. These are meant to be the raw flight and science files from the slocum. @@ -227,17 +227,16 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'): deploymentyaml : str YAML text file with deployment information for this glider. - - incremental : bool - Only add new files.... + kind: str + 'sub' for nrt sub files 'raw' for delayed mode raw files """ deployment = utils._get_deployment(deploymentyaml) metadata = deployment['metadata'] - id = metadata['glider_name'] - outgli = outdir + '/' + id + '-rawgli.parquet' - outpld = outdir + '/' + id + '-' + kind + 'pld.parquet' + glider_id = metadata['glider_name'] + outgli = outdir + '/' + glider_id + '-rawgli.parquet' + outpld = outdir + '/' + glider_id + '-' + kind + 'pld.parquet' _log.info('Opening *.gli.sub.*.parquet multi-file dataset from %s', indir) files = sorted(glob.glob(indir + '/*.gli.sub.*.parquet')) @@ -263,7 +262,7 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'): return True -def _interp_gli_to_pld(gli, ds, val, indctd): +def _interp_gli_to_pld(gli, ds, val): gli_ind = ~np.isnan(val) # switch for if we are comparing two polars dataframes or a polars dataframe and a xarray dataset if type(ds) is pl.DataFrame: @@ -306,7 +305,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', profile_filt_time=100, profile_min_time=300, maxgap=10, interpolate=False, fnamesuffix=''): """ - A little different than above, for the 4-file version of the data set. + Converts nav and pld files into a single netcdf timeseries file """ deployment = utils._get_deployment(deploymentyaml) @@ -314,11 +313,11 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', metadata = deployment['metadata'] ncvar = deployment['netcdf_variables'] device_data = deployment['glider_devices'] - id = metadata['glider_name'] - _log.info(f'Opening combined nav file {indir}/{id}-rawgli.nc') - gli = pl.read_parquet(f'{indir}/{id}-rawgli.parquet') - _log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.parquet') - sensor = pl.read_parquet(f'{indir}/{id}-{kind}pld.parquet') + glider_id = metadata['glider_name'] + _log.info(f'Opening combined nav file {indir}/{glider_id}-rawgli.nc') + gli = pl.read_parquet(f'{indir}/{glider_id}-rawgli.parquet') + _log.info(f'Opening combined payload file {indir}/{glider_id}-{kind}pld.parquet') + sensor = pl.read_parquet(f'{indir}/{glider_id}-{kind}pld.parquet') sensor = _remove_fill_values(sensor) # build a new data set based on info in `deploymentyaml.` @@ -338,7 +337,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', raise ValueError(f"timebase source: {ncvar['timebase']['source']} not found in pld1 columns") vals = sensor.select([ncvar['timebase']['source']]).to_numpy()[:, 0] indctd = np.where(~np.isnan(vals))[0] - ds['time'] = (('time'), sensor.select('time').to_numpy()[indctd, 0].astype('datetime64[ns]'), attr) + ds['time'] = ('time', sensor.select('time').to_numpy()[indctd, 0].astype('datetime64[ns]'), attr) thenames = list(ncvar.keys()) # Check yaml to see if interpolate has been set to True if "interpolate" in thenames: @@ -375,7 +374,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', pl.col('time').cast(pl.Datetime('ms')) )[:-1, :] val2 = sensor_sub_grouped.select(sensorname).to_numpy()[:, 0] - val = _interp_gli_to_pld(sensor_sub_grouped, sensor, val2, indctd) + val = _interp_gli_to_pld(sensor_sub_grouped, sensor, val2) if interpolate and not np.isnan(val).all(): time_original = sensor.select('time').to_numpy()[:, 0] time_var = time_original[np.where(~np.isnan(val))[0]] @@ -393,7 +392,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', # interpolate only over those gaps that are smaller than 'maxgap' # apparently maxgap is to be in somethng like seconds, and this data is in ms. Certainly # the default of 0.3 s was not intended. Changing default to 10 s: - tg_ind = utils.find_gaps(time_var.astype(float), time_timebase.astype(float), maxgap*1000) + tg_ind = utils.find_gaps(time_var.astype(float), time_timebase.astype(float), maxgap * 1000) val[tg_ind] = np.nan else: val = val[indctd] @@ -404,13 +403,13 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', val = convert(val) # Values from the glider netcdf must be interpolated to match # the sensor netcdf - val = _interp_gli_to_pld(gli, ds, val, indctd) + val = _interp_gli_to_pld(gli, ds, val) # make the attributes: ncvar[name].pop('coordinates', None) attrs = ncvar[name] attrs = utils.fill_required_attrs(attrs) - ds[name] = (('time'), val, attrs) + ds[name] = ('time', val, attrs) # fix lon and lat to be linearly interpolated between fixes good = np.where(np.abs(np.diff(ds.longitude)) + @@ -480,10 +479,8 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', ds.attrs['deployment_start'] = str(start) ds.attrs['deployment_end'] = str(end) - try: - os.mkdir(outdir) - except: - pass + if not Path(outdir).exists(): + Path(outdir).mkdir(parents=True) id0 = ds.attrs['deployment_name'] outname = outdir + id0 + fnamesuffix + '.nc' _log.info('writing %s', outname) diff --git a/tests/test_seaexplorer.py b/tests/test_seaexplorer.py index 4eb3d94..5207464 100644 --- a/tests/test_seaexplorer.py +++ b/tests/test_seaexplorer.py @@ -45,11 +45,10 @@ def test_raw_to_rawnc(): def test__needsupdating(): - ftype = 'pld1' fin = 'tests/data/realtime_raw/sea035.12.pld1.sub.36' fout = 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet' - result_badpath = seaexplorer._needsupdating(ftype, fin, 'baz') - result_goodpath = seaexplorer._needsupdating(ftype, fin, fout) + result_badpath = seaexplorer._needsupdating(fin, 'baz') + result_goodpath = seaexplorer._needsupdating(fin, fout) assert result_badpath is True assert result_goodpath is False @@ -82,7 +81,7 @@ def test__interp_gli_to_pld(): glider = pl.read_parquet('tests/data/realtime_rawnc/sea035.0012.gli.sub.0036.parquet') ds = pl.read_parquet('tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet') val = glider.select("Pitch").to_numpy()[:, 0] - pitch_interp = seaexplorer._interp_gli_to_pld(glider, ds, val, None) + pitch_interp = seaexplorer._interp_gli_to_pld(glider, ds, val) assert len(pitch_interp) == ds.shape[0]