Skip to content

Commit

Permalink
refactor mars_client stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
TomHodson committed Dec 18, 2023
1 parent 3c662a5 commit fba7b4b
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 41 deletions.
9 changes: 0 additions & 9 deletions src/__init__.py

This file was deleted.

54 changes: 37 additions & 17 deletions src/ionbeam/core/html_formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import string, random
import pyodc as odc
from pathlib import Path
from typing import BinaryIO


def random_id(n):
Expand Down Expand Up @@ -101,26 +102,45 @@ def make_section(title, contents, open=False):
</details>"""


def odb_summary(filepath):
df = odc.read_odb(filepath, single=True)

def odb_info(filepath: str | BinaryIO) -> dict:
"Read an ODB file and generate some useful info about it"
r = odc.Reader(filepath)
codecs = r.frames[0]._column_codecs
assert len(r.frames) == 1
frame = r.frames[0]
codecs = frame._column_codecs

# Could do an optimisation here where only load the varying columns into memory
# Would require peering into pyodc/codec.py to get the value of each constant codec
df = frame.dataframe()

varying_columns = [c for c in df if df[c].nunique() > 1]

return dict(
as_dataframe=df[varying_columns],
properties=r.frames[0].properties,
codecs=codecs,
summary=pd.DataFrame(
zip(
df.dtypes,
df.iloc[0],
[df[c].nunique() for c in df],
[c.name for c in codecs],
),
columns=["dtype", "First Entry", "Unique Entries", "ODB codec"],
index=df.columns,
),
)

properties = pd.DataFrame(r.frames[0].properties, index=[0]).transpose().to_html(header=False, notebook=True)

summary = pd.DataFrame(
zip(
df.dtypes,
df.iloc[0],
[df[c].nunique() for c in df],
[c.name for c in codecs],
),
columns=["dtype", "First Entry", "Unique Entries", "ODB codec"],
index=df.columns,
).to_html(notebook=True)
def odb_to_html(filepath: str | Path | BinaryIO):
if isinstance(filepath, Path):
info = odb_info(str(filepath))
else:
info = odb_info(filepath)

full_file = df.to_html(notebook=True, max_rows=20)
properties = pd.DataFrame(info["properties"], index=[0]).transpose().to_html(header=False, notebook=True)
summary = info["summary"].to_html(notebook=True)
full_file = info["as_dataframe"].to_html(notebook=True, max_rows=20)

return f"""
Expand All @@ -146,7 +166,7 @@ def summarise_file(filepath: Path):
return None
size = human_readable_bytes(filepath.stat().st_size)
if filepath.suffix == ".odb":
return make_section(f"ODB File Data ({size})", odb_summary(filepath))
return make_section(f"ODB File Data ({size})", odb_to_html(filepath))
else:
with open(filepath, "rb") as f:
data = f.read(500).decode(errors="replace")
Expand Down
52 changes: 52 additions & 0 deletions src/ionbeam/writers/aviso_notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# #
# # (C) Copyright 2023 ECMWF.
# #
# # This software is licensed under the terms of the Apache Licence Version 2.0
# # which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# # In applying this licence, ECMWF does not waive the privileges and immunities
# # granted to it by virtue of its status as an intergovernmental organisation nor
# # does it submit to any jurisdiction.
# #

from typing import Iterable, List, Literal

import pandas as pd

import dataclasses

from ..core.bases import Writer, Message, FileMessage, FinishMessage
from ..core.aviso import send_aviso_notification

import logging

logger = logging.getLogger(__name__)


@dataclasses.dataclass
class AVISONotifier(Writer):
def __str__(self):
return f"{self.__class__.__name__}()"

def init(self, globals):
super().init(globals)
self.metadata = dataclasses.replace(self.metadata, state="aviso_notified")

def process(self, message: FileMessage | FinishMessage) -> Iterable[Message]:
if isinstance(message, FinishMessage):
return

request = {"database": "fdbdev", "class": "rd", "source": message.metadata.filepath}
odb_keys = {k.key: k.value for k in message.metadata.mars_keys if not k.reason == "Skipped"}
request = odb_keys | request
request = {k: mars_value_formatters.get(k, str)(v) for k, v in request.items()}

# Send a notification to AVISO that we put this data into the DB
response = send_aviso_notification(request)
logger.debug("Aviso response {response}")

# TODO: the explicit mars_keys should not be necessary here.
metadata = self.generate_metadata(message, mars_keys=message.metadata.mars_keys)
output_msg = FileMessage(metadata=metadata)

assert output_msg.metadata.mars_keys is not None
yield self.tag_message(output_msg, message)
21 changes: 21 additions & 0 deletions src/ionbeam/writers/construct_mars_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# #
# # (C) Copyright 2023 ECMWF.
# #
# # This software is licensed under the terms of the Apache Licence Version 2.0
# # which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# # In applying this licence, ECMWF does not waive the privileges and immunities
# # granted to it by virtue of its status as an intergovernmental organisation nor
# # does it submit to any jurisdiction.
# #

mars_value_formatters = {
"time": lambda t: f"{t:04d}",
}


def construct_mars_request(message, override=dict(database="fdbdev")):
source = {"source": message.metadata.filepath}
odb_keys = {k.key: k.value for k in message.metadata.mars_keys if not k.reason == "Skipped"}
request = odb_keys | source | override # rightmost dict takes precedence here
request = {k: mars_value_formatters.get(k, str)(v) for k, v in request.items()}
return request
36 changes: 36 additions & 0 deletions src/ionbeam/writers/fdb_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# #

from typing import Iterable, List, Literal
from pathlib import Path

import pandas as pd

Expand All @@ -17,12 +18,34 @@
from ..core.bases import Writer, Message, FileMessage, FinishMessage

import logging
import pyfdb
import findlibs
import os
import yaml

logger = logging.getLogger(__name__)


@dataclasses.dataclass
class FB5Config:
type: str = "remote"
host: str = "localhost"
port: int = 7654
dataPortStart: int = 40000
dataPortCount: int = 100
schema: Path | None = None

def asdict(self):
d = dataclasses.asdict(self)
d["schema"] = str(d["schema"])
return d


@dataclasses.dataclass
class FDBWriter(Writer):
FDB5_client_config: FB5Config
debug: list[str] = dataclasses.field(default_factory=list)

def __str__(self):
return f"{self.__class__.__name__}()"

Expand All @@ -36,6 +59,19 @@ def process(self, input_message: FileMessage | FinishMessage) -> Iterable[Messag

assert input_message.metadata.filepath is not None

fdb5_path = findlibs.find("fdb5")
logger.debug(f"FDBWriter using fdb5 shared library from {fdb5_path}")

os.environ["FDB5_CONFIG"] = yaml.dump(self.FDB5_client_config.asdict())

for lib in self.debug:
os.environ[f"{lib.upper()}_DEBUG"] = "1"

fdb = pyfdb.FDB()

with open(input_message.metadata.filepath, "rb") as f:
fdb.archive(f.read())

metadata = self.generate_metadata(input_message)
output_msg = FileMessage(metadata=metadata)
yield self.tag_message(output_msg, input_message)
18 changes: 3 additions & 15 deletions src/ionbeam/writers/mars_client_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from ..core.bases import Writer, Message, FileMessage, FinishMessage
from ..core.aviso import send_aviso_notification
from .construct_mars_request import construct_mars_request


import logging

Expand Down Expand Up @@ -61,15 +63,6 @@ def run_temp_mars_request(file):
# print("Output: \n{}\n".format(output))


def time_format(i):
return f"{i:04d}"


mars_value_formatters = {
"time": time_format,
}


@dataclasses.dataclass
class MARSWriter(Writer):
def __str__(self):
Expand All @@ -86,12 +79,7 @@ def process(self, message: FileMessage | FinishMessage) -> Iterable[Message]:
assert message.metadata.mars_keys is not None
assert message.metadata.filepath is not None

request = {"database": "fdbdev", "class": "rd", "source": message.metadata.filepath}

odb_keys = {k.key: k.value for k in message.metadata.mars_keys if not k.reason == "Skipped"}
request = odb_keys | request

request = {k: mars_value_formatters.get(k, str)(v) for k, v in request.items()}
request = construct_mars_request(message)

with tempfile.NamedTemporaryFile() as fp:
mars_request = write_temp_mars_request(request, file=fp.name)
Expand Down

0 comments on commit fba7b4b

Please sign in to comment.