diff --git a/r2/r2/config/environment.py b/r2/r2/config/environment.py index 744ffd34c..07a5468a5 100644 --- a/r2/r2/config/environment.py +++ b/r2/r2/config/environment.py @@ -53,6 +53,7 @@ def load_environment(global_conf={}, app_conf={}, setup_globals=True): template_engine='mako', paths=paths) g = config['pylons.g'] = Globals(global_conf, app_conf, paths) + g.plugins.declare_queues(g.queues) if setup_globals: g.setup() r2.config.cache = g.cache diff --git a/r2/r2/config/queues.py b/r2/r2/config/queues.py new file mode 100644 index 000000000..c7289b6db --- /dev/null +++ b/r2/r2/config/queues.py @@ -0,0 +1,93 @@ +# The contents of this file are subject to the Common Public Attribution +# License Version 1.0. (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public +# License Version 1.1, but Sections 14 and 15 have been added to cover use of +# software over a computer network and provide for limited attribution for the +# Original Developer. In addition, Exhibit A has been modified to be consistent +# with Exhibit B. +# +# Software distributed under the License is distributed on an "AS IS" basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for +# the specific language governing rights and limitations under the License. +# +# The Original Code is reddit. +# +# The Original Developer is the Initial Developer. The Initial Developer of +# the Original Code is reddit Inc. +# +# All portions of the code written by reddit are Copyright (c) 2006-2012 reddit +# Inc. All Rights Reserved. +############################################################################### + +from r2.lib.utils import tup + + +__all__ = ["MessageQueue", "declare_queues"] + + +class Queues(dict): + """A container for queue declarations.""" + def __init__(self, queues): + dict.__init__(self) + self.__dict__ = self + self.bindings = set() + self.declare(queues) + + def __iter__(self): + for name, queue in self.iteritems(): + if name != "bindings": + yield queue + + def declare(self, queues): + for name, queue in queues.iteritems(): + queue.name = name + queue.bindings = self.bindings + if queue.bind_to_self: + queue._bind(name) + self.update(queues) + + +class MessageQueue(object): + """A representation of an AMQP message queue. + + This class is solely intended for use with the Queues class above. + + """ + def __init__(self, durable=True, exclusive=False, + auto_delete=False, bind_to_self=False): + self.durable = durable + self.exclusive = exclusive + self.auto_delete = auto_delete + self.bind_to_self = bind_to_self + + def _bind(self, routing_key): + self.bindings.add((self.name, routing_key)) + + def __lshift__(self, routing_keys): + """Register bindings from routing keys to this queue.""" + routing_keys = tup(routing_keys) + for routing_key in routing_keys: + self._bind(routing_key) + + +def declare_queues(): + queues = Queues({ + "scraper_q": MessageQueue(), + "newcomments_q": MessageQueue(), + "commentstree_q": MessageQueue(), + "commentstree_fastlane_q": MessageQueue(), + "vote_link_q": MessageQueue(bind_to_self=True), + "vote_comment_q": MessageQueue(bind_to_self=True), + "vote_fastlane_q": MessageQueue(bind_to_self=True), + "log_q": MessageQueue(bind_to_self=True), + "usage_q": MessageQueue(bind_to_self=True, durable=False), + "cloudsearch_changes": MessageQueue(bind_to_self=True), + }) + + queues.cloudsearch_changes << "search_changes" + queues.scraper_q << "new_link" + queues.newcomments_q << "new_comment" + queues.commentstree_q << "new_comment" + queues.commentstree_fastlane_q << "new_fastlane_comment" + return queues diff --git a/r2/r2/lib/amqp.py b/r2/r2/lib/amqp.py index 10a794a88..d1b900716 100644 --- a/r2/r2/lib/amqp.py +++ b/r2/r2/lib/amqp.py @@ -43,6 +43,7 @@ log = g.log amqp_virtual_host = g.amqp_virtual_host amqp_logging = g.amqp_logging stats = g.stats +queues = g.queues #there are two ways of interacting with this module: add_item and #handle_items/consume_items. _add_item (the internal function for @@ -129,11 +130,22 @@ class ConnectionManager(local): return self.channel def init_queue(self): - from r2.lib.queues import RedditQueueMap - chan = self.get_channel() + chan.exchange_declare(exchange=amqp_exchange, + type="direct", + durable=True, + auto_delete=False) - RedditQueueMap(amqp_exchange, chan).init() + for queue in queues: + chan.queue_declare(queue=queue.name, + durable=queue.durable, + exclusive=queue.exclusive, + auto_delete=queue.auto_delete) + + for queue, key in queues.bindings: + chan.queue_bind(routing_key=key, + queue=queue, + exchange=amqp_exchange) connection_manager = ConnectionManager() diff --git a/r2/r2/lib/app_globals.py b/r2/r2/lib/app_globals.py index b2490d0bf..d21b2ea39 100755 --- a/r2/r2/lib/app_globals.py +++ b/r2/r2/lib/app_globals.py @@ -43,6 +43,7 @@ from r2.lib.lock import make_lock_factory from r2.lib.manager import db_manager from r2.lib.stats import Stats, CacheStats, StatsCollectingConnectionPool from r2.lib.plugin import PluginLoader +from r2.config import queues LIVE_CONFIG_NODE = "/config/live" @@ -222,6 +223,7 @@ class Globals(object): self.config = ConfigValueParser(global_conf) self.config.add_spec(self.spec) self.plugins = PluginLoader(self.config.get("plugins", [])) + self.queues = queues.declare_queues() self.paths = paths diff --git a/r2/r2/lib/plugin.py b/r2/r2/lib/plugin.py index dbf4fddb3..7d00690fa 100644 --- a/r2/r2/lib/plugin.py +++ b/r2/r2/lib/plugin.py @@ -69,6 +69,9 @@ class Plugin(object): else: module_registry[name].extend(module) + def declare_queues(self, queues): + pass + def add_routes(self, mc): pass @@ -113,6 +116,10 @@ class PluginLoader(object): def available_plugins(name=None): return pkg_resources.iter_entry_points('r2.plugin', name) + def declare_queues(self, queues): + for plugin in self: + plugin.declare_queues(queues) + def load_plugins(self): g = config['pylons.g'] for plugin in self: diff --git a/r2/r2/lib/queues.py b/r2/r2/lib/queues.py deleted file mode 100644 index 037e8c412..000000000 --- a/r2/r2/lib/queues.py +++ /dev/null @@ -1,106 +0,0 @@ -# The contents of this file are subject to the Common Public Attribution -# License Version 1.0. (the "License"); you may not use this file except in -# compliance with the License. You may obtain a copy of the License at -# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public -# License Version 1.1, but Sections 14 and 15 have been added to cover use of -# software over a computer network and provide for limited attribution for the -# Original Developer. In addition, Exhibit A has been modified to be consistent -# with Exhibit B. -# -# Software distributed under the License is distributed on an "AS IS" basis, -# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for -# the specific language governing rights and limitations under the License. -# -# The Original Code is reddit. -# -# The Original Developer is the Initial Developer. The Initial Developer of -# the Original Code is reddit Inc. -# -# All portions of the code written by reddit are Copyright (c) 2006-2012 reddit -# Inc. All Rights Reserved. -############################################################################### - -class QueueMap(object): - """Represents a set of queues and bindings in a single exchange""" - def __init__(self, exchange, chan, exchange_type='direct', - durable=True, auto_delete=False): - self.exchange = exchange - self.chan = chan - - self._exchange(exchange,exchange_type=exchange_type, - durable=durable, auto_delete=auto_delete) - - def _exchange(self, name, exchange_type, durable, auto_delete): - self.chan.exchange_declare(exchange=name, - type=exchange_type, - durable=durable, - auto_delete=auto_delete) - - def _q(self, name, durable=True, exclusive=False, - auto_delete=False, self_refer=False): - self.chan.queue_declare(queue=name, - durable=durable, - exclusive=exclusive, - auto_delete=auto_delete) - if self_refer: - # make a routing key with the same name as the queue to - # allow things to be placed directly in it - self._bind(name, name) - - def _bind(self, rk, q): - self.chan.queue_bind(routing_key=rk, - queue=q, - exchange=self.exchange) - - def init(self): - self.queues() - self.bindings() - - def queues(self): - raise NotImplementedError - - def bindings(self): - raise NotImplementedError - - -class RedditQueueMap(QueueMap): - def queues(self): - self._q('scraper_q') - self._q('newcomments_q') - self._q('commentstree_q') - self._q('commentstree_fastlane_q') - # this isn't in use until the spam_q plumbing is - #self._q('newpage_q') - self._q('register_vote_q', self_refer=True) - self._q('vote_link_q', self_refer=True) - self._q('vote_comment_q', self_refer=True) - self._q('vote_fastlane_q', self_refer=True) - self._q('log_q', self_refer=True) - self._q('usage_q', self_refer=True, durable=False) - - self._q('cloudsearch_changes', self_refer=True) - self._bind('search_changes', 'cloudsearch_changes') - - def bindings(self): - self.newlink_bindings() - self.newcomment_bindings() - self.newsubreddit_bindings() - - def newlink_bindings(self): - self._bind('new_link', 'scraper_q') - - # this isn't in use until the spam_q plumbing is - #self._bind('new_link', 'newpage_q') - - def newcomment_bindings(self): - self._bind('new_comment', 'newcomments_q') - self._bind('new_comment', 'commentstree_q') - self._bind('new_fastlane_comment', 'commentstree_fastlane_q') - - def newsubreddit_bindings(self): - pass - -try: - from r2admin.lib.adminqueues import * -except ImportError: - pass