-
Notifications
You must be signed in to change notification settings - Fork 5
/
scavenger.py
117 lines (99 loc) · 4 KB
/
scavenger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import testing
import asyncio
import logging
from logging.handlers import RotatingFileHandler
import wallet as w
import queries as q
import grcconf as g
last_block = 0
# Set up logging functionality
handler = [RotatingFileHandler(g.log_dir+'deposits.log', maxBytes=10**7, backupCount=3)]
logging.basicConfig(format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%d/%m %T',
level=logging.INFO,
handlers=handler)
async def split_stake(amount):
bals = await q.get_all_bals()
bals_sum = sum([tup[1] for tup in bals])
final_bals = {}
for tup in bals:
if tup[1] >= g.min_deposit:
final_bals[tup[0]] = (tup[1]/bals_sum)*amount
else:
final_bals[tup[0]] = 0
await q.apply_balance_changes(final_bals, is_stake=True)
async def stake_searcher():
txs = await w.get_latest_stakes()
for tx in txs:
amount_staked = txs[tx]
if await q.register_stake(tx, amount_staked):
logging.info('Processed stake of %s coins with txid: %s', tx, amount_staked)
await split_stake(amount_staked)
async def check_tx(txid):
tx = await w.query('gettransaction', [txid])
receivers = []
send_addrs = []
try:
if not isinstance(tx, int):
for details in tx['details']:
if details['category'] == 'receive':
receivers.append({details['address'] : details['amount']})
elif details['category'] == 'send':
send_addrs.append(details['address'])
elif tx == 1:
pass
else:
logging.error('Bad signal in GRC client: %s', tx)
except RuntimeError as E:
logging.error('check_tx ran into an unhandled error: %s', E)
except KeyError:
pass
final_receivers = []
for received in receivers:
if (not list(received.keys())[0] in send_addrs):
final_receivers.append(received)
return final_receivers
async def blk_searcher():
global last_block
newblock = await w.query('getblockcount', [])
if newblock > last_block:
try:
users = await q.get_addr_uid_dict()
for blockheight in range(last_block+1, newblock+1):
last_block = blockheight
blockhash = await w.query('getblockhash', [blockheight])
await asyncio.sleep(0.05) # Protections to guard against reusing the bind address
blockdata = await w.query('getblock', [blockhash])
if isinstance(blockdata, dict):
for txid in blockdata['tx']:
for received in await check_tx(txid):
addr = list(received.keys())[0]
uid = users[addr]
if received[addr] < g.min_deposit:
continue
if await q.register_deposit(txid, received[addr], uid):
logging.info('Processed deposit with TXID: %s for %s', txid, uid)
elif blockdata == 3:
pass # Don't render the reuse address error as an exception
else:
logging.error('Bad signal in GRC client: %s', blockdata)
except Exception as E:
logging.exception('Block searcher ran into an error: %s', E)
async def scavenge():
global last_block
with open(g.LST_BLK, 'r') as last_block_file:
last_block = int(last_block_file.read().replace('\n', ''))
logging.info(f'Starting blockchain scavenger at height: {last_block}.')
while True:
try:
await blk_searcher()
await stake_searcher()
with open(g.LST_BLK, 'w') as last_block_file:
last_block_file.write(str(last_block))
await asyncio.sleep(g.SCAV_SLP)
except KeyboardInterrupt:
return
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(scavenge())
loop.run_until_complete(task)
loop.close()