diff --git a/r2/r2/lib/app_globals.py b/r2/r2/lib/app_globals.py index 8d12fcaf1..3ccd6b7b5 100755 --- a/r2/r2/lib/app_globals.py +++ b/r2/r2/lib/app_globals.py @@ -26,6 +26,8 @@ import signal from datetime import timedelta, datetime from urlparse import urlparse import json +from sqlalchemy import engine +from sqlalchemy import event from r2.lib.cache import LocalCache, SelfEmptyingCache from r2.lib.cache import CMemcache, StaleCacheChain from r2.lib.cache import HardCache, MemcacheChain, MemcacheChain, HardcacheChain @@ -217,6 +219,11 @@ class Globals(object): self.stats = Stats(global_conf.get('statsd_addr'), global_conf.get('statsd_sample_rate')) + event.listens_for(engine.Engine, 'before_cursor_execute')( + self.stats.pg_before_cursor_execute) + event.listens_for(engine.Engine, 'after_cursor_execute')( + self.stats.pg_after_cursor_execute) + if not self.cassandra_seeds: raise ValueError("cassandra_seeds not set in the .ini") diff --git a/r2/r2/lib/stats.py b/r2/r2/lib/stats.py index f533953d2..45e675fbf 100644 --- a/r2/r2/lib/stats.py +++ b/r2/r2/lib/stats.py @@ -26,9 +26,14 @@ class TimingStatBuffer: # so as to avoid inconsistency from a poorly timed context switch. self.data[key] += service_time_sec + 1j - def __iter__(self): + def iteritems(self): + """Yields timing and counter data for sending to statsd.""" for k, v in self.data.iteritems(): - yield k, (v.real, v.imag) + total_time, count = v.real, v.imag + yield k, str(count) + '|c' + divisor = count or 1 + mean = total_time / divisor + yield k, str(mean * 1000) + '|ms' class Stats: # Sample rate for recording cache hits/misses, relative to the global @@ -107,6 +112,12 @@ class Stats: return wrap_processor return decorator + def flush_timing_stats(self): + events = self.timing_stats + self.timing_stats = TimingStatBuffer() + if self.connection: + self.connection.send(events) + def cassandra_event(self, operation, column_families, success, service_time): if not self.connection: @@ -115,21 +126,24 @@ class Stats: column_families = [column_families] for cf in column_families: key = '.'.join([ - cf, operation, self.CASSANDRA_KEY_SUFFIXES[success]]) + 'cassandra', cf, operation, + self.CASSANDRA_KEY_SUFFIXES[success]]) self.timing_stats.record(key, service_time) - def flush_timing_stats(self): - events = self.timing_stats - self.timing_stats = TimingStatBuffer() - if self.connection: - data = {} - for key, (total_time, count) in events: - data['cassandra.' + key] = str(count) + '|c' - if key.endswith('.ok'): - divisor = count or 1 - mean = total_time / divisor - data['cassandra.' + key[:-3]] = str(mean) + '|ms' - self.connection.send(data) + def pg_before_cursor_execute(self, conn, cursor, statement, parameters, + context, executemany): + context._query_start_time = time.time() + + def pg_after_cursor_execute(self, conn, cursor, statement, parameters, + context, executemany): + self.pg_event(context.engine.url.host, context.engine.url.database, + time.time() - context._query_start_time) + + def pg_event(self, db_server, db_name, service_time): + if not self.connection: + return + key = '.'.join(['pg', db_server.replace('.', '-'), db_name]) + self.timing_stats.record(key, service_time) class CacheStats: def __init__(self, parent, cache_name):