diff --git a/r2/r2/lib/amqp.py b/r2/r2/lib/amqp.py index 5f882376b..a82fa0b8f 100644 --- a/r2/r2/lib/amqp.py +++ b/r2/r2/lib/amqp.py @@ -216,18 +216,19 @@ def consume_items(queue, callback, verbose=True): if chan.is_open: chan.close() -def handle_items(queue, callback, ack = True, limit = 1, drain = False, - verbose=True, sleep_time = 1): +def handle_items(queue, callback, ack=True, limit=1, min_size=0, + drain=False, verbose=True, sleep_time=1): """Call callback() on every item in a particular queue. If the - connection to the queue is lost, it will die. Intended to be - used as a long-running process.""" + connection to the queue is lost, it will die. Intended to be + used as a long-running process.""" + if limit < min_size: + raise ValueError("min_size must be less than limit") from pylons import c chan = connection_manager.get_channel() countdown = None while True: - # NB: None != 0, so we don't need an "is not None" check here if countdown == 0: break @@ -245,15 +246,21 @@ def handle_items(queue, callback, ack = True, limit = 1, drain = False, g.reset_caches() c.use_write_db = {} - items = [] + items = [msg] - while msg and countdown != 0: - items.append(msg) + while countdown != 0: if countdown is not None: countdown -= 1 if len(items) >= limit: break # the innermost loop only msg = chan.basic_get(queue) + if msg is None: + if len(items) < min_size: + time.sleep(sleep_time) + else: + break + else: + items.append(msg) try: count_str = ''