Skip to content

Commit

Permalink
voto space saving
Browse files Browse the repository at this point in the history
  • Loading branch information
callumrollo committed Jul 26, 2024
1 parent fc59e43 commit 34ed632
Showing 1 changed file with 68 additions and 29 deletions.
97 changes: 68 additions & 29 deletions pyglider/seaexplorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
os.mkdir(outdir)
except FileExistsError:
pass
with open(deploymentyaml) as fin:
deployment = yaml.safe_load(fin)
ncvar = deployment['netcdf_variables']

for ftype in ['gli', 'pld1']:
goodfiles = []
Expand Down Expand Up @@ -128,7 +131,7 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
# Try to read the file with polars. If the file is corrupted (rare), file read will fail and file
# is appended to badfiles
try:
out = pl.read_csv(f, sep=';')
out = pl.read_csv(f, separator=';')
except Exception as e:
_log.warning(f'Exception reading {f}: {e}')
_log.warning(f'Could not read {f}')
Expand All @@ -137,11 +140,11 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
# Parse the datetime from nav files (called Timestamp) and pld1 files (called PLD_REALTIMECLOCK)
if "Timestamp" in out.columns:
out = out.with_columns(
pl.col("Timestamp").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S"))
pl.col("Timestamp").str.strptime(pl.Datetime, format="%d/%m/%Y %H:%M:%S"))
out = out.rename({"Timestamp": "time"})
else:
out = out.with_columns(
pl.col("PLD_REALTIMECLOCK").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S.%3f"))
pl.col("PLD_REALTIMECLOCK").str.strptime(pl.Datetime, format="%d/%m/%Y %H:%M:%S.%3f"))
out = out.rename({"PLD_REALTIMECLOCK": "time"})
for col_name in out.columns:
if "time" not in col_name.lower():
Expand All @@ -150,22 +153,57 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True,
if 'AD2CP_TIME' in out.columns:
# Set datestamps with date 00000 to None
out = out.with_columns(
pl.col('AD2CP_TIME').str.strptime(pl.Datetime, fmt="%m%d%y %H:%M:%S", strict=False))
pl.col('AD2CP_TIME').str.strptime(pl.Datetime, format="%m%d%y %H:%M:%S", strict=False))

if "" in out.columns:
out = out.drop("")
out = _remove_fill_values(out, fill_value=9999)
out = _remove_fill_values(out, fill_value=-9999)
out = drop_pre_1971_samples(out)

# subsetting for heavily oversampled raw data:
if rawsub == 'raw' and dropna_subset is not None:
# This check is the polars equivalent of pandas dropna. See docstring note on dropna
out = out.with_columns(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64))
.sum(axis=1).alias("null_count")).filter(
.horizontal_sum().alias("null_count")).filter(
pl.col("null_count") <= dropna_thresh) \
.drop("null_count")

if ftype == 'gli':
out = out.with_columns([(pl.col("NavState") * 0 + int(filenum)).alias("fnum")])
out_special = out.select(
pl.col('time'),
pl.col("^(NavState)$").cast(pl.UInt8),
pl.col("^(fnum)$").cast(pl.UInt16),
pl.col("^(DeadReckoning|)$").cast(pl.Int8),
pl.col("^(DesiredH|)$").cast(pl.Int16),
pl.col("^(SecurityLevel)$").cast(pl.UInt32),
)
remaining_columns = set(out.columns).difference(set(out_special.columns))
out_remainder = out.select(remaining_columns).cast(pl.Float32)
out = pl.concat((out_special, out_remainder), how='horizontal')

out.write_parquet(fnout)
goodfiles.append(f)
else:
if out.select("time").shape[0] > min_samples_in_file:
# Drop rows that have no data from the key variables
if 'keep_variables' in ncvar.keys():
source_keepers = [ncvar[name]['source'] for name in ncvar['keep_variables']]
dropna_thresh = len(source_keepers)
out = out.with_columns(out.select(pl.col(source_keepers).is_null().cast(pl.Int64))
.sum_horizontal().alias("null_count")).filter(
pl.col("null_count") < dropna_thresh) \
.drop("null_count")
out_special = out.select(
pl.col("^*AD2CP_TIME$").cast(pl.Datetime),
pl.col("^*time$").cast(pl.Datetime),
pl.col("^(NAV_RESOURCE)$").cast(pl.UInt8),
pl.col("^*COUNT$").cast(pl.Int32),
)
remaining_columns = set(out.columns).difference(set(out_special.columns))
out_remainder = out.select(remaining_columns).cast(pl.Float32)
out = pl.concat((out_special, out_remainder), how='horizontal')

out.write_parquet(fnout)
goodfiles.append(f)
else:
Expand Down Expand Up @@ -196,11 +234,7 @@ def drop_pre_1971_samples(df):
Returns:
df: polars.DataFrame
"""
dt_1971 = datetime.datetime(1971, 1, 1)
# If all dates before or after 1971-01-01, return the dataset
pre_1971 = df.filter(pl.col("time") < dt_1971)
if len(pre_1971) == len(df):
return pre_1971
dt_1971 = datetime.datetime(2020, 1, 1)
post_1971 = df.filter(pl.col("time") > dt_1971)
if len(post_1971) == len(df):
return post_1971
Expand Down Expand Up @@ -232,8 +266,8 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
Only add new files....
"""

deployment = utils._get_deployment(deploymentyaml)

with open(deploymentyaml) as fin:
deployment = yaml.safe_load(fin)
metadata = deployment['metadata']
id = metadata['glider_name']
outgli = outdir + '/' + id + '-rawgli.parquet'
Expand All @@ -245,17 +279,14 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
_log.warning(f'No *gli*.parquet files found in {indir}')
return False
gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet')
gli = drop_pre_1971_samples(gli)
gli.write_parquet(outgli)
_log.info(f'Done writing {outgli}')

_log.info('Opening *.pld.sub.*.parquet multi-file dataset')
files = sorted(glob.glob(indir + '/*.pld1.' + kind + '.*.parquet'))
if not files:
_log.warning(f'No *{kind}*.parquet files found in {indir}')
return False
pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet')
pld = drop_pre_1971_samples(pld)
pld.write_parquet(outpld)

_log.info(f'Done writing {outpld}')
Expand Down Expand Up @@ -288,7 +319,7 @@ def _interp_pld_to_pld(pld, ds, val, indctd):
return val


def _remove_fill_values(df, fill_value=9999):
def _remove_fill_values(df, fill_value=-9999):
"""
For input dataframe df, this function converts all Float values equaling fill_values to null. Columns of other
datatypes are not affected.
Expand All @@ -297,19 +328,19 @@ def _remove_fill_values(df, fill_value=9999):
pl.when(pl.col(pl.Float64) == fill_value)
.then(None)
.otherwise(pl.col(pl.Float64))
.name.keep()
)
return df


def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
profile_filt_time=100, profile_min_time=300,
maxgap=10, interpolate=False, fnamesuffix=''):
profile_filt_time=100, profile_min_time=300, maxgap=300, interpolate=False, fnamesuffix=''):
"""
A little different than above, for the 4-file version of the data set.
"""

deployment = utils._get_deployment(deploymentyaml)

with open(deploymentyaml) as fin:
deployment = yaml.safe_load(fin)
metadata = deployment['metadata']
ncvar = deployment['netcdf_variables']
device_data = deployment['glider_devices']
Expand All @@ -318,7 +349,6 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
gli = pl.read_parquet(f'{indir}/{id}-rawgli.parquet')
_log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.parquet')
sensor = pl.read_parquet(f'{indir}/{id}-{kind}pld.parquet')
sensor = _remove_fill_values(sensor)

# build a new data set based on info in `deploymentyaml.`
# We will use ctd as the interpolant
Expand Down Expand Up @@ -390,9 +420,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
val = np.interp(time_timebase.astype(float), time_var.astype(float), var_non_nan)

# interpolate only over those gaps that are smaller than 'maxgap'
# apparently maxgap is to be in somethng like seconds, and this data is in ms. Certainly
# the default of 0.3 s was not intended. Changing default to 10 s:
tg_ind = utils.find_gaps(time_var.astype(float), time_timebase.astype(float), maxgap*1000)
tg_ind = utils.find_gaps(time_var.astype(float), time_timebase.astype(float), maxgap)
val[tg_ind] = np.nan
else:
val = val[indctd]
Expand Down Expand Up @@ -432,9 +460,6 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
# some derived variables:
ds = utils.get_glider_depth(ds)
ds = utils.get_distance_over_ground(ds)
# ds = utils.get_profiles(ds)
ds = utils.get_profiles_new(ds, filt_time=profile_filt_time,
profile_min_time=profile_min_time)
ds = utils.get_derived_eos_raw(ds)

# somehow this comes out unsorted:
Expand Down Expand Up @@ -483,7 +508,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
os.mkdir(outdir)
except:
pass
id0 = ds.attrs['deployment_name']
id0 = "mission_timeseries"
outname = outdir + id0 + fnamesuffix + '.nc'
_log.info('writing %s', outname)
if 'units' in ds.time.attrs.keys():
Expand All @@ -503,3 +528,17 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw',
merge_rawnc = merge_parquet

__all__ = ['raw_to_rawnc', 'merge_parquet', 'raw_to_timeseries']

if __name__ == '__main__':
logf = f'/data/log/test.log'
logging.basicConfig(filename=logf,
filemode='w',
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
rawdir = '/data/data_raw/complete_mission/SEA44/M34_sub2/'
rawncdir = '/data/data_l0_pyglider/complete_mission/SEA44/M34_sub4/rawnc/'
in_yaml = '/data/deployment_yaml/mission_yaml/SEA44_M34.yml'
raw_to_rawnc(rawdir, rawncdir, in_yaml)
merge_parquet(rawncdir, rawncdir, in_yaml)
raw_to_timeseries(rawncdir, rawncdir, in_yaml)

0 comments on commit 34ed632

Please sign in to comment.