Skip to content

Commit

Permalink
Strip out SQL namespace stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
TomHodson committed Sep 27, 2024
1 parent cec2a6b commit f994a70
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 207 deletions.
87 changes: 49 additions & 38 deletions src/ionbeam/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

# Note: I've put some imports after the argparse code to make the cmdline usage feel snappier
import argparse
from pathlib import Path
import sys

import logging
from rich.logging import RichHandler
import pdb
import sys
import traceback
from pathlib import Path

from rich.logging import RichHandler

if __name__ == "__main__":
# Stages:
Expand Down Expand Up @@ -110,12 +110,6 @@
help="Which environment to use, local, dev or test",
)

parser.add_argument(
"--namespace",
help="Which namespace to us for the postgres database, defaults to the environment name.",
)


args = parser.parse_args()

handlers = []
Expand All @@ -124,15 +118,16 @@
handlers.append(logging.StreamHandler())
prompt = input
else:
from rich.traceback import install
from rich.prompt import Prompt
install(show_locals=False)
from rich.prompt import Prompt
from rich.traceback import install

install(show_locals=False)

handlers.append(RichHandler(markup=True, rich_tracebacks=True))

handlers.append(RichHandler(markup=True, rich_tracebacks=True))
# override the input builtin when using fancy output
prompt = Prompt.ask

# override the input builtin when using fancy output
prompt = Prompt.ask

if args.logfile:
handlers.append(logging.FileHandler(args.logfile))

Expand All @@ -145,14 +140,16 @@
)
logger = logging.getLogger("CMDLINE")

from .core.bases import Source
from .core.config_parser.config_parser import parse_config
from .core.bases import Source, Aggregator

config, actions = parse_config(args.config_folder,
offline=args.offline,
overwrite=args.overwrite,
environment=args.environment,
namespace=args.namespace,)
config, actions = parse_config(
args.config_folder,
offline=args.offline,
overwrite=args.overwrite,
environment=args.environment,
)
print(args)

sources, downstream_actions = [], []
for action in actions:
Expand All @@ -161,17 +158,22 @@
else:
downstream_actions.append(action)

logger.info(f"Globals:")
logger.info("Globals:")
logger.info(f" Environment: {config.globals.environment}")
logger.info(f" Data Path: {config.globals.data_path}")
logger.info(f" Config Path: {config.globals.config_path}")
logger.info(f" Data Path: {config.globals.data_path}")
logger.info(f" Offline: {config.globals.offline}")
logger.info(f" Overwrite: {config.globals.overwrite}")
if config.globals.ingestion_time_constants is not None:
logger.info(f" Ingestion Time Constants:")
logger.info(" Ingestion Time Constants:")
logger.info(f" Query Timespan: {tuple(d.isoformat() for d in config.globals.ingestion_time_constants.query_timespan)}") # fmt: skip
logger.info(f" Emit After (Hours): {config.globals.ingestion_time_constants.emit_after_hours}")
logger.info(f" Granularity: {config.globals.ingestion_time_constants.granularity}")
logger.info(
f" Emit After (Hours): {config.globals.ingestion_time_constants.emit_after_hours}"
)
logger.info(
f" Granularity: {config.globals.ingestion_time_constants.granularity}"
)

logger.info("Sources")
for i, a in enumerate(sources):
Expand All @@ -181,32 +183,41 @@
sys.exit()

if args.init_db:
host = config.globals.secrets['postgres_database']['host']
host = config.globals.postgres_database["host"]
logger.warning(f"Wiping the postgres database as {host}!")
if host != "localhost" and prompt(f"Are you sure you want to wipe the postgres database at {host}? y/n: ") != "y": sys.exit()
if (
host != "localhost"
and prompt(
f"Are you sure you want to wipe the postgres database at {host}? y/n: "
)
!= "y"
):
sys.exit()
from ionbeam.metadata.db import init_db
from sqlalchemy import create_engine, URL
sql_engine = create_engine(URL.create(
"postgresql+psycopg2", **config.globals.secrets["postgres_database"],
), echo = False)
init_db(sql_engine, config.globals.canonical_variables)
logger.warning("SQL Database wiped and reinitialised.")

init_db(config.globals)
logger.warning("SQL Database wiped and reinitialised.")

if "finish_after" in args:
logger.warning(f"Telling all sources to finish after emitting {args.finish_after} messages")
logger.warning(
f"Telling all sources to finish after emitting {args.finish_after} messages"
)
for source in sources:
source.finish_after = args.finish_after

from .core.singleprocess_pipeline import singleprocess_pipeline

try:
singleprocess_pipeline(sources, downstream_actions, emit_partial=args.emit_partial, simple_output = args.simple_output or args.debug)
singleprocess_pipeline(
sources,
downstream_actions,
emit_partial=args.emit_partial,
simple_output=args.simple_output or args.debug,
)
except Exception as e:
if args.debug:
extype, value, tb = sys.exc_info()
traceback.print_exc()
pdb.post_mortem(tb)
else:
raise e

Loading

0 comments on commit f994a70

Please sign in to comment.