Add integration with websocket server "sutro".

This commit is contained in:
Neil Williams
2014-01-07 12:19:53 -08:00
parent 7cee17a418
commit f78857266b
4 changed files with 172 additions and 8 deletions

View File

@@ -160,6 +160,8 @@ cassandra
haproxy
nginx
stunnel
gunicorn
sutro
PACKAGES
###############################################################################
@@ -364,7 +366,7 @@ DEFAULT
# configure haproxy
cat > /etc/haproxy/haproxy.cfg <<HAPROXY
global
maxconn 100
maxconn 350
frontend frontend
mode http
@@ -372,7 +374,7 @@ frontend frontend
bind 0.0.0.0:80
bind 127.0.0.1:8080
timeout client 10000
timeout client 24h
option forwardfor except 127.0.0.1
option httpclose
@@ -381,13 +383,17 @@ frontend frontend
acl is-ssl dst_port 8080
reqadd X-Forwarded-Proto:\ https if is-ssl
# send websockets to sutro
acl is-websocket hdr(Upgrade) -i WebSocket
use_backend sutro if is-websocket
# send media stuff to the local nginx
acl is-media path_beg /media/
use_backend media if is-media
default_backend dynamic
default_backend reddit
backend dynamic
backend reddit
mode http
timeout connect 4000
timeout server 30000
@@ -396,6 +402,14 @@ backend dynamic
server app01-8001 localhost:8001 maxconn 1
backend sutro
mode http
timeout connect 4s
timeout server 24h
balance roundrobin
server sutro localhost:8002 maxconn 250
backend media
mode http
timeout connect 4000
@@ -453,6 +467,81 @@ sed -i s/ENABLED=0/ENABLED=1/ /etc/default/stunnel4
service stunnel4 restart
###############################################################################
# sutro (websocket server)
###############################################################################
if [ ! -f /etc/sutro.ini ]; then
cat > /etc/sutro.ini <<SUTRO
[app:main]
paste.app_factory = sutro.app:make_app
amqp.host = localhost
amqp.port = 5672
amqp.vhost = /
amqp.username = reddit
amqp.password = reddit
web.allowed_origins = $REDDIT_DOMAIN
web.mac_secret = YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXowMTIzNDU2Nzg5
web.ping_interval = 300
stats.host =
stats.port = 0
[server:main]
use = egg:gunicorn#main
worker_class = sutro.socketserver.SutroWorker
workers = 1
worker_connections = 250
host = 127.0.0.1
port = 8002
graceful_timeout = 5
forward_allow_ips = 127.0.0.1
[loggers]
keys = root
[handlers]
keys = syslog
[formatters]
keys = generic
[logger_root]
level = INFO
handlers = syslog
[handler_syslog]
class = handlers.SysLogHandler
args = ("/dev/log", "local7")
formatter = generic
level = NOTSET
[formatter_generic]
format = [%(name)s] %(message)s
SUTRO
fi
if [ ! -f /etc/init/sutro.conf ]; then
cat > /etc/init/sutro.conf << UPSTART_SUTRO
description "sutro websocket server"
stop on runlevel [!2345]
start on runlevel [2345]
respawn
respawn limit 10 5
kill timeout 15
limit nofile 65535 65535
exec gunicorn_paster /etc/sutro.ini
UPSTART_SUTRO
fi
start sutro
###############################################################################
# Upstart Environment
###############################################################################

View File

@@ -15,6 +15,8 @@ SECRET = YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXowMTIzNDU2Nzg5
FEEDSECRET = YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXowMTIzNDU2Nzg5
# used for authenticating admin API calls w/o cookie
ADMINSECRET = YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXowMTIzNDU2Nzg5
# used to securely authenticate websocket requests to sutro
websocket = YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXowMTIzNDU2Nzg5
#
@@ -420,6 +422,9 @@ media_fs_base_url_https =
media_domain = localhost
# the hostname to connect to for websockets
websocket_host =
# -- prices --
gold_month_price = 3.99
gold_year_price = 29.99

View File

@@ -153,7 +153,8 @@ DELIVERY_TRANSIENT = 1
DELIVERY_DURABLE = 2
def _add_item(routing_key, body, message_id = None,
delivery_mode = DELIVERY_DURABLE, headers=None):
delivery_mode=DELIVERY_DURABLE, headers=None,
exchange=amqp_exchange):
"""adds an item onto a queue. If the connection to amqp is lost it
will try to reconnect and then call itself again."""
if not amqp_host:
@@ -173,7 +174,7 @@ def _add_item(routing_key, body, message_id = None,
event_name = 'amqp.%s' % routing_key
try:
chan.basic_publish(msg,
exchange = amqp_exchange,
exchange=exchange,
routing_key = routing_key)
except Exception as e:
stats.event_count(event_name, 'enqueue_failed')
@@ -186,12 +187,13 @@ def _add_item(routing_key, body, message_id = None,
stats.event_count(event_name, 'enqueue')
def add_item(routing_key, body, message_id=None,
delivery_mode=DELIVERY_DURABLE, headers=None):
delivery_mode=DELIVERY_DURABLE, headers=None,
exchange=amqp_exchange):
if amqp_host and amqp_logging:
log.debug("amqp: adding item %r to %r" % (body, routing_key))
worker.do(_add_item, routing_key, body, message_id = message_id,
delivery_mode = delivery_mode, headers=headers)
delivery_mode=delivery_mode, headers=headers, exchange=exchange)
def add_kw(routing_key, **kw):
add_item(routing_key, pickle.dumps(kw))

68
r2/r2/lib/websockets.py Normal file
View File

@@ -0,0 +1,68 @@
# The contents of this file are subject to the Common Public Attribution
# License Version 1.0. (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
# License Version 1.1, but Sections 14 and 15 have been added to cover use of
# software over a computer network and provide for limited attribution for the
# Original Developer. In addition, Exhibit A has been modified to be consistent
# with Exhibit B.
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
# the specific language governing rights and limitations under the License.
#
# The Original Code is reddit.
#
# The Original Developer is the Initial Developer. The Initial Developer of
# the Original Code is reddit Inc.
#
# All portions of the code written by reddit are Copyright (c) 2006-2013 reddit
# Inc. All Rights Reserved.
###############################################################################
"""Utilities for interfacing with the WebSocket server Sutro."""
import hashlib
import hmac
import json
import time
import urllib
import urlparse
from pylons import g
from r2.lib import amqp
_WEBSOCKET_EXCHANGE = "sutro"
def send_broadcast(namespace, message):
"""Broadcast an object to all WebSocket listeners in a namespace.
The message will be encoded as a JSON object before being sent to the
client.
"""
amqp.add_item(routing_key=namespace, body=json.dumps(message),
exchange=_WEBSOCKET_EXCHANGE)
def make_url(namespace, max_age):
"""Return a signed URL for the client to use for websockets.
The namespace determines which messages the client receives and max_age is
the number of seconds the URL is valid for.
"""
expires = str(int(time.time() + max_age))
mac = hmac.new(g.secrets["websocket"], expires + namespace,
hashlib.sha1).hexdigest()
query_string = urllib.urlencode({
"h": mac,
"e": expires,
})
return urlparse.urlunparse(("wss", g.websocket_host, namespace,
None, query_string, None))