Skip to content
This repository has been archived by the owner on Jun 30, 2021. It is now read-only.

Commit

Permalink
Added ability to specify multiple backend server connections, and cha…
Browse files Browse the repository at this point in the history
…nged all caching to use redis instead of redis + database

* monitor_addr is now deprecated, instead use monitor_addrs as in example
* the Blob table is no longer used for anything. It will be removed in the future. All cached values that were stored there are now in the cache proper
* Tasks run periodically to cache all currently online workers for all servers, instead of doing so on a per-address basis
* Scheduled tasks have been split into cache related tasks and db-changing related tasks, allowing a passive version of celery to run for staging
* Workers now display their stratum connection address when connected
  • Loading branch information
icook committed Apr 4, 2014
1 parent ec57b84 commit 0561904
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 80 deletions.
6 changes: 5 additions & 1 deletion config.yml.example
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# General
# =======================================================================
# Where's powerpool running?
monitor_addr: http://localhost:3855/
monitor_addrs:
- stratum: tcp+stratum://localhost:3333
mon_address: http://localhost:3855/
- stratum: tcp+stratum://localhost:3334
mon_address: http://localhost:3856/
# how many confirmations do we wait before marking blocks mature
# and allowing payout over RPC
block_mature_confirms: 250
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup, find_packages

setup(name='simplecoin',
version='0.4.0',
version='0.5.0',
description='Dogecoin mining with no registration required.',
author='Eric Cook',
author_email='[email protected]',
Expand Down
6 changes: 3 additions & 3 deletions simplecoin/celery_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@


app = create_app(celery=True)
# import celerybeat settings
celery.config_from_object('celeryconfig')
celery.conf.update(app.config['celery'])

with app.app_context():
# import celerybeat settings
celery.config_from_object('celeryconfig')
celery.conf.update(current_app.config['celery'])
current_app.logger.info("Celery worker powering up... BBBVVVRRR!")
main(app=celery)
35 changes: 25 additions & 10 deletions simplecoin/celeryconfig.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import timedelta
from flask import current_app


CELERYBEAT_SCHEDULE = {
caching_tasks = {
'cache_user_donations': {
'task': 'simplecoin.tasks.cache_user_donation',
'schedule': timedelta(minutes=15)
Expand All @@ -10,9 +11,24 @@
'task': 'simplecoin.tasks.update_pplns_est',
'schedule': timedelta(minutes=15)
},
'update_online_workers': {
'task': 'simplecoin.tasks.update_online_workers',
'schedule': timedelta(minutes=2)
},
'update_diff_average': {
'task': 'simplecoin.tasks.difficulty_avg',
'schedule': timedelta(hours=1),
},
'server_status': {
'task': 'simplecoin.tasks.server_status',
'schedule': timedelta(minutes=2),
},
}

database_tasks = {
'compress_min_shares': {
'task': 'simplecoin.tasks.compress_minute',
'schedule': timedelta(minutes=5),
'schedule': timedelta(seconds=5),
},
'compress_five_min_shares': {
'task': 'simplecoin.tasks.compress_five_minute',
Expand All @@ -26,20 +42,19 @@
'task': 'simplecoin.tasks.update_block_state',
'schedule': timedelta(minutes=5),
},
'server_status': {
'task': 'simplecoin.tasks.server_status',
'schedule': timedelta(minutes=2),
},
'check_worker_down': {
'task': 'simplecoin.tasks.check_down',
'schedule': timedelta(minutes=1),
},
'update_diff_average': {
'task': 'simplecoin.tasks.difficulty_avg',
'schedule': timedelta(hours=1),
},
'update_coin_trans': {
'task': 'simplecoin.tasks.update_coin_transaction',
'schedule': timedelta(minutes=10),
},
}

CELERYBEAT_SCHEDULE = caching_tasks
# we want to let celery run in staging mode where it only handles updating
# caches while the prod celery runner is handling real work. Allows separate
# cache databases between stage and prod
if not current_app.config.get('stage', False):
CELERYBEAT_SCHEDULE.update(database_tasks)
84 changes: 57 additions & 27 deletions simplecoin/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,35 @@
celery = Celery('simplecoin')


@celery.task(bind=True)
def update_online_workers(self):
"""
Grabs a list of workers from the running powerpool instances and caches
them
"""
try:
users = {}
for i, pp_config in enumerate(current_app.config['monitor_addrs']):
mon_addr = pp_config['mon_address'] + '/clients'
try:
req = requests.get(mon_addr)
data = req.json()
except Exception:
current_app.logger.warn("Unable to connect to {} to gather worker summary."
.format(mon_addr))
else:
for address, workers in data['clients'].iteritems():
users.setdefault('addr_online_' + address, [])
for d in workers:
users['addr_online_' + address].append((d['worker'], i))

cache.set_many(users, timeout=480)

except Exception as exc:
logger.error("Unhandled exception in estimating pplns", exc_info=True)
raise self.retry(exc=exc)


@celery.task(bind=True)
def update_pplns_est(self):
"""
Expand All @@ -31,11 +60,16 @@ def update_pplns_est(self):
# grab configured N
mult = int(current_app.config['last_n'])
# generate average diff from last 500 blocks
diff = Blob.query.filter_by(key="diff").first().data['diff']
diff = cache.get('difficulty_avg')
if diff is None:
current_app.logger.warn(
"Difficulty average is blank, can't calculate pplns estimate")
return
# Calculate the total shares to that are 'counted'
total_shares = ((float(diff) * (2 ** 16)) * mult)

# Loop through all shares, descending order, until we'd distributed the shares
# Loop through all shares, descending order, until we'd distributed the
# shares
remain = total_shares
user_shares = {}
for share in Share.query.order_by(Share.id.desc()).yield_per(5000):
Expand Down Expand Up @@ -250,16 +284,13 @@ def count_share(typ, amount, user_=user):
def new_block(self, blockheight, bits=None, reward=None):
"""
Notification that a new block height has been reached in the network.
Sets some things into the cache for display on the website.
"""
logger.info("Recieved notice of new block height {}".format(blockheight))
if not isinstance(blockheight, int):
logger.error("Invalid block height submitted, must be integer")

blob = Blob(key='block', data={'height': str(blockheight),
'difficulty': str(bits_to_difficulty(bits)),
'reward': str(reward)})
db.session.merge(blob)
db.session.commit()
cache.set('blockheight', blockheight)
cache.set('difficulty', bits_to_difficulty(bits))
cache.set('reward', reward)

# keep the last 500 blocks in the cache for getting average difficulty
cache.cache._client.lpush('block_cache', bits)
Expand Down Expand Up @@ -661,23 +692,24 @@ def check_down(self):
@celery.task(bind=True)
def server_status(self):
"""
Periodic pull update of server stats
Periodicly poll the backend to get number of workers and throw it in the cache
"""
try:
mon_addr = current_app.config['monitor_addr']
try:
req = requests.get(mon_addr)
data = req.json()
except Exception:
logger.warn("Couldn't connect to internal monitor at {}".format(mon_addr),
exc_info=True)
output = {'stratum_clients': 0, 'agent_clients': 0}
else:
output = {'stratum_clients': data['stratum_clients'],
'agent_clients': data['agent_clients']}
blob = Blob(key='server', data={k: str(v) for k, v in output.iteritems()})
db.session.merge(blob)
db.session.commit()
total_workers = 0
for i, pp_config in enumerate(current_app.config['monitor_addrs']):
mon_addr = pp_config['mon_address']
try:
req = requests.get(mon_addr)
data = req.json()
except Exception:
logger.warn("Couldn't connect to internal monitor at {}"
.format(mon_addr))
continue
else:
cache.set('stratum_workers_' + str(i), data['stratum_clients'])
total_workers += data['stratum_clients']

cache.set('total_workers', total_workers)
except Exception:
logger.error("Unhandled exception in server_status", exc_info=True)
db.session.rollback()
Expand All @@ -691,9 +723,7 @@ def difficulty_avg(self):
try:
diff_list = cache.cache._client.lrange('block_cache', 0, 500)
total_diffs = sum([bits_to_difficulty(diff) for diff in diff_list])
blob = Blob(key='diff', data={'diff': str(total_diffs / len(diff_list))})
db.session.merge(blob)
db.session.commit()
cache.set('difficulty_avg', total_diffs / len(diff_list))
except Exception as exc:
logger.warn("Unknown failure in difficulty_avg", exc_info=True)
raise self.retry(exc=exc)
23 changes: 7 additions & 16 deletions simplecoin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import datetime
import time
import itertools
import requests
import yaml

from flask import current_app
Expand Down Expand Up @@ -194,13 +193,13 @@ def collect_user_stats(address):
if pplns_cached_time != None:
pplns_cached_time.strftime("%Y-%m-%d %H:%M:%S")

pplns_total_shares = cache.get('pplns_total_shares')
pplns_total_shares = cache.get('pplns_total_shares') or 0

# store all the raw data of we're gonna grab
workers = {}
# blank worker template
def_worker = {'accepted': 0, 'rejected': 0, 'last_10_shares': 0,
'online': False, 'status': None}
'online': False, 'status': None, 'server': {}}
# for picking out the last 10 minutes worth shares...
now = datetime.datetime.utcnow().replace(second=0, microsecond=0)
twelve_ago = now - datetime.timedelta(minutes=12)
Expand Down Expand Up @@ -240,9 +239,13 @@ def collect_user_stats(address):
workers[st.worker]['status_version'] = "Unsupp"

# pull online status from cached pull direct from powerpool servers
for name in workers_online(address):
for name, host in cache.get('addr_online_' + address) or []:
workers.setdefault(name, def_worker.copy())
workers[name]['online'] = True
try:
workers[name]['server'] = current_app.config['monitor_addrs'][host]
except KeyError:
workers[name]['server'] = {}

# pre-calculate a few of the values here to abstract view logic
for name, w in workers.iteritems():
Expand Down Expand Up @@ -284,18 +287,6 @@ def collect_user_stats(address):
unconfirmed_balance=unconfirmed_balance)


@cache.memoize(timeout=120)
def workers_online(address):
""" Returns all workers online for an address """
client_mon = current_app.config['monitor_addr']+'client/'+address
try:
req = requests.get(client_mon)
data = req.json()
except Exception:
return []
return [w['worker'] for w in data[address]]


def get_pool_eff():
rej, acc = get_pool_acc_rej()
# avoid zero division error
Expand Down
26 changes: 8 additions & 18 deletions simplecoin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
jsonify, g, session, Response)
from lever import get_joined

from .models import (OneMinuteShare, Block, Blob, FiveMinuteShare,
from .models import (OneMinuteShare, Block, FiveMinuteShare,
OneHourShare, Status, FiveMinuteReject, OneMinuteReject,
OneHourReject, DonationPercent, BonusPayout)
from . import db, root, cache
Expand Down Expand Up @@ -41,8 +41,9 @@ def blocks():

@main.route("/pool_stats")
def pool_stats():
current_block = db.session.query(Blob).filter_by(key="block").first()
current_block.data['reward'] = int(current_block.data['reward'])
current_block = {'reward': cache.get('reward') or 0,
'difficulty': cache.get('difficulty') or 0,
'height': cache.get('blockheight') or 0}
blocks = db.session.query(Block).order_by(Block.height.desc()).limit(10)

reject_total, accept_total = get_pool_acc_rej()
Expand All @@ -62,18 +63,9 @@ def add_pool_stats():
g.round_duration = (datetime.datetime.utcnow() - last_block_time()).total_seconds()
g.hashrate = get_pool_hashrate()

blobs = Blob.query.filter(Blob.key.in_(("server", "diff"))).all()
try:
server = [b for b in blobs if b.key == "server"][0]
g.worker_count = int(server.data['stratum_clients'])
except IndexError:
g.worker_count = 0
try:
diff = float([b for b in blobs if b.key == "diff"][0].data['diff'])
except IndexError:
diff = -1
g.average_difficulty = diff
g.shares_to_solve = diff * (2 ** 16)
g.worker_count = cache.get('total_workers') or 0
g.average_difficulty = cache.get('difficulty_avg') or 0
g.shares_to_solve = g.average_difficulty * (2 ** 16)
g.total_round_shares = g.shares_to_solve * current_app.config['last_n']
g.alerts = get_alerts()

Expand Down Expand Up @@ -130,11 +122,9 @@ def user_match(user):
user_list = [([shares, user, (65536 * last_10_shares(user[6:]) / 600), user_match(user[6:])]) for user, shares in user_shares.iteritems()]
user_list = sorted(user_list, key=lambda x: x[0], reverse=True)

current_block = db.session.query(Blob).filter_by(key="block").first()

return render_template('round_summary.html',
users=user_list,
current_block=current_block,
blockheight=cache.get('blockheight') or 0,
cached_time=cached_time)


Expand Down
6 changes: 3 additions & 3 deletions templates/block_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
<th><span class="odometer blockshares">0</span></th>
<th><span class="odometer blockluck">0</span>%</th>
<th><span class="hours">00</span>:<span class="minutes">00</span>:<span class="seconds">00</span></th>
<th>{{ '{:,}'.format((current_block.data['reward'] / 100000000) | round(2)) }}</th>
<th>{{ '{:,}'.format((current_block.data['difficulty'] | int)) }}</th>
<th>{{ '{:,}'.format((current_block['reward'] / 100000000) | round(2)) }}</th>
<th>{{ '{:,}'.format((current_block['difficulty'] | round(4))) }}</th>
<th>...</th>
<th>...</th>
<th>{{ '{:,}'.format((current_block.data['height']) | int) }}</th>
<th>{{ '{:,}'.format((current_block['height']) | int) }}</th>
<th>In progress</th>
</tr>
{% endif %}
Expand Down
2 changes: 1 addition & 1 deletion templates/round_summary.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ <h2 style="word-wrap:break-word;">Current Round Stats</h2>
title="Current height of the blockchain."></i>
<br>
<h4>
<p><span>{{ '{:2,}'.format((current_block.data['height']|int)) }}</span></p>
<p><span>{{ '{:2,}'.format((blockheight | int)) }}</span></p>
</h4>
</div>
</div>
Expand Down
1 change: 1 addition & 0 deletions templates/user_stats.html
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ <h3>
data-toggle="tooltip" data-placement="right"
title="{% if worker['online'] %}Online{% else %}Offline{% endif %}"></i>&nbsp;
{% if worker['name'] %}{{ worker['name'] }}{% else %}[unnamed]{% endif %}
&nbsp;&nbsp;<small style="font-size:8pt;">{{ worker['server'].get('stratum', '') }}</small>
<span class="pull-right">
<small class="text-right" style="padding-left:10px;font-weight:300;">Hashrate:
{{ worker['last_10_hashrate'] | round(3) }} MH/sec &nbsp;
Expand Down

0 comments on commit 0561904

Please sign in to comment.