From 9b31cdcae2d5e8e58022c6a3543d76eb4a5daa2c Mon Sep 17 00:00:00 2001 From: spez Date: Fri, 13 Feb 2009 12:18:06 -0800 Subject: [PATCH] added db load balancing based on loads reported by the service monitor --- r2/r2/lib/db/tdb_sql.py | 31 ++++++++++++++++++++++++++++++- r2/r2/lib/services.py | 13 +++++-------- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/r2/r2/lib/db/tdb_sql.py b/r2/r2/lib/db/tdb_sql.py index 9192a2919..dc521d182 100644 --- a/r2/r2/lib/db/tdb_sql.py +++ b/r2/r2/lib/db/tdb_sql.py @@ -28,6 +28,7 @@ import sqlalchemy as sa from sqlalchemy.databases import postgres from r2.lib.utils import storage, storify, iters, Results, tup, TransSet +from r2.lib.services import AppServiceMonitor import operators from pylons import g dbm = g.dbm @@ -275,7 +276,35 @@ def get_write_table(tables): return tables[0] def get_read_table(tables): - return random.choice(tables) + #shortcut with 1 entry + if len(tables) == 1: + return tables[0] + + #'t' is a list of engines itself. since we assume those engines + #are on the same machine, just take the first one. len(ips) may be + #< len(tables) if some tables are on the same host. + ips = dict((t[0].engine.url.host, t) for t in tables) + ip_loads = AppServiceMonitor.get_db_load(ips.keys()) + + candidates = [] + all_ips = [] + for ip in ips: + if ip not in ip_loads: + print 'no load info for %s' % ip + all_ips.append(ip) + else: + load, avg_load, conns, avg_conns, max_conns = ip_loads[ip] + + #prune high-connection machines + if conns < .9 * max_conns: + candidates.append((ip, max(load, avg_load))) + + #add the least loaded machine to all_ips + if candidates: + all_ips.append(min(candidates, key = lambda x: x[1])[0]) + + best_ip = random.choice(all_ips) + return ips[best_ip] def get_thing_write_table(type_id): return get_write_table(types_id[type_id].tables) diff --git a/r2/r2/lib/services.py b/r2/r2/lib/services.py index 642d0c669..c6ab6a071 100644 --- a/r2/r2/lib/services.py +++ b/r2/r2/lib/services.py @@ -97,13 +97,9 @@ class AppServiceMonitor(Wrapped): cache.set_multi(res) @classmethod - def get_db_load(cls, *names): + def get_db_load(cls, names): return g.rendercache.get_multi(names, prefix = cls.cache_key_small) - def database_load(self, db_name): - if self._db_info.has_key(db_name): - return self.server_load(self._db_info[db_name][-1]) - def server_load(self, mach_name): h = self.from_cache(host) return h.load.most_recent() @@ -272,9 +268,10 @@ class HostLogger(object): def monitor(self, srvname, srv_params = {}, top_params = {}, db_params = {}): # (re)populate the service listing - for name, status, pid, t in supervise_list(**srv_params): - if not srvname or any(s in name for s in srvname): - self.add_service(name, pid, t) + if srvname: + for name, status, pid, t in supervise_list(**srv_params): + if any(s in name for s in srvname): + self.add_service(name, pid, t) # check process usage proc_info = process_info(proc_ids = self.service_pids(),