Skip to content

Commit

Permalink
Refactor DB and context (#273)
Browse files Browse the repository at this point in the history
* bump

* register led default

* refactor rundb
  • Loading branch information
cfuselli authored Sep 20, 2023
1 parent 8842f68 commit e373239
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 75 deletions.
9 changes: 5 additions & 4 deletions amstrax/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
export, __all__ = strax.exporter()
__all__ += ['amstrax_dir', 'to_pe']

# Current values
n_tpc_pmts = 8
n_xamsl_channel = 4
to_pe = 1

_is_jupyter = any('jupyter' in arg for arg in sys.argv)

amstrax_dir = os.path.dirname(os.path.abspath(
inspect.getfile(inspect.currentframe())))

# Current values
n_tpc_pmts = 8
n_xamsl_channel = 4
to_pe = 1


def open_test_data(file_name
Expand Down
48 changes: 31 additions & 17 deletions amstrax/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,25 @@
import sys
import amstrax as ax

common_opts_xams = dict(
# Configuration
CONFIG = {
'DEFAULT_DETECTOR': 'xams',
'DEFAULT_RUNCOLNAME': 'run',
'DEFAULT_COLLECTION': 'runs_gas'
}

PROCESSED_DATA_FOLDER = 'home/xams/data/processed'

COMMON_OPT_XAMS = dict(
register_all=[],
register=[ax.DAQReader,
ax.PulseProcessing,
ax.Peaks,
ax.PeakClassification,
ax.PeakBasics,
ax.Events,
ax.RecordsLED,
ax.LEDCalibration,
# ax.EventBasics,
# ax.EventPositions,
# ax.CorrectedAreas,
Expand All @@ -29,7 +40,7 @@
free_options=('live_data_dir',),
)

xams_common_config = dict(
XAMS_COMMON_CONFIG = dict(
n_tpc_pmts=5,
channel_map=immutabledict(
bottom=(0, 0),
Expand All @@ -38,19 +49,18 @@
))


def xams(output_folder='./strax_data', init_rundb=True, *args, **kwargs):

mongo_kwargs = dict(mongo_collname='runs',
mongo_dbname='run',
runid_field='number',
)
def xams(output_folder='./strax_data',
init_rundb=True,
mongo_kwargs=dict(mongo_collname=CONFIG['DEFAULT_COLLECTION'],
mongo_dbname=CONFIG['DEFAULT_RUNCOLNAME'],
runid_field='number'),
*args,
**kwargs):

st = strax.Context(**common_opts_xams, forbid_creation_of=ax.DAQReader.provides)
st = strax.Context(**COMMON_OPT_XAMS, forbid_creation_of=ax.DAQReader.provides)

st.set_config(xams_common_config)

processed_data_folder = 'home/xams/data/processed'

st.set_config(XAMS_COMMON_CONFIG)

st.storage = []
if init_rundb:
if mongo_kwargs is None:
Expand All @@ -61,18 +71,18 @@ def xams(output_folder='./strax_data', init_rundb=True, *args, **kwargs):
)]

st.storage += [
strax.DataDirectory(processed_data_folder,
strax.DataDirectory(PROCESSED_DATA_FOLDER,
provide_run_metadata=False,
deep_scan=False,
readonly=True),
strax.DataDirectory(output_folder),
]
print(st.storage)

return st

def context_for_daq_reader(st: strax.Context,
run_id: str,
detector: str,
detector: str = 'xams',
runs_col_kwargs: dict = None,
run_doc: dict = None,
check_exists=True,
Expand Down Expand Up @@ -108,8 +118,12 @@ def context_for_daq_reader(st: strax.Context,

live_dir = daq_config['strax_output_path']

if st.config['live_data_dir'] != live_dir:
# Check if live dir is set in the config, in case it is, set it
# to the live dir in the config
if 'live_data_dir' in st.config:
live_dir = st.config['live_data_dir']
# Print a UserWarning that the live_data_dir is overwritten
UserWarning(f'live_data_dir is overwritten to {live_dir}')

input_dir = os.path.join(live_dir, run_id)
if not os.path.exists(input_dir):
Expand Down
108 changes: 54 additions & 54 deletions amstrax/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import re
import socket
import typing
from typing import Union, Dict, Any

import pymongo
import strax
Expand All @@ -10,76 +11,79 @@

export, __all__ = strax.exporter()

default_mongo_dbname = 'run'
default_mongo_collname = 'runs'
# Configuration
CONFIG = {
'MONGO_PORT': 27017,
'DEFAULT_DBNAME': 'admin',
'DEFAULT_DETECTOR': 'xams',
'DEFAULT_RUNCOLNAME': 'run',
'DEFAULT_COLLECTION': 'runs_gas'
}

_SECRET_SERVING_PORT = {}

def _check_environment_var(key):
def check_environment_var(key: str) -> None:
"""Check if an environment variable is set."""
if key not in os.environ:
raise RuntimeError(
f"{key} not set. Please define in .bashrc file. (i.e. "
f"'export {key} = <secret {key.lower()}>')")
f"{key} not set. Please define in .bashrc file. (i.e. 'export {key}=<value>')")


def link_to_daq(
daq_host="",
daq_user=""
):
def get_env_var(key: str) -> str:
"""Retrieve the value of an environment variable after checking its existence."""
check_environment_var(key)
return os.environ[key]

"""Create an SSH tunnel to the daq machine to get access to the runsdb"""
_check_environment_var("DAQ_HOST")
daq_host = os.environ['DAQ_HOST']
print(f"Connected to daq_host {daq_host}")

_check_environment_var("DAQ_PASSWORD")
daq_password = os.environ['DAQ_PASSWORD']

def establish_ssh_tunnel(daq_host: str, daq_user: str, secret_serving_port: Dict[str, Any]) -> int:
"""Establish an SSH tunnel and return the local bind port."""
daq_password = get_env_var("DAQ_PASSWORD")
port_key = f'{daq_host}_{daq_user}'
if _SECRET_SERVING_PORT is not None and port_key in _SECRET_SERVING_PORT:
return _SECRET_SERVING_PORT[port_key]
_check_environment_var("DAQ_PASSWORD")
daq_password = os.environ['DAQ_PASSWORD']

if port_key in secret_serving_port:
return secret_serving_port[port_key]

server = SSHTunnelForwarder(
daq_host,
ssh_username=daq_user,
ssh_password=daq_password,
remote_bind_address=('127.0.0.1', 27017),
remote_bind_address=('127.0.0.1', CONFIG['MONGO_PORT']),
)
server.start()
_SECRET_SERVING_PORT[port_key] = server.local_bind_port
secret_serving_port[port_key] = server.local_bind_port
return server.local_bind_port


@export
def get_mongo_client(**link_kwargs):
"""Get a mongo client, any kwargs are passed on to link_to_daq"""
_check_environment_var('MONGO_USER')
_check_environment_var('MONGO_PASSWORD')
local_port = link_to_daq(**link_kwargs)
user = os.environ['MONGO_USER']
password = os.environ['MONGO_PASSWORD']
return pymongo.MongoClient(f'mongodb://{user}:{password}@127.0.0.1:{local_port}/admin')


# @export
# def get_mongo_collection(database_name='run',
# database_col='runs_new',
# **link_kwargs,
# ):
# """Get the runs collection"""
# return get_mongo_client(**link_kwargs)[database_name][database_col]
def get_mongo_client(daq_host: str = "",
daq_user: str = "",
secret_serving_port: Dict[str, Any] = {}
) -> pymongo.MongoClient:
"""Get a MongoDB client."""

daq_host = get_env_var('DAQ_HOST')
daq_user = get_env_var('DAQ_USER')

local_port = establish_ssh_tunnel(daq_host, daq_user, secret_serving_port)

user = get_env_var('MONGO_USER')
password = get_env_var('MONGO_PASSWORD')

return pymongo.MongoClient(f'mongodb://{user}:{password}@127.0.0.1:{local_port}/{CONFIG["DEFAULT_DBNAME"]}')

@export
def get_mongo_collection(detector,
**link_kwargs,
):
if detector == 'xams':
return get_mongo_client(**link_kwargs)['run']['runs_gas']
elif detector == 'xamsl':
return get_mongo_client(**link_kwargs)['run']['runs_new']
else:
def get_mongo_collection(detector: str = CONFIG['DEFAULT_DETECTOR'],
runcolname: str = CONFIG['DEFAULT_RUNCOLNAME'],
**link_kwargs
) -> pymongo.collection.Collection:
"""Get a specific MongoDB collection based on the detector."""
client = get_mongo_client(**link_kwargs)
collections = {
'xams': 'runs_gas',
'xamsl': 'runs_new'
}
if detector not in collections:
raise NameError(f'detector {detector} is not a valid detector name.')
return client[runcolname][collections[detector]]

@export
class RunDB(strax.StorageFrontend):
Expand All @@ -95,8 +99,8 @@ class RunDB(strax.StorageFrontend):
provide_run_metadata = True

def __init__(self,
mongo_dbname=None,
mongo_collname=None,
mongo_dbname=CONFIG['DEFAULT_DBNAME'],
mongo_collname=CONFIG['DEFAULT_COLLECTION'],
runid_field='name',
local_only=True,
new_data_path=None,
Expand Down Expand Up @@ -135,10 +139,6 @@ def __init__(self,

self.client = get_mongo_client()

if mongo_dbname is None:
mongo_dbname = default_mongo_dbname
if mongo_collname is None:
mongo_collname = default_mongo_collname
self.collection = self.client[mongo_dbname][mongo_collname]

self.backends = [
Expand Down

0 comments on commit e373239

Please sign in to comment.