From 230c8a9cc2c85addd47d8a643779999ff6d95356 Mon Sep 17 00:00:00 2001 From: KeyserSosa Date: Thu, 12 Feb 2009 11:31:06 -0800 Subject: [PATCH] moved service monitor innards into r2.lib, updated to use ps instead of top for getting cpu and mem percentages, and added db load requests to AppServiceMonitor --- r2/r2/lib/services.py | 402 +++++++++++++++++++++++++ r2/r2/templates/admin_rightbox.html | 2 +- r2/r2/templates/appservicemonitor.html | 2 +- r2/r2/templates/framebuster.html | 8 +- r2/supervise_watcher.py | 357 +--------------------- 5 files changed, 414 insertions(+), 357 deletions(-) create mode 100644 r2/r2/lib/services.py diff --git a/r2/r2/lib/services.py b/r2/r2/lib/services.py new file mode 100644 index 000000000..a4a3f03e6 --- /dev/null +++ b/r2/r2/lib/services.py @@ -0,0 +1,402 @@ +# The contents of this file are subject to the Common Public Attribution +# License Version 1.0. (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public +# License Version 1.1, but Sections 14 and 15 have been added to cover use of +# software over a computer network and provide for limited attribution for the +# Original Developer. In addition, Exhibit A has been modified to be consistent +# with Exhibit B. +# +# Software distributed under the License is distributed on an "AS IS" basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for +# the specific language governing rights and limitations under the License. +# +# The Original Code is Reddit. +# +# The Original Developer is the Initial Developer. The Initial Developer of the +# Original Code is CondeNet, Inc. +# +# All portions of the code written by CondeNet are Copyright (c) 2006-2008 +# CondeNet, Inc. All Rights Reserved. +################################################################################ +import os, re, sys, socket, time +from wrapped import Wrapped +from datetime import datetime, timedelta +from pylons import g +from r2.lib.utils import tup +from itertools import chain + +class AppServiceMonitor(Wrapped): + cache_key = "machine_datalogger_data_" + cache_key_small = "machine_datalogger_db_summary_" + + """ + Master controller class for service monitoring. + This class has three purposes: + + * Fetches Hostlogger instances from the cache for generating + reports (by calling render() as it is a subclass of wrapped). + + * keeping track of which machines are DB machines, allowing db + load to be checked and improving load balancing. + + * monitoring the local host's load and storing it in the cache. + + """ + + def __init__(self, hosts = None): + """ + hosts is a list of machine hostnames to be tracked. + """ + self._hosts = hosts or g.monitored_servers + + db_info = {} + for db in g.databases: + dbase, ip = list(g.to_iter(getattr(g, db + "_db")))[:2] + name = socket.gethostbyaddr(ip)[0] + + for host in g.monitored_servers: + if (name == host or + ("." in host and name.endswith("." + host)) or + name.startswith(host + ".")): + db_info[db] = (dbase, ip, host) + + self._db_info = db_info + self.hostlogs = [] + Wrapped.__init__(self) + + @classmethod + def from_cache(cls, host): + key = cls.cache_key + str(host) + return g.rendercache.get(key) + + def set_cache(self, h): + cache = g.rendercache + # cache the whole object + res = {} + res[self.cache_key + str(h.host)] = h + if h.database: + data = (h.load(), h.load(60), + h.database.connections(), h.database.connections(60), + h.database.max_connections) + # cache summary for easy db lookups + for dbn in chain(h.ini_db_names, h.db_ips): + res[self.cache_key_small + dbn] = data + cache.set_multi(res) + + @classmethod + def get_db_load(cls, *names): + return g.rendercache.get_multi(names, prefix = cls.cache_key_small) + + def database_load(self, db_name): + if self._db_info.has_key(db_name): + return self.server_load(self._db_info[db_name][-1]) + + def server_load(self, mach_name): + h = self.from_cache(host) + return h.load.most_recent() + + def __iter__(self): + return iter(self.hostlogs) + + def render(self, *a, **kw): + self.hostlogs = [self.from_cache(host) + for host in self._hosts] + self.hostlogs = filter(None, self.hostlogs) + return Wrapped.render(self, *a, **kw) + + def monitor(self, loop = True, loop_time = 2, *a, **kw): + + host = g.reddit_host + h = HostLogger(host, self) + while True: + h.monitor(*a, **kw) + + self.set_cache(h) + if loop: + time.sleep(loop_time) + else: + break + + + def is_db_machine(self, host): + """ + Given a host name, checks the list of known DB machines to + determine if the host is one of them. + """ + return dict((k, (d2, ip)) + for k, (d2,ip,name) in self._db_info.iteritems() + if host == name) + + +class DataLogger(object): + """ + simple stat tracker class. Elements are added to a list of length + maxlen along with their timestamp. __call__ generates the average + of the interval provided or returns the last element if no + interval is provided + """ + + def __init__(self, maxlen = 300): + self._list = [] + self.maxlen = maxlen + + def add(self, value): + self._list.append((value, datetime.now())) + if len(self._list) > self.maxlen: + self._list = self._list[-self.maxlen:] + + + def __call__(self, average = None): + time = datetime.now() + if average > 0 and self._list: + lst = filter(lambda x: time - x[1] <= timedelta(0, average), + self._list) + return sum(x[0] for x in lst)/max(len(lst), 1) + elif self._list: + return self._list[-1][0] + else: + return -1 + + def __len__(self): + return len(self._list) + + def most_recent(self): + if self._list: + return self._list[-1] + else: + return [0, None] + + +class Service(object): + def __init__(self, name, pid, age): + self.name = name + self.pid = pid + self.age = age + + self.mem = DataLogger() + self.cpu = DataLogger() + + def last_update(self): + return max(x.most_recent()[1] for x in [self.mem, self.cpu]) + + +class Database(object): + + def __init__(self): + self.vacuuming = [] + self.connections = DataLogger() + self.max_connections = -1 + self.ip_conn = {} + self.db_conn = {} + self.query_count = DataLogger() + + + def track(self, conn = 0, ip_conn = {}, db_conn = {}, vacuums = {}, + query_count = None, max_connections = None): + + #log the number of connections + self.connections.add(conn) + if self.max_connections: + self.max_connections = max_connections + + # log usage by ip + for ip, num in ip_conn.iteritems(): + self.ip_conn.setdefault(ip, DataLogger()) + self.ip_conn[ip].add(num) + + # log usage by db + for db, num in db_conn.iteritems(): + self.db_conn.setdefault(db, DataLogger()) + self.db_conn[db].add(num) + + # log vacuuming + self.vacuuming = [k for k, v in vacuums.iteritems() if v] + + # has a query count + if query_count is not None: + self.query_count.add(query_count) + +class HostLogger(object): + + def __init__(self, host, master): + self.host = host + self.load = DataLogger() + self.services = {} + db_info = master.is_db_machine(host) + + self.ini_db_names = db_info.keys() + self.db_names = set(name for name, ip in db_info.itervalues()) + self.db_ips = set(ip for name, ip in db_info.itervalues()) + + self.database = Database() if self.db_names else None + + def service_pids(self): + return self.services.keys() + + def track(self, pid, cpu = 0, mem = 0, **kw): + pid = int(pid) + if self.services.has_key(pid): + s = self.services[pid] + s.cpu.add(cpu) + s.mem.add(mem) + + def add_service(self, name, pid, age): + pid = int(pid) + if not self.services.has_key(pid): + self.services[pid] = Service(name, pid, int(age / 60)) + else: + self.services[pid].age = int(age / 60) + + def clean_dead(self, age = 10): + time = datetime.now() + for pid, s in list(self.services.iteritems()): + t = s.last_update() + if not t or t < time - timedelta(0, age) or pid < 0: + del self.services[pid] + + + def monitor(self, srvname = None, + srv_params = {}, top_params = {}, db_params = {}): + # (re)populate the service listing + for name, status, pid, t in supervise_list(**srv_params): + if not srvname or any(s in name for s in srvname): + self.add_service(name, pid, t) + + # check process usage + proc_info = process_info(proc_ids = self.service_pids(), + **top_params) + for pid, info in proc_info.iteritems(): + self.track(pid, **info) + + #check db usage: + if self.database: + self.database.track(**check_database(self.db_names, + **db_params)) + + handle = os.popen('/usr/bin/uptime') + foo = handle.read() + foo = foo.split("load average")[1].split(':')[1].strip(' ') + self.load.add(float(foo.split(' ')[1].strip(','))) + handle.close() + + self.clean_dead() + + + def __iter__(self): + s = self.services + pids = s.keys() + pids.sort(lambda x, y: 1 if s[x].name > s[y].name else -1) + for pid in pids: + yield s[pid] + + + +re_text = re.compile('\S+') +def process_info(proc_ids = [], name = '', exe = "/bin/ps"): + pidi = 0 + cpuid = 1 + memid = 2 + ageid = 5 + + if not os.path.exists(exe): + raise ValueError, "bad executable specified for top" + + cmd = ([exe, "-a", '-O', 'pcpu,pmem'] + + ["-p %d" % x for x in proc_ids if x > 0]) + handle = os.popen(' '.join(cmd)) + + proc_ids = set(map(int, proc_ids)) + res = {} + for line in handle: + line = re_text.findall(line) + try: + pid = int(line[pidi]) + n = ' '.join(line[ageid+1:]) + if (n.startswith(name) and + (not proc_ids or int(pid) in proc_ids)): + res[pid] = dict(cpu = float(line[cpuid]), + mem = float(line[memid]), + age = float(line[ageid].split(':')[0]), + name = n) + except (ValueError, IndexError): + pass + handle.close() + return res + + +def supervise_list(exe = "/usr/local/bin/svstat", path = '/service/'): + handle = os.popen("%s %s*" % (exe, path)) + defunct = 0 + for line in handle: + line = line.split(' ') + name = line[0] + try: + status, blah, pid, time = line[1:5] + name = name[len(path):].strip(':') + if status == 'up': + pid = int(pid.strip(')')) + time = int(time) + else: + raise ValueError, "down process" + except ValueError: + defunct += 1 + pid = -defunct + time = 0 + yield (name, "down", pid, time) + handle.close() + +def check_database(db_names, proc = "postgres", check_vacuum = True, user='ri'): + def simple_query(query, _db = None): + if not _db: _db = list(db_names)[0] + cmd = (r"(echo '\\t'; echo '%(query)s' ) " + + "| psql -U %(user)s %(db)s") + cmd = cmd % dict(query = query, user = user, db = _db) + handle = os.popen(cmd) + res = list(handle) + handle.close() + return res + + by_ip = {} + by_db = {} + total = 0 + for line in simple_query("select datname, client_addr " + "from pg_catalog.pg_stat_activity ;"): + line = line.strip(' \n').split("|") + if len(line) == 2: + db, ip = map(lambda x: x.strip(' '), line) + ip = ip or '[local]' + by_ip[ip] = by_ip.get(ip, 0) + 1 + by_db[db] = by_db.get(db, 0) + 1 + total += 1 + + vacuums = {} + if check_vacuum: + for db in by_db: + for line in simple_query('select * from active;', db): + cmd = line.split('|')[-1].strip(' ').lower() + if cmd.startswith('vacuum '): + vacuums[db] = True + break + + res = dict(conn = total, ip_conn = by_ip, db_conn = by_db, + vacuums = vacuums, max_connections = 0) + + for line in simple_query('show max_connections;'): + try: + res['max_connections'] = int(line.strip('\n ')) + break + except ValueError: + continue + + + if 'query_queue' in by_db: + for line in simple_query('select count(*) from reddit_query_queue;', + 'query_queue'): + try: + res['query_count'] = int(line.strip('\n ')) + break + except ValueError: + continue + + return res diff --git a/r2/r2/templates/admin_rightbox.html b/r2/r2/templates/admin_rightbox.html index d6dd63b5f..0597dad8e 100644 --- a/r2/r2/templates/admin_rightbox.html +++ b/r2/r2/templates/admin_rightbox.html @@ -21,7 +21,7 @@ ################################################################################ <% - from supervise_watcher import AppServiceMonitor + from r2.lib.services import AppServiceMonitor %> %if c.user_is_admin: diff --git a/r2/r2/templates/appservicemonitor.html b/r2/r2/templates/appservicemonitor.html index a7c1a8fca..dbaa37469 100644 --- a/r2/r2/templates/appservicemonitor.html +++ b/r2/r2/templates/appservicemonitor.html @@ -65,7 +65,7 @@ qcount = host.database.query_count \ if hasattr(host.database, "query_count") else None %> - %if qcount: + %if qcount and qcount(): diff --git a/r2/r2/templates/framebuster.html b/r2/r2/templates/framebuster.html index ac203e7e5..11deb04dc 100644 --- a/r2/r2/templates/framebuster.html +++ b/r2/r2/templates/framebuster.html @@ -37,7 +37,13 @@ (function($) { $.framebuster = function() { $(function() { - $.refresh(); + if($.browser.mozilla) { + var url = window.location.href; + $.redirect(url + ((url.indexOf('?') == -1)?'?':'&') + + 'v=' + Math.random()); + } + else + $.refresh(); }); } })(jQuery); diff --git a/r2/supervise_watcher.py b/r2/supervise_watcher.py index 213bf1911..d238f338e 100644 --- a/r2/supervise_watcher.py +++ b/r2/supervise_watcher.py @@ -19,259 +19,8 @@ # All portions of the code written by CondeNet are Copyright (c) 2006-2009 # CondeNet, Inc. All Rights Reserved. ################################################################################ -#!/usr/bin/env python -import os, re, sys, socket, time, smtplib -import subprocess -from datetime import datetime, timedelta -from r2.lib.wrapped import Wrapped - - -class AppServiceMonitor(Wrapped): - """ - Master controller class for service monitoring. Can be - initialized at the same time as pylons.g provided g is passed in - as the global_config argument. This class has three purposes: - - * Fetches Hostlogger instances from the cache for generating - reports (by calling render() as it is a subclass of wrapped). - - * keeping track of which machines are DB machines, allowing db - load to be checked and improving load balancing. - - * monitoring the local host's load and storing it in the cache. - - """ - - def __init__(self, hosts = None, global_conf = None): - """ - hosts is a list of machine hostnames to be tracked (will - default to global_conf.monitored_servers if not provided). - Note the ability to pass in the global_conf (aka pylons.g) - to allow for initializing before the app has finished loading. - """ - if not global_conf: - from pylons import g - global_conf = g - self.global_conf = global_conf - self._hosts = hosts or global_conf.monitored_servers - - db_info = {} - for db in global_conf.databases: - dbase, ip = list(global_conf.to_iter( - getattr(global_conf, db + "_db")))[:2] - name = socket.gethostbyaddr(ip)[0] - - for host in global_conf.monitored_servers: - if (name == host or - ("." in host and name.endswith("." + host)) or - name.startswith(host + ".")): - db_info[db] = (dbase, ip, host) - - self._db_info = db_info - self.hostlogs = [] - Wrapped.__init__(self) - - def database_load(self, db_name): - if self._db_info.has_key(db_name): - return self.server_load(self._db_info[db_name][-1]) - - @staticmethod - def server_load(mach_name): - h = HostLogger.from_cache(host, self.global_conf) - return h.load.most_recent() - - def __iter__(self): - return iter(self.hostlogs) - - def render(self, *a, **kw): - self.hostlogs = [HostLogger.from_cache(host, self.global_conf) - for host in self._hosts] - self.hostlogs = filter(None, self.hostlogs) - return Wrapped.render(self, *a, **kw) - - def monitor(self, *a, **kw): - host = self.global_conf.reddit_host - h = (HostLogger.from_cache(host, self.global_conf) or - HostLogger(host, self)) - return h.monitor(self, *a, **kw) - - def is_db_machine(self, host): - """ - Given a host name, checks the list of known DB machines to - determine if the host is one of them. - """ - return any(host == name for d2,ip,name in self._db_info.values()) - - -class DataLogger(object): - """ - simple stat tracker class. Elements are added to a list of length - maxlen along with their timestamp. __call__ generates the average - of the interval provided or returns the last element if no - interval is provided - """ - - def __init__(self, maxlen = 300): - self._list = [] - self.maxlen = maxlen - - def add(self, value): - self._list.append((value, datetime.now())) - if len(self._list) > self.maxlen: - self._list = self._list[-self.maxlen:] - - - def __call__(self, average = None): - time = datetime.now() - if average > 0 and self._list: - lst = filter(lambda x: time - x[1] <= timedelta(0, average), - self._list) - return sum(x[0] for x in lst)/max(len(lst), 1) - elif self._list: - return self._list[-1][0] - else: - return 0 - - def most_recent(self): - if self._list: - return self._list[-1] - else: - return [0, None] - - -class Service(object): - def __init__(self, name, pid, age): - self.name = name - self.pid = pid - self.age = age - - self.mem = DataLogger() - self.cpu = DataLogger() - - def last_update(self): - return max(x.most_recent()[1] for x in [self.mem, self.cpu]) - - -class Database(object): - - def __init__(self): - self.vacuuming = [] - self.connections = DataLogger() - self.ip_conn = {} - self.db_conn = {} - self.query_count = DataLogger() - - - def track(self, conn = 0, ip_conn = {}, db_conn = {}, vacuums = {}, - query_count = None): - - #log the number of connections - self.connections.add(conn) - - # log usage by ip - for ip, num in ip_conn.iteritems(): - self.ip_conn.setdefault(ip, DataLogger()) - self.ip_conn[ip].add(num) - - # log usage by db - for db, num in db_conn.iteritems(): - self.db_conn.setdefault(db, DataLogger()) - self.db_conn[db].add(num) - - # log vacuuming - self.vacuuming = [k for k, v in vacuums.iteritems() if v] - - # has a query count - if query_count is not None: - self.query_count.add(query_count) - -class HostLogger(object): - cache_key = "machine_datalog_data_" - - @classmethod - def cache(self, global_conf): - return global_conf.rendercache - - def __init__(self, host, master): - self.host = host - self.load = DataLogger() - self.services = {} - self.database = Database() if master.is_db_machine(host) else None - - def service_pids(self): - return self.services.keys() - - def track(self, pid, cpu = 0, mem = 0, **kw): - pid = int(pid) - if self.services.has_key(pid): - s = self.services[pid] - s.cpu.add(cpu) - s.mem.add(mem) - - def add_service(self, name, pid, age): - pid = int(pid) - if not self.services.has_key(pid): - self.services[pid] = Service(name, pid, int(age / 60)) - else: - self.services[pid].age = int(age / 60) - - def set_cache(self, global_conf): - key = self.cache_key + str(self.host) - self.cache(global_conf).set(key, self) - - @classmethod - def from_cache(cls, host, global_conf): - key = cls.cache_key + str(host) - return cls.cache(global_conf).get(key) - - def clean_dead(self, age = 10): - time = datetime.now() - for pid, s in list(self.services.iteritems()): - t = s.last_update() - if not t or t < time - timedelta(0, age) or pid < 0: - del self.services[pid] - - - def monitor(self, service_monitor, - srvname = None, loop = True, loop_time = 2, - srv_params = {}, top_params = {}, db_params = {}): - while True: - # (re)populate the service listing - for name, status, pid, t in supervise_list(**srv_params): - if not srvname or any(s in name for s in srvname): - self.add_service(name, pid, t) - - # check process usage - proc_info = run_top(proc_ids = self.service_pids(), **top_params) - for pid, info in proc_info.iteritems(): - self.track(pid, **info) - - #check db usage: - if self.database: - self.database.track(**check_database(**db_params)) - - handle = os.popen('/usr/bin/uptime') - foo = handle.read() - foo = foo.split("load average")[1].split(':')[1].strip(' ') - self.load.add(float(foo.split(' ')[1].strip(','))) - handle.close() - - self.clean_dead() - self.set_cache(service_monitor.global_conf) - - if loop: - time.sleep(loop_time) - else: - break - - def __iter__(self): - s = self.services - pids = s.keys() - pids.sort(lambda x, y: 1 if s[x].name > s[y].name else -1) - for pid in pids: - yield s[pid] - - +import os, re, sys, time, smtplib +from r2.lib.services import AppServiceMonitor def Alert(restart_list = ['MEM','CPU'], alert_recipients = ['nerds@reddit.com'], @@ -339,110 +88,10 @@ def Alert(restart_list = ['MEM','CPU'], session.quit() -re_text = re.compile('\S+') -def run_top(proc_ids = [], name = '', exe = "/usr/bin/top"): - pidi = 0 - cpuid = 8 - memid = 9 - ageid = 10 - procid = 11 - - if not os.path.exists(exe): - raise ValueError, "bad executable specified for top" - - cmd = [exe, '-b', '-n1'] + ["-p%d" % x for x in proc_ids if x > 0] - - handle = subprocess.Popen(cmd, stdout = subprocess.PIPE, - stderr = subprocess.PIPE) - proc_ids = set(map(int, proc_ids)) - res = {} - for line in handle.communicate()[0].split('\n'): - line = re_text.findall(line) - try: - pid = int(line[pidi]) - n = line[-1] - if (n.startswith(name) and - (not proc_ids or int(pid) in proc_ids)): - res[pid] = dict(cpu = float(line[cpuid]), - mem = float(line[memid]), - age = float(line[ageid].split(':')[0]), - name = n) - except (ValueError, IndexError): - pass - return res - - -def supervise_list(exe = "/usr/local/bin/svstat", path = '/service/'): - handle = os.popen("%s %s*" % (exe, path)) - defunct = 0 - for line in handle: - line = line.split(' ') - name = line[0] - try: - status, blah, pid, time = line[1:5] - name = name[len(path):].strip(':') - if status == 'up': - pid = int(pid.strip(')')) - time = int(time) - else: - raise ValueError, "down process" - except ValueError: - defunct += 1 - pid = -defunct - time = 0 - yield (name, "down", pid, time) - handle.close() - -def check_database(proc = "postgres", check_vacuum = True, user='ri'): - handle = os.popen("ps auxwww | grep ' %s'" %proc) - lines = [l.strip() for l in handle] - handle.close() - - by_ip = {} - by_db = {} - total = 0 - for line in lines: - line = re_text.findall(line)[10:] - if line[0].startswith(proc) and len(line) > 4: - db = line[2] - try: - ip, port = line[3].strip(')').split('(') - except ValueError: - ip = '127.0.0.1' - - by_ip[ip] = by_ip.get(ip, 0) + 1 - by_db[db] = by_db.get(db, 0) + 1 - total += 1 - - vacuums = {} - if check_vacuum: - vac = ("(echo '\\t'; echo 'select * from active;') " + - "| psql -U %(user)s %(db)s | grep -i '| vacuum'") - for db in by_db: - handle = os.popen(vac % dict(user=user, db=db)) - vacuums[db] = bool(handle.read()) - handle.close() - - res = dict(conn = total, ip_conn = by_ip, db_conn = by_db, - vacuums = vacuums) - - if 'query_queue' in by_db: - cmd = ("(echo '\t'; echo 'select count(*) from reddit_query_queue;') " - "| psql -U %(user)s query_queue ") - handle = os.popen(cmd % dict(user = user)) - for line in handle: - try: - res['query_count'] = int(line.strip('\n ')) - break - except ValueError: - continue - handle.close() - return res def Run(*a, **kw): - from pylons import g - AppServiceMonitor(global_conf = g).monitor(*a, **kw) + AppServiceMonitor().monitor(*a, **kw) def Test(num, load = 1., pid = 0): services = Services()