Skip to content

Commit

Permalink
bug: fix voters with last updated block to handle out of order processes
Browse files Browse the repository at this point in the history
  • Loading branch information
robcxyz committed Feb 14, 2022
1 parent 685de05 commit 3eecb41
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""add block delegations
Revision ID: 02abf7170d28
Revises: 99325d6a882a
Create Date: 2022-02-14 13:50:27.117887
"""
import sqlalchemy as sa
import sqlmodel
from alembic import op

# revision identifiers, used by Alembic.
revision = "02abf7170d28"
down_revision = "99325d6a882a"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("delegations", sa.Column("last_updated_block", sa.Integer(), nullable=True))
op.create_index(
op.f("ix_delegations_last_updated_block"),
"delegations",
["last_updated_block"],
unique=False,
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_delegations_last_updated_block"), table_name="delegations")
op.drop_column("delegations", "last_updated_block")
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions icon_governance/models/delegations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class Delegation(SQLModel, table=True):
address: Optional[str] = Field(..., primary_key=True)
prep_address: Optional[str] = Field(..., primary_key=True)
value: condecimal(max_digits=28, decimal_places=0) = Field(nullable=False, index=True)
last_updated_block: Optional[int] = Field(index=True)

@declared_attr
def __tablename__(cls) -> str: # noqa: N805
Expand Down
14 changes: 14 additions & 0 deletions icon_governance/utils/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ def get_preps_cps():
return post_rpc(payload)


def get_bond(address: str):
payload = {
"jsonrpc": "2.0",
"id": 1234,
"method": "icx_call",
"params": {
"to": "cx0000000000000000000000000000000000000000",
"dataType": "call",
"data": {"method": "getBond", "params": {"address": address}},
},
}
return post_rpc(payload)


def get_admin_chain(ip_address: str):
"""Get the response from the admin API."""
url = f"http://{ip_address}:9000/admin/chain/0x1"
Expand Down
7 changes: 6 additions & 1 deletion icon_governance/workers/crons/rewards.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ def get_iscore_value(tx_hash):


def get_rewards(session):
"""Simple cron to get all the values and iscores for rewards txs."""
"""
Cron to get all the values and iscores for rewards txs. Works by getting all the
iscore distributions which are picked up by the transactions processor and insert
them into a DB. The values are then inserted with this cron job by querying for
rewards that have no value.
"""
count = (
session.execute(select([func.count(Reward.address)]).where(Reward.value == None))
.scalars()
Expand Down
82 changes: 50 additions & 32 deletions icon_governance/workers/delegations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,64 @@
from icon_governance.utils.rpc import convert_hex_int


def set_delegation(session, data, address):
def set_delegation(session, data, address, block_height, hash):
"""
Set the delegation for the voters tab on the p-rep details page. Takes in all
setDelegation payloads and updates the records with them. First checks the db to
see if the address has sent a Tx with a higher last_updated_block. If it is older,
ignore the Tx. If higher, then delete all the old delegations and insert a the new
delegations. This is for out of order processing.
"""

params = data["params"]

if "delegations" not in params:
logger.info(f"Skipping because no delegation field.")
return

for d in params["delegations"]:
prep_address = d["address"]
value = convert_hex_int(d["value"])

# Get the delegation from address and to prep_address
statement = (
select(Delegation)
.where(Delegation.address == address)
.where(Delegation.prep_address == prep_address)
)
# Select all the records
statement = select(Delegation).where(Delegation.address == address)

result = session.execute(statement)
delegation = result.scalars().all()
result = session.execute(statement)
address_delegation = result.scalars().all()

if len(delegation) == 0:
delegation = Delegation(
address=address,
prep_address=d["address"],
value=convert_hex_int(d["value"]),
if len(address_delegation) != 0:
last_updated_block_set = set([i.last_updated_block for i in address_delegation])
if len(last_updated_block_set) > 1:
logger.info(
f"Found multiple different last_updated_block " f"- {last_updated_block_set}"
)

last_updated_block = address_delegation[0].last_updated_block
if last_updated_block is None:
last_updated_block = 0

if last_updated_block > block_height:
# Already have latest data in DB
logger.info(
f"Skipping setDelegation {hash} - before last updated block "
f"- {last_updated_block_set}"
)
return
elif last_updated_block == block_height:
# Already have latest data in DB
logger.info(
f"Skipping setDelegation {hash} - already updated this Tx"
f"- {last_updated_block_set}"
)
return
else:
# There can be only one
delegation = delegation[0]

# if value == 0:
# # Delete the record
# session.delete(delegation)
# else:
# Update the record
delegation.value = value
session.add(delegation)
try:
for d in address_delegation:
session.delete(d)
session.commit()
except:
session.rollback()
raise

for d in params["delegations"]:
delegation = Delegation(
address=address,
prep_address=d["address"],
value=convert_hex_int(d["value"]),
last_updated_block=block_height,
)

session.add(delegation)
session.commit()
10 changes: 7 additions & 3 deletions icon_governance/workers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ def __init__(self, **data: Any):
if self.name is None:
self.name = self.topic

self.consumer_deserializer = ProtobufDeserializer(message_type=TransactionRaw, conf={"use.deprecated.format": True})
self.consumer_deserializer = ProtobufDeserializer(
message_type=TransactionRaw, conf={"use.deprecated.format": True}
)

self.consumer = DeserializingConsumer(
{
Expand All @@ -105,8 +107,10 @@ def __init__(self, **data: Any):
self.protobuf_serializer = ProtobufSerializer(
GovernancePrepProcessed,
self.schema_registry_client,
conf={"auto.register.schemas": True,
"use.deprecated.format": True,},
conf={
"auto.register.schemas": True,
"use.deprecated.format": True,
},
)

self.protobuf_producer = SerializingProducer(
Expand Down
8 changes: 7 additions & 1 deletion icon_governance/workers/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ def process(self, msg):

if method == "setDelegation":
logger.info(f"set delegation {value.hash}")
set_delegation(session=self.session, data=data, address=address)
set_delegation(
session=self.session,
data=data,
address=address,
block_height=value.block_number,
hash=value.hash,
)

if method == "claimIScore":
logger.info(f"set delegation {value.hash}")
Expand Down
11 changes: 9 additions & 2 deletions tests/integration/utils/test_rpc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from icon_governance.utils.rpc import (
convert_hex_int,
get_bond,
get_preps_cps,
get_sponsors_record,
getDelegation,
Expand All @@ -10,7 +11,7 @@
)

SKIMPY_ADDRESS = "hxf5a52d659df00ef0517921647516daaf7502a728"
ADDRESS = "hxdf6bd350edae21f84e0a12392c17eac7e04817e7"
ADDRESS = "hx0cc3a3d55ed55df7c8eee926a4fafb5412d0cca4"


def test_get_preps():
Expand Down Expand Up @@ -57,4 +58,10 @@ def test_get_preps_cps():

def test_getProposals():
proposals = post_rpc_json(getProposals())
print()
assert proposals


def test_get_bond():
# bond = post_rpc_json(get_bond('hx0b047c751658f7ce1b2595da34d57a0e7dad357d'))
bond = post_rpc_json(get_bond("hx1e15b53379e5ee634a2da493a43fdc2d03d0c718"))
assert bond
34 changes: 33 additions & 1 deletion tests/integration/worker/test_delegations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from sqlmodel import select

from icon_governance.db import session_factory
from icon_governance.models.delegations import Delegation
from icon_governance.workers.delegations import set_delegation


Expand Down Expand Up @@ -32,4 +35,33 @@ def test_set_delegations():
}

with session_factory() as session:
set_delegation(session, data, "hxf55eccb07a95a263a81d79561adb372bc39b3ca8")
set_delegation(
session, data, "hxf55eccb07a95a263a81d79561adb372bc39b3ca8", 1000, "foo-hash"
)
# Should have no effect
set_delegation(
session, data, "hxf55eccb07a95a263a81d79561adb372bc39b3ca8", 1000, "foo-hash"
)
statement = select(Delegation).where(
Delegation.address == "hxf55eccb07a95a263a81d79561adb372bc39b3ca8"
)
result = session.execute(statement)
address_delegation = result.scalars().all()

assert len(address_delegation) == 5

set_delegation(
session, data, "hxf55eccb07a95a263a81d79561adb372bc39b3ca8", 1001, "foo-hash"
)

set_delegation(session, data, "hxf55eccb07a95a263a81d79561adb372bc39b3ca8", 999, "foo-hash")

assert len(address_delegation) == 5

result = session.execute(statement)
address_delegation = result.scalars().all()
blocks = [i.last_updated_block for i in address_delegation]
last_updated_block_set = set(blocks)

assert len(last_updated_block_set) == 1
assert blocks[0] == 1001

0 comments on commit 3eecb41

Please sign in to comment.