added db load balancing based on loads reported by the service monitor

This commit is contained in:
spez
2009-02-13 12:18:06 -08:00
parent e7da5adc6c
commit 9b31cdcae2
2 changed files with 35 additions and 9 deletions

View File

@@ -28,6 +28,7 @@ import sqlalchemy as sa
from sqlalchemy.databases import postgres
from r2.lib.utils import storage, storify, iters, Results, tup, TransSet
from r2.lib.services import AppServiceMonitor
import operators
from pylons import g
dbm = g.dbm
@@ -275,7 +276,35 @@ def get_write_table(tables):
return tables[0]
def get_read_table(tables):
return random.choice(tables)
#shortcut with 1 entry
if len(tables) == 1:
return tables[0]
#'t' is a list of engines itself. since we assume those engines
#are on the same machine, just take the first one. len(ips) may be
#< len(tables) if some tables are on the same host.
ips = dict((t[0].engine.url.host, t) for t in tables)
ip_loads = AppServiceMonitor.get_db_load(ips.keys())
candidates = []
all_ips = []
for ip in ips:
if ip not in ip_loads:
print 'no load info for %s' % ip
all_ips.append(ip)
else:
load, avg_load, conns, avg_conns, max_conns = ip_loads[ip]
#prune high-connection machines
if conns < .9 * max_conns:
candidates.append((ip, max(load, avg_load)))
#add the least loaded machine to all_ips
if candidates:
all_ips.append(min(candidates, key = lambda x: x[1])[0])
best_ip = random.choice(all_ips)
return ips[best_ip]
def get_thing_write_table(type_id):
return get_write_table(types_id[type_id].tables)

View File

@@ -97,13 +97,9 @@ class AppServiceMonitor(Wrapped):
cache.set_multi(res)
@classmethod
def get_db_load(cls, *names):
def get_db_load(cls, names):
return g.rendercache.get_multi(names, prefix = cls.cache_key_small)
def database_load(self, db_name):
if self._db_info.has_key(db_name):
return self.server_load(self._db_info[db_name][-1])
def server_load(self, mach_name):
h = self.from_cache(host)
return h.load.most_recent()
@@ -272,9 +268,10 @@ class HostLogger(object):
def monitor(self, srvname,
srv_params = {}, top_params = {}, db_params = {}):
# (re)populate the service listing
for name, status, pid, t in supervise_list(**srv_params):
if not srvname or any(s in name for s in srvname):
self.add_service(name, pid, t)
if srvname:
for name, status, pid, t in supervise_list(**srv_params):
if any(s in name for s in srvname):
self.add_service(name, pid, t)
# check process usage
proc_info = process_info(proc_ids = self.service_pids(),