|
| 1 | +import threading |
| 2 | +from time import sleep |
| 3 | +from collections import Counter |
| 4 | + |
| 5 | +from pyrocore import config as config_ini |
| 6 | +from pyrocore.util import pymagic |
| 7 | +from pyrobase.parts import Bunch |
| 8 | + |
| 9 | +from prometheus_client import start_http_server, Gauge |
| 10 | + |
| 11 | +class ClientServer(threading.Thread): |
| 12 | + def __init__(self, port): |
| 13 | + super(ClientServer, self).__init__() |
| 14 | + self.port = int(port) |
| 15 | + |
| 16 | + def run(self): |
| 17 | + start_http_server(self.port) |
| 18 | + |
| 19 | +class RtorrentExporter(object): |
| 20 | + """ Expose rTorrent and host statistics for scraping by a Prometheus instance. |
| 21 | + """ |
| 22 | + |
| 23 | + def __init__(self, config=None): |
| 24 | + """ Set up RtorrentExporter. |
| 25 | + """ |
| 26 | + self.config = config or Bunch() |
| 27 | + self.LOG = pymagic.get_class_logger(self) |
| 28 | + if 'log_level' in self.config: |
| 29 | + self.LOG.setLevel(config.log_level) |
| 30 | + self.LOG.debug("RtorrentExporter created with config %r" % self.config) |
| 31 | + self.prefix = self.config.get('prefix', 'rtorrent_') |
| 32 | + self.proxy = config_ini.engine.open() |
| 33 | + self.jobs = [] |
| 34 | + jobs_init = { |
| 35 | + 'tracker': self._init_tracker_stats, |
| 36 | + 'system': self._init_system_stats, |
| 37 | + 'item': self._init_item_stats |
| 38 | + } |
| 39 | + for j in self.config.get('jobs', 'system').split(','): |
| 40 | + if j in ['tracker', 'system', 'item']: |
| 41 | + self.jobs.append(j) |
| 42 | + jobs_init[j]() |
| 43 | + else: |
| 44 | + self.LOG.error("Unknown job '{}' requested, not initializing it".format(j)) |
| 45 | + if not self.jobs: |
| 46 | + raise "" |
| 47 | + # Start the server right off the bat |
| 48 | + self.prom_thread = ClientServer(self.config.get('port', '8000')) |
| 49 | + self.prom_thread.start() |
| 50 | + |
| 51 | + |
| 52 | + def run(self): |
| 53 | + """Update any defined metrics |
| 54 | + """ |
| 55 | + # Update requested stats |
| 56 | + jobs = { |
| 57 | + 'tracker': self._fetch_tracker_stats, |
| 58 | + 'system': self._fetch_system_stats, |
| 59 | + 'item': self._fetch_item_stats |
| 60 | + } |
| 61 | + for j in self.jobs: |
| 62 | + jobs[j]() |
| 63 | + |
| 64 | + def _init_item_stats(self): |
| 65 | + available_methods = set(self.proxy.system.listMethods()) |
| 66 | + if 'item_stats' in self.config: |
| 67 | + item_stat_methods = ["d."+s for s in self.config['item_stats'].split(',')] |
| 68 | + item_stat_methods = set(item_stat_methods) & available_methods |
| 69 | + else: |
| 70 | + item_stat_methods = ("d.down.total", "d.up.total") |
| 71 | + if 'item_labels' in self.config: |
| 72 | + item_labels = ["d."+s for s in self.config['item_labels'].split(',')] |
| 73 | + self.item_labels = list(set(item_labels) & available_methods) |
| 74 | + else: |
| 75 | + self.item_labels = ["d.hash", "d.name"] |
| 76 | + self.item_stats = {} |
| 77 | + for m in item_stat_methods: |
| 78 | + self.item_stats[m] = Gauge(self.prefix + "item_" + m.replace('.', '_'), m, self.item_labels) |
| 79 | + |
| 80 | + def _fetch_item_stats(self): |
| 81 | + """Use d.multicall2 to |
| 82 | + """ |
| 83 | + calls = ["d."+m+"=" for m in list(self.item_stats.keys()) + self.item_labels] |
| 84 | + result = self.proxy.d.multicall2('', "main", *calls) |
| 85 | + for i in result: |
| 86 | + info = dict(list(zip(list(self.item_stats.keys()) + self.item_labels, i))) |
| 87 | + for stat, guage in self.item_stats.items(): |
| 88 | + guage.labels(*[info[l] for l in self.item_labels]).set(info[stat]) |
| 89 | + |
| 90 | + def _init_tracker_stats(self): |
| 91 | + """Initialize the tracker guages |
| 92 | + """ |
| 93 | + self.tracker_gauge = Gauge(self.prefix + 'tracker_amount', 'Number of torrents belonging to a specific tracker', ['alias']) |
| 94 | + self.tracker_error_gauge = Gauge(self.prefix + 'tracker_errors', |
| 95 | + 'Number of torrents with tracker errors belonging to a specific tracker', ['alias']) |
| 96 | + |
| 97 | + |
| 98 | + def _fetch_tracker_stats(self): |
| 99 | + """Scrape tracker metrics from item information |
| 100 | + """ |
| 101 | + item_fields = ["d.tracker_domain=", "d.message="] |
| 102 | + |
| 103 | + result = self.proxy.d.multicall("main", *item_fields) |
| 104 | + |
| 105 | + trackers = Counter([config_ini.map_announce2alias(d[0]) for d in result]) |
| 106 | + tracker_errors = Counter([config_ini.map_announce2alias(d[0]) for d in result if d[1]]) |
| 107 | + |
| 108 | + for k, v in trackers.items(): |
| 109 | + self.tracker_gauge.labels(k).set(v) |
| 110 | + for k in trackers.keys(): # Use the tracker keys to make sure all active trackers get a value |
| 111 | + self.tracker_error_gauge.labels(k).set(tracker_errors[k]) |
| 112 | + |
| 113 | + def _init_system_stats(self): |
| 114 | + """Initialize the system guages |
| 115 | + """ |
| 116 | + stat_methods = [ |
| 117 | + "throttle.global_up.rate", "throttle.global_up.max_rate", "throttle.global_up.total", |
| 118 | + "throttle.global_down.rate", "throttle.global_down.max_rate", "throttle.global_down.total", |
| 119 | + "pieces.stats_not_preloaded", "pieces.stats_preloaded", |
| 120 | + "system.files.opened_counter", "system.files.failed_counter", "system.files.closed_counter", |
| 121 | + "pieces.memory.block_count", "pieces.memory.current", "pieces.memory.max", |
| 122 | + "network.open_sockets", "pieces.sync.queue_size", |
| 123 | + "pieces.stats.total_size", "pieces.preload.type", |
| 124 | + "pieces.preload.min_size", "pieces.preload.min_rate", |
| 125 | + "pieces.memory.sync_queue", "network.max_open_files", |
| 126 | + "network.max_open_sockets", "network.http.max_open", |
| 127 | + "throttle.max_downloads.global", "throttle.max_uploads.global", |
| 128 | + "startup_time", "network.http.current_open" |
| 129 | + ] |
| 130 | + |
| 131 | + info_methods = ['system.client_version', 'system.library_version'] |
| 132 | + |
| 133 | + self.system_stats = {} |
| 134 | + for m in set(stat_methods) & set(self.proxy.system.listMethods()): # Strip out any methods that aren't available on the system |
| 135 | + self.system_stats[m] = Gauge(self.prefix + m.replace('.', '_'), m) |
| 136 | + self.system_info = Gauge(self.prefix + "info", "rTorrent platform information", [m.replace('.','_') for m in info_methods]) |
| 137 | + self.system_view_size = Gauge(self.prefix + "view_size", "Size of rtorrent views", ["view"]) |
| 138 | + |
| 139 | + def _fetch_system_stats(self): |
| 140 | + """Scrape system and view statistics |
| 141 | + """ |
| 142 | + info_methods = ['system.client_version', 'system.library_version'] |
| 143 | + views = self.proxy.view.list() |
| 144 | + |
| 145 | + # Get data via multicall |
| 146 | + # Sort the system stats because we can't trust py2 keys() to be deterministic |
| 147 | + calls = [dict(methodName=method, params=[]) for method in sorted(self.system_stats.keys())] \ |
| 148 | + + [dict(methodName=method, params=[]) for method in info_methods] \ |
| 149 | + + [dict(methodName="view.size", params=['', view]) for view in views] |
| 150 | + |
| 151 | + result = self.proxy.system.multicall(calls, flatten=True) |
| 152 | + |
| 153 | + # Get numeric metrics |
| 154 | + for m in sorted(self.system_stats.keys()): |
| 155 | + self.system_stats[m].set(result[0]) |
| 156 | + del result[0] |
| 157 | + |
| 158 | + # Get text-like information |
| 159 | + info_methods = [m.replace('.', '_') for m in info_methods] |
| 160 | + self.system_info.labels(*result[0:len(result)-len(views)]).set(1) |
| 161 | + result = result[-len(views):] |
| 162 | + |
| 163 | + # Get view information |
| 164 | + for v in views: |
| 165 | + self.system_view_size.labels(v).set(result[0]) |
| 166 | + del result[0] |
| 167 | + |
| 168 | + |
| 169 | +def module_test(): |
| 170 | + from pyrocore import connect |
| 171 | + engine = connect() |
| 172 | + |
| 173 | + i = RtorrentExporter(Bunch(jobs="system,tracker,item")) |
| 174 | + i.proxy = engine.open() |
| 175 | + while True: |
| 176 | + i.run() |
| 177 | + sleep(5) |
| 178 | + |
| 179 | +if __name__ == '__main__': |
| 180 | + module_test() |
0 commit comments