Skip to content

Commit

Permalink
Support for S3 and Zarr (#160)
Browse files Browse the repository at this point in the history
* Support for S3 and Zarr

* Fix for 3.8
  • Loading branch information
carlosgjs authored Jul 31, 2023
1 parent 12760e3 commit c241239
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 20 deletions.
25 changes: 21 additions & 4 deletions src/noisepy/seis/datatypes.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
from __future__ import annotations

import sys
import typing
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import List, Optional
from typing import Any, DefaultDict, Dict, List, Optional
from urllib.parse import urlparse

import numpy as np
import obspy
from pydantic import BaseModel, ConfigDict, Field
from pydantic.functional_validators import model_validator
from pydantic_yaml import parse_yaml_raw_as, to_yaml_str

from noisepy.seis.utils import get_filesystem

INVALID_COORD = -sys.float_info.max


Expand Down Expand Up @@ -175,6 +180,16 @@ class ConfigParameters(BaseModel):
correction_csv: Optional[str] = Field(default=None, description="Path to e.g. meso_angles.csv")
# 'RESP', or 'polozeros' to remove response

storage_options: DefaultDict[str, typing.MutableMapping] = Field(
default=defaultdict(dict),
description="Storage options to pass to fsspec, keyed by protocol (local files are ''))",
)

def get_storage_options(self, path: str) -> Dict[str, Any]:
"""The storage options for the given path"""
url = urlparse(path)
return self.storage_options.get(url.scheme, {})

@property
def dt(self) -> float:
return 1.0 / self.samp_freq
Expand All @@ -194,11 +209,13 @@ def __getitem__(self, key):

def save_yaml(self, filename: str):
yaml_str = to_yaml_str(self)
with open(filename, "w") as f:
fs = get_filesystem(filename, storage_options=self.storage_options)
with fs.open(filename, "w") as f:
f.write(yaml_str)

def load_yaml(filename: str) -> ConfigParameters:
with open(filename, "r") as f:
def load_yaml(filename: str, storage_options={}) -> ConfigParameters:
fs = get_filesystem(filename, storage_options=storage_options)
with fs.open(filename, "r") as f:
yaml_str = f.read()
config = parse_yaml_raw_as(ConfigParameters, yaml_str)
return config
Expand Down
18 changes: 11 additions & 7 deletions src/noisepy/seis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,33 +158,37 @@ def run_download():
download(args.raw_data_path, params)
params.save_yaml(fs_join(args.raw_data_path, CONFIG_FILE))

def get_cc_store(args, mode="a"):
def get_cc_store(args, params: ConfigParameters, mode="a"):
return (
ZarrCCStore(args.ccf_path, mode=mode)
ZarrCCStore(
args.ccf_path,
mode=mode,
storage_options=params.get_storage_options(args.ccf_path),
)
if args.format == DataFormat.ZARR.value
else ASDFCCStore(args.ccf_path, mode=mode)
)

def get_stack_store(args):
def get_stack_store(args, params: ConfigParameters):
return (
ZarrStackStore(args.stack_path, mode="a")
ZarrStackStore(args.stack_path, mode="a", storage_options=params.get_storage_options(args.stack_path))
if args.format == DataFormat.ZARR.value
else ASDFStackStore(args.stack_path, "a")
)

def run_cross_correlation():
ccf_dir = args.ccf_path
cc_store = get_cc_store(args)
params = initialize_params(args, args.raw_data_path)
cc_store = get_cc_store(args, params)
raw_store = create_raw_store(args, params)
scheduler = MPIScheduler(0) if args.mpi else SingleNodeScheduler()
cross_correlate(raw_store, params, cc_store, scheduler)
params.save_yaml(fs_join(ccf_dir, CONFIG_FILE))

def run_stack():
cc_store = get_cc_store(args, mode="r")
stack_store = get_stack_store(args)
params = initialize_params(args, args.ccf_path)
cc_store = get_cc_store(args, params, mode="r")
stack_store = get_stack_store(args, params)
scheduler = MPIScheduler(0) if args.mpi else SingleNodeScheduler()
stack(cc_store, stack_store, params, scheduler)
params.save_yaml(fs_join(args.stack_path, CONFIG_FILE))
Expand Down
2 changes: 2 additions & 0 deletions src/noisepy/seis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
def get_filesystem(path: str, storage_options: dict = {}) -> fsspec.AbstractFileSystem:
"""Construct an fsspec filesystem for the given path"""
url = urlparse(path)
# The storage_options coming from the ConfigParameters is keyed by protocol
storage_options = storage_options.get(url.scheme, storage_options)
# default to anonymous access for S3 if the this is not already specified
if url.scheme == S3_SCHEME and ANON_ARG not in storage_options:
storage_options = {ANON_ARG: True}
Expand Down
18 changes: 9 additions & 9 deletions src/noisepy/seis/zarrstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ class ZarrStoreHelper:
Helper object for storing data and parameters into Zarr and track a "done" bit for subsets of the data
'done' is a dummy array to track completed sets in its attribute dictionary.
Args:
root_dir: Storage location
mode: "r" or "w" for read-only or writing mode
root_dir: Storage location, can be a local or S3 path
mode: "r" or "a" for read-only or writing mode
dims: number dimensions of the data
storage_options: options to pass to fsspec
"""

def __init__(self, root_dir: str, mode: str, dims: int) -> None:
def __init__(self, root_dir: str, mode: str, dims: int, storage_options={}) -> None:
super().__init__()
self.dims = dims
self.root = zarr.open(root_dir, mode=mode)
self.root = zarr.open_group(root_dir, mode=mode, storage_options=storage_options)
logger.info(f"store created at {root_dir}")

def contains(self, path: str) -> bool:
Expand Down Expand Up @@ -88,9 +88,9 @@ class ZarrCCStore(CrossCorrelationDataStore):
channel_pair (array)
"""

def __init__(self, root_dir: str, mode: str = "a") -> None:
def __init__(self, root_dir: str, mode: str = "a", storage_options={}) -> None:
super().__init__()
self.helper = ZarrStoreHelper(root_dir, mode, dims=2)
self.helper = ZarrStoreHelper(root_dir, mode, dims=2, storage_options=storage_options)

def contains(self, timespan: DateTimeRange, src_chan: Channel, rec_chan: Channel) -> bool:
path = self._get_path(timespan, src_chan, rec_chan)
Expand Down Expand Up @@ -158,9 +158,9 @@ class ZarrStackStore:
component (array)
"""

def __init__(self, root_dir: str, mode: str = "a") -> None:
def __init__(self, root_dir: str, mode: str = "a", storage_options={}) -> None:
super().__init__()
self.helper = ZarrStoreHelper(root_dir, mode, dims=1)
self.helper = ZarrStoreHelper(root_dir, mode, dims=1, storage_options=storage_options)

def mark_done(self, src: Station, rec: Station):
path = self._get_station_path(src, rec)
Expand Down
16 changes: 16 additions & 0 deletions tests/test_datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,19 @@ def test_station_valid():
assert s.valid()
s = Station("CI", "BAK", -110.0, 120.1, 15.0)
assert s.valid()


def test_storage_options():
c = ConfigParameters()
assert c.storage_options == {}
# make sure missing keys default to {}
assert c.storage_options["some_key"] == {}
c.storage_options["some_key"]["some_other_key"] = 6
assert c.storage_options["some_key"]["some_other_key"] == 6

c.storage_options["s3"] = {"profile": "my_profile"}
assert c.get_storage_options("s3://my_bucket/my_file") == {"profile": "my_profile"}

# scheme is '' for local files
c.storage_options[""]["foo"] = "bar"
assert c.get_storage_options("/local/file") == {"foo": "bar"}

0 comments on commit c241239

Please sign in to comment.