Skip to content

Commit

Permalink
refactor: only insert each vote once
Browse files Browse the repository at this point in the history
  • Loading branch information
wgwz committed Sep 6, 2023
1 parent af2fcd9 commit 6f29a66
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 80 deletions.
162 changes: 92 additions & 70 deletions index_votes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import textwrap
from psycopg2.errors import ForeignKeyViolation
import requests
from utils import is_archive_node, PollingProcess, events_to_process
from utils import (
is_archive_node,
PollingProcess,
events_to_process,
new_events_to_process,
)

logger = logging.getLogger(__name__)

Expand All @@ -26,73 +31,95 @@ def fetch_votes_by_proposal(height, proposal_id):
return resp.json()["votes"]


def gen_records(cur, query):
cur.execute(query)
for record in cur:
yield record


def _index_votes(pg_conn, _client, _chain_num):
with pg_conn.cursor() as cur:
for event in events_to_process(
cur,
"votes",
):
(
type,
block_height,
tx_idx,
msg_idx,
_,
_,
chain_num,
timestamp,
tx_hash,
) = event[0]
normalize = {}
normalize["type"] = type
normalize["block_height"] = block_height
normalize["tx_idx"] = tx_idx
normalize["msg_idx"] = msg_idx
normalize["chain_num"] = chain_num
normalize["timestamp"] = timestamp
normalize["tx_hash"] = tx_hash
for entry in event:
(_, _, _, _, key, value, _, _, _) = entry
value = value.strip('"')
normalize[key] = value
logger.debug(normalize)
votes = fetch_votes_by_proposal(
normalize["block_height"], normalize["proposal_id"]
all_chain_nums = [
record[0] for record in gen_records(cur, "select num from chain;")
]
max_block_heights = {
chain_num: max_block_height
for chain_num, max_block_height in gen_records(
cur,
"select chain_num, MAX(block_height) from votes group by chain_num;",
)
logger.debug(votes)
insert_text = textwrap.dedent(
"""
INSERT INTO votes (
type,
block_height,
tx_idx,
msg_idx,
chain_num,
timestamp,
tx_hash,
proposal_id,
voter,
option,
metadata,
submit_time
) VALUES (
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
);"""
).strip("\n")
with pg_conn.cursor() as _cur:
for vote in votes:
try:
}
logger.debug(f"{all_chain_nums=}")
logger.debug(f"{max_block_heights=}")
for chain_num in all_chain_nums:
if chain_num not in max_block_heights.keys():
max_block_heights[chain_num] = 0
logger.debug(f"{max_block_heights=}")
for chain_num, max_block_height in max_block_heights.items():
logger.debug(f"{chain_num=} {max_block_height=}")
for event in new_events_to_process(
cur, "votes", chain_num, max_block_height
):
(
type,
block_height,
tx_idx,
msg_idx,
_,
_,
chain_num,
timestamp,
tx_hash,
) = event[0]
normalize = {}
normalize["type"] = type
normalize["block_height"] = block_height
normalize["tx_idx"] = tx_idx
normalize["msg_idx"] = msg_idx
normalize["chain_num"] = chain_num
normalize["timestamp"] = timestamp
normalize["tx_hash"] = tx_hash
for entry in event:
(_, _, _, _, key, value, _, _, _) = entry
value = value.strip('"')
normalize[key] = value
logger.debug(normalize)
votes = fetch_votes_by_proposal(
normalize["block_height"], normalize["proposal_id"]
)
logger.debug(f"{votes=}")
insert_text = textwrap.dedent(
"""
INSERT INTO votes (
type,
block_height,
tx_idx,
msg_idx,
chain_num,
timestamp,
tx_hash,
proposal_id,
voter,
option,
metadata,
submit_time
) VALUES (
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
) ON CONFLICT DO NOTHING;"""
).strip("\n")
with pg_conn.cursor() as _cur:
for vote in votes:
row = (
normalize["type"],
normalize["block_height"],
Expand All @@ -114,11 +141,6 @@ def _index_votes(pg_conn, _client, _chain_num):
logger.debug(_cur.statusmessage)
pg_conn.commit()
logger.info("vote inserted..")
except ForeignKeyViolation as exc:
logger.debug(exc)
pg_conn.rollback()
# since we know all votes for this proposal will fail we exit the loop
break


def index_votes():
Expand Down
17 changes: 7 additions & 10 deletions sql/V1_12__votes.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
ALTER TABLE IF EXISTS proposals
DROP CONSTRAINT IF EXISTS proposals_proposal_id_ux;

ALTER TABLE IF EXISTS proposals
ADD CONSTRAINT proposals_proposal_id_ux UNIQUE (chain_num, proposal_id);

CREATE TABLE IF NOT EXISTS
votes (
TYPE TEXT NOT NULL,
Expand All @@ -30,10 +24,13 @@ CREATE TABLE IF NOT EXISTS
tx_idx,
msg_idx,
TYPE
) REFERENCES msg_event,
FOREIGN KEY (chain_num, proposal_id) REFERENCES proposals (chain_num, proposal_id)
) REFERENCES msg_event
);

DROP INDEX IF EXISTS votes_proposal_id_chain_num_idx;

CREATE INDEX IF NOT EXISTS votes_proposal_id_chain_num_idx ON votes (proposal_id, chain_num);

ALTER TABLE IF EXISTS votes
DROP CONSTRAINT votes_chain_num_proposal_id_voter_ux;

ALTER TABLE IF EXISTS votes
ADD CONSTRAINT votes_chain_num_proposal_id_voter_ux UNIQUE (chain_num, proposal_id, voter);
38 changes: 38 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,44 @@ def events_to_process(cur, index_table_name):
yield list(g)


def new_events_to_process(cur, index_table_name, chain_num, max_block_height):
event_names = TABLE_EVENT_NAMES_MAP[index_table_name]
formatted_event_names = [f"'{x}'" for x in event_names]
formatted_event_names_set = f"({','.join(formatted_event_names)})"
sql = textwrap.dedent(
f"""
SELECT mea.type,
mea.block_height,
mea.tx_idx,
mea.msg_idx,
mea.key,
mea.value,
mea.chain_num,
TRIM(BOTH '"' FROM (tx.data -> 'tx_response' -> 'timestamp')::text) AS timestamp,
encode(tx.hash, 'hex') as tx_hash
FROM msg_event_attr AS mea
NATURAL LEFT JOIN {index_table_name} AS e
NATURAL LEFT JOIN tx
WHERE mea.type IN {formatted_event_names_set}
AND (e.block_height IS NULL
AND e.type IS NULL
AND e.tx_idx IS NULL
AND e.msg_idx IS NULL)
AND mea.chain_num = {chain_num}
AND mea.block_height > {max_block_height}
ORDER BY block_height ASC,
KEY ASC;
"""
)
cur.execute(sql)

# group together results from the query above
# the group by done based on the block_height, tx_idx, and msg_idx
# this is how key and value are put into their own column
for _, g in groupby(cur, lambda x: f"{x[1]}-{x[2]}-{x[3]}"):
yield list(g)


def is_archive_node():
# since the indexer is intended to run against archive nodes,
# assume that by default the node is an archive node.
Expand Down

0 comments on commit 6f29a66

Please sign in to comment.