diff --git a/install-reddit.sh b/install-reddit.sh index 7283b698f..983a48822 100755 --- a/install-reddit.sh +++ b/install-reddit.sh @@ -160,6 +160,8 @@ cassandra haproxy nginx stunnel +gunicorn +sutro PACKAGES ############################################################################### @@ -364,7 +366,7 @@ DEFAULT # configure haproxy cat > /etc/haproxy/haproxy.cfg < /etc/sutro.ini < /etc/init/sutro.conf << UPSTART_SUTRO +description "sutro websocket server" + +stop on runlevel [!2345] +start on runlevel [2345] + +respawn +respawn limit 10 5 +kill timeout 15 + +limit nofile 65535 65535 + +exec gunicorn_paster /etc/sutro.ini +UPSTART_SUTRO +fi + +start sutro + ############################################################################### # Upstart Environment ############################################################################### diff --git a/r2/example.ini b/r2/example.ini index d28c0310f..3426c78ee 100644 --- a/r2/example.ini +++ b/r2/example.ini @@ -15,6 +15,8 @@ SECRET = YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXowMTIzNDU2Nzg5 FEEDSECRET = YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXowMTIzNDU2Nzg5 # used for authenticating admin API calls w/o cookie ADMINSECRET = YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXowMTIzNDU2Nzg5 +# used to securely authenticate websocket requests to sutro +websocket = YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXowMTIzNDU2Nzg5 # @@ -420,6 +422,9 @@ media_fs_base_url_https = media_domain = localhost +# the hostname to connect to for websockets +websocket_host = + # -- prices -- gold_month_price = 3.99 gold_year_price = 29.99 diff --git a/r2/r2/lib/amqp.py b/r2/r2/lib/amqp.py index d340863c9..25e39a33e 100644 --- a/r2/r2/lib/amqp.py +++ b/r2/r2/lib/amqp.py @@ -153,7 +153,8 @@ DELIVERY_TRANSIENT = 1 DELIVERY_DURABLE = 2 def _add_item(routing_key, body, message_id = None, - delivery_mode = DELIVERY_DURABLE, headers=None): + delivery_mode=DELIVERY_DURABLE, headers=None, + exchange=amqp_exchange): """adds an item onto a queue. If the connection to amqp is lost it will try to reconnect and then call itself again.""" if not amqp_host: @@ -173,7 +174,7 @@ def _add_item(routing_key, body, message_id = None, event_name = 'amqp.%s' % routing_key try: chan.basic_publish(msg, - exchange = amqp_exchange, + exchange=exchange, routing_key = routing_key) except Exception as e: stats.event_count(event_name, 'enqueue_failed') @@ -186,12 +187,13 @@ def _add_item(routing_key, body, message_id = None, stats.event_count(event_name, 'enqueue') def add_item(routing_key, body, message_id=None, - delivery_mode=DELIVERY_DURABLE, headers=None): + delivery_mode=DELIVERY_DURABLE, headers=None, + exchange=amqp_exchange): if amqp_host and amqp_logging: log.debug("amqp: adding item %r to %r" % (body, routing_key)) worker.do(_add_item, routing_key, body, message_id = message_id, - delivery_mode = delivery_mode, headers=headers) + delivery_mode=delivery_mode, headers=headers, exchange=exchange) def add_kw(routing_key, **kw): add_item(routing_key, pickle.dumps(kw)) diff --git a/r2/r2/lib/websockets.py b/r2/r2/lib/websockets.py new file mode 100644 index 000000000..25645bb21 --- /dev/null +++ b/r2/r2/lib/websockets.py @@ -0,0 +1,68 @@ +# The contents of this file are subject to the Common Public Attribution +# License Version 1.0. (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public +# License Version 1.1, but Sections 14 and 15 have been added to cover use of +# software over a computer network and provide for limited attribution for the +# Original Developer. In addition, Exhibit A has been modified to be consistent +# with Exhibit B. +# +# Software distributed under the License is distributed on an "AS IS" basis, +# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for +# the specific language governing rights and limitations under the License. +# +# The Original Code is reddit. +# +# The Original Developer is the Initial Developer. The Initial Developer of +# the Original Code is reddit Inc. +# +# All portions of the code written by reddit are Copyright (c) 2006-2013 reddit +# Inc. All Rights Reserved. +############################################################################### +"""Utilities for interfacing with the WebSocket server Sutro.""" + +import hashlib +import hmac +import json +import time +import urllib +import urlparse + +from pylons import g + +from r2.lib import amqp + + +_WEBSOCKET_EXCHANGE = "sutro" + + +def send_broadcast(namespace, message): + """Broadcast an object to all WebSocket listeners in a namespace. + + The message will be encoded as a JSON object before being sent to the + client. + + """ + amqp.add_item(routing_key=namespace, body=json.dumps(message), + exchange=_WEBSOCKET_EXCHANGE) + + +def make_url(namespace, max_age): + """Return a signed URL for the client to use for websockets. + + The namespace determines which messages the client receives and max_age is + the number of seconds the URL is valid for. + + """ + + expires = str(int(time.time() + max_age)) + mac = hmac.new(g.secrets["websocket"], expires + namespace, + hashlib.sha1).hexdigest() + + query_string = urllib.urlencode({ + "h": mac, + "e": expires, + }) + + return urlparse.urlunparse(("wss", g.websocket_host, namespace, + None, query_string, None))