diff --git a/r2/r2/lib/amqp.py b/r2/r2/lib/amqp.py index 472edf59d..d340863c9 100644 --- a/r2/r2/lib/amqp.py +++ b/r2/r2/lib/amqp.py @@ -153,7 +153,7 @@ DELIVERY_TRANSIENT = 1 DELIVERY_DURABLE = 2 def _add_item(routing_key, body, message_id = None, - delivery_mode = DELIVERY_DURABLE): + delivery_mode = DELIVERY_DURABLE, headers=None): """adds an item onto a queue. If the connection to amqp is lost it will try to reconnect and then call itself again.""" if not amqp_host: @@ -167,6 +167,9 @@ def _add_item(routing_key, body, message_id = None, if message_id: msg.properties['message_id'] = message_id + if headers: + msg.properties["application_headers"] = headers + event_name = 'amqp.%s' % routing_key try: chan.basic_publish(msg, @@ -182,12 +185,13 @@ def _add_item(routing_key, body, message_id = None, else: stats.event_count(event_name, 'enqueue') -def add_item(routing_key, body, message_id = None, delivery_mode = DELIVERY_DURABLE): +def add_item(routing_key, body, message_id=None, + delivery_mode=DELIVERY_DURABLE, headers=None): if amqp_host and amqp_logging: log.debug("amqp: adding item %r to %r" % (body, routing_key)) worker.do(_add_item, routing_key, body, message_id = message_id, - delivery_mode = delivery_mode) + delivery_mode = delivery_mode, headers=headers) def add_kw(routing_key, **kw): add_item(routing_key, pickle.dumps(kw))