Skip to content

Commit

Permalink
Merge branch 'develop' into make-events
Browse files Browse the repository at this point in the history
  • Loading branch information
calum-chamberlain authored Jun 26, 2024
2 parents 9e109eb + f17b328 commit 27762dd
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 20 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
## Current
- core.match_filter.tribe
- Add option to set minimum number of stations required to use a template in detect
(`min_stations` kwarg)

## 0.5.0
* core.match_filter.tribe
- Significant re-write of detect logic to take advantage of parallel steps (see #544)
- Significant re-structure of hidden functions.
Expand Down
22 changes: 19 additions & 3 deletions eqcorrscan/core/match_filter/helpers/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,18 @@ def _pre_processor(
break
Logger.debug("Getting stream from queue")
st = _get_and_check(input_stream_queue, poison_queue)
Logger.warning(st)
if st is None:
Logger.info("Ran out of streams, stopping processing")
break
elif isinstance(st, Poison):
Logger.error("Killed")
break
if len(st) == 0:
break
# if len(st) == 0:
# Logger.error("Empty stream provided")
# poison_queue.put_nowait(Poison(IndexError
# ("No matching channels between stream and ")))
# break
Logger.info(f"Processing stream:\n{st}")

# Process stream
Expand Down Expand Up @@ -445,6 +449,7 @@ def _prepper(
output_queue: Queue,
poison_queue: Queue,
xcorr_func: str = None,
min_stations: int = 0,
):
"""
Prepare templates and stream for correlation.
Expand Down Expand Up @@ -472,6 +477,8 @@ def _prepper(
Queue to check for poison, or put poison into if something goes awry
:param xcorr_func:
Name of correlation function backend to be used.
:param min_stations:
Minimum number of stations to run a template.
"""
if isinstance(templates, dict):
# We have been passed a db of template files on disk
Expand Down Expand Up @@ -525,16 +532,25 @@ def _prepper(
f"Multiple channels in continuous data for "
f"{', '.join(_duplicate_sids)}")))
break
template_sids = {tr.id for template in templates for tr in template.st}
if len(template_sids.intersection(st_sids)) == 0:
poison_queue.put_nowait(Poison(
IndexError("No matching channels between templates and data")))
# Do the grouping for this stream
Logger.info(f"Grouping {len(templates)} templates into groups "
f"of {group_size} templates")
try:
template_groups = _group(sids=st_sids, templates=templates,
group_size=group_size, groups=groups)
group_size=group_size, groups=groups,
min_stations=min_stations)
except Exception as e:
Logger.error(e)
poison_queue.put_nowait(Poison(e))
break
if template_groups is None:
output_queue.put_nowait(None)
return

Logger.info(f"Grouped into {len(template_groups)} groups")
for i, template_group in enumerate(template_groups):
killed = _check_for_poison(poison_queue)
Expand Down
25 changes: 18 additions & 7 deletions eqcorrscan/core/match_filter/helpers/tribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import numpy as np

from collections import defaultdict
from typing import List, Set
from typing import List, Set, Union
from timeit import default_timer

from concurrent.futures import ThreadPoolExecutor
Expand Down Expand Up @@ -261,36 +261,47 @@ def _group(
sids: Set[str],
templates: List[Template],
group_size: int,
groups: List[List[str]] = None
) -> List[List[Template]]:
groups: List[List[str]] = None,
min_stations: int = 0,
) -> Union[List[List[Template]], None]:
"""
Group templates either by seed id, or using pre-computed groups
:param sids: Seed IDs available in stream
:param templates: Templates to group
:param group_size: Maximum group size
:param groups: [Optional] List of List of template names in groups
:return: Groups of templates.
:param min_stations: Minimum number of stations to run a template.
:return: Groups of templates. None if no templates meet criteria
"""
Logger.info(f"Grouping for {sids}")
if groups:
Logger.info("Using pre-computed groups")
t_dict = {t.name: t for t in templates}
stations = {sid.split('.')[1] for sid in sids}
template_groups = []
for grp in groups:
template_group = [
t_dict.get(t_name) for t_name in grp
if t_name in t_dict.keys()]
if t_name in t_dict.keys() and
len({tr.stats.station for tr in
t_dict.get(t_name).st}.intersection(stations)
) >= max(1, min_stations)]
Logger.info(f"Dropping {len(grp) - len(template_group)} templates "
f"due to fewer than {min_stations} matched channels")
if len(template_group):
template_groups.append(template_group)
return template_groups
template_groups = group_templates_by_seedid(
templates=templates,
st_seed_ids=sids,
group_size=group_size)
group_size=group_size,
min_stations=min_stations)
if len(template_groups) == 1 and len(template_groups[0]) == 0:
Logger.error("No matching ids between stream and templates")
raise IndexError("No matching ids between stream and templates")
return None
# raise IndexError("No matching ids between stream and templates")
return template_groups


Expand Down
8 changes: 6 additions & 2 deletions eqcorrscan/core/match_filter/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ def group_templates_by_seedid(
templates: List[Template],
st_seed_ids: Set[str],
group_size: int,
min_stations: int = 0,
) -> List[List[Template]]:
"""
Group templates to reduce dissimilar traces
Expand All @@ -771,6 +772,9 @@ def group_templates_by_seedid(
Seed ids in the stream to be matched with
:param group_size:
Maximum group size - will not exceed this size
:param min_stations:
Minimum number of overlapping stations between template
and stream to use the template for detection.
:return:
List of lists of templates grouped.
Expand All @@ -788,9 +792,9 @@ def group_templates_by_seedid(
# Don't use templates that don't have any overlap with the stream
template_seed_ids = tuple(
(t_name, t_chans) for t_name, t_chans in template_seed_ids
if len(t_chans))
if len({sid.split('.')[1] for sid in t_chans}) >= max(1, min_stations))
Logger.info(f"Dropping {len(templates) - len(template_seed_ids)} "
f"templates due to no matched channels")
f"templates due to fewer than {min_stations} matched channels")
# We will need this dictionary at the end for getting the templates by id
template_dict = {t.name: t for t in templates}
# group_size can be None, in which case we don't actually need to group
Expand Down
27 changes: 20 additions & 7 deletions eqcorrscan/core/match_filter/tribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False,
ignore_bad_data=False, group_size=None, overlap="calculate",
full_peaks=False, save_progress=False, process_cores=None,
pre_processed=False, check_processing=True, make_events=True,
**kwargs):
min_stations=0, **kwargs):
"""
Detect using a Tribe of templates within a continuous stream.
Expand Down Expand Up @@ -749,6 +749,10 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False,
:type check_processing: bool
:param check_processing:
Whether to check that all templates were processed the same.
:type min_stations: int
:param min_stations:
Minimum number of overlapping stations between template
and stream to use the template for detection.
:return:
:class:`eqcorrscan.core.match_filter.Party` of Families of
Expand Down Expand Up @@ -871,7 +875,7 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False,
ignore_bad_data, group_size, groups, sampling_rate, threshold,
threshold_type, save_progress, xcorr_func, concurrency, cores,
export_cccsums, parallel, peak_cores, trig_int, full_peaks,
plot, plotdir, plot_format, make_events,)
plot, plotdir, plot_format, make_events, min_stations,)

if concurrent_processing:
party = self._detect_concurrent(*args, **inner_kwargs)
Expand Down Expand Up @@ -899,7 +903,7 @@ def _detect_serial(
group_size, groups, sampling_rate, threshold, threshold_type,
save_progress, xcorr_func, concurrency, cores, export_cccsums,
parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format,
make_events,**kwargs
make_events, min_stations, **kwargs
):
""" Internal serial detect workflow. """
from eqcorrscan.core.match_filter.helpers.tribe import (
Expand Down Expand Up @@ -929,7 +933,10 @@ def _detect_serial(
delta = st_chunk[0].stats.delta
template_groups = _group(
sids={tr.id for tr in st_chunk},
templates=self.templates, group_size=group_size, groups=groups)
templates=self.templates, group_size=group_size, groups=groups,
min_stations=min_stations)
if template_groups is None:
continue
for i, template_group in enumerate(template_groups):
templates = [_quick_copy_stream(t.st) for t in template_group]
template_names = [t.name for t in template_group]
Expand Down Expand Up @@ -987,7 +994,7 @@ def _detect_concurrent(
group_size, groups, sampling_rate, threshold, threshold_type,
save_progress, xcorr_func, concurrency, cores, export_cccsums,
parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format,
make_events, **kwargs
make_events, min_stations, **kwargs
):
""" Internal concurrent detect workflow. """
from eqcorrscan.core.match_filter.helpers.processes import (
Expand Down Expand Up @@ -1063,6 +1070,7 @@ def _detect_concurrent(
output_queue=prepped_queue,
poison_queue=poison_queue,
xcorr_func=xcorr_func,
min_stations=min_stations,
),
name="PrepProcess"
)
Expand Down Expand Up @@ -1226,7 +1234,8 @@ def client_detect(self, client, starttime, endtime, threshold,
ignore_bad_data=False, group_size=None,
return_stream=False, full_peaks=False,
save_progress=False, process_cores=None, retries=3,
check_processing=True, make_events=True, **kwargs):
check_processing=True, make_events=True,

Check failure on line 1237 in eqcorrscan/core/match_filter/tribe.py

View workflow job for this annotation

GitHub Actions / flake8-linter

W291 trailing whitespace
min_stations=0, **kwargs):
"""
Detect using a Tribe of templates within a continuous stream.
Expand Down Expand Up @@ -1318,6 +1327,10 @@ def client_detect(self, client, starttime, endtime, threshold,
:param retries:
Number of attempts allowed for downloading - allows for transient
server issues.
:type min_stations: int
:param min_stations:
Minimum number of overlapping stations between template
and stream to use the template for detection.
:return:
:class:`eqcorrscan.core.match_filter.Party` of Families of
Expand Down Expand Up @@ -1428,7 +1441,7 @@ def client_detect(self, client, starttime, endtime, threshold,
return_stream=return_stream, check_processing=False,
poison_queue=poison_queue, shutdown=False,
concurrent_processing=concurrent_processing, groups=groups,
make_events=make_events)
make_events=make_events, min_stations=min_stations)

if not concurrent_processing:
Logger.warning("Using concurrent_processing=True can be faster if"
Expand Down
46 changes: 46 additions & 0 deletions eqcorrscan/tests/matched_filter/match_filter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,52 @@ def test_tribe_detect(self):
party=party, party_in=self.party, float_tol=0.05,
check_event=True)

def test_min_stations(self):
"""
Check that minimum stations is respected and templates
without sufficient stations are not run.
"""
local_tribe = self.tribe.copy()
# Drop some channels
local_tribe[0].st = local_tribe[0].st[0:-2]
for conc_proc in [False, True]:
party = local_tribe.detect(
stream=self.unproc_st, threshold=8.0, threshold_type='MAD',
trig_int=6.0, daylong=False, plot=False, min_stations=5,
parallel_process=False, concurrent_processing=conc_proc)
self.assertEqual(len(party), 3)

@pytest.mark.network
def test_min_stations_network(self):
"""
Check that minimum stations is respected and templates
without sufficient stations are not run.
"""
local_tribe = self.tribe.copy()
# Drop some channels
local_tribe[0].st = local_tribe[0].st[0:-2]
for conc_proc in [False, True]:
party = local_tribe.client_detect(
client=Client('NCEDC'), starttime=UTCDateTime(2004, 9, 28, 17),
endtime=UTCDateTime(2004, 9, 28, 18),
threshold=8.0, threshold_type='MAD',
trig_int=6.0, daylong=False, plot=False, min_stations=5,
parallel_process=False, concurrent_processing=conc_proc)
self.assertEqual(len(party), 3)

def test_no_stations(self):
"""
Check that minimum stations is respected and templates
without sufficient stations are not run.
"""
local_tribe = self.tribe.copy()
for conc_proc in [False, True]:
party = local_tribe.detect(
stream=self.unproc_st, threshold=8.0, threshold_type='MAD',
trig_int=6.0, daylong=False, plot=False, min_stations=6,
parallel_process=False, concurrent_processing=conc_proc)
self.assertEqual(len(party), 0)

def test_tribe_detect_with_empty_streams(self):
"""
Compare the detect method for a tribe of one vs two templates and check
Expand Down
2 changes: 1 addition & 1 deletion eqcorrscan/utils/plotting.py
Original file line number Diff line number Diff line change
Expand Up @@ -1843,7 +1843,7 @@ def freq_mag(magnitudes, completeness, max_mag, binsize=0.2, **kwargs):
"""
import matplotlib.pyplot as plt
# Ensure magnitudes are sorted
magnitudes.sort()
magnitudes = sorted(magnitudes)
# Check that there are no nans or infs
if np.isnan(magnitudes).any():
Logger.warning('Found nan values, removing them')
Expand Down

0 comments on commit 27762dd

Please sign in to comment.