From e6274eea7f80f6772d81063ab56faed070b18950 Mon Sep 17 00:00:00 2001 From: Logan Hanks Date: Tue, 8 Nov 2011 15:12:14 -0700 Subject: [PATCH] Add recording of timing stats for amqp processors. --- r2/r2/lib/db/queries.py | 4 ++++ r2/r2/lib/stats.py | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/r2/r2/lib/db/queries.py b/r2/r2/lib/db/queries.py index d4ad59336..4a38f36a8 100644 --- a/r2/r2/lib/db/queries.py +++ b/r2/r2/lib/db/queries.py @@ -918,6 +918,7 @@ def run_new_comments(limit=1000): # this is done as a queue because otherwise the contention for the # lock on the query would be very high + @g.stats.amqp_processor def _run_new_comments(msgs, chan): fnames = [msg.body for msg in msgs] @@ -935,6 +936,7 @@ def run_new_comments(limit=1000): def run_commentstree(limit=100): """Add new incoming comments to their respective comments trees""" + @g.stats.amqp_processor def _run_commentstree(msgs, chan): comments = Comment._by_fullname([msg.body for msg in msgs], data = True, return_dict = False) @@ -1054,6 +1056,7 @@ def handle_vote(user, thing, dir, ip, organic, cheater=False, foreground=False): def process_votes_single(qname, limit=0): # limit is taken but ignored for backwards compatibility + @g.stats.amqp_processor def _handle_vote(msg): #assert(len(msgs) == 1) r = pickle.loads(msg.body) @@ -1075,6 +1078,7 @@ def process_votes_single(qname, limit=0): def process_votes_multi(qname, limit=100): # limit is taken but ignored for backwards compatibility + @g.stats.amqp_processor def _handle_vote(msgs, chan): comments = [] diff --git a/r2/r2/lib/stats.py b/r2/r2/lib/stats.py index 350ea2540..c5894c6ea 100644 --- a/r2/r2/lib/stats.py +++ b/r2/r2/lib/stats.py @@ -1,3 +1,7 @@ +import time + +from r2.lib import utils + class Stats: def __init__(self, addr, sample_rate): if addr: @@ -24,3 +28,19 @@ class Stats: timer = self.get_timer('service_time') if timer: timer.send(action, service_time_sec) + + def amqp_processor(self, processor): + """Decorator for recording stats for amqp queue consumers/handlers.""" + def wrap_processor(msgs, *args): + # Work the same for amqp.consume_items and amqp.handle_items. + msg_tup = utils.tup(msgs) + + start = time.time() + try: + return processor(msgs, *args) + finally: + service_time = (time.time() - start) / len(msg_tup) + for msg in msg_tup: + self.transact('amqp.%s' % msg.delivery_info['routing_key'], + service_time) + return wrap_processor