improved shell handling in service monitor. Includes ability to check disk usage on db machines. Stale monitors are also flagged in the UI.

This commit is contained in:
KeyserSosa
2009-02-17 11:56:53 -08:00
parent e59261e959
commit c0a9b7b0e5
2 changed files with 114 additions and 38 deletions

View File

@@ -19,7 +19,7 @@
# All portions of the code written by CondeNet are Copyright (c) 2006-2008
# CondeNet, Inc. All Rights Reserved.
################################################################################
import os, re, sys, socket, time, random, time
import os, re, sys, socket, time, random, time, signal
from itertools import chain
from wrapped import Wrapped
@@ -28,9 +28,38 @@ from pylons import g
from r2.lib.utils import tup
from r2.lib.cache import Memcache
import subprocess, math
class ShellProcess(object):
def __init__(self, cmd, timeout = 5, sleepcycle = 0.5):
self.proc = subprocess.Popen(cmd, shell = True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
ntries = int(math.ceil(timeout / sleepcycle))
for n in xrange(ntries):
if self.proc.poll() is not None:
break
time.sleep(sleepcycle)
else:
print "Process timeout: '%s'" % cmd
os.kill(self.proc.pid, signal.SIGTERM)
self.output, self.error = self.proc.communicate()
self.rcode = self.proc.poll()
self.timeout = (self.rcode == -signal.SIGTERM)
def __iter__(self):
return iter(self.output.split('\n'))
def read(self):
return self.output
class AppServiceMonitor(Wrapped):
cache_key = "machine_datalogger_data_"
cache_key_small = "machine_datalogger_db_summary_"
cache_key = "service_datalogger_data_"
cache_key_small = "service_datalogger_db_summary_"
cache_lifetime = "memcached_lifetime"
"""
@@ -114,7 +143,7 @@ class AppServiceMonitor(Wrapped):
self.hostlogs = list(self)
return Wrapped.render(self, *a, **kw)
def monitor(self, srvname, loop = True, loop_time = 2, *a, **kw):
def monitor(self, srvname, loop = True, loop_time = 5, *a, **kw):
host = g.reddit_host
h = HostLogger(host, self)
@@ -199,10 +228,16 @@ class Database(object):
self.ip_conn = {}
self.db_conn = {}
self.query_count = DataLogger()
self.failures = set([])
self.disk_usage = 0
def last_update(self):
update = self.connections.most_recent()[1]
return datetime.now() - update if update else None
def track(self, conn = 0, ip_conn = {}, db_conn = {}, vacuums = {},
query_count = None, max_connections = None):
query_count = None, max_connections = None,
failures = [], disk_usage = 0):
#log the number of connections
self.connections.add(conn)
@@ -225,6 +260,11 @@ class Database(object):
# has a query count
if query_count is not None:
self.query_count.add(query_count)
# list of failed connections
self.failures = set(failures)
if disk_usage:
self.disk_usage = disk_usage
class HostLogger(object):
@@ -283,12 +323,10 @@ class HostLogger(object):
if self.database:
self.database.track(**check_database(self.db_names,
**db_params))
handle = os.popen('/usr/bin/uptime')
foo = handle.read()
foo = ShellProcess('/usr/bin/env uptime').read()
foo = foo.split("load average")[1].split(':')[1].strip(' ')
self.load.add(float(foo.split(' ')[1].strip(',')))
handle.close()
self.clean_dead()
@@ -303,22 +341,18 @@ class HostLogger(object):
re_text = re.compile('\S+')
def process_info(proc_ids = [], name = '', exe = "/bin/ps"):
def process_info(proc_ids = [], name = '', exe = "/usr/bin/env ps"):
pidi = 0
cpuid = 1
memid = 2
ageid = 5
if not os.path.exists(exe):
raise ValueError, "bad executable specified for top"
cmd = ([exe, "-a", '-O', 'pcpu,pmem'] +
["-p %d" % x for x in proc_ids if x > 0])
handle = os.popen(' '.join(cmd))
proc_ids = set(map(int, proc_ids))
res = {}
for line in handle:
for line in ShellProcess(' '.join(cmd)):
line = re_text.findall(line)
try:
pid = int(line[pidi])
@@ -331,14 +365,15 @@ def process_info(proc_ids = [], name = '', exe = "/bin/ps"):
name = n)
except (ValueError, IndexError):
pass
handle.close()
return res
def supervise_list(exe = "/usr/local/bin/svstat", path = '/service/'):
handle = os.popen("%s %s*" % (exe, path))
"""
Generates a list of processes that are currently running under supervise.
"""
defunct = 0
for line in handle:
for line in ShellProcess("%s %s*" % (exe, path)):
line = line.split(' ')
name = line[0]
try:
@@ -353,19 +388,29 @@ def supervise_list(exe = "/usr/local/bin/svstat", path = '/service/'):
defunct += 1
pid = -defunct
time = 0
yield (name, "down", pid, time)
handle.close()
status = 'down'
yield (name, status, pid, time)
def check_database(db_names, proc = "postgres", check_vacuum = True, user='ri'):
database_data_dir = None
def check_database(db_names, check_vacuum = True, user='ri'):
'''
Finds the number of connections per db (and allocated per remote
IP) on localhost. Also uses the postgres "show data_directory"
command to figure out the db partition disk usage. Optionally
checks to see if the db is vacuuming.
'''
conn_failure = set([])
def simple_query(query, _db = None):
if not _db: _db = list(db_names)[0]
cmd = (r"(echo '\\t'; echo '%(query)s' ) " +
cmd = (r"( echo '\\t'; echo '%(query)s' ) " +
"| psql -U %(user)s %(db)s")
cmd = cmd % dict(query = query, user = user, db = _db)
handle = os.popen(cmd)
res = list(handle)
handle.close()
return res
handle = ShellProcess(cmd)
if handle.rcode and not handle.timeout:
conn_failure.add(db)
return []
return iter(handle)
by_ip = {}
by_db = {}
@@ -383,8 +428,8 @@ def check_database(db_names, proc = "postgres", check_vacuum = True, user='ri'):
vacuums = {}
if check_vacuum:
for db in by_db:
for line in simple_query('select * from active;', db):
cmd = line.split('|')[-1].strip(' ').lower()
for line in simple_query('select current_query from active;', db):
cmd = line.strip(' ').lower()
if cmd.startswith('vacuum '):
vacuums[db] = True
break
@@ -402,13 +447,33 @@ def check_database(db_names, proc = "postgres", check_vacuum = True, user='ri'):
if 'query_queue' in by_db:
for line in simple_query('select count(*) from reddit_query_queue;',
'query_queue'):
'query_queue'):
try:
res['query_count'] = int(line.strip('\n '))
break
except ValueError:
continue
# best to only have to fetch the db dir once. It shouldn't be
# moving around
global database_data_dir
if database_data_dir is None:
for line in simple_query('show data_directory'):
line = line.strip(' \n')
if os.path.exists(line):
database_data_dir = line
break
if database_data_dir:
for line in ShellProcess("/usr/bin/env df %s" % database_data_dir):
line = filter(None, line.split(' '))
if len(line) > 4 and line[4].endswith('%'):
try:
res['disk_usage'] = float(line[4].strip('%'))/100
except TypeError:
pass
res['failures'] = conn_failure
return res
def monitor_cache_lifetime(minutes, retest = 10, ntest = -1,

View File

@@ -41,9 +41,8 @@
<table class="monitor-database">
<tr class="title-region">
<th>database</th>
<th>connections</th>
<th>
</th>
<th>disk</th>
<th>conn</th>
<th></th>
</tr>
%for host in thing.hostlogs:
@@ -52,26 +51,30 @@
host_id = host.host.replace('.', '-')
load = host.load()
load_level = min(max(int(load+0.4), 0),4)
if hasattr(host.database, "disk_usage"):
du = host.database.disk_usage
else:
du = 0;
du_wid = int(cpu_col*min(1, du))
if host.database.max_connections > 0:
max_conn = host.database.max_connections
conn = float(host.database.connections()) / max_conn
conn_wid = int(cpu_col*min(1, conn))
else:
max_conn = conn = 0
conn_wid = 0
%>
<tr class="load${load_level} title-region" id="${host_id}">
<th>
${host.host} load: ${load}
</th>
<th>
<th width="${cpu_col}px">
%if host.database.vacuuming:
<blink style="color:red">VACUUMING!</blink>
%else:
<div class="membar" style="width:${conn_wid}px;">
<div class="membar" style="width:${du_wid}px;">
<span>
${"%3.0f%%" % (100* conn) }
${"%3.0f%%" % (100 * du) }
</span>
</div>
%endif
@@ -80,6 +83,14 @@
<span style="color:green">${int(conn * max_conn)}</span>
&nbsp;/&nbsp;
<span>${max_conn}</span>
%if hasattr(host.database, "last_update"):
<% update = host.database.last_update() %>
%if not update or update > datetime.timedelta(0, 60):
<b style="color:orangered; font-size:smaller">
(${str(update).split('.')[0]} old)
</b>
%endif
%endif
</td>
</tr>
<%