Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
TomHodson committed Feb 4, 2025
1 parent a0d9301 commit e1675df
Show file tree
Hide file tree
Showing 23 changed files with 175 additions and 529 deletions.
5 changes: 2 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ saved_err = load_most_recent_error(config.globals)
[x] Fix rest api to work with new format
[x] add an endpoint to directly get station data saving the filter step?

[ ] fix IonBeam/src/ionbeam/sources/meteotracker/source.py:149: UserWarning: Could not infer
[x] fix IonBeam/src/ionbeam/sources/meteotracker/source.py:149: UserWarning: Could not infer
format, so each element will be parsed individually, falling back to `dateutil`. To ensure parsing
is consistent and as-expected, please specify a format.

data["datetime"] = pd.to_datetime(data["datetime"])

[ ] Figure out under what conditions timespans can become null/None in the sql database
Expand All @@ -73,7 +72,7 @@ is consistent and as-expected, please specify a format.
[x] Fix rest api to work with new format
[ ] Make it even harder to nuke the ingestion data
[ ] Add way to wipe just station metadata for one source
[ ] Allow parsing ingestion times as a cmd line argumennt
[x] Allow parsing ingestion times as a cmd line argumennt
[ ] Add a way to keep track of average time spent on each action.


Expand Down
5 changes: 1 addition & 4 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@ globals:
# Global constants that override things in the sources and timeaggregator
ingestion_time_constants:
query_timespan:
start: datetime.now(tz=timezone.utc) - timedelta(days = 1000)
start: datetime.now(tz=timezone.utc) - timedelta(days = 10)
end: datetime.now(tz=timezone.utc) - timedelta(minutes = 10)

# How big a data granule should be
granularity:
hours: 1

# In what direction the time aggregator expects data to arrive.
time_direction: forwards

sources:
# - sensor.community
- acronet
Expand Down
8 changes: 4 additions & 4 deletions cron_acronet.sh → cron/acronet.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/usr/bin/env bash
cd /home/math/IonBeam
cd /home/math/IonBeam/
export ODC_ENABLE_WRITING_LONG_STRING_CODEC=1
echo >> cron_logs_acronet.txt
echo Acronet `date` >> cron_logs_acronet.txt
echo >> cron/logs_acronet.txt
echo Acronet `date` >> cron/logs_acronet.txt
/home/math/.venv/bin/python -m ionbeam \
./config -vvvv \
--env ewc \
--sources acronet \
--download \
--no-ingest-to-pipeline \
--logfile cron_logs_acronet.txt
--logfile cron/logs_acronet.txt
6 changes: 3 additions & 3 deletions cron_ingest.sh → cron/ingest.sh
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#!/usr/bin/env bash
cd /home/math/IonBeam
export ODC_ENABLE_WRITING_LONG_STRING_CODEC=1
echo >> cron_logs_ingest.txt
echo Ingestion `date` >> cron_logs_ingest.txt
echo >> cron/logs_ingest.txt
echo Ingestion `date` >> cron/logs_ingest.txt
/home/math/.venv/bin/python -m ionbeam \
./config -vvv \
--env ewc \
--sources meteotracker smart_citizen_kit acronet \
--no-download \
--ingest-to-pipeline \
--overwrite-fdb \
--logfile cron_logs_ingest.txt
--logfile cron/logs_ingest.txt
6 changes: 3 additions & 3 deletions cron_meteotracker.sh → cron/meteotracker.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/usr/bin/env bash
cd /home/math/IonBeam
export ODC_ENABLE_WRITING_LONG_STRING_CODEC=1
echo >> cron_logs_meteotracker.txt
echo Meteotracker `date` >> cron_logs_meteotracker.txt
echo >> cron/logs_meteotracker.txt
echo Meteotracker `date` >> cron/logs_meteotracker.txt
/home/math/.venv/bin/python -m ionbeam \
./config -vvvv \
--env ewc \
--sources meteotracker \
--download \
--no-ingest-to-pipeline \
--logfile cron_logs_meteotracker.txt
--logfile cron/logs_meteotracker.txt
6 changes: 3 additions & 3 deletions cron_sck.sh → cron/sck.sh
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#!/usr/bin/env bash
cd /home/math/IonBeam
export ODC_ENABLE_WRITING_LONG_STRING_CODEC=1
echo >> cron_logs_smart_citizen_kit.txt
echo smart_citizen_kit `date` >> cron_logs_smart_citizen_kit.txt
echo >> cron/logs_smart_citizen_kit.txt
echo smart_citizen_kit `date` >> cron/logs_smart_citizen_kit.txt
/home/math/.venv/bin/python -m ionbeam \
./config -vvvv \
--env ewc \
--sources smart_citizen_kit \
--download \
--no-ingest-to-pipeline \
--overwrite-fdb \
--logfile cron_logs_smart_citizen_kit.txt
--logfile cron/logs_smart_citizen_kit.txt

14 changes: 0 additions & 14 deletions cron_reingest.sh

This file was deleted.

7 changes: 4 additions & 3 deletions reingest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ echo Ingestion `date` >> cron_logs_ingest.txt
/home/math/.venv/bin/python -m ionbeam \
./config -vvv \
--env ewc \
--sources acronet meteotracker smart_citizen_kit\
--no-download \
--sources meteotracker \
--download \
--ingest-to-pipeline \
--overwrite-fdb \
--reingest-from=2025-01-21 \
--time-span 2025-01-01 2025-01-25 \
--no-reingest \
--logfile cron_logs_ingest.txt
7 changes: 4 additions & 3 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ export ODC_ENABLE_WRITING_LONG_STRING_CODEC=1

python -m ionbeam ./config -vvvvvv \
--env ewc \
--download \
--ingest-to-pipeline \
--sources smart_citizen_kit meteotracker \
--no-download \
--no-ingest-to-pipeline \
--sources smart_citizen_kit meteotracker acronet \
--overwrite-fdb \
--init-db \
# --die-on-error
# --init-db

Expand Down
27 changes: 27 additions & 0 deletions scripts/delete_all_station_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from ionbeam.metadata.db import Station, Property, Author
from ionbeam.sources.API_sources_base import DataChunk, DBDataChunk, DataStream
from ionbeam.core.config_parser import parse_single_action, parse_config
from pathlib import Path
from sqlalchemy.orm import Session
from sqlalchemy import delete, text

config_file = Path("./config").expanduser()

action_yaml = """
class: SmartCitizenKitSource
mappings: []
copy_metadata_to_columns:
external_station_id: device.id
"""

config, source = parse_single_action(config_file, action_yaml,
environment = "local",
die_on_error = False,
)

with Session(config.globals.sql_engine) as db_session:
with db_session.begin():
db_session.execute(text("TRUNCATE TABLE property_station_association_table CASCADE"))
db_session.execute(text("TRUNCATE TABLE station CASCADE"))
db_session.execute(text("TRUNCATE TABLE property CASCADE"))
db_session.execute(text("TRUNCATE TABLE author CASCADE"))
25 changes: 25 additions & 0 deletions scripts/print_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from ionbeam.metadata.db import Station, Property, Author
from ionbeam.sources.API_sources_base import DataChunk, DBDataChunk, DataStream
from ionbeam.core.config_parser import parse_single_action, parse_config
from pathlib import Path
from sqlalchemy.orm import Session
from sqlalchemy import delete, text

config_file = Path("./config").expanduser()

action_yaml = """
class: SmartCitizenKitSource
mappings: []
copy_metadata_to_columns:
external_station_id: device.id
"""

config, source = parse_single_action(config_file, action_yaml,
environment = "local",
die_on_error = False,
)

with Session(config.globals.sql_engine) as db_session:
with db_session.begin():
for p in db_session.query(Property).all():
print(p)
25 changes: 13 additions & 12 deletions src/ionbeam/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import traceback
from datetime import datetime
from pathlib import Path
from datetime import datetime, UTC

from .core.singleprocess_pipeline import singleprocess_pipeline

Expand Down Expand Up @@ -123,15 +124,18 @@
)

parser.add_argument(
"--reingest-from",
help="Date to reingest from",
type=datetime.fromisoformat,
"--time-span",
help="Override the time span for downloading and ingestion.",
nargs = 2,
default = None,
type=lambda x: datetime.fromisoformat(x).replace(tzinfo=UTC),
)

parser.add_argument(
"--download-from",
help="Date to download from",
type=datetime.fromisoformat,
"--reingest",
help="Reingest all data regardless of whether it has actually changed or not, useful for debugging.",
action=argparse.BooleanOptionalAction,
default=False,
)

parser.add_argument(
Expand Down Expand Up @@ -199,21 +203,18 @@

config, actions = parse_config(
args.config_folder,
# Flags to control the two main phases, download and ingest
download = args.download,
ingest_to_pipeline = args.ingest_to_pipeline,
overwrite_fdb = args.overwrite_fdb,
environment = args.environment,
sources = args.sources,
die_on_error = args.die_on_error,
reingest_from = args.reingest_from,
time_span = args.time_span,
reingest = args.reingest,
finish_after = args.finish_after,
)

if args.download_from:
config.globals.ingestion_time_constants.query_timespan = dataclasses.replace(
config.globals.ingestion_time_constants.query_timespan,
start = args.download_from)

sources, downstream_actions = [], []
for action in actions:
if isinstance(action, Source):
Expand Down
13 changes: 0 additions & 13 deletions src/ionbeam/aggregators/__init__.py

This file was deleted.

Loading

0 comments on commit e1675df

Please sign in to comment.