From 2776fc7cd14ae71c4861a457b6297f41408aa0c7 Mon Sep 17 00:00:00 2001 From: Max Goodman Date: Thu, 21 Feb 2013 16:12:15 -0800 Subject: [PATCH] stats: Add timing logging capability. This also rejiggers stats timing code to record a start and end time instead of a duration, so timings can be plotted on a timeline visualization. --- r2/r2/lib/stats.py | 69 +++++++++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/r2/r2/lib/stats.py b/r2/r2/lib/stats.py index aaadb10e5..6b2c7c249 100644 --- a/r2/r2/lib/stats.py +++ b/r2/r2/lib/stats.py @@ -25,6 +25,7 @@ import os import random import socket import time +import threading from pycassa import columnfamily from pycassa import pool @@ -39,16 +40,23 @@ class TimingStatBuffer: instances of this class yields (key, (total_time, count)) tuples. """ + Timing = collections.namedtuple('Timing', ['key', 'start', 'end']) + + def __init__(self): # Store data internally as a map of keys to complex values. The real # part of the complex value is the total time (in seconds), and the # imaginary part is the total count. self.data = collections.defaultdict(complex) + self.log = threading.local() - def record(self, key, service_time_sec): + def record(self, key, start, end): # Add to the total time and total count with a single complex value, # so as to avoid inconsistency from a poorly timed context switch. - self.data[key] += service_time_sec + 1j + self.data[key] += (end - start) + 1j + + if getattr(self.log, 'timings', None) is not None: + self.log.timings.append(self.Timing(key, start, end)) def flush(self): """Yields accumulated timing and counter data and resets the buffer.""" @@ -65,6 +73,14 @@ class TimingStatBuffer: mean = total_time / divisor yield k, str(mean * 1000) + '|ms' + def start_logging(self): + self.log.timings = [] + + def end_logging(self): + timings = getattr(self.log, 'timings', None) + self.log.timings = None + return timings + class CountingStatBuffer: """Dictionary of keys to cumulative counts.""" @@ -202,9 +218,9 @@ class Timer: raise AssertionError("timer hasn't been stopped") return self._stop - self._start - def send(self, subname, delta): + def send(self, subname, start, end): name = _get_stat_name(self.name, subname) - self.client.timing_stats.record(name, delta) + self.client.timing_stats.record(name, start, end) def start(self): self._last = self._start = self._time() @@ -215,7 +231,7 @@ class Timer: if self._stop is not None: raise AssertionError("timer is stopped") last, self._last = self._last, self._time() - self._timings.append((subname, self._last - last)) + self._timings.append((subname, last, self._last)) def stop(self, subname='total'): if self._start is None: @@ -224,7 +240,7 @@ class Timer: raise AssertionError('timer is already stopped') self._stop = self._time() self.flush() - self.send(subname, self._stop - self._start) + self.send(subname, self._start, self._stop) class Stats: @@ -240,9 +256,9 @@ class Stats: def get_timer(self, name): return Timer(self.client, name) - def transact(self, action, service_time_sec): + def transact(self, action, start, end): timer = self.get_timer('service_time') - timer.send(action, service_time_sec) + timer.send(action, start, end) def get_counter(self, name): return Counter(self.client, name) @@ -300,8 +316,11 @@ class Stats: return processor(msgs, *args) finally: service_time = (time.time() - start) / len(msg_tup) - for msg in msg_tup: - self.transact('amqp.%s' % queue_name, service_time) + for n, msg in enumerate(msg_tup): + fake_start = start + n * service_time + fake_end = fake_start + service_time + self.transact('amqp.%s' % queue_name, + fake_start, fake_end) self.flush() return wrap_processor return decorator @@ -309,8 +328,14 @@ class Stats: def flush(self): self.client.flush() + def start_logging_timings(self): + self.client.timing_stats.start_logging() + + def end_logging_timings(self): + return self.client.timing_stats.end_logging() + def cassandra_event(self, operation, column_families, success, - service_time): + start, end): if not self.client: return if not isinstance(column_families, list): @@ -319,7 +344,7 @@ class Stats: key = '.'.join([ 'cassandra', cf, operation, self.CASSANDRA_KEY_SUFFIXES[success]]) - self.client.timing_stats.record(key, service_time) + self.client.timing_stats.record(key, start, end) def pg_before_cursor_execute(self, conn, cursor, statement, parameters, context, executemany): @@ -329,14 +354,14 @@ class Stats: context, executemany): dsn = dict(part.split('=', 1) for part in context.engine.url.query['dsn'].split()) - self.pg_event(dsn['host'], dsn['dbname'], - time.time() - context._query_start_time) + start = context._query_start_time + self.pg_event(dsn['host'], dsn['dbname'], start, time.time()) - def pg_event(self, db_server, db_name, service_time): + def pg_event(self, db_server, db_name, start, end): if not self.client: return key = '.'.join(['pg', db_server.replace('.', '-'), db_name]) - self.client.timing_stats.record(key, service_time) + self.client.timing_stats.record(key, start, end) class CacheStats: @@ -416,15 +441,15 @@ class StatsCollectingConnectionPool(pool.ConnectionPool): truncate=lambda args, kwargs: args[0], ) - def record_error(method_name, cf_name, service_time): + def record_error(method_name, cf_name, start, end): if cf_name and self.stats: self.stats.cassandra_event(method_name, cf_name, False, - service_time) + start, end) - def record_success(method_name, cf_name, service_time): + def record_success(method_name, cf_name, start, end): if cf_name and self.stats: self.stats.cassandra_event(method_name, cf_name, True, - service_time) + start, end) def instrument(f, get_cf_name): def call_with_instrumentation(*args, **kwargs): @@ -433,10 +458,10 @@ class StatsCollectingConnectionPool(pool.ConnectionPool): try: result = f(*args, **kwargs) except: - record_error(f.__name__, cf_name, time.time() - start) + record_error(f.__name__, cf_name, start, time.time()) raise else: - record_success(f.__name__, cf_name, time.time() - start) + record_success(f.__name__, cf_name, start, time.time()) return result return call_with_instrumentation