Pass queue names to @g.stats.amqp_processor.

This commit is contained in:
Logan Hanks
2011-11-17 14:21:19 -08:00
parent 709dd35261
commit 6945598117
4 changed files with 20 additions and 19 deletions

View File

@@ -918,7 +918,7 @@ def run_new_comments(limit=1000):
# this is done as a queue because otherwise the contention for the
# lock on the query would be very high
@g.stats.amqp_processor
@g.stats.amqp_processor('newcomments_q')
def _run_new_comments(msgs, chan):
fnames = [msg.body for msg in msgs]
@@ -936,7 +936,7 @@ def run_new_comments(limit=1000):
def run_commentstree(limit=100):
"""Add new incoming comments to their respective comments trees"""
@g.stats.amqp_processor
@g.stats.amqp_processor('commentstree_q')
def _run_commentstree(msgs, chan):
comments = Comment._by_fullname([msg.body for msg in msgs],
data = True, return_dict = False)
@@ -1056,7 +1056,7 @@ def handle_vote(user, thing, dir, ip, organic, cheater=False, foreground=False):
def process_votes_single(qname, limit=0):
# limit is taken but ignored for backwards compatibility
@g.stats.amqp_processor
@g.stats.amqp_processor(qname)
def _handle_vote(msg):
#assert(len(msgs) == 1)
r = pickle.loads(msg.body)
@@ -1078,7 +1078,7 @@ def process_votes_single(qname, limit=0):
def process_votes_multi(qname, limit=100):
# limit is taken but ignored for backwards compatibility
@g.stats.amqp_processor
@g.stats.amqp_processor(qname)
def _handle_vote(msgs, chan):
comments = []

View File

@@ -174,7 +174,7 @@ def force_thumbnail(link, image_data, never_expire=True, file_type=".jpg"):
update_link(link, thumbnail=thumb_url, media_object=None, thumbnail_size=image.size)
def run():
@g.stats.amqp_processor
@g.stats.amqp_processor('scraper_q')
def process_link(msg):
def _process_link(fname):
link = Link._by_fullname(fname, data=True)

View File

@@ -667,7 +667,7 @@ def run_changed(drain=False):
last run. Note: unlike many queue-using functions, this one is
run from cron and totally drains the queue before terminating
"""
@g.stats.amqp_processor
@g.stats.amqp_processor('solrsearch_changes')
def _run_changed(msgs, chan):
print "changed: Processing %d items" % len(msgs)
msgs = [strordict_fullname(msg.body)

View File

@@ -46,21 +46,22 @@ class Stats:
if counter and random.random() < self.CACHE_SAMPLE_RATE:
counter.increment(name, delta=delta)
def amqp_processor(self, processor):
def amqp_processor(self, queue_name):
"""Decorator for recording stats for amqp queue consumers/handlers."""
def wrap_processor(msgs, *args):
# Work the same for amqp.consume_items and amqp.handle_items.
msg_tup = utils.tup(msgs)
def decorator(processor):
def wrap_processor(msgs, *args):
# Work the same for amqp.consume_items and amqp.handle_items.
msg_tup = utils.tup(msgs)
start = time.time()
try:
return processor(msgs, *args)
finally:
service_time = (time.time() - start) / len(msg_tup)
for msg in msg_tup:
self.transact('amqp.%s' % msg.delivery_info['routing_key'],
service_time)
return wrap_processor
start = time.time()
try:
return processor(msgs, *args)
finally:
service_time = (time.time() - start) / len(msg_tup)
for msg in msg_tup:
self.transact('amqp.%s' % queue_name, service_time)
return wrap_processor
return decorator
class CacheStats:
def __init__(self, parent, cache_name):