Skip to content

Commit

Permalink
refactor: Decouple data loading from plot_all_moveout (#244)
Browse files Browse the repository at this point in the history
* refactor: Decouple data loading from plot_all_moveout

* fix zarrstore and code coverage
  • Loading branch information
carlosgjs authored Oct 17, 2023
1 parent b55b0f2 commit 1b5fbd7
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 39 deletions.
11 changes: 8 additions & 3 deletions src/noisepy/seis/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,15 @@ def to_json_types(params: Dict[str, Any]) -> Dict[str, Any]:

def _to_json_type(value: Any) -> Any:
# special case since numpy arrays are not json serializable
if type(value) == np.ndarray:
if isinstance(value, np.ndarray):
return list(map(_to_json_type, value))
elif type(value) == np.float64 or type(value) == np.float32:
elif isinstance(value, np.float64) or isinstance(value, np.float32):
return float(value)
elif type(value) == np.int64 or type(value) == np.int32 or type(value) == np.int16 or type(value) == np.int8:
elif (
isinstance(value, np.int64)
or isinstance(value, np.int32)
or isinstance(value, np.int16)
or isinstance(value, np.int8)
):
return int(value)
return value
6 changes: 3 additions & 3 deletions src/noisepy/seis/numpystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from .datatypes import CrossCorrelation, Stack, to_json_types
from .hierarchicalstores import ArrayStore, HierarchicalStoreBase
from .stores import parse_timespan
from .stores import CrossCorrelationDataStore, StackStore, parse_timespan
from .utils import fs_join

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -89,12 +89,12 @@ def read(self, path: str) -> Optional[Tuple[np.ndarray, Dict[str, Any]]]:
return None


class NumpyStackStore(HierarchicalStoreBase[Stack]):
class NumpyStackStore(HierarchicalStoreBase[Stack], StackStore):
def __init__(self, root_dir: str, mode: str = "a", storage_options={}):
super().__init__(NumpyArrayStore(root_dir, mode, storage_options=storage_options), Stack.load_instances)


class NumpyCCStore(HierarchicalStoreBase[CrossCorrelation]):
class NumpyCCStore(HierarchicalStoreBase[CrossCorrelation], CrossCorrelationDataStore):
def __init__(self, root_dir: str, mode: str = "a", storage_options={}):
super().__init__(
NumpyArrayStore(root_dir, mode, storage_options=storage_options), CrossCorrelation.load_instances
Expand Down
33 changes: 15 additions & 18 deletions src/noisepy/seis/plotting_modules.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple

import matplotlib.pyplot as plt
import numpy as np
Expand All @@ -11,8 +11,8 @@
from obspy.signal.filter import bandpass
from scipy.fftpack import next_fast_len

from noisepy.seis.stores import CrossCorrelationDataStore, StackStore
from noisepy.seis.utils import TimeLogger, get_results
from noisepy.seis.datatypes import Stack, Station
from noisepy.seis.stores import CrossCorrelationDataStore

logging.getLogger("matplotlib.font_manager").disabled = True
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -630,7 +630,7 @@ def plot_substack_all_spect(


def plot_all_moveout(
store: StackStore,
sta_stacks: List[Tuple[Station, Station, Stack]],
stack_name,
freqmin,
freqmax,
Expand Down Expand Up @@ -664,15 +664,16 @@ def plot_all_moveout(
if sdir is None:
raise ValueError("sdir argument must be provided if savefig=True")

sta_pairs = store.get_station_pairs()
ts = store.get_timespans(sta_pairs[0][0], sta_pairs[0][1])
ts = ts[0] if len(ts) else None
if len(sta_pairs) == 0:
if len(sta_stacks) == 0:
logger.error("No data available for plotting")
return

# Read some common arguments from the first available data set:
stacks = store.read(ts, sta_pairs[0][0], sta_pairs[0][1])
stacks = sta_stacks[0][1]
stacks = list(filter(lambda x: x.name == stack_name and x.component == ccomp, stacks))
if len(stacks) == 0:
logger.error(f"No data available for plotting {stack_name}/{ccomp}")
return
dtmp = stacks[0].data
params = stacks[0].parameters
if len(params) == 0 or dtmp.size == 0:
Expand All @@ -681,7 +682,7 @@ def plot_all_moveout(

dt, maxlag = (params[p] for p in ["dt", "maxlag"])
stack_method = stack_name.split("0")[-1]
print(stack_name, stack_method)
logger.info(f"Plottting: {stack_method}, {len(sta_stacks)} station pairs")

# lags for display
if not disp_lag:
Expand All @@ -693,15 +694,14 @@ def plot_all_moveout(
indx2 = indx1 + 2 * int(disp_lag / dt) + 1

# cc matrix
nwin = len(sta_pairs)
nwin = len(sta_stacks)
data = np.zeros(shape=(nwin, indx2 - indx1), dtype=np.float32)
dist = np.zeros(nwin, dtype=np.float32)
ngood = np.zeros(nwin, dtype=np.int16)

# load cc and parameter matrix
def load(ii):
(src, rec) = sta_pairs[ii]
stacks = store.read(ts, src, rec)
(src, rec), stacks = sta_stacks[ii]
stacks = list(filter(lambda x: x.name == stack_name and x.component == ccomp, stacks))
if len(stacks) == 0:
logger.warning(f"No data available for {src}_{rec}/{stack_name}/{ccomp}")
Expand All @@ -712,11 +712,8 @@ def load(ii):
crap = bandpass(all_data, freqmin, freqmax, int(1 / dt), corners=4, zerophase=True)
data[ii] = crap[indx1:indx2]

tlog = TimeLogger(level=logging.INFO)
with ThreadPoolExecutor() as exec:
futures = [exec.submit(load, ii) for ii in range(nwin)]
_ = get_results(futures)
tlog.log(f"loading {nwin} stacks")
for i in range(nwin):
load(i)

# average cc
ntrace = int(np.round(np.max(dist) + 0.51) / dist_inc)
Expand Down
15 changes: 15 additions & 0 deletions src/noisepy/seis/stores.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import datetime
import logging
import os
import re
from abc import ABC, abstractmethod
from concurrent.futures import Executor, ThreadPoolExecutor
from typing import Generic, List, Optional, Tuple, TypeVar

import obspy
from datetimerange import DateTimeRange

from noisepy.seis.constants import DATE_FORMAT
from noisepy.seis.utils import TimeLogger, get_results

from .datatypes import (
AnnotatedData,
Expand Down Expand Up @@ -82,6 +85,18 @@ def get_station_pairs(self) -> List[Tuple[Station, Station]]:
def read(self, timespan: DateTimeRange, src_sta: Station, rec_sta: Station) -> List[T]:
pass

def read_bulk(
self, timespan: DateTimeRange, pairs: List[Tuple[Station, Station]], executor: Executor = ThreadPoolExecutor()
) -> List[Tuple[Tuple[Station, Station], List[T]]]:
"""
Reads the data for all the given station pairs (and timespan) in parallel.
"""
tlog = TimeLogger(level=logging.INFO)
futures = [executor.submit(self.read, timespan, p[0], p[1]) for p in pairs]
results = get_results(futures)
tlog.log(f"loading {len(pairs)} stacks")
return list(zip(pairs, results))


class CrossCorrelationDataStore(ComputedDataStore[CrossCorrelation]):
pass
Expand Down
6 changes: 3 additions & 3 deletions src/noisepy/seis/zarrstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from .datatypes import CrossCorrelation, Stack
from .hierarchicalstores import ArrayStore, HierarchicalStoreBase
from .stores import parse_timespan
from .stores import CrossCorrelationDataStore, StackStore, parse_timespan

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -62,13 +62,13 @@ def parse_path(self, path: str) -> Optional[Tuple[str, DateTimeRange]]:
return (parts[-3], ts)


class ZarrCCStore(HierarchicalStoreBase):
class ZarrCCStore(HierarchicalStoreBase, CrossCorrelationDataStore):
def __init__(self, root_dir: str, mode: str = "a", storage_options={}) -> None:
helper = ZarrStoreHelper(root_dir, mode, storage_options=storage_options)
super().__init__(helper, CrossCorrelation.load_instances)


class ZarrStackStore(HierarchicalStoreBase):
class ZarrStackStore(HierarchicalStoreBase, StackStore):
def __init__(self, root_dir: str, mode: str = "a", storage_options={}) -> None:
helper = ZarrStoreHelper(root_dir, mode, storage_options=storage_options)
super().__init__(helper, Stack.load_instances)
5 changes: 5 additions & 0 deletions tests/test_stackstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def _stackstore_test_helper(store: StackStore):
bad_read = store.read(ts, Station("nw", "sta3"), rec)
assert len(bad_read) == 0

sta_stacks = store.read_bulk(ts, [(src, rec)])
assert len(sta_stacks) == 1
assert sta_stacks[0][0] == (src, rec)
assert len(sta_stacks[0][1]) == len(stacks)


def test_asdfstore(asdfstore: ASDFStackStore):
_stackstore_test_helper(asdfstore)
Expand Down
24 changes: 19 additions & 5 deletions tutorials/get_started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@
"source": [
"config.stations = [\"A*\"]\n",
"config.channels = [\"BHE\",\"BHN\",\"BHZ\"]\n",
"config.start_date = isoparse(\"2019-02-01\")\n",
"config.end_date = isoparse(\"2019-02-02\")\n",
"config.start_date = isoparse(\"2019-02-01T00:00:00Z\")\n",
"config.end_date = isoparse(\"2019-02-02T00:00:00Z\")\n",
"\n",
"# Download data locally. Enters raw data path, channel types, stations, config, and fdsn server.\n",
"download(raw_data_path, config)"
Expand Down Expand Up @@ -288,7 +288,8 @@
},
"outputs": [],
"source": [
"timespans = cc_store.get_timespans()\n",
"pairs = cc_store.get_station_pairs()\n",
"timespans = cc_store.get_timespans(*pairs[0])\n",
"plotting_modules.plot_substack_cc(cc_store, timespans[0], 0.1, 1, 200, False)"
]
},
Expand All @@ -314,10 +315,23 @@
"source": [
"# open a new cc store in read-only mode since we will be doing parallel access for stacking\n",
"cc_store = ASDFCCStore(cc_data_path, mode=\"r\")\n",
"print(cc_store.get_station_pairs())\n",
"stack_store = ASDFStackStore(stack_data_path)\n",
"config.stations = [\"*\"] # stacking doesn't support prefixes yet, so allow all stations\n",
"stack_cross_correlations(cc_store, stack_store, config)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pairs = stack_store.get_station_pairs()\n",
"print(f\"Found {len(pairs)} station pairs\")\n",
"sta_stacks = stack_store.read_bulk(None, pairs) # no timestamp used in ASDFStackStore"
]
},
{
"cell_type": "markdown",
"metadata": {
Expand Down Expand Up @@ -347,7 +361,7 @@
},
"outputs": [],
"source": [
"plotting_modules.plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
"plotting_modules.plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
]
}
],
Expand All @@ -374,7 +388,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.10.13"
}
},
"nbformat": 4,
Expand Down
15 changes: 13 additions & 2 deletions tutorials/noisepy_pnwstore_tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,17 @@
"print(os.listdir(stack_data_path))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pairs = stack_store.get_station_pairs()\n",
"print(f\"Found {len(pairs)} station pairs\")\n",
"sta_stacks = stack_store.read_bulk(None, pairs) # no timestamp used in ASDFStackStore"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -344,7 +355,7 @@
},
"outputs": [],
"source": [
"plotting_modules.plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
"plotting_modules.plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
]
}
],
Expand Down Expand Up @@ -372,7 +383,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.10.13"
}
},
"nbformat": 4,
Expand Down
15 changes: 13 additions & 2 deletions tutorials/noisepy_scedc_tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,17 @@
"cc_store.get_station_pairs()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pairs = stack_store.get_station_pairs()\n",
"print(f\"Found {len(pairs)} station pairs\")\n",
"sta_stacks = stack_store.read_bulk(None, pairs) # no timestamp used in ASDFStackStore"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -444,7 +455,7 @@
},
"outputs": [],
"source": [
"plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
"plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
]
},
{
Expand Down Expand Up @@ -479,7 +490,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.10.13"
}
},
"nbformat": 4,
Expand Down
40 changes: 37 additions & 3 deletions tutorials/plot_stacks.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
"from noisepy.seis import __version__ # noisepy core functions\n",
"from noisepy.seis.plotting_modules import plot_all_moveout\n",
"from noisepy.seis.numpystore import NumpyStackStore\n",
"from noisepy.seis.datatypes import Stack, Station\n",
"import random\n",
"print(f\"Using NoisePy version {__version__}\")\n",
"\n",
"\n",
"stack_data_path = \"s3://scoped-noise/scedc_CI_stack_v2/\"\n",
"stack_data_path = \"s3://scoped-noise/scedc_CI_2022_stack/\"\n",
"\n",
"S3_STORAGE_OPTIONS = {\"s3\": {\"anon\": False}}\n",
"stack_store = NumpyStackStore(stack_data_path, storage_options=S3_STORAGE_OPTIONS)"
]
Expand All @@ -38,7 +39,40 @@
"metadata": {},
"outputs": [],
"source": [
"plot_all_moveout(stack_store, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
"pairs = stack_store.get_station_pairs()\n",
"print(f\"Found {len(pairs)} station pairs\")\n",
"# Get the first timespan available for the first pair\n",
"ts = stack_store.get_timespans(*pairs[0])[0]\n",
"print(f\"Timespan: {ts}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# load 10% of the data to plot\n",
"sample = random.sample(pairs, int(len(pairs)*.1))\n",
"print(len(sample))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sta_stacks = stack_store.read_bulk(ts, sample)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"plot_all_moveout(sta_stacks, 'Allstack_linear', 0.1, 0.2, 'ZZ', 1)"
]
}
],
Expand Down

0 comments on commit 1b5fbd7

Please sign in to comment.