Skip to content

Commit

Permalink
REFACTOR: remove unused arguments, redundant params, bare except
Browse files Browse the repository at this point in the history
  • Loading branch information
callumrollo committed Jul 26, 2024
1 parent 9f92866 commit d2e3fda
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 32 deletions.
53 changes: 25 additions & 28 deletions pyglider/seaexplorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
SeaExplorer-specific processing routines.
"""
import glob
from pathlib import Path
import logging
import numpy as np
import os
import xarray as xr
import yaml
import pyglider.utils as utils
import datetime
import polars as pl
Expand All @@ -28,10 +28,10 @@ def _outputname(f, outdir):
return outdir + fnout + 'parquet', filenum


def _needsupdating(ftype, fin, fout):
def _needsupdating(fin, fout):
if not os.path.isfile(fout):
return True
return (os.path.getmtime(fin) >= os.path.getmtime(fout))
return os.path.getmtime(fin) >= os.path.getmtime(fout)


def _sort(ds):
Expand Down Expand Up @@ -123,7 +123,7 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
# output name:
fnout, filenum = _outputname(f, outdir)
_log.info(f'{f} to {fnout}')
if not incremental or _needsupdating(ftype, f, fnout):
if not incremental or _needsupdating(f, fnout):
_log.info(f'Doing: {f} to {fnout}')
# Try to read the file with polars. If the file is corrupted (rare), file read will fail and file
# is appended to badfiles
Expand Down Expand Up @@ -156,7 +156,7 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
if rawsub == 'raw' and dropna_subset is not None:
# This check is the polars equivalent of pandas dropna. See docstring note on dropna
out = out.with_columns(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64))
.sum(axis=1).alias("null_count")).filter(
.sum(axis=1).alias("null_count")).filter(
pl.col("null_count") <= dropna_thresh) \
.drop("null_count")

Expand Down Expand Up @@ -207,7 +207,7 @@ def drop_pre_1971_samples(df):
return df.filter(pl.col("time") > dt_1971)


def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
def merge_parquet(indir, outdir, deploymentyaml, kind='raw'):
"""
Merge all the raw netcdf files in indir. These are meant to be
the raw flight and science files from the slocum.
Expand All @@ -227,17 +227,16 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
deploymentyaml : str
YAML text file with deployment information for this glider.
incremental : bool
Only add new files....
kind: str
'sub' for nrt sub files 'raw' for delayed mode raw files
"""

deployment = utils._get_deployment(deploymentyaml)

metadata = deployment['metadata']
id = metadata['glider_name']
outgli = outdir + '/' + id + '-rawgli.parquet'
outpld = outdir + '/' + id + '-' + kind + 'pld.parquet'
glider_id = metadata['glider_name']
outgli = outdir + '/' + glider_id + '-rawgli.parquet'
outpld = outdir + '/' + glider_id + '-' + kind + 'pld.parquet'

_log.info('Opening *.gli.sub.*.parquet multi-file dataset from %s', indir)
files = sorted(glob.glob(indir + '/*.gli.sub.*.parquet'))
Expand All @@ -263,7 +262,7 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
return True


def _interp_gli_to_pld(gli, ds, val, indctd):
def _interp_gli_to_pld(gli, ds, val):
gli_ind = ~np.isnan(val)
# switch for if we are comparing two polars dataframes or a polars dataframe and a xarray dataset
if type(ds) is pl.DataFrame:
Expand Down Expand Up @@ -306,19 +305,19 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
profile_filt_time=100, profile_min_time=300,
maxgap=10, interpolate=False, fnamesuffix=''):
"""
A little different than above, for the 4-file version of the data set.
Converts nav and pld files into a single netcdf timeseries file
"""

deployment = utils._get_deployment(deploymentyaml)

metadata = deployment['metadata']
ncvar = deployment['netcdf_variables']
device_data = deployment['glider_devices']
id = metadata['glider_name']
_log.info(f'Opening combined nav file {indir}/{id}-rawgli.nc')
gli = pl.read_parquet(f'{indir}/{id}-rawgli.parquet')
_log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.parquet')
sensor = pl.read_parquet(f'{indir}/{id}-{kind}pld.parquet')
glider_id = metadata['glider_name']
_log.info(f'Opening combined nav file {indir}/{glider_id}-rawgli.nc')
gli = pl.read_parquet(f'{indir}/{glider_id}-rawgli.parquet')
_log.info(f'Opening combined payload file {indir}/{glider_id}-{kind}pld.parquet')
sensor = pl.read_parquet(f'{indir}/{glider_id}-{kind}pld.parquet')
sensor = _remove_fill_values(sensor)

# build a new data set based on info in `deploymentyaml.`
Expand All @@ -338,7 +337,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
raise ValueError(f"timebase source: {ncvar['timebase']['source']} not found in pld1 columns")
vals = sensor.select([ncvar['timebase']['source']]).to_numpy()[:, 0]
indctd = np.where(~np.isnan(vals))[0]
ds['time'] = (('time'), sensor.select('time').to_numpy()[indctd, 0].astype('datetime64[ns]'), attr)
ds['time'] = ('time', sensor.select('time').to_numpy()[indctd, 0].astype('datetime64[ns]'), attr)
thenames = list(ncvar.keys())
# Check yaml to see if interpolate has been set to True
if "interpolate" in thenames:
Expand Down Expand Up @@ -375,7 +374,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
pl.col('time').cast(pl.Datetime('ms'))
)[:-1, :]
val2 = sensor_sub_grouped.select(sensorname).to_numpy()[:, 0]
val = _interp_gli_to_pld(sensor_sub_grouped, sensor, val2, indctd)
val = _interp_gli_to_pld(sensor_sub_grouped, sensor, val2)
if interpolate and not np.isnan(val).all():
time_original = sensor.select('time').to_numpy()[:, 0]
time_var = time_original[np.where(~np.isnan(val))[0]]
Expand All @@ -393,7 +392,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
# interpolate only over those gaps that are smaller than 'maxgap'
# apparently maxgap is to be in somethng like seconds, and this data is in ms. Certainly
# the default of 0.3 s was not intended. Changing default to 10 s:
tg_ind = utils.find_gaps(time_var.astype(float), time_timebase.astype(float), maxgap*1000)
tg_ind = utils.find_gaps(time_var.astype(float), time_timebase.astype(float), maxgap * 1000)
val[tg_ind] = np.nan
else:
val = val[indctd]
Expand All @@ -404,13 +403,13 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
val = convert(val)
# Values from the glider netcdf must be interpolated to match
# the sensor netcdf
val = _interp_gli_to_pld(gli, ds, val, indctd)
val = _interp_gli_to_pld(gli, ds, val)

# make the attributes:
ncvar[name].pop('coordinates', None)
attrs = ncvar[name]
attrs = utils.fill_required_attrs(attrs)
ds[name] = (('time'), val, attrs)
ds[name] = ('time', val, attrs)

# fix lon and lat to be linearly interpolated between fixes
good = np.where(np.abs(np.diff(ds.longitude)) +
Expand Down Expand Up @@ -480,10 +479,8 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
ds.attrs['deployment_start'] = str(start)
ds.attrs['deployment_end'] = str(end)

try:
os.mkdir(outdir)
except:
pass
if not Path(outdir).exists():
Path(outdir).mkdir(parents=True)
id0 = ds.attrs['deployment_name']
outname = outdir + id0 + fnamesuffix + '.nc'
_log.info('writing %s', outname)
Expand Down
7 changes: 3 additions & 4 deletions tests/test_seaexplorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ def test_raw_to_rawnc():


def test__needsupdating():
ftype = 'pld1'
fin = 'tests/data/realtime_raw/sea035.12.pld1.sub.36'
fout = 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet'
result_badpath = seaexplorer._needsupdating(ftype, fin, 'baz')
result_goodpath = seaexplorer._needsupdating(ftype, fin, fout)
result_badpath = seaexplorer._needsupdating(fin, 'baz')
result_goodpath = seaexplorer._needsupdating(fin, fout)
assert result_badpath is True
assert result_goodpath is False

Expand Down Expand Up @@ -82,7 +81,7 @@ def test__interp_gli_to_pld():
glider = pl.read_parquet('tests/data/realtime_rawnc/sea035.0012.gli.sub.0036.parquet')
ds = pl.read_parquet('tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet')
val = glider.select("Pitch").to_numpy()[:, 0]
pitch_interp = seaexplorer._interp_gli_to_pld(glider, ds, val, None)
pitch_interp = seaexplorer._interp_gli_to_pld(glider, ds, val)
assert len(pitch_interp) == ds.shape[0]


Expand Down

0 comments on commit d2e3fda

Please sign in to comment.