Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FW-526: live cosmos data payload MVP #10

Merged
merged 4 commits into from
Jan 16, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ build/*
dist/*
*.egg*
.coverage
config.cfg
oracle.cfg
aws-auth
*.log
*.certs*
14 changes: 13 additions & 1 deletion README.MD
Original file line number Diff line number Diff line change
@@ -2,4 +2,16 @@
[![tests badge](https://github.com/NERC-CEH/iot-swarm/actions/workflows/test.yml/badge.svg)](https://github.com/NERC-CEH/iot-swarm/actions)
[![docs badge](https://github.com/NERC-CEH/iot-swarm/actions/workflows/doc-deployment.yml/badge.svg)](https://nerc-ceh.github.io/iot-swarm/)

This is a Python package intended to simulate a swarm of IoT device communications via MQTT, enabling stress testing of cloud infrastructure with loads close to production level. [Read the docs](https://nerc-ceh.github.io/iot-swarm/)
This is a Python package intended to simulate a swarm of IoT device communications via MQTT, enabling stress testing of cloud infrastructure with loads close to production level. [Read the docs](https://nerc-ceh.github.io/iot-swarm/)

# Live Cosmos Data

To use the live cosmos data tool you must create a config file in the format:

```yaml
dsn='<dsn>'
user='<username>'
pass='<password>'
```
The module script can then be triggered by running the module with an argument:
`python -m iotswarm.livecosmos <config_src>`
24 changes: 24 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
services:
# local stack container for local testing of AWS services
# intialises with localscript-setup.sh
localstack:
container_name: "swarm_localstack"
image: localstack/localstack:3.4
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
environment:
- SERVICES=s3,sqs
- DEBUG=${DEBUG:-0}
- PATH=$PATH:/var/lib/localstack/bin
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
- "./bin/localstack-setup.sh:/etc/localstack/init/ready.d/init-aws.sh"
- "./bin:/var/lib/localstack/bin"
# profiles:
# - localstack
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:4566"]
interval: 10s
timeout: 5s
retries: 2
28 changes: 28 additions & 0 deletions src/iotswarm/db.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
from math import nan
import sqlite3
from typing import List
from datetime import datetime

logger = logging.getLogger(__name__)

@@ -200,6 +201,33 @@ async def query_latest_from_site(self, site_id: str, table: CosmosTable) -> dict

return dict(zip(columns, data))

async def query_datetime_gt_from_site(
self, site_id: str, date: datetime, table: CosmosTable
):
"""Returns a list of rows from a table for a specific site where the datetime is greater than
the value given
Args:
site_id: ID of the site to retrieve records from.
date: The date that results are filtered by
table: A valid table from the database
Returns:
List[dict] | None: A list of dicts containing the database columns as keys, and the values as values.
Returns `None` if no data retrieved.
"""

query = self._fill_query(CosmosQuery.ORACLE_DATE_GREATER_THAN, table)

with self.connection.cursor() as cursor:
await cursor.execute(query, site_id=site_id, date_time=date)

columns = [i[0] for i in cursor.description]
data = await cursor.fetchall()

if data:
return [dict(zip(columns, data_row)) for data_row in data]

async def query_site_ids(
self, table: CosmosTable, max_sites: int | None = None
) -> list:
Empty file.
116 changes: 116 additions & 0 deletions src/iotswarm/livecosmos/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""This is the main module invocation for sending live COSMOS data to AWS"""

from config import Config
from pathlib import Path
import sys
from iotswarm.db import Oracle
from iotswarm.queries import CosmosTable
from iotswarm.devices import CR1000XDevice
from iotswarm.messaging.core import MockMessageConnection
import asyncio
from typing import List
from datetime import datetime, timedelta
import logging
import logging.config

logging.config.fileConfig(
fname=Path(__file__).parents[1] / "__assets__" / "loggers.ini"
)

logger = logging.getLogger(__name__)

MOCK_CONNECTION = MockMessageConnection()


async def get_latest_payloads_for_table(
oracle: Oracle, table: CosmosTable, datetime_gt: datetime
) -> List[dict]:
"""Gets all payloads after the datetime for a given Oracle table
Iterates through all sites found in the table and filters by datetimes
after the specified timestamp.
Args:
oracle: The oracle database connection
table: The database table to search
datetime_gt: The datetime that values must be greater than.
Returns:
A list dictionaries where each dictionary is a payload.
"""

sites = await oracle.query_site_ids(table)

logger.debug(f"Found {len(sites)} sites IDs for table: {table}")

payloads = await asyncio.gather(
*[
get_latest_payloads_for_site(oracle, table, datetime_gt, site)
for site in sites
]
)

# Flatten lists and return
return [item for row in payloads for item in row]


async def get_latest_payloads_for_site(
oracle: Oracle, table: CosmosTable, datetime_gt: datetime, site: str
) -> List[dict]:
"""Gets all payloads after the datetime for a given site from an Oracle table.
Args:
oracle: The oracle database connection
table: The database table to search
datetime_gt: The datetime that values must be greater than
site: The name of the site
Returns:
A list dictionaries where each dictionary is a payload.
"""
latest = await oracle.query_datetime_gt_from_site(site, datetime_gt, table)

if not latest:
logger.debug(f"Got 0 rows for site {site} in table: {table}")
return []

device = CR1000XDevice(
device_id=site,
data_source=oracle,
connection=MOCK_CONNECTION,
table=table,
)

logger.debug(f"Got {len(latest)} rows for site {site} in table: {table}")

payloads = [device._format_payload(x) for x in latest]

return payloads


async def main(config_file: Path) -> List[dict]:
"""The main invocation method.
Initialises the Oracle connection and defines which data the query.
Args:
config_file: Path to the *.cfg file that contains oracle credentials.
"""
oracle_creds = Config(str(config_file))

oracle = await Oracle.create(
oracle_creds["dsn"], oracle_creds["user"], oracle_creds["pass"]
)
tables = [CosmosTable.LEVEL_1_SOILMET_30MIN, CosmosTable.LEVEL_1_NMDB_1HOUR]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this not just be all the tables in the enum?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be, but it takes a frustrating amount of time to get unique site IDs from the PRECIP datasets which slowed down testing a lot. I'm entertaining the possibility that the table will be a command line argument to run the script because the PRECIP datasets are likely to take much longer to send out

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to comment on this, but it doesn't seem the best way to get the SITE IDs to me... Can we not just query the SITES table to get all site ids?


date_gt = datetime.now() - timedelta(hours=3)
result = await asyncio.gather(
*[get_latest_payloads_for_table(oracle, table, date_gt) for table in tables]
)

table_data = dict(zip(tables, result))
print(table_data)


if __name__ == "__main__":
if len(sys.argv) == 1:
sys.argv.append(str(Path(__file__).parents[3] / "oracle.cfg"))
asyncio.run(main(*sys.argv[1:]))
23 changes: 23 additions & 0 deletions src/iotswarm/livecosmos/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Utility methods for the module"""

import boto3


def get_alphabetically_last_s3_object(s3_client, bucket_name, prefix=""):
"""Returns the alohabetically last object in an s3 bucket"""
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

last_key = None

for page in pages:
if "Contents" in page:
# The last key in the current page (sorted lexicographically within the page)
page_last_key = page["Contents"][-1]["Key"]

# Update the global last key if this page's last key is greater
if last_key is None or page_last_key > last_key:
last_key = page_last_key
print(page_last_key)

return last_key
14 changes: 14 additions & 0 deletions src/iotswarm/queries.py
Original file line number Diff line number Diff line change
@@ -63,3 +63,17 @@ class CosmosQuery(StrEnum):
SELECT UNQIUE(site_id) FROM <table>
"""

ORACLE_DATE_GREATER_THAN = """SELECT * FROM COSMOS.{table}
WHERE site_id = :site_id
AND date_time > :date_time"""

"""Query for retreiving data from a given table in oracle format
that is greater than a given datetime.
.. code-block:: sql
SELECT * FROM <table>
WHERE site_id = <site_id>
AND date_time > <date_time>
"""
1 change: 1 addition & 0 deletions tests/livecosmos/test_livecosmos_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import unittest