diff --git a/src/noisepy/seis/asdfstore.py b/src/noisepy/seis/asdfstore.py index 9874838a..13021bcf 100644 --- a/src/noisepy/seis/asdfstore.py +++ b/src/noisepy/seis/asdfstore.py @@ -2,7 +2,7 @@ import logging import os from pathlib import Path -from typing import Callable, Dict, Generic, List, Optional, Tuple, TypeVar +from typing import Callable, Dict, Generic, List, Optional, Set, Tuple, TypeVar import numpy as np import obspy @@ -54,16 +54,20 @@ def __init__( self.parse_filename = parse_filename def __getitem__(self, key: T) -> pyasdf.ASDFDataSet: + return self._get_dataset(key, self.mode) + + def _get_dataset(self, key: T, mode: str) -> pyasdf.ASDFDataSet: file_name = self.get_filename(key) file_path = os.path.join(self.directory, file_name) - return _get_dataset(file_path, self.mode) + return _get_dataset(file_path, mode) def get_keys(self) -> List[T]: h5files = sorted(glob.glob(os.path.join(self.directory, "**/*.h5"), recursive=True)) return list(map(self.parse_filename, h5files)) def contains(self, key: T, data_type: str, path: str = None): - ccf_ds = self[key] + # contains is always a read + ccf_ds = self._get_dataset(key, "r") if not ccf_ds: return False @@ -128,7 +132,7 @@ def __init__(self, directory: str, mode: str = "a") -> None: self.datasets = ASDFDirectory(directory, mode, _filename_from_timespan, parse_timespan) # CrossCorrelationDataStore implementation - def contains(self, timespan: DateTimeRange, src: Station, rec: Station) -> bool: + def contains(self, src: Station, rec: Station, timespan: DateTimeRange) -> bool: station_pair = self._get_station_pair(src, rec) contains = self.datasets.contains(timespan, station_pair) if contains: @@ -149,19 +153,23 @@ def append( channels = self._get_channel_pair(cc.src, cc.rec) self.datasets.add_aux_data(timespan, cc.parameters, station_pair, channels, cc.data) - def get_timespans(self) -> List[DateTimeRange]: - return self.datasets.get_keys() + def get_timespans(self, src: Station, rec: Station) -> List[DateTimeRange]: + timespans = {} + pair_key = self._get_station_pair(src, rec) + + def visit(pairs, ts): + if pair_key in pairs: + timespans[str(ts)] = ts + + self._visit_pairs(visit) + return sorted(timespans.values(), key=lambda t: str(t)) def get_station_pairs(self) -> List[Tuple[Station, Station]]: - timespans = self.get_timespans() pairs_all = set() - for timespan in timespans: - with self.datasets[timespan] as ccf_ds: - data = ccf_ds.auxiliary_data.list() - pairs_all.update(parse_station_pair(p) for p in data if p != PROGRESS_DATATYPE) + self._visit_pairs(lambda pairs, _: pairs_all.update((parse_station_pair(p) for p in pairs))) return list(pairs_all) - def read_correlations(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> List[CrossCorrelation]: + def read(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> List[CrossCorrelation]: with self.datasets[timespan] as ccf_ds: dtype = self._get_station_pair(src_sta, rec_sta) if dtype not in ccf_ds.auxiliary_data: @@ -175,20 +183,39 @@ def read_correlations(self, timespan: DateTimeRange, src_sta: Station, rec_sta: ccs.append(CrossCorrelation(src_ch, rec_ch, stream.parameters, stream.data[:])) return ccs + def _visit_pairs(self, visitor: Callable[[Set[Tuple[str, str]], DateTimeRange], None]): + all_timespans = self.datasets.get_keys() + for timespan in all_timespans: + with self.datasets[timespan] as ccf_ds: + data = ccf_ds.auxiliary_data.list() + pairs = {p for p in data if p != PROGRESS_DATATYPE} + visitor(pairs, timespan) + + def _get_channel_pair(self, src_chan: ChannelType, rec_chan: ChannelType) -> str: + return f"{src_chan}_{rec_chan}" + + def _get_station_pair(self, src_sta: Station, rec_sta: Station) -> str: + return f"{src_sta}_{rec_sta}" + class ASDFStackStore(StackStore): def __init__(self, directory: str, mode: str = "a"): super().__init__() self.datasets = ASDFDirectory(directory, mode, _filename_from_stations, _parse_station_pair_h5file) - def append(self, src: Station, rec: Station, stacks: List[Stack]): + # TODO: Do we want to support storing stacks from different timespans in the same store? + def append(self, timespan: DateTimeRange, src: Station, rec: Station, stacks: List[Stack]): for stack in stacks: self.datasets.add_aux_data((src, rec), stack.parameters, stack.name, stack.component, stack.data) def get_station_pairs(self) -> List[Tuple[Station, Station]]: return self.datasets.get_keys() - def read_stacks(self, src: Station, rec: Station) -> List[Stack]: + def get_timespans(self, src: Station, rec: Station) -> List[DateTimeRange]: + # TODO: Do we want to support storing stacks from different timespans in the same store? + return [] + + def read(self, timespan: DateTimeRange, src: Station, rec: Station) -> List[Stack]: stacks = [] with self.datasets[(src, rec)] as ds: for name in ds.auxiliary_data.list(): diff --git a/src/noisepy/seis/correlate.py b/src/noisepy/seis/correlate.py index 8aa71751..b62802c4 100644 --- a/src/noisepy/seis/correlate.py +++ b/src/noisepy/seis/correlate.py @@ -112,7 +112,7 @@ def cc_timespan( pair_filter: Callable[[Channel, Channel], bool] = lambda src, rec: True, ) -> bool: errors = False - executor = ThreadPoolExecutor(max_workers=12) + executor = ThreadPoolExecutor() tlog = TimeLogger(logger, logging.INFO, prefix="CC Main") """ LOADING NOISE DATA AND DO FFT @@ -134,7 +134,7 @@ def cc_timespan( station_pairs = list(create_pairs(pair_filter, all_channels, fft_params.acorr_only).keys()) # Check for stations that are already done, do this in parallel logger.info(f"Checking for stations already done: {len(station_pairs)} pairs") - station_pair_dones = list(executor.map(lambda p: cc_store.contains(ts, p[0], p[1]), station_pairs)) + station_pair_dones = list(executor.map(lambda p: cc_store.contains(p[0], p[1], ts), station_pairs)) missing_pairs = [pair for pair, done in zip(station_pairs, station_pair_dones) if not done] # get a set of unique stations from the list of pairs @@ -260,10 +260,10 @@ def create_pairs( if src_chan.station != rec_chan.station: continue if ffts and iiS not in ffts: - logger.warning(f"No FFT data available for channel '{src_chan}', skipped") + logger.warning(f"No FFT data available for src channel '{src_chan}', skipped") continue if ffts and iiR not in ffts: - logger.warning(f"No FFT data available for channel '{rec_chan}', skipped") + logger.warning(f"No FFT data available for rec channel '{rec_chan}', skipped") continue station_pairs[(src_chan.station, rec_chan.station)].append((iiS, iiR)) @@ -285,7 +285,7 @@ def stations_cross_correlation( tlog = TimeLogger(logger, logging.DEBUG) datas = [] try: - if cc_store.contains(ts, src, rec): + if cc_store.contains(src, rec, ts): logger.info(f"Skipping {src}_{rec} for {ts} since it's already done") return True, None diff --git a/src/noisepy/seis/datatypes.py b/src/noisepy/seis/datatypes.py index 7337d0a5..ab8265fc 100644 --- a/src/noisepy/seis/datatypes.py +++ b/src/noisepy/seis/datatypes.py @@ -16,7 +16,7 @@ from pydantic.functional_validators import model_validator from pydantic_yaml import parse_yaml_raw_as, to_yaml_str -from noisepy.seis.utils import get_filesystem +from noisepy.seis.utils import get_filesystem, remove_nan_rows, remove_nans INVALID_COORD = -sys.float_info.max @@ -82,6 +82,13 @@ def __init__( self.elevation = elevation self.location = location + def parse(sta: str) -> Optional[Station]: + # Parse from: CI.ARV_CI.BAK + parts = sta.split(".") + if len(parts) != 2: + return None + return Station(parts[0], parts[1]) + def valid(self) -> bool: return min(self.lat, self.lon, self.elevation) > INVALID_COORD @@ -297,7 +304,7 @@ def __init__(self, params: Dict[str, Any], data: np.ndarray): def get_metadata(self) -> Tuple: pass - def pack(datas: List[AnnotatedData]) -> AnnotatedData: + def pack(datas: List[AnnotatedData]) -> Tuple[np.ndarray, List[Dict[str, Any]]]: if len(datas) == 0: raise ValueError("Cannot pack empty list of data") # Some arrays may have different lengths, so pad them with NaNs for stacking @@ -343,6 +350,12 @@ def __init__(self, src: ChannelType, rec: ChannelType, params: Dict[str, Any], d def get_metadata(self) -> Tuple: return (self.src.name, self.src.location, self.rec.name, self.rec.location, self.parameters) + def load_instances(tuples: List[Tuple[np.ndarray, Dict[str, Any]]]) -> List[CrossCorrelation]: + return [ + CrossCorrelation(ChannelType(src, src_loc), ChannelType(rec, rec_loc), params, remove_nan_rows(data)) + for data, (src, src_loc, rec, rec_loc, params) in tuples + ] + class Stack(AnnotatedData): component: str # e.g. "EE", "EN", ... @@ -356,6 +369,9 @@ def __init__(self, component: str, name: str, params: Dict[str, Any], data: np.n def get_metadata(self) -> Tuple: return (self.component, self.name, self.parameters) + def load_instances(tuples: List[Tuple[np.ndarray, Dict[str, Any]]]) -> List[Stack]: + return [Stack(comp, name, params, remove_nans(data)) for data, (comp, name, params) in tuples] + def to_json_types(params: Dict[str, Any]) -> Dict[str, Any]: return {k: _to_json_type(v) for k, v in params.items()} diff --git a/src/noisepy/seis/hierarchicalstores.py b/src/noisepy/seis/hierarchicalstores.py index ae5d4fd6..df3ab9f4 100644 --- a/src/noisepy/seis/hierarchicalstores.py +++ b/src/noisepy/seis/hierarchicalstores.py @@ -1,17 +1,22 @@ import logging import threading from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional, Tuple - +from bisect import bisect +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, TypeVar + +import fsspec import numpy as np from datetimerange import DateTimeRange -from .datatypes import AnnotatedData, ChannelType, CrossCorrelation, Stack, Station -from .stores import CrossCorrelationDataStore, timespan_str -from .utils import TimeLogger, remove_nan_rows, remove_nans, unstack +from .datatypes import AnnotatedData, Station +from .stores import timespan_str +from .utils import TimeLogger, fs_join, get_filesystem, unstack -CHANNELS_ATTR = "channels" -STACKS_ATTR = "stacks" +META_ATTR = "metadata" VERSION_ATTR = "version" logger = logging.getLogger(__name__) @@ -22,142 +27,240 @@ class ArrayStore(ABC): An interface definition for reading and writing arrays with metadata """ + def __init__(self, root_dir: str, storage_options={}) -> None: + super().__init__() + self.root_dir = root_dir + self.fs = get_filesystem(root_dir, storage_options=storage_options) + @abstractmethod def append(self, path: str, params: Dict[str, Any], data: np.ndarray): pass @abstractmethod - def load_paths(self) -> List[str]: + def read(self, path: str) -> Optional[Tuple[np.ndarray, Dict[str, Any]]]: pass @abstractmethod - def read(self, path: str) -> Optional[Tuple[np.ndarray, Dict[str, Any]]]: - pass + def parse_path(path: str) -> Optional[Tuple[str, DateTimeRange]]: + """ + Parse a full file path into a receiving station and timespan tuple + """ + def get_fs(self) -> fsspec.AbstractFileSystem: + return self.fs + + def get_root_dir(self) -> str: + return self.root_dir -class HierarchicalCCStoreBase(CrossCorrelationDataStore): - """ - CrossCorrelationDataStore that uses hierarchical files for storage. The directory organization is as follows: - / (root) - station_pair - timestamp - The 'timestamp' files contain the cross-correlation data for all channels. The channel information is stored - as part of the array metadata. +class PairDirectoryCache: + """ + Data structure to store the timespans for each station pair. The data is stored in a nested dictionary: + First, stations are mapped to an integer index to save memory. The first dictionary is indexed by the source station + and each entry holds a dictiory keyed by receiver station. The value of the second dictionary is a list of tuples. + Each tuple is a pair of the time delta in the timespans and a numpy array of the start times of the timespans. + E.g. + .. code-block:: python + stations_idx: {"CI.ACP": 0} + idx_stations: ["CI.ACP"] + items: + {0: {0: [(86400, np.array([1625097600, 1625184000]))]} + + We keep the timespans in a sorted list so that we can use binary search to check if a timespan is contained. A + python dictionary would be O(1) to check but use a lot more memory. Also the timestamp (np.uint32) is a lot + more compact to store than the string representation of the timespan + (e.g. "2023_07_01_00_00_00T2023_07_02_00_00_00"). """ - def __init__(self, helper: ArrayStore) -> None: - super().__init__() - self.helper = helper - self._lock = threading.Lock() - tlog = TimeLogger(logger, logging.INFO) - self.path_cache = set(helper.load_paths()) - tlog.log(f"keys_cache initialized with {len(self.path_cache)} keys") + def __init__(self) -> None: + self.items: Dict[int, Dict[int, List[Tuple[int, np.ndarray]]]] = defaultdict(lambda: defaultdict(list)) + self.stations_idx: Dict[str, int] = {} + self.idx_stations: List[str] = [] + self._lock: threading.Lock = threading.Lock() + # We need to be able to pickle this across processes when using multiprocessing def __getstate__(self) -> object: state = self.__dict__.copy() del state["_lock"] + del state["items"] return state def __setstate__(self, state: object) -> None: self.__dict__.update(state) self._lock = threading.Lock() + self.items = defaultdict(lambda: defaultdict(list)) + + def _sta_index(self, sta: str) -> int: + idx = self.stations_idx.get(sta, -1) + if idx == -1: + with self._lock: + # check again in case another thread added it before we got the lock + idx = self.stations_idx.get(sta, -1) + if idx == -1: + idx = len(self.stations_idx) + self.stations_idx[sta] = idx + self.idx_stations.append(sta) + return idx + + def get_pairs(self) -> List[Tuple[str, str]]: + return [ + (self.idx_stations[src], self.idx_stations[rec]) + for src in sorted(self.items.keys()) + for rec in sorted(self.items.get(src, {}).keys()) + ] - def contains(self, timespan: DateTimeRange, src: Station, rec: Station) -> bool: - path = self._get_path(timespan, src, rec) - with self._lock: - return path in self.path_cache + def add(self, src: str, rec: str, timespans: List[DateTimeRange]): + src_idx = self._sta_index(src) + rec_idx = self._sta_index(rec) + if len(timespans) == 0: + if not self.items.get(src_idx, {}).get(rec_idx, None): + self.items[src_idx][rec_idx] = [] + return - def append( - self, - timespan: DateTimeRange, - src: Station, - rec: Station, - ccs: List[CrossCorrelation], - ): - path = self._get_path(timespan, src, rec) - all_ccs, metadata = AnnotatedData.pack(ccs) - tlog = TimeLogger(logger, logging.DEBUG) - self.helper.append(path, {CHANNELS_ATTR: metadata, VERSION_ATTR: 1.0}, all_ccs) - tlog.log(f"writing {len(ccs)} CCs to {path}") + delta = int(timespans[0].timedelta.total_seconds()) + if not all(int(t.timedelta.total_seconds()) == delta for t in timespans): + all_deltas = set(int(t.timedelta.total_seconds()) for t in timespans) + raise ValueError(f"All timespans must have the same time delta ({all_deltas})") + starts = np.array(sorted([int(t.start_datetime.timestamp()) for t in timespans]), dtype=np.uint32) with self._lock: - self.path_cache.add(path) - - @abstractmethod - def get_timespans(self) -> List[DateTimeRange]: - pass - - @abstractmethod - def get_station_pairs(self) -> List[Tuple[Station, Station]]: - pass - - def read_correlations(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> List[CrossCorrelation]: - path = self._get_path(timespan, src_sta, rec_sta) - - tuple = self.helper.read(path) - if not tuple: + time_tuples = self.items[src_idx][rec_idx] + for t in time_tuples: + if t[0] == delta: + new_t = (delta, np.array(sorted(list(t[1]) + list(starts)), dtype=np.uint32)) + # not generally safe to modify a list while iterating over it, but we return after this + time_tuples.remove(t) + time_tuples.append(new_t) + self.items[src_idx][rec_idx] = time_tuples + return + # delta not found, so add it + self.items[src_idx][rec_idx].append((delta, starts)) + + def is_src_loaded(self, src: str) -> bool: + return self._sta_index(src) in self.items + + def contains(self, src: str, rec: str, timespan: DateTimeRange) -> bool: + time_tuples = self._get_tuples(src, rec) + if time_tuples is None: + return False + + delta = int(timespan.timedelta.total_seconds()) + start = int(timespan.start_datetime.timestamp()) + for t in time_tuples: + if t[0] == delta: + result = bisect(t[1], start) + return result != 0 and t[1][result - 1] == start + + return False + + def get_timespans(self, src: str, rec: str) -> List[DateTimeRange]: + time_tuples = self._get_tuples(src, rec) + if time_tuples is None: return [] - array, metadata = tuple - channel_params = self._read_cc_metadata(metadata) - cc_stack = array[:] - ccs = unstack(cc_stack) + timespans = [] + for delta, timestamps in time_tuples: + for ts in timestamps: + timespans.append( + DateTimeRange( + datetime.fromtimestamp(ts, timezone.utc), + datetime.fromtimestamp(ts + delta, timezone.utc), + ) + ) + return timespans + + def _get_tuples(self, src: str, rec: str) -> List[Tuple[int, np.ndarray]]: + src_idx = self._sta_index(src) + rec_idx = self._sta_index(rec) - return [ - CrossCorrelation(src, rec, params, remove_nan_rows(data)) - for (src, rec, params), data in zip(channel_params, ccs) - ] - - def _read_cc_metadata(self, metadata: Dict[str, Any]): - channels_dict = metadata[CHANNELS_ATTR] - channel_params = [ - (ChannelType(src, src_loc), ChannelType(rec, rec_loc), params) - for src, src_loc, rec, rec_loc, params in channels_dict - ] + with self._lock: + time_tuples = self.items.get(src_idx, {}).get(rec_idx, None) + return time_tuples - return channel_params - def _get_path(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> str: - stations = self._get_station_pair(src_sta, rec_sta) - times = timespan_str(timespan) - return f"{stations}/{times}" +T = TypeVar("T", bound=AnnotatedData) -class HierarchicalStackStoreBase: +class HierarchicalStoreBase(Generic[T]): """ - A class for reading and writing stack data files. Hierarchy is: + A CC and Stack store bases class that uses hierarchical files for storage. The directory organization is as follows: / - station_pair (group) - stack name (group) - component (array) + src_sta/ + rec_sta/ + timespan + + The specific file format stored at the timespan level is delegated to the ArrayStore helper class. """ - def __init__(self, store: ArrayStore) -> None: + def __init__( + self, + helper: ArrayStore, + loader_func: Callable[[List[Tuple[np.ndarray, Dict[str, Any]]]], List[T]], + ) -> None: super().__init__() - self.helper = store - - def append(self, src: Station, rec: Station, stacks: List[Stack]): - path = self._get_station_path(src, rec) - all_stacks, metadata = AnnotatedData.pack(stacks) + self.helper = helper + self.dir_cache = PairDirectoryCache() + self.loader_func = loader_func + + def contains(self, src_sta: Station, rec_sta: Station, timespan: DateTimeRange) -> bool: + src = str(src_sta) + rec = str(rec_sta) + if self.dir_cache.contains(src, rec, timespan): + return True + self._load_src(src) + return self.dir_cache.contains(src, rec, timespan) + + def _load_src(self, src: str): + if self.dir_cache.is_src_loaded(src): + return + paths = [self.helper.parse_path(p) for p in self.helper.get_fs().find(fs_join(self.helper.get_root_dir(), src))] + grouped_paths = defaultdict(list) + for rec_sta, timespan in [p for p in paths if p]: + grouped_paths[rec_sta].append(timespan) + for rec_sta, timespans in grouped_paths.items(): + self.dir_cache.add(src, rec_sta, sorted(timespans, key=lambda t: t.start_datetime.timestamp())) + + def append(self, timespan: DateTimeRange, src: Station, rec: Station, data: List[T]): + path = self._get_path(src, rec, timespan) + packed_data, metadata = AnnotatedData.pack(data) tlog = TimeLogger(logger, logging.DEBUG) - self.helper.append(path, {STACKS_ATTR: metadata, VERSION_ATTR: 1.0}, all_stacks) - tlog.log(f"writing {len(stacks)} stacks to {path}") + self.helper.append(path, {META_ATTR: metadata, VERSION_ATTR: 1.0}, packed_data) + tlog.log(f"writing {len(data)} arrays to {path}") + self.dir_cache.add(str(src), str(rec), [timespan]) - def get_station_pairs(self) -> List[Tuple[Station, Station]]: - return self.helper.get_station_pairs() + def get_timespans(self, src: Station, rec: Station) -> List[DateTimeRange]: + self._load_src(str(src)) + return self.dir_cache.get_timespans(str(src), str(rec)) - def read_stacks(self, src: Station, rec: Station) -> List[Stack]: - path = self._get_station_path(src, rec) + def get_station_pairs(self) -> List[Tuple[Station, Station]]: + if not self.helper.get_fs().exists(self.helper.get_root_dir()): + return [] # fsspec.ls throws if the directory doesn't exist + + def is_sta(path): + return self.helper.get_fs().isdir(path) and Station.parse(Path(path).parts[-1]) + + # ls gets us the first level + sta = self.helper.get_fs().ls(self.helper.get_root_dir()) + # make sure we have stations + sta = [s for s in sta if is_sta(s)] + # ls the second level in parallel + with ThreadPoolExecutor() as exec: + sub_ls = list(exec.map(self.helper.get_fs().ls, sta)) + + sub_ls = [item for sublist in sub_ls for item in sublist if is_sta(item)] + pairs = [(Station.parse(Path(p).parts[-2]), Station.parse(Path(p).parts[-1])) for p in sub_ls] + return [p for p in pairs if p[0] and p[1]] + + def read(self, timespan: DateTimeRange, src: Station, rec: Station) -> List[T]: + path = self._get_path(src, rec, timespan) tuple = self.helper.read(path) if not tuple: return [] - arrays, metadata = tuple - stacks = unstack(arrays) - stacks_params = metadata[STACKS_ATTR] - return [ - Stack(comp, name, params, remove_nans(data)) for (comp, name, params), data in zip(stacks_params, stacks) - ] + array, metadata = tuple + arrays = unstack(array) + meta = metadata[META_ATTR] + tuples = list(zip(arrays, meta)) + return self.loader_func(tuples) - def _get_station_path(self, src: Station, rec: Station) -> str: - return f"{src}_{rec}" + def _get_path(self, src: Station, rec: Station, timespan: DateTimeRange) -> str: + return f"{src}/{rec}/{timespan_str(timespan)}" diff --git a/src/noisepy/seis/numpystore.py b/src/noisepy/seis/numpystore.py index 201e20a3..3ba407ca 100644 --- a/src/noisepy/seis/numpystore.py +++ b/src/noisepy/seis/numpystore.py @@ -1,23 +1,17 @@ import io import json import logging -import re import tarfile from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, Optional, Tuple import numpy as np from datetimerange import DateTimeRange -from noisepy.seis.datatypes import Station, to_json_types -from noisepy.seis.hierarchicalstores import ( - ArrayStore, - HierarchicalCCStoreBase, - HierarchicalStackStoreBase, -) - -from .stores import parse_station_pair, parse_timespan -from .utils import fs_join, get_filesystem, get_fs_sep +from .datatypes import CrossCorrelation, Stack, to_json_types +from .hierarchicalstores import ArrayStore, HierarchicalStoreBase +from .stores import parse_timespan +from .utils import fs_join logger = logging.getLogger(__name__) @@ -28,25 +22,13 @@ class NumpyArrayStore(ArrayStore): def __init__(self, root_dir: str, mode: str, storage_options={}) -> None: - super().__init__() + super().__init__(root_dir, storage_options) logger.info(f"store creating at {root_dir}, mode={mode}, storage_options={storage_options}") + # TODO: This needs to come in as part of the storage_options storage_options["client_kwargs"] = {"region_name": "us-west-2"} self.root_path = root_dir self.storage_options = storage_options - self.fs = get_filesystem(root_dir, storage_options=storage_options) - path = Path(root_dir) - prefix = get_fs_sep(root_dir).join(path.parts[1:]) - self.prefix_regex = re.compile(f".*{re.escape(prefix)}/") logger.info(f"Numpy store created at {root_dir}") - self.raw_paths = None - - def load_paths(self) -> List[str]: - paths = self.fs.find(self.root_path) - return [self._clean(p) for p in paths if p.endswith(TAR_GZ_EXTENSION)] - - def _clean(self, path: str) -> str: - # go from full file paths to just the STA_STA/TIMESTAMP - return self.prefix_regex.sub("", path).removesuffix(TAR_GZ_EXTENSION) def append(self, path: str, params: Dict[str, Any], data: np.ndarray): logger.debug(f"Appending to {path}: {data.shape}") @@ -61,8 +43,8 @@ def add_file_bytes(tar, name, f): tar.addfile(ti, fileobj=f) dir = fs_join(self.root_path, str(Path(path).parent)) - self.fs.makedirs(dir, exist_ok=True) - with self.fs.open(fs_join(self.root_path, path + TAR_GZ_EXTENSION), "wb") as f: + self.get_fs().makedirs(dir, exist_ok=True) + with self.get_fs().open(fs_join(self.root_path, path + TAR_GZ_EXTENSION), "wb") as f: with tarfile.open(fileobj=f, mode="w:gz") as tar: with io.BytesIO() as npyf: np.save(npyf, data, allow_pickle=False) @@ -72,52 +54,48 @@ def add_file_bytes(tar, name, f): add_file_bytes(tar, FILE_ARRAY_NPY, npyf) add_file_bytes(tar, FILE_PARAMS_JSON, jsf) + def parse_path(self, path: str) -> Optional[Tuple[str, DateTimeRange]]: + if not path.endswith(TAR_GZ_EXTENSION): + return None + path = path.removesuffix(TAR_GZ_EXTENSION) + parts = Path(path).parts + if len(parts) < 2: + return None + ts = parse_timespan(parts[-1]) + if ts is None: + return None + return (parts[-2], ts) + def read(self, path: str) -> Optional[Tuple[np.ndarray, Dict[str, Any]]]: file = fs_join(self.root_path, path + TAR_GZ_EXTENSION) - if not self.fs.exists(file): + if not self.get_fs().exists(file): + return None + + try: + with self.get_fs().open(file, "rb") as f: + with tarfile.open(fileobj=f, mode="r:gz") as tar: + npy_mem = tar.getmember(FILE_ARRAY_NPY) + with tar.extractfile(npy_mem) as f: + array_file = io.BytesIO() + array_file.write(f.read()) + array_file.seek(0) + array = np.load(array_file, allow_pickle=False) + params_mem = tar.getmember(FILE_PARAMS_JSON) + with tar.extractfile(params_mem) as f: + params = json.load(f) + return (array, params) + except Exception as e: + logger.error(f"Error reading {file}: {e}") return None - with self.fs.open(file, "rb") as f: - with tarfile.open(fileobj=f, mode="r:gz") as tar: - npy_mem = tar.getmember(FILE_ARRAY_NPY) - with tar.extractfile(npy_mem) as f: - array_file = io.BytesIO() - array_file.write(f.read()) - array_file.seek(0) - array = np.load(array_file, allow_pickle=False) - params_mem = tar.getmember(FILE_PARAMS_JSON) - with tar.extractfile(params_mem) as f: - params = json.load(f) - return (array, params) - - -class NumpyCCStore(HierarchicalCCStoreBase): - def __init__(self, root_dir: str, mode: str = "a", storage_options={}) -> None: - super().__init__(NumpyArrayStore(root_dir, mode, storage_options=storage_options)) - - def get_timespans(self) -> List[DateTimeRange]: - paths = self.helper.load_paths() - timespans = set(Path(p).name for p in paths) - timespans.discard(None) - return list(map(parse_timespan, sorted(timespans))) - - def get_station_pairs(self) -> List[Tuple[Station, Station]]: - paths = self.helper.load_paths() - # the pairs are the parent directories - paths = set(Path(p).parts[-2] for p in paths) - - pairs = [parse_station_pair(k) for k in paths] - return [p for p in pairs if p] - - -class NumpyStackStore(HierarchicalStackStoreBase): - def __init__(self, root_dir: str, mode: str = "a", storage_options={}) -> None: - super().__init__(NumpyArrayStore(root_dir, mode, storage_options=storage_options)) - - def get_station_pairs(self) -> List[Tuple[Station, Station]]: - paths = self.helper.load_paths() - # the pairs are the files - paths = set(Path(p).name.removesuffix(TAR_GZ_EXTENSION) for p in paths) - - pairs = [parse_station_pair(k) for k in paths] - return [p for p in pairs if p] + +class NumpyStackStore(HierarchicalStoreBase[Stack]): + def __init__(self, root_dir: str, mode: str = "a", storage_options={}): + super().__init__(NumpyArrayStore(root_dir, mode, storage_options=storage_options), Stack.load_instances) + + +class NumpyCCStore(HierarchicalStoreBase[CrossCorrelation]): + def __init__(self, root_dir: str, mode: str = "a", storage_options={}): + super().__init__( + NumpyArrayStore(root_dir, mode, storage_options=storage_options), CrossCorrelation.load_instances + ) diff --git a/src/noisepy/seis/plotting_modules.py b/src/noisepy/seis/plotting_modules.py index 8430a072..377c1f73 100644 --- a/src/noisepy/seis/plotting_modules.py +++ b/src/noisepy/seis/plotting_modules.py @@ -155,7 +155,7 @@ def plot_substack_cc( logger.error("No data available for plotting") return - ccs = cc_store.read_correlations(ts, sta_pairs[0][0], sta_pairs[0][1]) + ccs = cc_store.read(ts, sta_pairs[0][0], sta_pairs[0][1]) if len(ccs) == 0: logger.error(f"No data available for plotting in {ts}/{sta_pairs[0]}") return @@ -663,12 +663,14 @@ def plot_all_moveout( raise ValueError("sdir argument must be provided if savefig=True") sta_pairs = store.get_station_pairs() + ts = store.get_timespans(sta_pairs[0][0], sta_pairs[0][1]) + ts = ts[0] if len(ts) else None if len(sta_pairs) == 0: logger.error("No data available for plotting") return # Read some common arguments from the first available data set: - stacks = store.read_stacks(sta_pairs[0][0], sta_pairs[0][1]) + stacks = store.read(ts, sta_pairs[0][0], sta_pairs[0][1]) dtmp = stacks[0].data params = stacks[0].parameters if len(params) == 0 or dtmp.size == 0: @@ -696,7 +698,9 @@ def plot_all_moveout( # load cc and parameter matrix for ii, (src, rec) in enumerate(sta_pairs): - stacks = store.read_stacks(src, rec) + stacks = store.read(ts, src, rec) + ts = store.get_timespans(src, rec) + ts = ts[0] if len(ts) else None stacks = list(filter(lambda x: x.name == stack_name and x.component == ccomp, stacks)) if len(stacks) == 0: logger.warning(f"No data available for {src}_{rec}/{stack_name}/{ccomp}") diff --git a/src/noisepy/seis/stack.py b/src/noisepy/seis/stack.py index 05cf1ebd..d2a25a58 100644 --- a/src/noisepy/seis/stack.py +++ b/src/noisepy/seis/stack.py @@ -56,19 +56,16 @@ def stack( t_tot = tlog.reset() def initializer(): - timespans = sorted(cc_store.get_timespans(), key=lambda x: str(x)) # Important to have them sorted when we distribute work across nodes pairs_all = sorted(cc_store.get_station_pairs(), key=lambda x: str(x)) - if len(timespans) == 0 or len(pairs_all) == 0: + if len(pairs_all) == 0: raise IOError(NO_CCF_DATA_MSG) - logger.info( - f"Station pairs: {len(pairs_all)}, timespans:{len(timespans)}. From: {timespans[0]} to {timespans[-1]}" - ) + logger.info(f"Station pairs: {len(pairs_all)}") - return timespans, pairs_all + return [pairs_all] - timespans, pairs_all = scheduler.initialize(initializer, 2) + [pairs_all] = scheduler.initialize(initializer, 1) # Get the pairs that need to be processed by this node pairs_node = [pairs_all[i] for i in scheduler.get_indices(pairs_all)] @@ -80,10 +77,7 @@ def initializer(): logger.info(f"Stack already exists for {p[0]}-{p[1]}") continue missing_pairs.append(p) - tasks = [ - executor.submit(stack_store_pair, p[0], p[1], timespans, cc_store, stack_store, fft_params) - for p in missing_pairs - ] + tasks = [executor.submit(stack_store_pair, p[0], p[1], cc_store, stack_store, fft_params) for p in missing_pairs] results = get_results(tasks, "Stacking Pairs") executor.shutdown() scheduler.synchronize() @@ -100,19 +94,20 @@ def initializer(): def stack_store_pair( src_sta: Station, rec_sta: Station, - timespans: List[DateTimeRange], cc_store: CrossCorrelationDataStore, stack_store: StackStore, fft_params: ConfigParameters, ) -> bool: logger.info(f"Stacking {src_sta}_{rec_sta}") try: + timespans = cc_store.get_timespans(src_sta, rec_sta) stacks = stack_pair(src_sta, rec_sta, timespans, cc_store, fft_params) if len(stacks) == 0: logger.warning(f"No stacks for {src_sta}_{rec_sta}") return False tlog = TimeLogger(logger=logger, level=logging.INFO) - stack_store.append(src_sta, rec_sta, stacks) + ts = DateTimeRange(timespans[0].start_datetime, timespans[-1].end_datetime) + stack_store.append(ts, src_sta, rec_sta, stacks) tlog.log(f"writing stack pair {(src_sta, rec_sta)}") return True except Exception as e: @@ -164,7 +159,7 @@ def stack_pair( iseg = 0 for ts in timespans: # load the data from daily compilation - cross_correlations = cc_store.read_correlations(ts, src_sta, rec_sta) + cross_correlations = cc_store.read(ts, src_sta, rec_sta) logger.debug(f"path_list for {src_sta}-{rec_sta}: {cross_correlations}") # seperate auto and cross-correlation diff --git a/src/noisepy/seis/stores.py b/src/noisepy/seis/stores.py index abe3dc86..84f90390 100644 --- a/src/noisepy/seis/stores.py +++ b/src/noisepy/seis/stores.py @@ -2,18 +2,17 @@ import os import re from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional, Tuple +from typing import Generic, List, Optional, Tuple, TypeVar -import numpy as np import obspy from datetimerange import DateTimeRange from noisepy.seis.constants import DATE_FORMAT from .datatypes import ( + AnnotatedData, Channel, ChannelData, - ChannelType, CrossCorrelation, Stack, Station, @@ -49,9 +48,16 @@ def get_inventory(self, timespan: DateTimeRange, station: Station) -> obspy.Inve pass -class CrossCorrelationDataStore: +T = TypeVar("T", bound=AnnotatedData) + + +class ComputedDataStore(Generic[T]): + """ + A class for reading and writing cross-correlation data + """ + @abstractmethod - def contains(self, timespan: DateTimeRange, src: Station, rec: Station) -> bool: + def contains(self, src: Station, rec: Station, timespan: DateTimeRange) -> bool: pass @abstractmethod @@ -60,12 +66,12 @@ def append( timespan: DateTimeRange, src: Station, rec: Station, - ccs: List[CrossCorrelation], + ccs: List[T], ): pass @abstractmethod - def get_timespans(self) -> List[DateTimeRange]: + def get_timespans(self, src_sta: Station, rec_sta: Station) -> List[DateTimeRange]: pass @abstractmethod @@ -73,35 +79,20 @@ def get_station_pairs(self) -> List[Tuple[Station, Station]]: pass @abstractmethod - def read_correlations(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> List[CrossCorrelation]: + def read(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> List[T]: pass - # private helpers - def _get_station_pair(self, src_sta: Station, rec_sta: Station) -> str: - return f"{src_sta}_{rec_sta}" - def _get_channel_pair(self, src_chan: ChannelType, rec_chan: ChannelType) -> str: - return f"{src_chan}_{rec_chan}" +class CrossCorrelationDataStore(ComputedDataStore[CrossCorrelation]): + pass -class StackStore: +class StackStore(ComputedDataStore[Stack]): """ A class for reading and writing stack data """ - @abstractmethod - def append( - self, src: Station, rec: Station, components: str, name: str, stack_params: Dict[str, Any], data: np.ndarray - ): - pass - - @abstractmethod - def get_station_pairs(self) -> List[Tuple[Station, Station]]: - pass - - @abstractmethod - def read_stacks(self, src: Station, rec: Station) -> List[Stack]: - pass + pass def timespan_str(timespan: DateTimeRange) -> str: @@ -109,17 +100,10 @@ def timespan_str(timespan: DateTimeRange) -> str: def parse_station_pair(pair: str) -> Optional[Tuple[Station, Station]]: - # Parse from: CI.ARV_CI.BAK - def station(sta: str) -> Optional[Station]: - parts = sta.split(".") - if len(parts) != 2: - return None - return Station(parts[0], parts[1]) - if re.match(r"([A-Z0-9]+)\.([A-Z0-9]+)_([A-Z0-9]+)\.([A-Z0-9]+)", pair, re.IGNORECASE) is None: return None - tup = tuple(map(station, pair.split("_"))) + tup = tuple(map(Station.parse, pair.split("_"))) if None in tup: return None return tup diff --git a/src/noisepy/seis/zarrstore.py b/src/noisepy/seis/zarrstore.py index a4e3f879..613fa913 100644 --- a/src/noisepy/seis/zarrstore.py +++ b/src/noisepy/seis/zarrstore.py @@ -1,20 +1,14 @@ import logging -import re -from typing import Any, Dict, List, Optional, Tuple +from pathlib import Path +from typing import Any, Dict, Optional, Tuple import numpy as np import zarr from datetimerange import DateTimeRange -from noisepy.seis.constants import DONE_PATH -from noisepy.seis.datatypes import Station -from noisepy.seis.hierarchicalstores import ( - ArrayStore, - HierarchicalCCStoreBase, - HierarchicalStackStoreBase, -) - -from .stores import parse_station_pair, parse_timespan +from .datatypes import CrossCorrelation, Stack +from .hierarchicalstores import ArrayStore, HierarchicalStoreBase +from .stores import parse_timespan logger = logging.getLogger(__name__) @@ -30,24 +24,14 @@ class ZarrStoreHelper(ArrayStore): """ def __init__(self, root_dir: str, mode: str, storage_options={}) -> None: - super().__init__() - # We don't want to cache the data, but we do want to use the keys() cache - CACHE_SIZE = 1 - logger.info( - f"store creating at {root_dir}, mode={mode}, storage_options={storage_options}, cache_size={CACHE_SIZE}" - ) + super().__init__(root_dir, storage_options) + logger.info(f"store creating at {root_dir}, mode={mode}, storage_options={storage_options}") # TODO: storage_options["client_kwargs"] = {"region_name": "us-west-2"} store = zarr.storage.FSStore(root_dir, **storage_options) - self.cache = zarr.LRUStoreCache(store, max_size=CACHE_SIZE) - self.root = zarr.open_group(self.cache, mode=mode) - self.clean_regex = re.compile("/.z.*") + self.root = zarr.open_group(store, mode=mode) logger.info(f"store created at {root_dir}: {type(store)}. Zarr version {zarr.__version__}") - def load_paths(self) -> List[str]: - # the keys() returned contains the full file paths, so we remove suffixes like /.zgroup, /.zarray - return [self.clean_regex.sub("", key) for key in self.cache.keys()] - def append(self, path: str, params: Dict[str, Any], data: np.ndarray): logger.debug(f"Appending to {path}: {data.shape}") array = self.root.create_dataset( @@ -66,27 +50,25 @@ def read(self, path: str) -> Optional[Tuple[np.ndarray, Dict[str, Any]]]: metadata.update(array.attrs) return (array[:], metadata) + def parse_path(self, path: str) -> Optional[Tuple[str, DateTimeRange]]: + if not path.endswith("0.0"): + return None + parts = Path(path).parts + if len(parts) < 3: + return None + ts = parse_timespan(parts[-2]) + if ts is None: + return None + return (parts[-3], ts) + -class ZarrCCStore(HierarchicalCCStoreBase): +class ZarrCCStore(HierarchicalStoreBase): def __init__(self, root_dir: str, mode: str = "a", storage_options={}) -> None: helper = ZarrStoreHelper(root_dir, mode, storage_options=storage_options) - super().__init__(helper) - - def get_timespans(self) -> List[DateTimeRange]: - pairs = [k for k in self.helper.root.group_keys() if k != DONE_PATH] - timespans = [] - for p in pairs: - timespans.extend(k for k in self.helper.root[p].array_keys()) - return list(parse_timespan(t) for t in sorted(set(timespans))) + super().__init__(helper, CrossCorrelation.load_instances) - def get_station_pairs(self) -> List[Tuple[Station, Station]]: - return [parse_station_pair(k) for k in self.helper.root.group_keys() if k != DONE_PATH] - -class ZarrStackStore(HierarchicalStackStoreBase): +class ZarrStackStore(HierarchicalStoreBase): def __init__(self, root_dir: str, mode: str = "a", storage_options={}) -> None: helper = ZarrStoreHelper(root_dir, mode, storage_options=storage_options) - super().__init__(helper) - - def get_station_pairs(self) -> List[Tuple[Station, Station]]: - return [parse_station_pair(k) for k in self.helper.root.array_keys() if k != DONE_PATH] + super().__init__(helper, Stack.load_instances) diff --git a/tests/test_ccstores.py b/tests/test_ccstores.py index 904c1c53..2e2bb89c 100644 --- a/tests/test_ccstores.py +++ b/tests/test_ccstores.py @@ -1,8 +1,6 @@ from datetime import datetime, timedelta, timezone -from pathlib import Path import numpy as np -import pytest from datetimerange import DateTimeRange from noisepy.seis.asdfstore import ASDFCCStore @@ -12,72 +10,70 @@ from noisepy.seis.zarrstore import ZarrCCStore -# Use the built in tmp_path fixture: https://docs.pytest.org/en/7.1.x/how-to/tmp_path.html -# to create CC stores -@pytest.fixture -def asdfstore(tmp_path: Path) -> ASDFCCStore: - return ASDFCCStore(str(tmp_path)) - +def make_1dts(dt: datetime): + dt = dt.replace(tzinfo=timezone.utc, microsecond=0) + return DateTimeRange(dt, dt + timedelta(days=1)) -@pytest.fixture -def zarrstore(tmp_path: Path) -> ZarrCCStore: - return ZarrCCStore(str(tmp_path)) - -@pytest.fixture -def numpystore(tmp_path: Path) -> NumpyCCStore: - return NumpyCCStore(str(tmp_path)) +ts1 = make_1dts(datetime.now()) +ts2 = make_1dts(ts1.end_datetime) +src = Channel(ChannelType("foo"), Station("nw", "sta1")) +rec = Channel(ChannelType("bar"), Station("nw", "sta2")) def _ccstore_test_helper(ccstore: CrossCorrelationDataStore): - def make_1dts(dt: datetime): - dt = dt.replace(tzinfo=timezone.utc, microsecond=0) - return DateTimeRange(dt, dt + timedelta(days=1)) - data = np.random.random((10, 10)) - ts1 = make_1dts(datetime.now()) - ts2 = make_1dts(ts1.end_datetime) - src = Channel(ChannelType("foo"), Station("nw", "sta1")) - rec = Channel(ChannelType("bar"), Station("nw", "sta2")) params = {"key": "Value"} # assert empty state - assert not ccstore.contains(ts1, src.station, rec.station) - assert not ccstore.contains(ts2, src.station, rec.station) + assert not ccstore.contains(src.station, rec.station, ts1) + assert not ccstore.contains(src.station, rec.station, ts2) # add CC (src->rec) for ts1 ccstore.append(ts1, src.station, rec.station, [CrossCorrelation(src.type, rec.type, params, data)]) # assert ts1 is there, but not ts2 - assert ccstore.contains(ts1, src.station, rec.station) - assert not ccstore.contains(ts2, src.station, rec.station) + assert ccstore.contains(src.station, rec.station, ts1) + assert not ccstore.contains(src.station, rec.station, ts2) # also rec->src should not be there for ts1 - assert not ccstore.contains(ts1, rec.station, src.station) + assert not ccstore.contains(rec.station, src.station, ts1) # now add CC for ts2 ccstore.append(ts2, src.station, rec.station, [CrossCorrelation(src.type, rec.type, {}, data)]) - assert ccstore.contains(ts2, src.station, rec.station) - - timespans = ccstore.get_timespans() - assert timespans == [ts1, ts2] - sta_pairs = ccstore.get_station_pairs() - assert sta_pairs == [(src.station, rec.station)] - ccs = ccstore.read_correlations(ts1, sta_pairs[0][0], sta_pairs[0][1]) + sta_pairs = check_populated_store(ccstore) + ccs = ccstore.read(ts1, sta_pairs[0][0], sta_pairs[0][1]) cha_pairs = [(c.src, c.rec) for c in ccs] assert cha_pairs == [(src.type, rec.type)] assert params == ccs[0].parameters assert np.all(data == ccs[0].data) - wrong_ccs = ccstore.read_correlations(ts1, src.station, Station("nw", "wrong")) + wrong_ccs = ccstore.read(ts1, src.station, Station("nw", "wrong")) assert len(wrong_ccs) == 0 -def test_asdfccstore(asdfstore: ASDFCCStore): - _ccstore_test_helper(asdfstore) +def check_populated_store(ccstore): + assert ccstore.contains(src.station, rec.station, ts2) + + timespans = ccstore.get_timespans(src.station, rec.station) + assert timespans == [ts1, ts2] + sta_pairs = ccstore.get_station_pairs() + assert sta_pairs == [(src.station, rec.station)] + return sta_pairs + + +# Use the built in tmp_path fixture: https://docs.pytest.org/en/7.1.x/how-to/tmp_path.html +def test_asdfccstore(tmp_path): + path = str(tmp_path) + _ccstore_test_helper(ASDFCCStore(path)) + check_populated_store(ASDFCCStore(tmp_path)) -def test_zarrccstore(zarrstore: ZarrCCStore): - _ccstore_test_helper(zarrstore) +def test_zarrccstore(tmp_path): + path = str(tmp_path) + _ccstore_test_helper(ZarrCCStore(path)) + check_populated_store(ZarrCCStore(path)) -def test_numpyccstore(numpystore: NumpyCCStore): - _ccstore_test_helper(numpystore) +def test_numpyccstore(tmp_path): + path = str(tmp_path) + _ccstore_test_helper(NumpyCCStore(path)) + check_populated_store(NumpyCCStore(path)) diff --git a/tests/test_cli.sh b/tests/test_cli.sh index a8281a0c..b462905c 100755 --- a/tests/test_cli.sh +++ b/tests/test_cli.sh @@ -10,9 +10,10 @@ fi echo "FORMAT is _${FORMAT}_" RUNNER_TEMP=~/test_temp_${FORMAT} +# RUNNER_TEMP=s3://carlosgjs-noisepy/test_new +# aws s3 rm --recursive $RUNNER_TEMP +rm -rf $RUNNER_TEMP CCF=$RUNNER_TEMP/CCF -# CCF=s3://carlosgjs-noisepy/test_batch/CCF_scratch -# aws s3 rm --recursive $CCF STACK=$RUNNER_TEMP/STACK RAW=~/s3tmp/scedc/ XML=~/s3tmp/FDSNstationXML @@ -52,3 +53,6 @@ noisepy stack --ccf_path $CCF \ --format=${FORMAT} \ --logfile=$LOGFILE \ --loglevel=${LOG_LEVEL} + +du -ch $RUNNER_TEMP +tree $RUNNER_TEMP diff --git a/tests/test_hierarchicalstores.py b/tests/test_hierarchicalstores.py new file mode 100644 index 00000000..ba73f3ab --- /dev/null +++ b/tests/test_hierarchicalstores.py @@ -0,0 +1,87 @@ +from typing import Tuple + +import pytest +from datetimerange import DateTimeRange +from utils import range + +from noisepy.seis.hierarchicalstores import PairDirectoryCache +from noisepy.seis.numpystore import NumpyArrayStore +from noisepy.seis.zarrstore import ZarrStoreHelper + + +def test_dircache(): + cache = PairDirectoryCache() + + ts1 = range(4, 1, 2) + ts2 = range(4, 2, 3) + ts3 = range(2, 1, 2) + + assert not cache.contains("src", "rec", ts1) + assert cache.get_pairs() == [] + assert not cache.is_src_loaded("src") + cache.add("src", "rec", []) + assert cache.is_src_loaded("src") + cache.add("src", "rec", [ts1, ts2]) + assert cache.contains("src", "rec", ts1) + assert cache.contains("src", "rec", ts2) + + cache.add("src", "rec", [ts3]) + + def check_1day(): + assert cache.contains("src", "rec", ts1) + assert cache.contains("src", "rec", ts2) + assert cache.contains("src", "rec", ts3) + + assert not cache.contains("src", "rec2", ts1) + assert not cache.contains("src2", "rec", ts1) + + check_1day() + assert cache.get_timespans("src", "rec") == [ts3, ts1, ts2] + assert cache.get_timespans("src2", "rec2") == [] + + tsh1 = range(4, 1, 1, 0, 1) + assert not cache.contains("src", "rec", tsh1) + cache.add("src", "rec", [tsh1]) + assert cache.contains("src", "rec", tsh1) + check_1day() + assert cache.get_timespans("src", "rec") == [ts3, ts1, ts2, tsh1] + + with pytest.raises(ValueError): + cache.add("src", "rec", [ts1, tsh1]) + + +numpy_paths = [ + ( + "some/path/CI.BAK/CI.ARV/2021_07_01_00_00_00T2021_07_02_00_00_00.tar.gz", + ("CI.ARV", range(7, 1, 2)), + ), + ("some/path/CI.BAK/CI.BAK_CI.ARV/2021_07_01_00_00_00.tar.gz", None), + ("some/path/CI.BAK/CI.BAK_CI.ARV/2021_07_01_00_00_00.tar.gz", None), + ("path/2021_07_01_00_00_00.tar.gz", None), + ("2021_07_01_00_00_00.tar.gz", None), + ("some/path/CI.BAK/CI.BAK_CI.ARV/2021_07_01_00_00_00T2021_07_02_00_00_00.TXT", None), +] + + +@pytest.mark.parametrize("path,expected", numpy_paths) +def test_numpy_parse_path(path: str, expected: Tuple[str, DateTimeRange]): + store = NumpyArrayStore("some/path", "r") + assert store.parse_path(path) == expected + + +zarr_paths = [ + ( + "some/path/CI.BAK/CI.ARV/2021_07_01_00_00_00T2021_07_02_00_00_00/0.0.0", + ("CI.ARV", range(7, 1, 2)), + ), + ("some/path/CI.BAK/CI.BAK_CI.ARV/2021_07_01_00_00_00/0.0.0", None), + ("some/path/CI.BAK/CI.BAK/2021_07_01_00_00_00/.zgroup", None), + ("path/non_ts/0.0.0", None), + ("too_short/0.0.0", None), +] + + +@pytest.mark.parametrize("path,expected", zarr_paths) +def test_zarr_parse_path(tmp_path, path: str, expected: Tuple[str, DateTimeRange]): + store = ZarrStoreHelper(str(tmp_path), "a") + assert store.parse_path(path) == expected diff --git a/tests/test_stack.py b/tests/test_stack.py index 6c04b4f9..37712bac 100644 --- a/tests/test_stack.py +++ b/tests/test_stack.py @@ -62,6 +62,6 @@ def test_stack_pair(): ccs = [CrossCorrelation(p[0], p[1], params, data) for p in pairs] - cc_store.read_correlations.return_value = ccs + cc_store.read.return_value = ccs stacks = stack_pair(sta, sta, [ts, ts], cc_store, config) assert len(stacks) == 6 diff --git a/tests/test_stackstores.py b/tests/test_stackstores.py index 4b4d717e..9903d0d8 100644 --- a/tests/test_stackstores.py +++ b/tests/test_stackstores.py @@ -2,6 +2,7 @@ import numpy as np import pytest +from utils import range from noisepy.seis.asdfstore import ASDFStackStore from noisepy.seis.datatypes import Stack, Station @@ -30,15 +31,16 @@ def numpystore(tmp_path: Path) -> NumpyStackStore: def _stackstore_test_helper(store: StackStore): src = Station("nw", "sta1") rec = Station("nw", "sta2") + ts = range(4, 1, 2) stack1 = Stack("EE", "Allstack_linear", {"key1": "value1"}, np.random.random(10)) stack2 = Stack("NZ", "Allstack_robust", {"key2": "value2"}, np.random.random(7)) stacks = [stack1, stack2] - store.append(src, rec, stacks) + store.append(ts, src, rec, stacks) sta_pairs = store.get_station_pairs() assert sta_pairs == [(src, rec)] - read_stacks = store.read_stacks(src, rec) + read_stacks = store.read(ts, src, rec) assert len(read_stacks) == len(stacks) for s1, s2 in zip(read_stacks, stacks): assert s1.name == s2.name @@ -47,7 +49,7 @@ def _stackstore_test_helper(store: StackStore): assert s1.data.shape == s2.data.shape assert np.all(s1.data == s2.data) - bad_read = store.read_stacks(Station("nw", "sta3"), rec) + bad_read = store.read(ts, Station("nw", "sta3"), rec) assert len(bad_read) == 0 diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 00000000..f29a7fe5 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,10 @@ +from datetime import datetime, timezone + +from datetimerange import DateTimeRange + + +def range(month: int, start_day: int, end_day: int, start_hr: int = 0, end_hr: int = 0): + return DateTimeRange( + datetime(2021, month, start_day, start_hr).replace(tzinfo=timezone.utc), + datetime(2021, month, end_day, end_hr).replace(tzinfo=timezone.utc), + ) diff --git a/tutorials/noisepy_scedc_tutorial.ipynb b/tutorials/noisepy_scedc_tutorial.ipynb index e1a93080..cfa9b9aa 100644 --- a/tutorials/noisepy_scedc_tutorial.ipynb +++ b/tutorials/noisepy_scedc_tutorial.ipynb @@ -64,7 +64,9 @@ }, "outputs": [], "source": [ - "from noisepy.seis import cross_correlate, stack, plotting_modules, __version__ # noisepy core functions\n", + "%load_ext autoreload\n", + "%autoreload 2\n", + "from noisepy.seis import cross_correlate, stack, __version__ # noisepy core functions\n", "from noisepy.seis.asdfstore import ASDFCCStore, ASDFStackStore # Object to store ASDF data within noisepy\n", "from noisepy.seis.scedc_s3store import SCEDCS3DataStore, channel_filter # Object to query SCEDC data from on S3\n", "from noisepy.seis.datatypes import ConfigParameters, StackMethod # Main configuration object\n",