diff --git a/r2/r2/lib/db/queries.py b/r2/r2/lib/db/queries.py index 4a38f36a8..349920474 100644 --- a/r2/r2/lib/db/queries.py +++ b/r2/r2/lib/db/queries.py @@ -918,7 +918,7 @@ def run_new_comments(limit=1000): # this is done as a queue because otherwise the contention for the # lock on the query would be very high - @g.stats.amqp_processor + @g.stats.amqp_processor('newcomments_q') def _run_new_comments(msgs, chan): fnames = [msg.body for msg in msgs] @@ -936,7 +936,7 @@ def run_new_comments(limit=1000): def run_commentstree(limit=100): """Add new incoming comments to their respective comments trees""" - @g.stats.amqp_processor + @g.stats.amqp_processor('commentstree_q') def _run_commentstree(msgs, chan): comments = Comment._by_fullname([msg.body for msg in msgs], data = True, return_dict = False) @@ -1056,7 +1056,7 @@ def handle_vote(user, thing, dir, ip, organic, cheater=False, foreground=False): def process_votes_single(qname, limit=0): # limit is taken but ignored for backwards compatibility - @g.stats.amqp_processor + @g.stats.amqp_processor(qname) def _handle_vote(msg): #assert(len(msgs) == 1) r = pickle.loads(msg.body) @@ -1078,7 +1078,7 @@ def process_votes_single(qname, limit=0): def process_votes_multi(qname, limit=100): # limit is taken but ignored for backwards compatibility - @g.stats.amqp_processor + @g.stats.amqp_processor(qname) def _handle_vote(msgs, chan): comments = [] diff --git a/r2/r2/lib/media.py b/r2/r2/lib/media.py index 63dc5529d..74ba7e9cf 100644 --- a/r2/r2/lib/media.py +++ b/r2/r2/lib/media.py @@ -174,7 +174,7 @@ def force_thumbnail(link, image_data, never_expire=True, file_type=".jpg"): update_link(link, thumbnail=thumb_url, media_object=None, thumbnail_size=image.size) def run(): - @g.stats.amqp_processor + @g.stats.amqp_processor('scraper_q') def process_link(msg): def _process_link(fname): link = Link._by_fullname(fname, data=True) diff --git a/r2/r2/lib/solrsearch.py b/r2/r2/lib/solrsearch.py index f0ffd5739..30b149126 100644 --- a/r2/r2/lib/solrsearch.py +++ b/r2/r2/lib/solrsearch.py @@ -667,7 +667,7 @@ def run_changed(drain=False): last run. Note: unlike many queue-using functions, this one is run from cron and totally drains the queue before terminating """ - @g.stats.amqp_processor + @g.stats.amqp_processor('solrsearch_changes') def _run_changed(msgs, chan): print "changed: Processing %d items" % len(msgs) msgs = [strordict_fullname(msg.body) diff --git a/r2/r2/lib/stats.py b/r2/r2/lib/stats.py index 4b2d65606..478292f64 100644 --- a/r2/r2/lib/stats.py +++ b/r2/r2/lib/stats.py @@ -46,21 +46,22 @@ class Stats: if counter and random.random() < self.CACHE_SAMPLE_RATE: counter.increment(name, delta=delta) - def amqp_processor(self, processor): + def amqp_processor(self, queue_name): """Decorator for recording stats for amqp queue consumers/handlers.""" - def wrap_processor(msgs, *args): - # Work the same for amqp.consume_items and amqp.handle_items. - msg_tup = utils.tup(msgs) + def decorator(processor): + def wrap_processor(msgs, *args): + # Work the same for amqp.consume_items and amqp.handle_items. + msg_tup = utils.tup(msgs) - start = time.time() - try: - return processor(msgs, *args) - finally: - service_time = (time.time() - start) / len(msg_tup) - for msg in msg_tup: - self.transact('amqp.%s' % msg.delivery_info['routing_key'], - service_time) - return wrap_processor + start = time.time() + try: + return processor(msgs, *args) + finally: + service_time = (time.time() - start) / len(msg_tup) + for msg in msg_tup: + self.transact('amqp.%s' % queue_name, service_time) + return wrap_processor + return decorator class CacheStats: def __init__(self, parent, cache_name):