Add recording of timing stats for amqp processors.

This commit is contained in:
Logan Hanks
2011-11-08 15:12:14 -07:00
parent a1c371a572
commit e6274eea7f
2 changed files with 24 additions and 0 deletions

View File

@@ -918,6 +918,7 @@ def run_new_comments(limit=1000):
# this is done as a queue because otherwise the contention for the
# lock on the query would be very high
@g.stats.amqp_processor
def _run_new_comments(msgs, chan):
fnames = [msg.body for msg in msgs]
@@ -935,6 +936,7 @@ def run_new_comments(limit=1000):
def run_commentstree(limit=100):
"""Add new incoming comments to their respective comments trees"""
@g.stats.amqp_processor
def _run_commentstree(msgs, chan):
comments = Comment._by_fullname([msg.body for msg in msgs],
data = True, return_dict = False)
@@ -1054,6 +1056,7 @@ def handle_vote(user, thing, dir, ip, organic, cheater=False, foreground=False):
def process_votes_single(qname, limit=0):
# limit is taken but ignored for backwards compatibility
@g.stats.amqp_processor
def _handle_vote(msg):
#assert(len(msgs) == 1)
r = pickle.loads(msg.body)
@@ -1075,6 +1078,7 @@ def process_votes_single(qname, limit=0):
def process_votes_multi(qname, limit=100):
# limit is taken but ignored for backwards compatibility
@g.stats.amqp_processor
def _handle_vote(msgs, chan):
comments = []

View File

@@ -1,3 +1,7 @@
import time
from r2.lib import utils
class Stats:
def __init__(self, addr, sample_rate):
if addr:
@@ -24,3 +28,19 @@ class Stats:
timer = self.get_timer('service_time')
if timer:
timer.send(action, service_time_sec)
def amqp_processor(self, processor):
"""Decorator for recording stats for amqp queue consumers/handlers."""
def wrap_processor(msgs, *args):
# Work the same for amqp.consume_items and amqp.handle_items.
msg_tup = utils.tup(msgs)
start = time.time()
try:
return processor(msgs, *args)
finally:
service_time = (time.time() - start) / len(msg_tup)
for msg in msg_tup:
self.transact('amqp.%s' % msg.delivery_info['routing_key'],
service_time)
return wrap_processor