amqp: Make it possible to add application headers to messages.

Useful for carrying extra information out-of-band from the content.
This commit is contained in:
Neil Williams
2013-06-19 14:13:04 -07:00
parent 713a8ee112
commit 26f2dd6de3

View File

@@ -153,7 +153,7 @@ DELIVERY_TRANSIENT = 1
DELIVERY_DURABLE = 2
def _add_item(routing_key, body, message_id = None,
delivery_mode = DELIVERY_DURABLE):
delivery_mode = DELIVERY_DURABLE, headers=None):
"""adds an item onto a queue. If the connection to amqp is lost it
will try to reconnect and then call itself again."""
if not amqp_host:
@@ -167,6 +167,9 @@ def _add_item(routing_key, body, message_id = None,
if message_id:
msg.properties['message_id'] = message_id
if headers:
msg.properties["application_headers"] = headers
event_name = 'amqp.%s' % routing_key
try:
chan.basic_publish(msg,
@@ -182,12 +185,13 @@ def _add_item(routing_key, body, message_id = None,
else:
stats.event_count(event_name, 'enqueue')
def add_item(routing_key, body, message_id = None, delivery_mode = DELIVERY_DURABLE):
def add_item(routing_key, body, message_id=None,
delivery_mode=DELIVERY_DURABLE, headers=None):
if amqp_host and amqp_logging:
log.debug("amqp: adding item %r to %r" % (body, routing_key))
worker.do(_add_item, routing_key, body, message_id = message_id,
delivery_mode = delivery_mode)
delivery_mode = delivery_mode, headers=headers)
def add_kw(routing_key, **kw):
add_item(routing_key, pickle.dumps(kw))