Record postgres cursor execution times.

This commit is contained in:
Logan Hanks
2011-12-19 13:52:15 -08:00
parent 59421ba17f
commit a26d6a953e
2 changed files with 36 additions and 15 deletions

View File

@@ -26,6 +26,8 @@ import signal
from datetime import timedelta, datetime
from urlparse import urlparse
import json
from sqlalchemy import engine
from sqlalchemy import event
from r2.lib.cache import LocalCache, SelfEmptyingCache
from r2.lib.cache import CMemcache, StaleCacheChain
from r2.lib.cache import HardCache, MemcacheChain, MemcacheChain, HardcacheChain
@@ -217,6 +219,11 @@ class Globals(object):
self.stats = Stats(global_conf.get('statsd_addr'),
global_conf.get('statsd_sample_rate'))
event.listens_for(engine.Engine, 'before_cursor_execute')(
self.stats.pg_before_cursor_execute)
event.listens_for(engine.Engine, 'after_cursor_execute')(
self.stats.pg_after_cursor_execute)
if not self.cassandra_seeds:
raise ValueError("cassandra_seeds not set in the .ini")

View File

@@ -26,9 +26,14 @@ class TimingStatBuffer:
# so as to avoid inconsistency from a poorly timed context switch.
self.data[key] += service_time_sec + 1j
def __iter__(self):
def iteritems(self):
"""Yields timing and counter data for sending to statsd."""
for k, v in self.data.iteritems():
yield k, (v.real, v.imag)
total_time, count = v.real, v.imag
yield k, str(count) + '|c'
divisor = count or 1
mean = total_time / divisor
yield k, str(mean * 1000) + '|ms'
class Stats:
# Sample rate for recording cache hits/misses, relative to the global
@@ -107,6 +112,12 @@ class Stats:
return wrap_processor
return decorator
def flush_timing_stats(self):
events = self.timing_stats
self.timing_stats = TimingStatBuffer()
if self.connection:
self.connection.send(events)
def cassandra_event(self, operation, column_families, success,
service_time):
if not self.connection:
@@ -115,21 +126,24 @@ class Stats:
column_families = [column_families]
for cf in column_families:
key = '.'.join([
cf, operation, self.CASSANDRA_KEY_SUFFIXES[success]])
'cassandra', cf, operation,
self.CASSANDRA_KEY_SUFFIXES[success]])
self.timing_stats.record(key, service_time)
def flush_timing_stats(self):
events = self.timing_stats
self.timing_stats = TimingStatBuffer()
if self.connection:
data = {}
for key, (total_time, count) in events:
data['cassandra.' + key] = str(count) + '|c'
if key.endswith('.ok'):
divisor = count or 1
mean = total_time / divisor
data['cassandra.' + key[:-3]] = str(mean) + '|ms'
self.connection.send(data)
def pg_before_cursor_execute(self, conn, cursor, statement, parameters,
context, executemany):
context._query_start_time = time.time()
def pg_after_cursor_execute(self, conn, cursor, statement, parameters,
context, executemany):
self.pg_event(context.engine.url.host, context.engine.url.database,
time.time() - context._query_start_time)
def pg_event(self, db_server, db_name, service_time):
if not self.connection:
return
key = '.'.join(['pg', db_server.replace('.', '-'), db_name])
self.timing_stats.record(key, service_time)
class CacheStats:
def __init__(self, parent, cache_name):