Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: History #282

Draft
wants to merge 17 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions bulk/management/commands/export_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import json
from functools import lru_cache
from collections import defaultdict
from django.core.management.base import BaseCommand
from opencivicdata.legislative.models import Bill, VoteEvent, LegislativeSession
from history.models import Change


def format_time(
event_time__year,
event_time__month,
event_time__day,
event_time__hour=None,
event_time__minute=None,
):
timestr = f"{event_time__year}-{event_time__month:02d}-{event_time__day:02d}"
if event_time__hour is not None:
timestr += f"T{event_time__hour:02d}"
if event_time__minute is not None:
timestr += f":{event_time__minute:02d}"
return timestr


@lru_cache()
def get_item_properties(obj_id=None, session_id=None):
if session_id:
session = LegislativeSession.objects.get(pk=session_id)
elif obj_id.startswith("ocd-bill"):
session = Bill.objects.get(pk=obj_id).legislative_session
elif obj_id.startswith("ocd-vote"):
session = VoteEvent.objects.get(pk=obj_id).legislative_session
return {
"jurisdiction_id": session.jurisdiction_id,
"session_id": session.identifier,
}


def accumulate_changes(changes):
"""
roll up all changes in a list to be attached to the appropriate ID
"""
# for each type, we return a dict mapping ids to change objects
accum = {
"bill": defaultdict(list),
"vote": defaultdict(list),
"related_entity": defaultdict(list),
"version_link": defaultdict(list),
"document_link": defaultdict(list),
}

for c in changes:
if c.object_id.startswith("ocd-bill"):
ctype = "bill"
elif c.object_id.startswith("ocd-vote"):
ctype = "vote"
elif c.table_name == "opencivicdata_billactionrelatedentity":
ctype = "related_entity"
elif c.table_name == "opencivicdata_billversionlink":
ctype = "version_link"
elif c.table_name == "opencivicdata_billdocumentlink":
ctype = "document_link"
else:
raise ValueError("unexpected id: " + c.object_id)
accum[ctype][c.object_id].append(c)
return accum


MAPPING = {
"opencivicdata_billaction": "actions",
"opencivicdata_billversion": "versions",
"opencivicdata_billdocument": "documents",
"opencivicdata_billsponsorship": "sponsors",
"opencivicdata_billsource": "sources",
"opencivicdata_billabstract": "abstracts",
"opencivicdata_personvote": "votes",
"opencivicdata_votecount": "counts",
"opencivicdata_votesource": "sources",
}


def update_old(old, change):
""" like dict.update, but keeps first value it sees """
for k, v in change.items():
if k not in old:
old[k] = v


def clean_subobj(subobj):
""" we don't want to show these since we're just nesting the objects """
subobj.pop("id")
subobj.pop("bill_id", None)
subobj.pop("vote_id", None)
return subobj


def make_change_object(changes):
"""
changes is a list of changes that are for the same object

return a single object representing all changes as one
"""
# start with the the parent object id
item_id = changes[0].object_id
item_type = "bill" if item_id.startswith("ocd-bill") else "vote"
# unless we see a top-level create or delete, this is an update
change_type = "update"
old_obj = {}
new_obj = {}

for change in changes:
if (
change.table_name == "opencivicdata_bill"
or change.table_name == "opencivicdata_voteevent"
):
if change.change_type == "update":
update_old(old_obj, change.old)
new_obj.update(change.new)
elif change.change_type == "create":
new_obj.update(change.new)
change_type = "create"
elif change.change_type == "delete":
change_type = "delete"
old_obj = change.old
else:
raise ValueError(change.change_type)
else:
# standard subfield handling
field_name = MAPPING[change.table_name]

if field_name not in new_obj:
new_obj[field_name] = []
if field_name not in old_obj:
old_obj[field_name] = []

# subfields are either deleted or created, updates don't currently happen via pupa
if change.change_type == "delete":
old_obj[field_name].append(clean_subobj(change.old))
elif change.change_type == "create":
new_obj[field_name].append(clean_subobj(change.new))
else:
print(change.object_id, change.change_type, field_name, change.new)
raise ValueError("update unexpected")

if change_type != "delete":
item_properties = get_item_properties(item_id)
else:
item_properties = get_item_properties(
session_id=old_obj["legislative_session_id"]
)
return {
"item_type": item_type,
"item_id": item_id,
"item_properties": item_properties,
"action": change_type,
"old": old_obj if change_type != "create" else None,
"new": new_obj if change_type != "delete" else None,
}


def handle_epoch(**kwargs):
"""
get all changes for a time period and return list of change_objects
"""
changes = list(Change.objects.filter(**kwargs))
formatted = format_time(**kwargs)
print(f"{len(changes)} changes for {formatted}")
changes = accumulate_changes(changes)
print(f"{len(changes['bill'])} bills, {len(changes['vote'])} votes")

# TODO: handle subobjects
for bill_id, obj_changes in changes["bill"].items():
change_obj = make_change_object(obj_changes)
yield change_obj
for vote_id, obj_changes in changes["vote"].items():
change_obj = make_change_object(obj_changes)
yield change_obj


def output_json(epoch, data):
output = {
"version": "0.1",
"source": "https://openstates.org",
"epoch": epoch,
"changes": [],
}
for item in data:
output["changes"].append(item)

with open(f"changelog_{epoch}.json", "w") as f:
json.dump(output, f, indent=1)


class Command(BaseCommand):
help = "export history data"

def handle(self, *args, **options):
for time in Change.objects.values(
"event_time__year",
"event_time__month",
"event_time__day",
"event_time__hour",
).distinct():
formatted = format_time(**time)
output_json(formatted, handle_epoch(**time))
2 changes: 1 addition & 1 deletion docker/init-db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ FILE=data.pgdump
if [ ! -f "$FILE" ]; then
wget https://data.openstates.org/postgres/monthly/$WHICH-public.pgdump -O $FILE
fi
PGPASSWORD=openstates pg_restore --host db --user openstates -d openstatesorg $FILE;
PGPASSWORD=openstates pg_restore --disable-triggers --host db --user openstates -d openstatesorg $FILE;
# rm $FILE;

poetry run ./manage.py update_materialized_views --initial
Expand Down
Empty file added history/__init__.py
Empty file.
Empty file added history/management/__init__.py
Empty file.
Empty file.
81 changes: 81 additions & 0 deletions history/management/commands/history_install.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from django.conf import settings
from django.core.management.base import BaseCommand
from django.db import connection
from collections import namedtuple


GET_TRIGGERS_SQL = """
select event_object_schema as table_schema,
event_object_table as table_name,
trigger_schema,
trigger_name,
string_agg(event_manipulation, ',') as event,
action_timing as activation,
action_condition as condition,
action_statement as definition
from information_schema.triggers
group by 1,2,3,4,6,7,8
order by table_schema, table_name;
"""

CREATE_TRIGGER_TEMPLATE_SQL = """
CREATE TRIGGER history_insert AFTER INSERT ON {0}
FOR EACH ROW EXECUTE PROCEDURE history_insert();
CREATE TRIGGER history_delete AFTER DELETE ON {0}
FOR EACH ROW EXECUTE PROCEDURE history_delete();
CREATE TRIGGER history_update AFTER UPDATE ON {0}
FOR EACH ROW EXECUTE PROCEDURE history_update();
"""


class Command(BaseCommand):
help = "install required triggers for history tracking"

def add_arguments(self, parser):
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--uninstall", action="store_true")

def _exec_sql(self, statement):
with connection.cursor() as cursor:
if self.dry_run:
print(statement)
else:
cursor.execute(statement)

def uninstall(self):
drop_sql = ""
count = 0
tables = set()
with connection.cursor() as cursor:
cursor.execute(GET_TRIGGERS_SQL)
desc = cursor.description
nt_result = namedtuple("Result", [col[0] for col in desc])
for row in cursor.fetchall():
result = nt_result(*row)
if result.trigger_name in (
"history_insert",
"history_update",
"history_delete",
):
drop_sql += (
f"DROP TRIGGER {result.trigger_name} ON {result.table_name};\n"
)
count += 1
tables.add(result.table_name)

print(
f"Found {count} existing triggers installed on {len(tables)} tables, dropping them."
)
self._exec_sql(drop_sql)

def handle(self, *args, **options):
self.dry_run = options["dry_run"]
if self.dry_run:
print("Dry Run: SQL will be printed but database will not be modified.")
if options["uninstall"]:
self.uninstall()
for table in settings.HISTORY_TABLES:
print(f"Installing triggers on {table}.")
self._exec_sql(CREATE_TRIGGER_TEMPLATE_SQL.format(table, table, table))
print("Installing get_object_id function from settings.")
self._exec_sql(settings.HISTORY_GET_OBJECT_ID_SQL)
33 changes: 33 additions & 0 deletions history/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 2.2.3 on 2020-02-10 20:01

import django.contrib.postgres.fields.jsonb
from django.db import migrations, models


class Migration(migrations.Migration):

initial = True

dependencies = []

operations = [
migrations.CreateModel(
name="Change",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("event_time", models.DateTimeField(editable=False)),
("table_name", models.CharField(max_length=100)),
("object_id", models.CharField(max_length=45)),
("old", django.contrib.postgres.fields.jsonb.JSONField(null=True)),
("new", django.contrib.postgres.fields.jsonb.JSONField(null=True)),
],
)
]
Loading