Skip to content

POC converting acquisition/ code to ORM #1133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 91 additions & 102 deletions src/acquisition/flusurv/flusurv_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,122 +68,111 @@
+ initial version
"""

# standard library
import argparse

# third party
import mysql.connector

# first party
from delphi.epidata.acquisition.flusurv import flusurv
import delphi.operations.secrets as secrets
from delphi.utils.epidate import EpiDate
from delphi.utils.epiweek import delta_epiweeks
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from ...server._config import (
SQLALCHEMY_DATABASE_URI,
SQLALCHEMY_ENGINE_OPTIONS
)
from .models import FluSurv

engine = create_engine(SQLALCHEMY_DATABASE_URI, **SQLALCHEMY_ENGINE_OPTIONS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: does making engine a global prevent this file from being used as an import?

i don't think we need it for this indicator, but if we're using it as an exemplar we should understand what constraints this approach implies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chat GPT recommends to do something like this

from sqlalchemy import create_engine

def get_engine():
    engine = create_engine('postgresql://user:password@localhost/mydatabase')
    return engine

with get_engine().connect() as conn:
    result = conn.execute("SELECT * FROM mytable")
    for row in result:
        print(row)

instead of global variables

but my suggestion is to make even deeper

db = SQLAlchemy(metadata=meta)
ma = Marshmallow()
bp = Blueprint('bp', __name__, url_prefix='/api')
migrate = Migrate(db)


def create_app(config_object=Config):
    ''' Flask Application Factory

    Docs:
        https://flask.palletsprojects.com/en/2.0.x/patterns/appfactories/
    '''

    flask_app = Flask(__name__)
    flask_app.config.from_object(config_object())

    # init db and serializer
    db.init_app(flask_app)
    ma.init_app(flask_app)
    migrate.init_app(flask_app, db)

    # register app blueprints
    import app.event_impact.blueprints  # noqa

    # register blueprints
    flask_app.register_blueprint(bp)

    return flask_app

but it will affect a lot

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking my understanding -- you're recommending we use Flask on the acquisition side as well? We currently only use it for the API server.


def get_rows(cur):
"""Return the number of rows in the `flusurv` table."""

# count all rows
cur.execute('SELECT count(1) `num` FROM `flusurv`')
for (num,) in cur:
return num
def get_rows(session):
"""Return the number of rows in the `flusurv` table."""
return session.query(FluSurv.id).count()


def update(issue, location_name, test_mode=False):
"""Fetch and store the currently avialble weekly FluSurv dataset."""

# fetch data
location_code = flusurv.location_codes[location_name]
print('fetching data for', location_name, location_code)
data = flusurv.get_data(location_code)

# metadata
epiweeks = sorted(data.keys())
location = location_name
release_date = str(EpiDate.today())

# connect to the database
u, p = secrets.db.epi
cnx = mysql.connector.connect(
host=secrets.db.host, user=u, password=p, database='epidata')
cur = cnx.cursor()
rows1 = get_rows(cur)
print('rows before: %d' % rows1)

# SQL for insert/update
sql = '''
INSERT INTO `flusurv` (
`release_date`, `issue`, `epiweek`, `location`, `lag`, `rate_age_0`,
`rate_age_1`, `rate_age_2`, `rate_age_3`, `rate_age_4`, `rate_overall`,
`rate_age_5`, `rate_age_6`, `rate_age_7`
)
VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
ON DUPLICATE KEY UPDATE
`release_date` = least(`release_date`, %s),
`rate_age_0` = coalesce(%s, `rate_age_0`),
`rate_age_1` = coalesce(%s, `rate_age_1`),
`rate_age_2` = coalesce(%s, `rate_age_2`),
`rate_age_3` = coalesce(%s, `rate_age_3`),
`rate_age_4` = coalesce(%s, `rate_age_4`),
`rate_overall` = coalesce(%s, `rate_overall`),
`rate_age_5` = coalesce(%s, `rate_age_5`),
`rate_age_6` = coalesce(%s, `rate_age_6`),
`rate_age_7` = coalesce(%s, `rate_age_7`)
'''

# insert/update each row of data (one per epiweek)
for epiweek in epiweeks:
lag = delta_epiweeks(epiweek, issue)
if lag > 52:
# Ignore values older than one year, as (1) they are assumed not to
# change, and (2) it would adversely affect database performance if all
# values (including duplicates) were stored on each run.
continue
args_meta = [release_date, issue, epiweek, location, lag]
args_insert = data[epiweek]
args_update = [release_date] + data[epiweek]
cur.execute(sql, tuple(args_meta + args_insert + args_update))

# commit and disconnect
rows2 = get_rows(cur)
print('rows after: %d (+%d)' % (rows2, rows2 - rows1))
cur.close()
if test_mode:
print('test mode: not committing database changes')
else:
cnx.commit()
cnx.close()
"""Fetch and store the currently avialble weekly FluSurv dataset."""

# fetch data
location_code = flusurv.location_codes[location_name]
print('fetching data for', location_name, location_code)
data = flusurv.get_data(location_code)

# metadata
epiweeks = sorted(data.keys())
location = location_name
release_date = str(EpiDate.today())

with Session(engine) as session:
rows1 = get_rows(session)
print('rows before: %d' % rows1)
for epiweek in epiweeks:
lag = delta_epiweeks(epiweek, issue)
if lag > 52:
# Ignore values older than one year, as (1) they are assumed not to
# change, and (2) it would adversely affect database performance if all
# values (including duplicates) were stored on each run.
continue
args_meta = {
"issue": issue,
"epiweek": epiweek,
"location": location,
}
args_update = {
"release_date": release_date,
"lag": lag,
"rate_age_0": data[epiweek][0],
"rate_age_1": data[epiweek][1],
"rate_age_2": data[epiweek][2],
"rate_age_3": data[epiweek][3],
"rate_age_4": data[epiweek][4],
"rate_overall": data[epiweek][5],
"rate_age_5": data[epiweek][6],
"rate_age_6": data[epiweek][7],
"rate_age_7": data[epiweek][8],
}
existing_flusurv = session.query(FluSurv).filter_by(**args_meta)
if existing_flusurv.first() is not None:
existing_flusurv.update(args_update)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: will this change the behavior of the system?

The old query used least when updating release_date for existing rows. In the new version, if args_update contains a release_date greater than what's already in the DB, what will happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new version filter flusurvs and if flusurv wit same issue, epiweek and location exists updates it, else it will create new one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want this behavior:

(issue, epiweek, location) exists? column row action column value condition
no * insert new value *
yes release date update old release date old release date < new release date
yes release date update new release date new release date < old release date
yes everything else update new value *

else:
args_create = dict(**args_meta, **args_update)
session.add(FluSurv(**args_create))

rows2 = get_rows(session)
print('rows after: %d (+%d)' % (rows2, rows2 - rows1))
if test_mode:
print('test mode: not committing database changes')
else:
session.commit()
session.close()


def main():
# args and usage
parser = argparse.ArgumentParser()
parser.add_argument(
'location',
help='location for which data should be scraped (e.g. "CA" or "all")'
)
parser.add_argument(
'--test', '-t',
default=False, action='store_true', help='do not commit database changes'
)
args = parser.parse_args()

# scrape current issue from the main page
issue = flusurv.get_current_issue()
print('current issue: %d' % issue)

# fetch flusurv data
if args.location == 'all':
# all locations
for location in flusurv.location_codes.keys():
update(issue, location, args.test)
else:
# single location
update(issue, args.location, args.test)
# args and usage
parser = argparse.ArgumentParser()
parser.add_argument(
'location',
help='location for which data should be scraped (e.g. "CA" or "all")'
)
parser.add_argument(
'--test', '-t',
default=False, action='store_true', help='do not commit database changes'
)
args = parser.parse_args()

# scrape current issue from the main page
issue = flusurv.get_current_issue()
print('current issue: %d' % issue)

# fetch flusurv data
if args.location == 'all':
# all locations
for location in flusurv.location_codes.keys():
update(issue, location, args.test)
else:
# single location
update(issue, args.location, args.test)


if __name__ == '__main__':
main()
main()
37 changes: 37 additions & 0 deletions src/acquisition/flusurv/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from sqlalchemy import Column, Index, UniqueConstraint
from sqlalchemy.orm import declarative_base
from sqlalchemy.types import Date, Float, Integer, String

Base = declarative_base()


class FluSurv(Base):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: does this depend on .sql ddl files or can it replace them?

src/ddl/fluview.sql is used in two scenarios:

  1. (common) setting up the database for the local development environment in Docker
  2. (uncommon) setting up a fresh production/staging/QA environment if something has gone Horribly Wrong with one of the existing ones

in both scenarios we want the resulting (empty) database to match what's in (full) prod, with the exception of column order.

with this change, we now have parallel table definitions in fluview.sql and this file. can we drop the .sql definition? if so, what's the procedure for spinning up an empty table based on the sqlalchemy definition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fully independent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we only have one copy of each table definition. if we wanted to drop the .sql file, what would be the procedure for spinning up an empty table based on the sqlalchemy definition?

"""
SQLAlchemy model representing flusurve data.
"""

__tablename__ = 'flusurv'
__table_args__ = (
UniqueConstraint("issue", "epiweek", "location", name="issue"),
Index("release_date", "release_date"),
Index("issue_2", "issue"),
Index("epiweek", "epiweek"),
Index("region", "location"),
Index("lag", "lag"),
)

id = Column(Integer, primary_key=True, autoincrement="auto", nullable=False)
release_date = Column(Date, nullable=False)
issue = Column(Integer, nullable=False)
epiweek = Column(Integer, nullable=False)
location = Column(String(length=32), nullable=False)
lag = Column(Integer, nullable=False)
rate_age_0 = Column(Float, default=None)
rate_age_1 = Column(Float, default=None)
rate_age_2 = Column(Float, default=None)
rate_age_3 = Column(Float, default=None)
rate_age_4 = Column(Float, default=None)
rate_age_5 = Column(Float, default=None)
rate_age_6 = Column(Float, default=None)
rate_age_7 = Column(Float, default=None)
rate_overall = Column(Float, default=None)