Skip to content

Commit e80d83a

Browse files
authored
Version/aeolus (#13)
* Improvements to allow nodes to be added from measurements in lcs * Moved to using orjson in the fetch process Old method was not keeping ascii characters as is and instead of updating the old json.dumps method we are just shifting over to orjson like the lcs method is using which has different defaults. * Updated check to use orjson, added line check before load * Added sensors_latest update on ingestion * Updated to use new split_ingest_id method to parse ingest_id * Testing out a new way to handle the timestamps on ingest Passing datetime strings to the ingest method is causing it to run about 500-600x slower than if we passed numeric timestamps. This is primarily because of try/catch method we have of determining string vs numeric. I modified the staging tables to accept a string for the datetime and then only convert the numeric timestamps. This makes the string method on par with the numeric method. Since we can control the shape of the data coming to this part I would recommend that we come up with a standard format and use the try/catch correctly, and not as an if/then. * Fixed timestamp 'bug', Added diagnostic data to ingest * Added providers and timezones check on insert * More updates to help with ingesting * Updated ingester * Adding batch methods for check * Ingest improvements * Added start/end date lookup to realtime fetcher * Cleaning up * Temporary fix to the airgradient duplication issue * Updated the lcs class to support the newer data format New class is meant to be more flexible and to work for all file formats. * Cleaned up to work in production setting * Swtiched source of loader in the handler * Adding git action * Fix action branch name * Updated deployment * cleaning up the settings * Updated from 3.9 and added poetry * fixed deployment code * Changed position of the python install in deployment * Updated pydantic settings * Fixing computed field issue * Adding export plugin * UPdated cdk version * Removed cdk version from deploy * Updated the python version to 12 * Redeploy with ingesting turned off * Clean up * Resetting PAUSE_INGESTING to be False * Updates to support the CAC data (#14) * Cleaned up the ingester to work better for CAC data * Added support for ingesting logging intervals * Support for ingesting instrument and status * Added flag ingest method * Fixed bug in the flag ingest process Flags with null notes where not being matched * Added the start of some testing files The test_flags script is not an automated test yet but I thought the data files and process would still be helpful to have committed * Flagging updates and hourly data rollup fixes * Some cleanup and bug fixes Added some methods that help in the testing/dev environments * Fixed bug with inserted new hourly data * Updated the insterted_hours method * Removed query * Redirected the realtime hourly queue update to the new table * Updated to support uuid * Changed fake fetchlogs id * Added backup sd value of zero * Switched from select_object to get_object Some files are too large for select, and were we always selecting the whole file so no reason for select anyway. * Added debugging method to the handler * Changed the location of the downloaded files * Added git commit hash * Adding tests for lcsV2 * added the deployment account to the deployment_id
1 parent a9e60e1 commit e80d83a

32 files changed

+1183
-173
lines changed

.github/workflows/deploy.yml

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
name: Deploy ingestor
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
8+
jobs:
9+
deploy:
10+
runs-on: ubuntu-latest
11+
steps:
12+
- name: Checkout repo
13+
uses: actions/checkout@v4
14+
15+
- name: Configure aws credentials
16+
uses: aws-actions/configure-aws-credentials@master
17+
with:
18+
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_PROD }}
19+
aws-secret-access-key: ${{ secrets.AWS_SECRET_KEY_PROD }}
20+
aws-region: ${{ secrets.AWS_REGION }}
21+
22+
- name: Get envionmental values
23+
uses: aws-actions/aws-secretsmanager-get-secrets@v2
24+
with:
25+
secret-ids: |
26+
AEOLUS, openaq-env/aeolus
27+
name-transformation: uppercase
28+
parse-json-secrets: true
29+
30+
- uses: actions/setup-node@v4
31+
with:
32+
node-version: "20"
33+
34+
35+
- name: Install CDK
36+
run: |
37+
npm install -g aws-cdk
38+
39+
- uses: actions/setup-python@v5
40+
with:
41+
python-version: '3.12'
42+
43+
- name: Install Poetry
44+
uses: snok/install-poetry@v1
45+
46+
- name: Deploy stack
47+
env:
48+
ENV: "aeolus"
49+
PROJECT: "openaq"
50+
51+
## deployment variables
52+
# CDK_ACCOUNT: ${{ secrets.CDK_ACCOUNT }}
53+
# CDK_REGION: ${{ secrets.CDK_REGION }}
54+
55+
VPC_ID: ${{ env.AEOLUS_VPC_ID }}
56+
57+
TOPIC_ARN: ${{ env.AEOLUS_FETCH_OBJECT_TOPIC_ARN }}
58+
59+
## application variables
60+
DATABASE_READ_USER: ${{ env.AEOLUS_DATABASE_READ_USER }}
61+
DATABASE_READ_PASSWORD: ${{ env.AEOLUS_DATABASE_READ_PASSWORD }}
62+
DATABASE_WRITE_USER: ${{ env.AEOLUS_DATABASE_WRITE_USER }}
63+
DATABASE_WRITE_PASSWORD: ${{ env.AEOLUS_DATABASE_WRITE_PASSWORD }}
64+
DATABASE_DB: ${{ env.AEOLUS_DATABASE_DB }}
65+
DATABASE_HOST: ${{ env.AEOLUS_DATABASE_HOST }}
66+
DATABASE_PORT: ${{ env.AEOLUS_DATABASE_PORT }}
67+
FETCH_BUCKET: ${{ env.AEOLUS_FETCH_BUCKET }}
68+
ETL_BUCKET: ${{ env.AEOLUS_FETCH_BUCKET }}
69+
PAUSE_INGESTING: False
70+
71+
72+
working-directory: ./cdk
73+
run: |
74+
poetry self add poetry-plugin-export
75+
poetry install
76+
cdk deploy openaq-ingest-aeolus --require-approval never

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@
22

33

44
# Testing a realtime file
5+
6+
# Testing files

cdk/app.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
lambda_env=lambda_env,
3333
fetch_bucket=settings.FETCH_BUCKET,
3434
vpc_id=settings.VPC_ID,
35-
ingest_lambda_timeout=settings.INGEST_LAMBDA_TIMEOUT,
36-
ingest_lambda_memory_size=settings.INGEST_LAMBDA_MEMORY_SIZE,
37-
ingest_rate_minutes=settings.INGEST_RATE_MINUTES,
35+
lambda_timeout=settings.LAMBDA_TIMEOUT,
36+
lambda_memory_size=settings.LAMBDA_MEMORY_SIZE,
37+
rate_minutes=settings.RATE_MINUTES,
3838
topic_arn=settings.TOPIC_ARN,
3939
env=env,
4040
)

cdk/cdk.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"app": "python app.py",
2+
"app": "poetry run python app.py",
33
"context": {
44
"aws-cdk:enableDiffNoFail": "true",
55
"@aws-cdk/core:stackRelativeExports": "true",

cdk/config.py

+11-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
from typing import List
2-
from pydantic import BaseSettings
2+
from pydantic_settings import (
3+
BaseSettings,
4+
SettingsConfigDict,
5+
)
36
from pathlib import Path
47
from os import environ
58

@@ -8,19 +11,17 @@ class Settings(BaseSettings):
811
FETCH_BUCKET: str
912
ENV: str = "staging"
1013
PROJECT: str = "openaq"
11-
INGEST_LAMBDA_TIMEOUT: int = 900
12-
INGEST_LAMBDA_MEMORY_SIZE: int = 1536
13-
INGEST_RATE_MINUTES: int = 15
14+
LAMBDA_TIMEOUT: int = 900
15+
LAMBDA_MEMORY_SIZE: int = 1536
16+
RATE_MINUTES: int = 15
1417
LOG_LEVEL: str = 'INFO'
1518
TOPIC_ARN: str = None
1619
VPC_ID: str = None
1720

18-
class Config:
19-
parent = Path(__file__).resolve().parent.parent
20-
if 'DOTENV' in environ:
21-
env_file = Path.joinpath(parent, environ['DOTENV'])
22-
else:
23-
env_file = Path.joinpath(parent, ".env")
21+
22+
model_config = SettingsConfigDict(
23+
extra="ignore", env_file=f"../{environ.get('DOTENV', '.env')}", env_file_encoding="utf-8"
24+
)
2425

2526

2627
settings = Settings()

cdk/lambda_ingest_stack.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ def __init__(
3030
env_name: str,
3131
lambda_env: Dict,
3232
fetch_bucket: str,
33-
ingest_lambda_timeout: int,
34-
ingest_lambda_memory_size: int,
35-
ingest_rate_minutes: int = 15,
33+
lambda_timeout: int,
34+
lambda_memory_size: int,
35+
rate_minutes: int = 15,
3636
topic_arn: str = None,
3737
vpc_id: str = None,
3838
**kwargs,
@@ -66,11 +66,11 @@ def __init__(
6666
),
6767
handler="ingest.handler.handler",
6868
vpc=vpc_id,
69-
runtime=aws_lambda.Runtime.PYTHON_3_9,
69+
runtime=aws_lambda.Runtime.PYTHON_3_12,
7070
allow_public_subnet=True,
71-
memory_size=ingest_lambda_memory_size,
71+
memory_size=lambda_memory_size,
7272
environment=stringify_settings(lambda_env),
73-
timeout=Duration.seconds(ingest_lambda_timeout),
73+
timeout=Duration.seconds(lambda_timeout),
7474
layers=[
7575
create_dependencies_layer(
7676
self,
@@ -89,12 +89,12 @@ def __init__(
8989

9090
# Set how often the ingester will run
9191
# If 0 the ingester will not run automatically
92-
if ingest_rate_minutes > 0:
92+
if rate_minutes > 0:
9393
aws_events.Rule(
9494
self,
9595
f"{id}-ingest-event-rule",
9696
schedule=aws_events.Schedule.cron(
97-
minute=f"0/{ingest_rate_minutes}"
97+
minute=f"0/{rate_minutes}"
9898
),
9999
targets=[
100100
aws_events_targets.LambdaFunction(ingest_function),

cdk/requirements.txt

-4
This file was deleted.

cdk/utils.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ def create_dependencies_layer(
1919
function_name: str,
2020
requirements_path: Path
2121
) -> aws_lambda.LayerVersion:
22-
requirements_file = str(requirements_path.resolve())
22+
#requirements_file = str(requirements_path.resolve())
2323
output_dir = f'../.build/{function_name}'
2424
layer_id = f'openaq-{function_name}-{env_name}-dependencies'
2525

26-
if not environ.get('SKIP_PIP'):
27-
print(f'Building {layer_id} from {requirements_file} into {output_dir}')
26+
if not environ.get('SKIP_BUILD'):
27+
print(f'Building {layer_id} into {output_dir}')
2828
subprocess.run(
29-
f"""python -m pip install -qq -r {requirements_file} \
29+
f"""
30+
poetry export --without=cdk -o requirements.txt --without-hashes && \
31+
poetry run python -m pip install -qq -r requirements.txt \
3032
-t {output_dir}/python && \
3133
cd {output_dir}/python && \
3234
find . -type f -name '*.pyc' | \
@@ -47,5 +49,5 @@ def create_dependencies_layer(
4749
self,
4850
layer_id,
4951
code=layer_code,
50-
compatible_runtimes=[aws_lambda.Runtime.PYTHON_3_9]
52+
compatible_runtimes=[aws_lambda.Runtime.PYTHON_3_12]
5153
)

check.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
import orjson
66
import psycopg2
77

8-
9-
logger = logging.getLogger(__name__)
8+
logger = logging.getLogger('check.py')
109

1110
#os.chdir('/home/christian/git/caparker/openaq-ingestor/ingest')
1211
#print(os.getcwd())
@@ -77,14 +76,17 @@
7776
os.environ['USE_TEMP_TABLES'] = 'False'
7877

7978
from botocore.exceptions import ClientError
80-
from ingest.handler import cronhandler, logger
79+
from ingest.handler import cronhandler
8180
from ingest.settings import settings
8281

8382
from ingest.lcs import (
8483
load_metadata,
84+
load_metadata_batch,
85+
)
86+
87+
from ingest.lcsV2 import (
8588
load_measurements,
8689
load_measurements_batch,
87-
load_metadata_batch,
8890
)
8991

9092
from ingest.fetch import (
@@ -154,8 +156,6 @@ def check_realtime_key(key: str, fix: bool = False):
154156
mark_success(key=key, reset=True)
155157

156158

157-
logger.debug(settings)
158-
159159
if args.file is not None:
160160
# check if the files exists
161161
# is it a realtime file or a lcs file?
@@ -172,6 +172,7 @@ def check_realtime_key(key: str, fix: bool = False):
172172
# get just the keys
173173
keys = [log[1] for log in logs]
174174
# loop through and check each
175+
logger.info(f"Downloading {len(keys)} files")
175176
for idx, key in enumerate(keys):
176177
if args.download:
177178
# we may be using the new source pat

ingest/etl_process_measurements.sql

+29-8
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,16 @@ FROM staging_measurements;
5252
-- that duplicate sensors with the same ingest/source id are created
5353
-- this is a short term fix
5454
-- a long term fix would not allow duplicate source_id's
55-
WITH ranked_sensors AS (
55+
WITH staged_sensors AS (
56+
-- this first part significantly speeds it up on slow machines
57+
SELECT DISTINCT ingest_id
58+
FROM staging_measurements
59+
), ranked_sensors AS (
5660
SELECT s.sensors_id
5761
, s.source_id
5862
, RANK() OVER (PARTITION BY s.source_id ORDER BY added_on ASC) as rnk
5963
FROM sensors s
60-
JOIN staging_measurements m ON (s.source_id = m.ingest_id)
64+
JOIN staged_sensors m ON (s.source_id = m.ingest_id)
6165
), active_sensors AS (
6266
SELECT source_id
6367
, sensors_id
@@ -68,6 +72,7 @@ WITH ranked_sensors AS (
6872
FROM active_sensors s
6973
WHERE s.source_id=ingest_id;
7074

75+
7176
-- Now we have to fill in any missing information
7277
-- first add the nodes and systems that dont exist
7378
-- add just the bare minimum amount of data to the system
@@ -285,6 +290,7 @@ INSERT INTO sensors_rollup (
285290
, value_latest
286291
, value_count
287292
, value_avg
293+
, value_sd
288294
, value_min
289295
, value_max
290296
, geom_latest
@@ -299,6 +305,7 @@ WITH numbered AS (
299305
, sum(1) OVER (PARTITION BY sensors_id) as value_count
300306
, min(datetime) OVER (PARTITION BY sensors_id) as datetime_min
301307
, avg(value) OVER (PARTITION BY sensors_id) as value_avg
308+
, stddev(value) OVER (PARTITION BY sensors_id) as value_sd
302309
, row_number() OVER (PARTITION BY sensors_id ORDER BY datetime DESC) as rn
303310
FROM staging_inserted_measurements
304311
), latest AS (
@@ -308,6 +315,7 @@ WITH numbered AS (
308315
, value
309316
, value_count
310317
, value_avg
318+
, value_sd
311319
, datetime_min
312320
, lat
313321
, lon
@@ -320,6 +328,7 @@ SELECT l.sensors_id
320328
, l.value -- last value
321329
, l.value_count
322330
, l.value_avg
331+
, COALESCE(l.value_sd, 0)
323332
, l.value -- min
324333
, l.value -- max
325334
, public.pt3857(lon, lat)
@@ -348,12 +357,24 @@ SET datetime_last = GREATEST(sensors_rollup.datetime_last, EXCLUDED.datetime_las
348357

349358

350359
-- Update the table that will help to track hourly rollups
351-
INSERT INTO hourly_stats (datetime)
352-
SELECT date_trunc('hour', datetime)
353-
FROM staging_inserted_measurements
354-
GROUP BY 1
355-
ON CONFLICT (datetime) DO UPDATE
356-
SET modified_on = now();
360+
-- this is a replacement to the hourly stats table
361+
WITH inserted_hours AS (
362+
-- first we group things, adding an hour to make it time-ending after truncating
363+
SELECT datetime + '1h'::interval as datetime
364+
, utc_offset(datetime + '1h'::interval, tz.tzid) as tz_offset
365+
FROM staging_inserted_measurements m
366+
JOIN sensors s ON (s.sensors_id = m.sensors_id)
367+
JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id)
368+
JOIN sensor_nodes sn ON (sy.sensor_nodes_id = sn.sensor_nodes_id)
369+
JOIN timezones tz ON (sn.timezones_id = tz.timezones_id)
370+
GROUP BY 1, 2
371+
)
372+
INSERT INTO hourly_data_queue (datetime, tz_offset)
373+
SELECT as_utc_hour(datetime, tz_offset), tz_offset
374+
FROM inserted_hours
375+
GROUP BY 1, 2
376+
ON CONFLICT (datetime, tz_offset) DO UPDATE
377+
SET modified_on = now();
357378

358379

359380
--Update the export queue/logs to export these records

0 commit comments

Comments
 (0)