mirror of
https://github.com/reddit-archive/reddit.git
synced 2026-01-25 14:58:27 -05:00
Add min_size parameter to amqp.handle_items
This commit is contained in:
committed by
Neil Williams
parent
e7e153577a
commit
46dbdba1ac
@@ -216,18 +216,19 @@ def consume_items(queue, callback, verbose=True):
|
||||
if chan.is_open:
|
||||
chan.close()
|
||||
|
||||
def handle_items(queue, callback, ack = True, limit = 1, drain = False,
|
||||
verbose=True, sleep_time = 1):
|
||||
def handle_items(queue, callback, ack=True, limit=1, min_size=0,
|
||||
drain=False, verbose=True, sleep_time=1):
|
||||
"""Call callback() on every item in a particular queue. If the
|
||||
connection to the queue is lost, it will die. Intended to be
|
||||
used as a long-running process."""
|
||||
connection to the queue is lost, it will die. Intended to be
|
||||
used as a long-running process."""
|
||||
if limit < min_size:
|
||||
raise ValueError("min_size must be less than limit")
|
||||
from pylons import c
|
||||
|
||||
chan = connection_manager.get_channel()
|
||||
countdown = None
|
||||
|
||||
while True:
|
||||
|
||||
# NB: None != 0, so we don't need an "is not None" check here
|
||||
if countdown == 0:
|
||||
break
|
||||
@@ -245,15 +246,21 @@ def handle_items(queue, callback, ack = True, limit = 1, drain = False,
|
||||
g.reset_caches()
|
||||
c.use_write_db = {}
|
||||
|
||||
items = []
|
||||
items = [msg]
|
||||
|
||||
while msg and countdown != 0:
|
||||
items.append(msg)
|
||||
while countdown != 0:
|
||||
if countdown is not None:
|
||||
countdown -= 1
|
||||
if len(items) >= limit:
|
||||
break # the innermost loop only
|
||||
msg = chan.basic_get(queue)
|
||||
if msg is None:
|
||||
if len(items) < min_size:
|
||||
time.sleep(sleep_time)
|
||||
else:
|
||||
break
|
||||
else:
|
||||
items.append(msg)
|
||||
|
||||
try:
|
||||
count_str = ''
|
||||
|
||||
Reference in New Issue
Block a user