stats: Add timing logging capability.

This also rejiggers stats timing code to record a start and end time
instead of a duration, so timings can be plotted on a timeline
visualization.
This commit is contained in:
Max Goodman
2013-02-21 16:12:15 -08:00
parent 67af84488d
commit 2776fc7cd1

View File

@@ -25,6 +25,7 @@ import os
import random
import socket
import time
import threading
from pycassa import columnfamily
from pycassa import pool
@@ -39,16 +40,23 @@ class TimingStatBuffer:
instances of this class yields (key, (total_time, count)) tuples.
"""
Timing = collections.namedtuple('Timing', ['key', 'start', 'end'])
def __init__(self):
# Store data internally as a map of keys to complex values. The real
# part of the complex value is the total time (in seconds), and the
# imaginary part is the total count.
self.data = collections.defaultdict(complex)
self.log = threading.local()
def record(self, key, service_time_sec):
def record(self, key, start, end):
# Add to the total time and total count with a single complex value,
# so as to avoid inconsistency from a poorly timed context switch.
self.data[key] += service_time_sec + 1j
self.data[key] += (end - start) + 1j
if getattr(self.log, 'timings', None) is not None:
self.log.timings.append(self.Timing(key, start, end))
def flush(self):
"""Yields accumulated timing and counter data and resets the buffer."""
@@ -65,6 +73,14 @@ class TimingStatBuffer:
mean = total_time / divisor
yield k, str(mean * 1000) + '|ms'
def start_logging(self):
self.log.timings = []
def end_logging(self):
timings = getattr(self.log, 'timings', None)
self.log.timings = None
return timings
class CountingStatBuffer:
"""Dictionary of keys to cumulative counts."""
@@ -202,9 +218,9 @@ class Timer:
raise AssertionError("timer hasn't been stopped")
return self._stop - self._start
def send(self, subname, delta):
def send(self, subname, start, end):
name = _get_stat_name(self.name, subname)
self.client.timing_stats.record(name, delta)
self.client.timing_stats.record(name, start, end)
def start(self):
self._last = self._start = self._time()
@@ -215,7 +231,7 @@ class Timer:
if self._stop is not None:
raise AssertionError("timer is stopped")
last, self._last = self._last, self._time()
self._timings.append((subname, self._last - last))
self._timings.append((subname, last, self._last))
def stop(self, subname='total'):
if self._start is None:
@@ -224,7 +240,7 @@ class Timer:
raise AssertionError('timer is already stopped')
self._stop = self._time()
self.flush()
self.send(subname, self._stop - self._start)
self.send(subname, self._start, self._stop)
class Stats:
@@ -240,9 +256,9 @@ class Stats:
def get_timer(self, name):
return Timer(self.client, name)
def transact(self, action, service_time_sec):
def transact(self, action, start, end):
timer = self.get_timer('service_time')
timer.send(action, service_time_sec)
timer.send(action, start, end)
def get_counter(self, name):
return Counter(self.client, name)
@@ -300,8 +316,11 @@ class Stats:
return processor(msgs, *args)
finally:
service_time = (time.time() - start) / len(msg_tup)
for msg in msg_tup:
self.transact('amqp.%s' % queue_name, service_time)
for n, msg in enumerate(msg_tup):
fake_start = start + n * service_time
fake_end = fake_start + service_time
self.transact('amqp.%s' % queue_name,
fake_start, fake_end)
self.flush()
return wrap_processor
return decorator
@@ -309,8 +328,14 @@ class Stats:
def flush(self):
self.client.flush()
def start_logging_timings(self):
self.client.timing_stats.start_logging()
def end_logging_timings(self):
return self.client.timing_stats.end_logging()
def cassandra_event(self, operation, column_families, success,
service_time):
start, end):
if not self.client:
return
if not isinstance(column_families, list):
@@ -319,7 +344,7 @@ class Stats:
key = '.'.join([
'cassandra', cf, operation,
self.CASSANDRA_KEY_SUFFIXES[success]])
self.client.timing_stats.record(key, service_time)
self.client.timing_stats.record(key, start, end)
def pg_before_cursor_execute(self, conn, cursor, statement, parameters,
context, executemany):
@@ -329,14 +354,14 @@ class Stats:
context, executemany):
dsn = dict(part.split('=', 1)
for part in context.engine.url.query['dsn'].split())
self.pg_event(dsn['host'], dsn['dbname'],
time.time() - context._query_start_time)
start = context._query_start_time
self.pg_event(dsn['host'], dsn['dbname'], start, time.time())
def pg_event(self, db_server, db_name, service_time):
def pg_event(self, db_server, db_name, start, end):
if not self.client:
return
key = '.'.join(['pg', db_server.replace('.', '-'), db_name])
self.client.timing_stats.record(key, service_time)
self.client.timing_stats.record(key, start, end)
class CacheStats:
@@ -416,15 +441,15 @@ class StatsCollectingConnectionPool(pool.ConnectionPool):
truncate=lambda args, kwargs: args[0],
)
def record_error(method_name, cf_name, service_time):
def record_error(method_name, cf_name, start, end):
if cf_name and self.stats:
self.stats.cassandra_event(method_name, cf_name, False,
service_time)
start, end)
def record_success(method_name, cf_name, service_time):
def record_success(method_name, cf_name, start, end):
if cf_name and self.stats:
self.stats.cassandra_event(method_name, cf_name, True,
service_time)
start, end)
def instrument(f, get_cf_name):
def call_with_instrumentation(*args, **kwargs):
@@ -433,10 +458,10 @@ class StatsCollectingConnectionPool(pool.ConnectionPool):
try:
result = f(*args, **kwargs)
except:
record_error(f.__name__, cf_name, time.time() - start)
record_error(f.__name__, cf_name, start, time.time())
raise
else:
record_success(f.__name__, cf_name, time.time() - start)
record_success(f.__name__, cf_name, start, time.time())
return result
return call_with_instrumentation