Make queue declarations and bindings plugin-friendly.

This allows plugins to declare queues and bind routing keys to them
using a declarative syntax.

This also removes some r2admin cruft from public.
This commit is contained in:
Neil Williams
2012-09-07 16:02:26 -07:00
parent 3147fc9ee5
commit 17a2e1c2f9
6 changed files with 118 additions and 109 deletions

View File

@@ -53,6 +53,7 @@ def load_environment(global_conf={}, app_conf={}, setup_globals=True):
template_engine='mako', paths=paths)
g = config['pylons.g'] = Globals(global_conf, app_conf, paths)
g.plugins.declare_queues(g.queues)
if setup_globals:
g.setup()
r2.config.cache = g.cache

93
r2/r2/config/queues.py Normal file
View File

@@ -0,0 +1,93 @@
# The contents of this file are subject to the Common Public Attribution
# License Version 1.0. (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
# License Version 1.1, but Sections 14 and 15 have been added to cover use of
# software over a computer network and provide for limited attribution for the
# Original Developer. In addition, Exhibit A has been modified to be consistent
# with Exhibit B.
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
# the specific language governing rights and limitations under the License.
#
# The Original Code is reddit.
#
# The Original Developer is the Initial Developer. The Initial Developer of
# the Original Code is reddit Inc.
#
# All portions of the code written by reddit are Copyright (c) 2006-2012 reddit
# Inc. All Rights Reserved.
###############################################################################
from r2.lib.utils import tup
__all__ = ["MessageQueue", "declare_queues"]
class Queues(dict):
"""A container for queue declarations."""
def __init__(self, queues):
dict.__init__(self)
self.__dict__ = self
self.bindings = set()
self.declare(queues)
def __iter__(self):
for name, queue in self.iteritems():
if name != "bindings":
yield queue
def declare(self, queues):
for name, queue in queues.iteritems():
queue.name = name
queue.bindings = self.bindings
if queue.bind_to_self:
queue._bind(name)
self.update(queues)
class MessageQueue(object):
"""A representation of an AMQP message queue.
This class is solely intended for use with the Queues class above.
"""
def __init__(self, durable=True, exclusive=False,
auto_delete=False, bind_to_self=False):
self.durable = durable
self.exclusive = exclusive
self.auto_delete = auto_delete
self.bind_to_self = bind_to_self
def _bind(self, routing_key):
self.bindings.add((self.name, routing_key))
def __lshift__(self, routing_keys):
"""Register bindings from routing keys to this queue."""
routing_keys = tup(routing_keys)
for routing_key in routing_keys:
self._bind(routing_key)
def declare_queues():
queues = Queues({
"scraper_q": MessageQueue(),
"newcomments_q": MessageQueue(),
"commentstree_q": MessageQueue(),
"commentstree_fastlane_q": MessageQueue(),
"vote_link_q": MessageQueue(bind_to_self=True),
"vote_comment_q": MessageQueue(bind_to_self=True),
"vote_fastlane_q": MessageQueue(bind_to_self=True),
"log_q": MessageQueue(bind_to_self=True),
"usage_q": MessageQueue(bind_to_self=True, durable=False),
"cloudsearch_changes": MessageQueue(bind_to_self=True),
})
queues.cloudsearch_changes << "search_changes"
queues.scraper_q << "new_link"
queues.newcomments_q << "new_comment"
queues.commentstree_q << "new_comment"
queues.commentstree_fastlane_q << "new_fastlane_comment"
return queues

View File

@@ -43,6 +43,7 @@ log = g.log
amqp_virtual_host = g.amqp_virtual_host
amqp_logging = g.amqp_logging
stats = g.stats
queues = g.queues
#there are two ways of interacting with this module: add_item and
#handle_items/consume_items. _add_item (the internal function for
@@ -129,11 +130,22 @@ class ConnectionManager(local):
return self.channel
def init_queue(self):
from r2.lib.queues import RedditQueueMap
chan = self.get_channel()
chan.exchange_declare(exchange=amqp_exchange,
type="direct",
durable=True,
auto_delete=False)
RedditQueueMap(amqp_exchange, chan).init()
for queue in queues:
chan.queue_declare(queue=queue.name,
durable=queue.durable,
exclusive=queue.exclusive,
auto_delete=queue.auto_delete)
for queue, key in queues.bindings:
chan.queue_bind(routing_key=key,
queue=queue,
exchange=amqp_exchange)
connection_manager = ConnectionManager()

View File

@@ -43,6 +43,7 @@ from r2.lib.lock import make_lock_factory
from r2.lib.manager import db_manager
from r2.lib.stats import Stats, CacheStats, StatsCollectingConnectionPool
from r2.lib.plugin import PluginLoader
from r2.config import queues
LIVE_CONFIG_NODE = "/config/live"
@@ -222,6 +223,7 @@ class Globals(object):
self.config = ConfigValueParser(global_conf)
self.config.add_spec(self.spec)
self.plugins = PluginLoader(self.config.get("plugins", []))
self.queues = queues.declare_queues()
self.paths = paths

View File

@@ -69,6 +69,9 @@ class Plugin(object):
else:
module_registry[name].extend(module)
def declare_queues(self, queues):
pass
def add_routes(self, mc):
pass
@@ -113,6 +116,10 @@ class PluginLoader(object):
def available_plugins(name=None):
return pkg_resources.iter_entry_points('r2.plugin', name)
def declare_queues(self, queues):
for plugin in self:
plugin.declare_queues(queues)
def load_plugins(self):
g = config['pylons.g']
for plugin in self:

View File

@@ -1,106 +0,0 @@
# The contents of this file are subject to the Common Public Attribution
# License Version 1.0. (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
# License Version 1.1, but Sections 14 and 15 have been added to cover use of
# software over a computer network and provide for limited attribution for the
# Original Developer. In addition, Exhibit A has been modified to be consistent
# with Exhibit B.
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
# the specific language governing rights and limitations under the License.
#
# The Original Code is reddit.
#
# The Original Developer is the Initial Developer. The Initial Developer of
# the Original Code is reddit Inc.
#
# All portions of the code written by reddit are Copyright (c) 2006-2012 reddit
# Inc. All Rights Reserved.
###############################################################################
class QueueMap(object):
"""Represents a set of queues and bindings in a single exchange"""
def __init__(self, exchange, chan, exchange_type='direct',
durable=True, auto_delete=False):
self.exchange = exchange
self.chan = chan
self._exchange(exchange,exchange_type=exchange_type,
durable=durable, auto_delete=auto_delete)
def _exchange(self, name, exchange_type, durable, auto_delete):
self.chan.exchange_declare(exchange=name,
type=exchange_type,
durable=durable,
auto_delete=auto_delete)
def _q(self, name, durable=True, exclusive=False,
auto_delete=False, self_refer=False):
self.chan.queue_declare(queue=name,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete)
if self_refer:
# make a routing key with the same name as the queue to
# allow things to be placed directly in it
self._bind(name, name)
def _bind(self, rk, q):
self.chan.queue_bind(routing_key=rk,
queue=q,
exchange=self.exchange)
def init(self):
self.queues()
self.bindings()
def queues(self):
raise NotImplementedError
def bindings(self):
raise NotImplementedError
class RedditQueueMap(QueueMap):
def queues(self):
self._q('scraper_q')
self._q('newcomments_q')
self._q('commentstree_q')
self._q('commentstree_fastlane_q')
# this isn't in use until the spam_q plumbing is
#self._q('newpage_q')
self._q('register_vote_q', self_refer=True)
self._q('vote_link_q', self_refer=True)
self._q('vote_comment_q', self_refer=True)
self._q('vote_fastlane_q', self_refer=True)
self._q('log_q', self_refer=True)
self._q('usage_q', self_refer=True, durable=False)
self._q('cloudsearch_changes', self_refer=True)
self._bind('search_changes', 'cloudsearch_changes')
def bindings(self):
self.newlink_bindings()
self.newcomment_bindings()
self.newsubreddit_bindings()
def newlink_bindings(self):
self._bind('new_link', 'scraper_q')
# this isn't in use until the spam_q plumbing is
#self._bind('new_link', 'newpage_q')
def newcomment_bindings(self):
self._bind('new_comment', 'newcomments_q')
self._bind('new_comment', 'commentstree_q')
self._bind('new_fastlane_comment', 'commentstree_fastlane_q')
def newsubreddit_bindings(self):
pass
try:
from r2admin.lib.adminqueues import *
except ImportError:
pass