Skip to content

Commit

Permalink
squash ci/test-downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
ckingbailey committed Dec 11, 2024
1 parent 244eb0a commit a783a76
Show file tree
Hide file tree
Showing 17 changed files with 181 additions and 92 deletions.
32 changes: 32 additions & 0 deletions download/.github/workflows/pr_check.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Python tests

on:
pull_request:
branches:
- feat/pull-v2-api
push:
branches:
- ci/test-downloader
workflow_dispatch:

env:
working_dir: download

jobs:
run_tests:
name: Run tests on Python download code
runs-on: ubuntu-22.04
defaults:
run:
working-directory: ${{ env.working_dir }}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version-file: ${{ env.working_dir }}/.python-version
cache: pip
cache-dependency-path: ${{ env.working_dir }}/requirements.txt
- run: pip install -r requirements.txt
- name: Run tests
run: pytest tests/test_*.py

14 changes: 7 additions & 7 deletions download/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def main():
with open(f'{DATA_DIR_PATH}/filers.json', encoding='utf8') as f:
filers = json.loads(f.read())

committees = Committees(filers, elections)
committees = Committees(filers, elections.pl)

# A-Contribs:
# join filers + filings + elections + transactions
Expand All @@ -49,13 +49,13 @@ def main():
# committees.Ballot_Measure_Election -> elections.Ballot_Measure_Election
# where trans['transaction']['calTransactionType'] == 'F460A'
with open(f'{DATA_DIR_PATH}/filings.json', encoding='utf8') as f:
filings = Filings(json.loads(f.read()))
filings = Filings(json.loads(f.read())).pl

with open(f'{DATA_DIR_PATH}/transactions.json', encoding='utf8') as f:
records = json.loads(f.read())
transactions = Transactions(records)
transactions = Transactions(records).pl

a_contributions = A_Contributions(transactions, filings, committees)
a_contributions = A_Contributions(transactions, filings, committees.pl)
a_contribs_df = a_contributions.df
if not a_contribs_df.is_empty:
print(a_contribs_df.drop(columns=[
Expand All @@ -80,9 +80,9 @@ def main():
'XRef_Match',
]).sample(n=20))

elections.df.write_csv(f'{csv_data_dir_path}/elections.csv')
committees.df.write_csv(f'{csv_data_dir_path}/committees.csv')
a_contributions.df.write_csv(f'{csv_data_dir_path}/a_contributions.csv')
elections.pl.write_csv(f'{OUTPUT_DIR}/elections.csv')
committees.pl.write_csv(f'{OUTPUT_DIR}/committees.csv')
a_contributions.df.write_csv(f'{OUTPUT_DIR}/a_contributions.csv')

if __name__ == '__main__':
main()
10 changes: 4 additions & 6 deletions download/model/a_contributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
Schedule A, Contributions
Hopefully this can be joined with other Schedule classes into a single Transaction class
"""
from .committee import Committees
from .filing import Filings
from .transaction import Transactions
import polars as pl
from .schedule import ScheduleBase

class A_Contributions(ScheduleBase):
Expand All @@ -13,9 +11,9 @@ class A_Contributions(ScheduleBase):
"""
def __init__(
self,
transactions:Transactions,
filings:Filings,
committees:Committees
transactions:pl.DataFrame,
filings:pl.DataFrame,
committees:pl.DataFrame
):
self._form_id = 'F460A'
super().__init__(
Expand Down
35 changes: 25 additions & 10 deletions download/model/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
""" This is the base model, upon all others shall be based """
import pandas as pd
import polars as pl

class BaseModel:
""" Base model other models inherit from """
def __init__(self, data):
self._data = data
self._df = None
self._lazy = None
self._pl = None
self._dtypes = []
self._pl_dtypes = []
self._sql_dtypes = []
Expand All @@ -19,18 +20,32 @@ def data(self):
return self._data

@property
def lazy(self):
''' Return a Polars Lazyframe '''
if self._lazy is None:
self._lazy = pl.LazyFrame(self._data, schema=self._dtypes)

return self._lazy
def pl(self):
''' Return a Polars dataframe '''
if self._pl is None or self._pl.is_empty():
self._pl = pl.DataFrame(self._data, schema=self._pl_dtypes)

return self._pl

@property
def df(self):
''' Return a Polars dataframe '''
if self._df is None:
self._df = self.lazy.collect()
""" Get a dataframe of the data """
if self._df is None or self._df.empty:
self._df = pd.DataFrame(self._data).astype(self._dtypes)

return self._df

def to_sql(self, connection, **kwargs):
""" Write to a postgresql table """
options = {
'index_label': 'id',
'if_exists': 'replace'
}
options.update(kwargs)

self.df[self._sql_cols].to_sql(
self._sql_table_name,
connection,
dtype=self._sql_dtypes,
**options
)
39 changes: 35 additions & 4 deletions download/model/committee.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
from typing import List
import polars as pl
from sqlalchemy.types import String
# Next line ingored because Pylint reports cannot find election in model
from . import base, election # pylint: disable=no-name-in-module
from . import base

class Committees(base.BaseModel):
""" A collection of committees """
def __init__(self, filers:List[dict], elections:election.Elections):
def __init__(self, filers:List[dict], elections:pl.DataFrame):
empty_election_influence = {
'electionDate': None,
'measure': None,
Expand All @@ -20,6 +19,7 @@ def __init__(self, filers:List[dict], elections:election.Elections):
super().__init__([
{
'filer_nid': int(f['filerNid']),
# 'Ballot_Measure_Election': [ *elections[elections['date'] == infl['electionDate']]['name'].array, None ][0],
'Ballot_Measure_Election': self._get_possibly_empty_ballot_measure_election(
elections,
infl
Expand Down Expand Up @@ -49,6 +49,21 @@ def __init__(self, filers:List[dict], elections:election.Elections):
if f['registrations'].get('CA SOS')
])
self._dtypes = {
'filer_nid': int,
'Ballot_Measure_Election': 'string',
'Filer_ID': 'string',
'Filer_NamL': 'string',
'_Status': 'string',
'_Committee_Type': 'string',
'Ballot_Measure': 'string',
'Support_Or_Oppose': 'string',
'candidate_controlled_id': 'string',
'Start_Date': 'string',
'End_Date': 'string',
'data_warning': 'string',
'Make_Active': 'string'
}
self._pl_dtypes = {
'filer_nid': pl.UInt64,
'Ballot_Measure_Election': pl.Utf8,
'Filer_ID': pl.Utf8,
Expand All @@ -63,6 +78,22 @@ def __init__(self, filers:List[dict], elections:election.Elections):
'data_warning': pl.Utf8,
'Make_Active': pl.Utf8
}
self._sql_dtypes = {
'Ballot_Measure_Election': String,
'Filer_ID': String,
'Filer_NamL': String,
'_Status': String,
'_Committee_Type': String,
'Ballot_Measure': String,
'Support_Or_Oppose': String,
'candidate_controlled_id': String,
'Start_Date': String,
'End_Date': String,
'data_warning': String,
'Make_Active': String
}
self._sql_cols = self._sql_dtypes.keys()
self._sql_table_name = 'committees'

@staticmethod
def support_or_oppose(influence):
Expand All @@ -85,7 +116,7 @@ def _get_possibly_empty_ballot_measure_election(elections: pl.DataFrame, influen
list, which will contain either the matched election slug or None.
'''
return [
*elections.lazy.filter(
*elections.lazy().filter(
pl.col('date') == influence['electionDate']
).first().collect().get_column('name'),
None
Expand Down
10 changes: 4 additions & 6 deletions download/model/d_expenditures.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
'''
FPPC Form 460, Schedule D, Expenditures
'''
from .committee import Committees
from .filing import Filings
from .transaction import Transactions
import polars as pl
from .schedule import ScheduleBase

class DExpenditures(ScheduleBase):
Expand All @@ -12,9 +10,9 @@ class DExpenditures(ScheduleBase):
'''
def __init__(
self,
transactions:Transactions,
filings:Filings,
committees:Committees
transactions: pl.DataFrame,
filings: pl.DataFrame,
committees: pl.DataFrame
):
self._form_id = 'F460D'
super().__init__(
Expand Down
18 changes: 13 additions & 5 deletions download/model/election.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
import os
from datetime import datetime
from polars import Utf8
from sqlalchemy.types import String
from .base import BaseModel

class Elections(BaseModel):
Expand Down Expand Up @@ -53,11 +53,19 @@ def __init__(self, election_records):

super().__init__(elections)
self._dtypes = {
'title': Utf8,
'name': Utf8,
'location': Utf8,
'date': Utf8
'title': 'string',
'name': 'string',
'location': 'string',
'date': 'string'
}
self._sql_dtypes = {
'title': String,
'name': String,
'location': String,
'date': String
}
self._sql_cols = self._sql_dtypes.keys()
self._sql_table_name = 'elections'

@staticmethod
def ordinal(n):
Expand Down
9 changes: 9 additions & 0 deletions download/model/filing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ def __init__(self, filings):
])

self._dtypes = {
'filing_nid': 'string',
'filer_nid': int,
'Report_Num': 'Int64',
'Rpt_Date': 'string',
'From_Date': 'string',
'Thru_Date': 'string'
}

self._pl_dtypes = {
'filing_nid': Utf8,
'filer_nid': UInt64,
'Report_Num': UInt64,
Expand Down
24 changes: 8 additions & 16 deletions download/model/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
Abstracts much of the boilerplate common to FPPC Form 460 Schedule data
'''
import polars as pl
from .committee import Committees
from .filing import Filings
from .transaction import Transactions

DTYPES = {
'Filer_ID': 'string',
Expand Down Expand Up @@ -91,18 +88,18 @@ class ScheduleBase:
def __init__(
self,
form_id: str,
transactions:Transactions,
filings:Filings,
committees:Committees
transactions: pl.DataFrame,
filings: pl.DataFrame,
committees: pl.DataFrame
):
schedule = committees.df.lazy().group_by('Filer_ID').first().join(
filings.df.lazy(),
schedule = committees.lazy().group_by('Filer_ID').first().join(
filings.lazy(),
on='filer_nid',
how='inner'
).rename({
'_Committee_Type': 'Committee_Type'
}).join(
transactions.df.lazy().filter(pl.col('cal_tran_type') == form_id),
transactions.lazy().filter(pl.col('cal_tran_type') == form_id),
on='filing_nid',
how='inner'
).drop([
Expand All @@ -121,19 +118,14 @@ def __init__(
])

self._lazy = schedule
self._df = None

self._dtypes = DTYPES

@property
def lazy(self):
''' Get data as Polars LazyFrame '''
return self._lazy

@property
def df(self):
''' Get data as Polars DataFrame '''
if self._df is None:
self._df = self._lazy.collect()

return self._df
# QUESTION: Does this invalidate self._lazy?
return self._lazy.collect()
2 changes: 1 addition & 1 deletion download/model/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,4 @@ def __init__(self, transactions):
} for t in transactions
])

self._dtypes = DTYPES
self._pl_dtypes = DTYPES
6 changes: 6 additions & 0 deletions download/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pandas~=2.0.3
SQLAlchemy~=2.0.20
psycopg2~=2.9.7
gdrive-datastore==0.0.1.5
pyarrow==14.0.1
polars==0.19.12
Loading

0 comments on commit a783a76

Please sign in to comment.