Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: remote data collection over existing postgres connection #80

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions pg_view/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

from pg_view import consts
from pg_view import flags
from pg_view.collectors.host_collector import HostStatCollector
from pg_view.collectors.memory_collector import MemoryStatCollector
from pg_view.collectors.host_collector import HostStatCollector, RemoteHostDataSource
from pg_view.collectors.memory_collector import MemoryStatCollector, RemoteMemoryDataSource
from pg_view.collectors.partition_collector import PartitionStatCollector, DetachedDiskStatCollector
from pg_view.collectors.pg_collector import PgstatCollector
from pg_view.collectors.system_collector import SystemStatCollector
from pg_view.collectors.system_collector import SystemStatCollector, RemoteSystemDataSource
from pg_view.loggers import logger, enable_logging_to_stderr, disable_logging_to_stderr
from pg_view.models.consumers import DiskCollectorConsumer
from pg_view.models.db_client import build_connection, detect_db_connection_arguments, \
Expand Down Expand Up @@ -258,15 +258,25 @@ def main():
collector.start()
consumer = DiskCollectorConsumer(q)

collectors.append(HostStatCollector())
collectors.append(SystemStatCollector())
collectors.append(MemoryStatCollector())
use_local_data = not(options.host) or options.host.startswith('/')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

host name can be also read from the configuration file

if use_local_data:
collectors.append(HostStatCollector())
collectors.append(SystemStatCollector())
collectors.append(MemoryStatCollector())
else:
pgcon = clusters[0]['pgcon']
collectors.append(HostStatCollector(RemoteHostDataSource(pgcon)))
collectors.append(SystemStatCollector(RemoteSystemDataSource(pgcon)))
collectors.append(MemoryStatCollector(RemoteMemoryDataSource(pgcon)))

for cl in clusters:
part = PartitionStatCollector(cl['name'], cl['ver'], cl['wd'], consumer)
pg = PgstatCollector(cl['pgcon'], cl['reconnect'], cl['pid'], cl['name'], cl['ver'], options.pid)
groupname = cl['wd']
groups[groupname] = {'pg': pg, 'partitions': part}
collectors.append(part)
groups[groupname] = {'pg': pg}
if use_local_data:
part = PartitionStatCollector(cl['name'], cl['ver'], cl['wd'], consumer)
groups[groupname]['partitions'] = part
collectors.append(part)
collectors.append(pg)

# we don't want to mix diagnostics messages with useful output, so we log the former into a file.
Expand Down
101 changes: 74 additions & 27 deletions pg_view/collectors/host_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,83 @@
from datetime import timedelta
from multiprocessing import cpu_count

import psycopg2

from pg_view.collectors.base_collector import StatCollector
from pg_view.loggers import logger
from pg_view.models.outputs import COLSTATUS, COLHEADER


UPTIME_FILENAME = '/proc/uptime'

class LocalHostDataSource(object):
def __int__(self):
pass

def __call__(self):
try:
with open(UPTIME_FILENAME, 'rU') as f:
uptime = f.read().split()
except:
uptime = 0
try:
ncpus = cpu_count()
except:
logger.error('multiprocessing does not support cpu_count')
ncpus = 0
return {
'uptime': uptime,
'loadavg': os.getloadavg(),
'hostname': socket.gethostname(),
'uname': os.uname(),
'ncpus': ncpus
}


class RemoteHostDataSource(object):
def __init__(self, pgcon):
self.pgcon = pgcon

def __call__(self):
"""
CREATE OR REPLACE FUNCTION pgview.get_host_info(OUT uptime double precision[], OUT loadavg double precision[], OUT hostname text, OUT uname text[], OUT ncpus integer)
RETURNS record
LANGUAGE plpythonu
AS $function$
import os
import socket
from multiprocessing import cpu_count
try:
with open('/proc/uptime', 'rU') as f:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nitpick, we should not use hard-coded names when generating sprocs text.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just copy-paste from my psql prompt, so I don't lose the func definition ;-)

uptime = f.read().split()
except:
uptime = 0
try:
ncpus = cpu_count()
except:
ncpus = 0
return (uptime, os.getloadavg(), socket.gethostname(), os.uname(), ncpus)
$function$
"""
cur = self.pgcon.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM pgview.get_host_info()")
res = cur.fetchone()
cur.close()
self.pgcon.commit()
return res


class HostStatCollector(StatCollector):

""" General system-wide statistics """

UPTIME_FILE = '/proc/uptime'
data = {}

def __init__(self):
def __init__(self, data_source=LocalHostDataSource()):
super(HostStatCollector, self).__init__(produce_diffs=False)

self.data_source = data_source

self.transform_list_data = [{'out': 'loadavg', 'infn': self._concat_load_avg}]
self.transform_uptime_data = [{'out': 'uptime', 'in': 0, 'fn': self._uptime_to_str}]
self.transform_uname_data = [{'out': 'sysname', 'infn': self._construct_sysname}]
Expand Down Expand Up @@ -65,6 +128,8 @@ def __init__(self):
self.postinit()

def refresh(self):
self.data = self.data_source()

raw_result = {}
raw_result.update(self._read_uptime())
raw_result.update(self._read_load_average())
Expand All @@ -74,7 +139,7 @@ def refresh(self):
self._do_refresh([raw_result])

def _read_load_average(self):
return self._transform_list(os.getloadavg())
return self._transform_list(self.data['loadavg'])

def _load_avg_state(self, row, col):
state = {}
Expand Down Expand Up @@ -111,44 +176,26 @@ def _load_avg_status(self, row, col, val, bound):
return True
return False

@staticmethod
def _read_cpus():
cpus = 0
try:
cpus = cpu_count()
except:
logger.error('multiprocessing does not support cpu_count')
pass
return {'cores': cpus}
def _read_cpus(self):
return {'cores': self.data['ncpus']}

def _construct_sysname(self, attname, row, optional):
if len(row) < 3:
return None
return '{0} {1}'.format(row[0], row[2])

def _read_uptime(self):
fp = None
raw_result = []
try:
fp = open(HostStatCollector.UPTIME_FILE, 'rU')
raw_result = fp.read().split()
except:
logger.error('Unable to read uptime from {0}'.format(HostStatCollector.UPTIME_FILE))
finally:
fp and fp.close()
return self._transform_input(raw_result, self.transform_uptime_data)
return self._transform_input(self.data['uptime'], self.transform_uptime_data)

@staticmethod
def _uptime_to_str(uptime):
return str(timedelta(seconds=int(float(uptime))))

@staticmethod
def _read_hostname():
return {'hostname': socket.gethostname()}
def _read_hostname(self):
return {'hostname': self.data['hostname']}

def _read_uname(self):
uname_row = os.uname()
return self._transform_input(uname_row, self.transform_uname_data)
return self._transform_input(self.data['uname'], self.transform_uname_data)

def output(self, method):
return super(self.__class__, self).output(method, before_string='Host statistics', after_string='\n')
87 changes: 62 additions & 25 deletions pg_view/collectors/memory_collector.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,58 @@
import psycopg2

from pg_view.collectors.base_collector import StatCollector
from pg_view.loggers import logger


MEMORY_STAT_FILENAME = '/proc/meminfo'

class LocalMemoryDataSource(object):
def __init__(self):
pass

def __call__(self):
try:
with open(MEMORY_STAT_FILENAME, 'rU') as f:
for line in f:
yield line.strip()
except IOError:
logger.error('Unable to read {0} memory statistics. Check your permissions'.format(MEMORY_STAT_FILENAME))


class RemoteMemoryDataSource(object):
def __init__(self, pgcon):
self.pgcon = pgcon

def __call__(self):
"""
CREATE OR REPLACE FUNCTION pgview.get_memory_info(OUT results text)
RETURNS SETOF text
LANGUAGE plpythonu
AS $function$
try:
with open('/proc/meminfo', 'rU') as f:
for line in f:
yield line.strip()
except:
pass
$function$
"""
cur = self.pgcon.cursor()
cur.execute("SELECT * FROM pgview.get_memory_info()")
res = [row[0] for row in cur.fetchall()]
cur.close()
self.pgcon.commit()
return res


class MemoryStatCollector(StatCollector):
""" Collect memory-related statistics """

MEMORY_STAT_FILE = '/proc/meminfo'

def __init__(self):
def __init__(self, data_source=LocalMemoryDataSource()):
super(MemoryStatCollector, self).__init__(produce_diffs=False)

self.data_source = data_source

self.transform_dict_data = [
{'in': 'MemTotal', 'out': 'total', 'fn': int},
{'in': 'MemFree', 'out': 'free', 'fn': int},
Expand Down Expand Up @@ -122,29 +166,22 @@ def _read_memory_data(self):
MemTotal, MemFree, Buffers, Cached, Dirty, CommitLimit, Committed_AS
"""
result = {}
try:
fp = open(MemoryStatCollector.MEMORY_STAT_FILE, 'rU')
for l in fp:
vals = l.strip().split()
if len(vals) >= 2:
name, val = vals[:2]
# if we have units of measurement different from kB - transform the result
if len(vals) == 3 and vals[2] in ('mB', 'gB'):
if vals[2] == 'mB':
val = val + '0' * 3
if vals[2] == 'gB':
val = val + '0' * 6
if len(str(name)) > 1:
result[str(name)[:-1]] = val
else:
logger.error('name is too short: {0}'.format(str(name)))
for line in self.data_source():
vals = line.split()
if len(vals) >= 2:
name, val = vals[:2]
# if we have units of measurement different from kB - transform the result
if len(vals) == 3 and vals[2] in ('mB', 'gB'):
if vals[2] == 'mB':
val = val + '0' * 3
if vals[2] == 'gB':
val = val + '0' * 6
if len(str(name)) > 1:
result[str(name)[:-1]] = val
else:
logger.error('/proc/meminfo string is not name value: {0}'.format(vals))
except:
logger.error('Unable to read /proc/meminfo memory statistics. Check your permissions')
return result
finally:
fp.close()
logger.error('name is too short: {0}'.format(str(name)))
else:
logger.error('/proc/meminfo string is not name value: {0}'.format(vals))
return result

def calculate_kb_left_until_limit(self, colname, row, optional):
Expand Down
56 changes: 49 additions & 7 deletions pg_view/collectors/system_collector.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,60 @@
import psycopg2

from pg_view.collectors.base_collector import StatCollector
from pg_view.loggers import logger


PROC_STAT_FILENAME = '/proc/stat'

class LocalSystemDataSource(object):
def __init__(self):
pass

def __call__(self):
try:
# split /proc/stat into the name - value pairs
with open(PROC_STAT_FILENAME, 'rU') as f:
for line in f:
yield line.strip()
except IOError:
logger.error('Unable to read {0}, global data will be unavailable'.format(PROC_STAT_FILENAME))


class RemoteSystemDataSource(object):
def __init__(self, pgcon):
self.pgcon = pgcon

def __call__(self):
"""
CREATE OR REPLACE FUNCTION pgview.get_system_info(OUT results text)
RETURNS SETOF text
LANGUAGE plpythonu
AS $function$
try:
with open('/proc/stat', 'rU') as f:
for line in f:
yield line.strip()
except:
pass
$function$
"""
cur = self.pgcon.cursor()
cur.execute("SELECT * FROM pgview.get_system_info()")
res = [row[0] for row in cur.fetchall()]
cur.close()
self.pgcon.commit()
return res


class SystemStatCollector(StatCollector):

""" Collect global system statistics, i.e. CPU/IO usage, not including memory. """

PROC_STAT_FILENAME = '/proc/stat'

def __init__(self):
def __init__(self, data_source=LocalSystemDataSource()):
super(SystemStatCollector, self).__init__()

self.data_source = data_source

self.transform_list_data = [
{'out': 'utime', 'in': 0, 'fn': float},
{'out': 'stime', 'in': 2, 'fn': float},
Expand Down Expand Up @@ -173,10 +217,8 @@ def _read_proc_stat(self):
raw_result = {}
result = {}
try:
fp = open(SystemStatCollector.PROC_STAT_FILENAME, 'rU')
# split /proc/stat into the name - value pairs
for line in fp:
elements = line.strip().split()
for line in self.data_source():
elements = line.split()
if len(elements) > 2:
raw_result[elements[0]] = elements[1:]
elif len(elements) > 1:
Expand Down
7 changes: 4 additions & 3 deletions pg_view/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ def process_single_collector(st):

def process_groups(groups):
for name in groups:
part = groups[name]['partitions']
pg = groups[name]['pg']
part.ncurses_set_prefix(pg.ncurses_produce_prefix())
part = groups[name].get('partitions')
if part:
pg = groups[name]['pg']
part.ncurses_set_prefix(pg.ncurses_produce_prefix())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not show useful information about the cluster (i.e. version, max connections, active and idle ones). I think we need to do pg.ncurses_set_prefix(pg.ncurses_produce_prefix()), attaching it to the header of pg collector.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a temp hack to avoid it crashing because partition collector doesn't have a remote data source yet.



def dbversion_as_float(pgcon):
Expand Down