Skip to content

Commit

Permalink
PNWStore implementation (#142)
Browse files Browse the repository at this point in the history
* add pnwstore RAWDataStore support

* RFC: PNWStore - Query DB during read_data() (#139)

* RFC: PNWStore - Query DB during read_data()

* cleanup/fix code

* Fix db queries

* Add sqlite package

* Cleanup

* missed file

* missed dependency

* fix response bug

* Relax inventory constraint

---------

Co-authored-by: Yiyu Ni <[email protected]>
Co-authored-by: Kuan-Fu Feng <[email protected]>
  • Loading branch information
3 people authored Jun 14, 2023
1 parent bcc66ac commit c3b9674
Show file tree
Hide file tree
Showing 7 changed files with 546 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ src/noisepy/seis/_version.py
tutorials/get_started_data/
tutorials/scedc_data/
tutorials/cli/tmpdata
tutorials/pnw_data

# Jupyterbook
_build/
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ dev = [
"pytest>=7.2.2",
"memory-profiler>=0.61",
"pre-commit>=3.2",
"ray[default]>=2.4.0",
]
sql = [
"SQLite3-0611",
]

[project.scripts]
Expand Down
9 changes: 8 additions & 1 deletion src/noisepy/seis/S1_fft_cc_MPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,16 @@ def cross_corr(
def preprocess(
raw_store: RawDataStore, ch: Channel, ch_data: ChannelData, fft_params: ConfigParameters, ts: DateTimeRange
) -> obspy.Stream:
inv = raw_store.get_inventory(ts, ch.station)
ch_inv = inv.select(channel=ch.type.name, time=ts.start_datetime)
# if we don't find an inventory when filtering by time, back off and try
# without this constraint
if len(ch_inv) < 1:
ch_inv = inv.select(channel=ch.type.name)

return noise_module.preprocess_raw(
ch_data.stream.copy(), # If we don't copy it's not writeable
raw_store.get_inventory(ts, ch.station),
ch_inv,
fft_params,
obspy.UTCDateTime(ts.start_datetime),
obspy.UTCDateTime(ts.end_datetime),
Expand Down
2 changes: 1 addition & 1 deletion src/noisepy/seis/noise_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def preprocess_raw(
if not inv[0][0][0].response:
raise ValueError("no response found in the inventory! abort!")
elif inv[0][0][0].response == obspy.core.inventory.response.Response():
raise ValueError("The response found in the inventory is empty (no stages)! abort!")
raise ValueError("The response found in the inventory is empty (no stages)! abort! %s" % st[0])
else:
try:
logger.info("removing response for %s using inv" % st[0])
Expand Down
3 changes: 1 addition & 2 deletions src/noisepy/seis/plotting_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def plot_substack_cc(sfile, freqmin, freqmax, disp_lag=None, savefig=True, sdir=
ds = pyasdf.ASDFDataSet(sfile, mode="r")
# extract common variables
spairs = ds.auxiliary_data.list()
spairs = list(filter(lambda x: x != PROGRESS_DATATYPE, spairs))
path_lists = ds.auxiliary_data[spairs[0]].list()
flag = ds.auxiliary_data[spairs[0]][path_lists[0]].parameters["substack"]
dt = ds.auxiliary_data[spairs[0]][path_lists[0]].parameters["dt"]
Expand All @@ -169,8 +170,6 @@ def plot_substack_cc(sfile, freqmin, freqmax, disp_lag=None, savefig=True, sdir=
indx2 = indx1 + 2 * int(disp_lag / dt) + 1

for spair in spairs:
if spair == PROGRESS_DATATYPE:
continue
ttr = spair.split("_")
net1, sta1 = ttr[0].split(".")
net2, sta2 = ttr[1].split(".")
Expand Down
155 changes: 155 additions & 0 deletions src/noisepy/seis/pnwstore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import io
import logging
import os
import sqlite3
from datetime import datetime, timedelta, timezone
from typing import Callable, List, Tuple

import obspy
from datetimerange import DateTimeRange

from noisepy.seis.channelcatalog import ChannelCatalog
from noisepy.seis.stores import RawDataStore

from .datatypes import Channel, ChannelData, ChannelType, Station
from .utils import fs_join, get_filesystem

logger = logging.getLogger(__name__)


class PNWDataStore(RawDataStore):
"""
A data store implementation to read from a SQLite DB of metadata and a directory of data files
"""

def __init__(
self,
path: str,
chan_catalog: ChannelCatalog,
db_file: str,
ch_filter: Callable[[Channel], bool] = None,
date_range: DateTimeRange = None,
):
"""
Parameters:
path: path to look for ms files. Can be a local file directory or an s3://... url path
chan_catalog: ChannelCatalog to retrieve inventory information for the channels
db_file: path to the sqlite DB file
channel_filter: Optional function to decide whether a channel should be used or not,
if None, all channels are used
date_range: Optional date range to filter the data
"""
super().__init__()
self.fs = get_filesystem(path)
self.channel_catalog = chan_catalog
self.path = path
self.db_file = db_file
self.paths = {}
# to store a dict of {timerange: list of channels}
self.channels = {}
if ch_filter is None:
ch_filter = lambda s: True # noqa: E731

if date_range is None:
self._load_channels(self.path, ch_filter)
else:
dt = date_range.end_datetime - date_range.start_datetime
for d in range(0, dt.days):
date = date_range.start_datetime + timedelta(days=d)
date_path = str(date.year) + "/" + str(date.timetuple().tm_yday).zfill(3) + "/"
full_path = fs_join(self.path, date_path)
self._load_channels(full_path, ch_filter)

def _load_channels(self, full_path: str, ch_filter: Callable[[Channel], bool]):
# The path should look like: .../UW/2020/125/
parts = full_path.split(os.path.sep)
assert len(parts) >= 4
net, year, doy = parts[-4:-1]

rst = self._dbquery(
f"SELECT network, station, channel, location, filename "
f"FROM tsindex WHERE filename LIKE '%%/{net}/{year}/{doy}/%%'"
)
timespans = []
for i in rst:
timespan = PNWDataStore._parse_timespan(os.path.basename(i[4]))
self.paths[timespan.start_datetime] = full_path
channel = PNWDataStore._parse_channel(i)
if not ch_filter(channel):
continue
key = str(timespan)
if key not in self.channels:
timespans.append(timespan)
self.channels[key] = [channel]
else:
self.channels[key].append(channel)

def get_channels(self, timespan: DateTimeRange) -> List[Channel]:
tmp_channels = self.channels.get(str(timespan), [])
return list(map(lambda c: self.channel_catalog.get_full_channel(timespan, c), tmp_channels))

def get_timespans(self) -> List[DateTimeRange]:
return list([DateTimeRange.from_range_text(d) for d in sorted(self.channels.keys())])

def read_data(self, timespan: DateTimeRange, chan: Channel) -> ChannelData:
assert timespan.start_datetime.year == timespan.end_datetime.year, "Did not expect timespans to cross years"
year = timespan.start_datetime.year
doy = str(timespan.start_datetime.timetuple().tm_yday).zfill(3)

rst = self._dbquery(
f"SELECT byteoffset, bytes "
f"FROM tsindex WHERE network='{chan.station.network}' AND station='{chan.station.name}' "
f"AND channel='{chan.type.name}' and location='{chan.station.location}' "
f"AND filename LIKE '%%/{chan.station.network}/{year}/{doy}/%%'"
)

if len(rst) == 0:
logger.warning(f"Could not find file {timespan}/{chan} in the database")
return ChannelData.empty()
byteoffset, bytes = rst[0]

# reconstruct the file name from the channel parameters
chan_str = f"{chan.station.name}.{chan.station.network}.{timespan.start_datetime.strftime('%Y.%j')}"
filename = fs_join(self.paths[timespan.start_datetime], f"{chan_str}")
if not self.fs.exists(filename):
logger.warning(f"Could not find file {filename}")
return ChannelData.empty()

with self.fs.open(filename, "rb") as f:
f.seek(byteoffset)
buff = io.BytesIO(f.read(bytes))
stream = obspy.read(buff)
return ChannelData(stream)

def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inventory:
return self.channel_catalog.get_inventory(timespan, station)

def _parse_timespan(filename: str) -> DateTimeRange:
# The PNWStore repository stores files in the form: STA.NET.YYYY.DOY
# YA2.UW.2020.366
year = int(filename.split(".")[2])
day = int(filename.split(".")[3])
jan1 = datetime(year, 1, 1, tzinfo=timezone.utc)
return DateTimeRange(jan1 + timedelta(days=day - 1), jan1 + timedelta(days=day))

def _parse_channel(record: tuple) -> Channel:
# e.g.
# YA2.UW.2020.366
network = record[0]
station = record[1]
channel = record[2]
location = record[3]
c = Channel(
ChannelType(channel, location),
# lat/lon/elev will be populated later
Station(network, station, location=location),
)
return c

def _dbquery(self, query: str) -> List[Tuple]:
db = sqlite3.connect(self.db_file)
cursor = db.cursor()
rst = cursor.execute(query)
all = rst.fetchall()
db.close()
return all
Loading

0 comments on commit c3b9674

Please sign in to comment.