Skip to content

Commit

Permalink
seperated publishers to own dir, added a postgres publisher and alter…
Browse files Browse the repository at this point in the history
…ed publisher selection to a match case
  • Loading branch information
ccall48 committed Nov 6, 2023
1 parent 27b7abd commit fe198d1
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 8 deletions.
7 changes: 7 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,10 @@ PUBLISH_USAGE_EVENTS_SQS_URL=<SQS URL>
PUBLISH_USAGE_EVENTS_SQS_REGION=<AWS Region>
AWS_ACCESS_KEY_ID=<AWS Access Key>
AWS_SECRET_ACCESS_KEY=<AWS Secret Key>

# Usage Event Postgres Publisher (Optional)
PG_EVENTS_USER=<postgres username>
PG_EVENTS_PASS=<postgres password>
PG_EVENTS_PORT=<postgres port>
PG_EVENTS_HOST=<postgres host>
PG_EVENTS_DB=<postgres database>
2 changes: 1 addition & 1 deletion app/ChirpHeliumTenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def meta_up(self, data: dict):
""".format(total_dc, dev_eui)
self.db_transaction(query)

if os.getenv('PUBLISH_USAGE_EVENTS') is True:
if os.getenv('PUBLISH_USAGE_EVENTS'):
# First we get the tenant id for the device...
query = """
SELECT application.tenant_id, application.id FROM application
Expand Down
40 changes: 40 additions & 0 deletions app/Publishers/PgUsagePublisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os
import psycopg2
import psycopg2.extras


pg_user = os.getenv('PG_EVENTS_USER')
pg_passwd = os.getenv('PG_EVENTS_PASS')
pg_port = int(os.getenv('PG_EVENTS_PORT', 5432))
pg_host = os.getenv('PG_EVENTS_HOST')
pg_dbname = os.getenv('PG_EVENTS_DB')


def publish_to_pg(event):
con_str = f'postgresql://{pg_user}:{pg_passwd}@{pg_host}:{pg_port}/{pg_dbname}'
query = """
INSERT INTO tenant_dc (datetime, dev_eui, tenant_id, application_id, dc_used)
VALUES ('{}','{}','{}','{}','{}');
""".format(
event['datetime'],
event['dev_eui'],
event['tenant_id'],
event['application_id'],
event['dc_used'],
)

with psycopg2.connect(con_str) as con:
with con.cursor() as cur:
cur.execute(query)


"""
CREATE TABLE IF NOT EXISTS tenant_dc (
id serial PRIMARY KEY,
datetime TIMESTAMPTZ,
dev_eui TEXT NOT NULL,
tenant_id UUID NOT NULL,
application_id UUID NOT NULL,
dc_used INT NOT NULL
);
"""
File renamed without changes.
21 changes: 14 additions & 7 deletions app/UsagePublisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ def publish_usage_event(dev_eui, tenant_id, application_id, dc_used):
'dc_used': dc_used
}

if provider == 'AWS_SQS':
from SqsUsagePublisher import publish_to_sqs
info_log("Publishing usage event to SQS: %s" % usage_event)
publish_to_sqs(usage_event)

# Here maybe we could add more providers, e.g. GCP PubSub / Kafka or something.

else:
info_log("Provider %s not found" % provider)
match provider:
case 'AWS_SQS':
from Publishers.SqsUsagePublisher import publish_to_sqs
info_log("Publishing usage event to SQS: %s" % usage_event)
publish_to_sqs(usage_event)

case 'POSTGRES':
from Publishers.PgUsagePublisher import publish_to_pg
info_log("Publishing usage event to PG: %s" % usage_event)
publish_to_pg(usage_event)

case _:
info_log("Provider %s not found" % provider)
return
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ services:
- PUBLISH_USAGE_EVENTS_SQS_REGION=${PUBLISH_USAGE_EVENTS_SQS_REGION}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
# Usage Event Postgres Publisher (Optional)
- PG_EVENTS_USER=${PG_EVENTS_USER}
- PG_EVENTS_PASS=${PG_EVENTS_PASS}
- PG_EVENTS_PORT=${PG_EVENTS_PORT}
- PG_EVENTS_HOST=${PG_EVENTS_HOST}
- PG_EVENTS_DB=${PG_EVENTS_DB}
command: bash -c 'cd /app && python app.py'

networks:
Expand Down

0 comments on commit fe198d1

Please sign in to comment.