Skip to content

Commit

Permalink
Merge branch 'develop' into fdbdev
Browse files Browse the repository at this point in the history
  • Loading branch information
TomHodson committed Feb 15, 2024
2 parents 3da57cd + e4840bd commit 8c2610f
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 25 deletions.
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@
lxml
pika
rich
pe
pe
findlibs
pyfdb @ git+https://github.com/ecmwf/pyfdb@develop
aviso @ git+https://github.com/ecmwf/aviso@develop
6 changes: 4 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ python_requires = >=3.10
install_requires =
pandas
numexpr
pyodc @ git+https://github.com/TomHodson/pyodc@test/string_hack
pyaviso @ git+https://github.com/ecmwf/aviso@develop
pyodc @ git+https://github.com/TomHodson/pyodc.git@test/string_hack
pyaviso @ git+https://github.com/ecmwf/aviso.git@develop
pyfdb @ git+https://github.com/ecmwf/pyfdb.git@develop
requests
oauthlib
requests-oauthlib
Expand All @@ -38,6 +39,7 @@ install_requires =
pika
rich
pe
findlibs


[options.extras_require]
Expand Down
4 changes: 1 addition & 3 deletions src/ionbeam/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
handlers = [RichHandler(markup=True, rich_tracebacks=True)]
if args.logfile:
handlers.append(logging.FileHandler(args.logfile))


# Set the log level, default is warnings, -v gives info, -vv for debug
logging.basicConfig(
Expand Down Expand Up @@ -122,11 +121,10 @@
logger.info(f" Overwrite: {globals.globals.overwrite}")
if globals.globals.ingestion_time_constants is not None:
logger.info(f" Ingestion Time Constants:")
logger.info(f" Query Timespan: {tuple(d.isoformat() for d in globals.globals.ingestion_time_constants.query_timespan)}")
logger.info(f" Query Timespan: {tuple(d.isoformat() for d in globals.globals.ingestion_time_constants.query_timespan)}") # fmt: skip
logger.info(f" Emit After (Hours): {globals.globals.ingestion_time_constants.emit_after_hours}")
logger.info(f" Granularity: {globals.globals.ingestion_time_constants.granularity}")


logger.info("Sources")
for i, a in enumerate(sources):
logger.info(f" {i} {str(a)}")
Expand Down
8 changes: 5 additions & 3 deletions src/ionbeam/writers/construct_mars_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
}


def construct_mars_request(message):
request = {"database": "fdbdev", "class": "rd", "source": str(message.metadata.filepath)}
request |= message.metadata.mars_request.as_strings()
def construct_mars_request(message, override=dict(database="fdbdev")):
source = {"source": message.metadata.filepath}
odb_keys = {k.key: k.value for k in message.metadata.mars_keys if not k.reason == "Skipped"}
request = odb_keys | source | override # rightmost dict takes precedence here
request = {k: mars_value_formatters.get(k, str)(v) for k, v in request.items()}
return request
6 changes: 3 additions & 3 deletions src/ionbeam/writers/fdb_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
class FB5Config:
type: str = "remote"
host: str = "localhost"
port: int = 7654
dataPortStart: int = 40000
dataPortCount: int = 100
port: int = 8000
schema: Path | None = None
engine: str = "remote"
store: str = "remote"

def asdict(self):
d = dataclasses.asdict(self)
Expand Down
38 changes: 25 additions & 13 deletions src/ionbeam/writers/mars_client_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from ..core.bases import Writer, Message, FileMessage, FinishMessage
from ..core.aviso import send_aviso_notification
from .construct_mars_request import construct_mars_request


import logging
Expand Down Expand Up @@ -48,31 +49,38 @@ def write_temp_mars_request(request, file, verb="archive", quote_keys={"source",
return rendered


class MarsClientError(ValueError):
pass


def make_mars_request(verb, request):
with tempfile.NamedTemporaryFile() as cmd, tempfile.NamedTemporaryFile() as output:
if verb in {"list", "retrieve"}: request = request | dict(target = output.name)

if verb in {"list", "retrieve"}:
request = request | dict(target=output.name)

mars_request_string = write_temp_mars_request(request, file=cmd.name, verb=verb)
# logger.debug(f"mars_request_string: {mars_request_string}")

try:
stderr = sb.check_output(["mars", cmd.name], stderr=sb.STDOUT, encoding='utf-8')
stderr = sb.check_output(["mars", cmd.name], stderr=sb.STDOUT, encoding="utf-8")
except sb.CalledProcessError as exc:
logger.debug(f"Status : FAIL, retcode: {exc.returncode}, output: {exc.output}", extra={"markup": None})
raise ValueError(f"MARS {verb} returned error: {exc.output}")
logger.debug(f"Status : FAIL, retcode: {exc.returncode}, output: {exc.output}")
raise MarsClientError(f"MARS {verb} returned error: {exc.output}")

return stderr, output.read().decode("utf-8")


def mars_archive(request, source):
stderr, output = make_mars_request('archive', request | {"source" : str(source)})
stderr, output = make_mars_request("archive", request | {"source": str(source)})
return stderr



def mars_list(request):
request = request | dict(output = "cost")
stderr, output = make_mars_request('list', request)
request = request | dict(output="cost")
stderr, output = make_mars_request("list", request)
entries = next(l for l in output.split("\n") if "Entries" in l)
n = int(entries.split(":")[1].strip())
return dict(entries = n)
return dict(entries=n)


@dataclasses.dataclass
Expand All @@ -99,12 +107,16 @@ def process(self, message: FileMessage | FinishMessage) -> Iterable[Message]:

if entries == 0 or self.globals.overwrite:
logger.debug(f"Archiving via mars client")
mars_archive(mars_request, source = message.metadata.filepath)
mars_archive(mars_request, source=message.metadata.filepath)
else:
# Cut this message off so that it doesn't overwrite data
logger.debug(f"Dropping data because it's already in the database.")
return

# Send a notification to AVISO that we put this data into the DB
response = send_aviso_notification(mars_request)
# logger.debug("Aviso respose {response}")

# TODO: the explicit mars_keys should not be necessary here.
metadata = self.generate_metadata(message, mars_request=message.metadata.mars_request)
output_msg = FileMessage(metadata=metadata)
Expand Down

0 comments on commit 8c2610f

Please sign in to comment.