From 1fc6509a51165e9d9d00c66d43f29ef3e322eb8f Mon Sep 17 00:00:00 2001 From: Kevin Sawicki & Nathan Sobo Date: Wed, 24 Jul 2013 17:20:01 -0600 Subject: [PATCH] Use Redis pub/sub for channels. :zap: to @mcolyer for the idea! --- package.json | 3 +- .../collaboration/lib/guest-session.coffee | 2 +- .../collaboration/lib/host-session.coffee | 4 +-- .../collaboration/lib/redis-channel.coffee | 31 +++++++++++++++++++ src/packages/collaboration/lib/session.coffee | 4 +-- .../spec/collaboration-spec.coffee | 6 ++-- 6 files changed, 41 insertions(+), 9 deletions(-) create mode 100644 src/packages/collaboration/lib/redis-channel.coffee diff --git a/package.json b/package.json index f3c1534d3..957359a2e 100644 --- a/package.json +++ b/package.json @@ -68,7 +68,8 @@ "todo-tmbundle": "1.0.0", "xml-tmbundle": "1.0.0", "yaml-tmbundle": "1.0.0", - "nslog": "0.1.0" + "nslog": "0.1.0", + "redis": "0.8.4" }, "devDependencies": { "biscotto": "0.0.12", diff --git a/src/packages/collaboration/lib/guest-session.coffee b/src/packages/collaboration/lib/guest-session.coffee index 098b531f2..7e236bbb7 100644 --- a/src/packages/collaboration/lib/guest-session.coffee +++ b/src/packages/collaboration/lib/guest-session.coffee @@ -19,7 +19,7 @@ class GuestSession extends Session start: -> channel = @subscribe("presence-atom") - channel.on 'pusher:subscription_succeeded', => + channel.on 'channel:open', => console.log 'in the channel', arguments channel.one 'client-welcome', ({doc, siteId, repoSnapshot}) => diff --git a/src/packages/collaboration/lib/host-session.coffee b/src/packages/collaboration/lib/host-session.coffee index b40f50e9f..5936dbf0f 100644 --- a/src/packages/collaboration/lib/host-session.coffee +++ b/src/packages/collaboration/lib/host-session.coffee @@ -41,11 +41,11 @@ class HostSession extends Session @doc = @createDocument() channel = @subscribe("presence-atom") - channel.on 'pusher:subscription_succeeded', => + channel.on 'channel:opened', => @trigger 'started' @connectDocument(@doc, channel) - channel.on 'pusher:member_added', => + channel.on 'channel:participant-joined', => @snapshotRepository (repoSnapshot) => welcomePackage = siteId: @nextGuestSiteId++ diff --git a/src/packages/collaboration/lib/redis-channel.coffee b/src/packages/collaboration/lib/redis-channel.coffee new file mode 100644 index 000000000..2b95ada19 --- /dev/null +++ b/src/packages/collaboration/lib/redis-channel.coffee @@ -0,0 +1,31 @@ +_ = require 'underscore' +redis = require 'redis' +guid = require 'guid' + +module.exports = +class RedisChannel + _.extend @prototype, require('event-emitter') + + constructor: (@name) -> + @clientId = guid.create().toString() + + @subscribeClient = redis.createClient() + @sendClient = redis.createClient() + @subscribeClient.on 'error', (error) -> console.error("Error on subscribe client", error) + @sendClient.on 'error', (error) -> console.error("Error on send client", error) + + @subscribeClient.subscribe @name, => + console.log "subscribed to channel", @name + @send 'channel:participant-joined', @clientId + @trigger 'channel:opened' + + @subscribeClient.on 'message', (channelName, message) => + return unless channelName is @name + [senderId, eventName, args...] = JSON.parse(message) + unless senderId is @clientId + console.log "message on channel", eventName, args... + @trigger(eventName, args...) + + send: (eventName, args...) -> + message = JSON.stringify([@clientId, eventName, args...]) + @sendClient.publish(@name, message) diff --git a/src/packages/collaboration/lib/session.coffee b/src/packages/collaboration/lib/session.coffee index 88a3b6426..5a6f61758 100644 --- a/src/packages/collaboration/lib/session.coffee +++ b/src/packages/collaboration/lib/session.coffee @@ -1,14 +1,14 @@ _ = require 'underscore' keytar = require 'keytar' -RateLimitedChannel = require './rate-limited-channel' +RedisChannel = require './redis-channel' module.exports = class Session _.extend @prototype, require('event-emitter') subscribe: (channelName) -> - new RateLimitedChannel(@getPusherConnection().subscribe(channelName)) + new RedisChannel(channelName) getPusherConnection: -> @pusher ?= new Pusher '490be67c75616316d386', diff --git a/src/packages/collaboration/spec/collaboration-spec.coffee b/src/packages/collaboration/spec/collaboration-spec.coffee index 8496c4bc9..d4cae07fd 100644 --- a/src/packages/collaboration/spec/collaboration-spec.coffee +++ b/src/packages/collaboration/spec/collaboration-spec.coffee @@ -24,9 +24,9 @@ class ChannelServer setTimeout => for client in @getChannelClients() if client is channelClient - client.trigger 'pusher:subscription_succeeded' + client.trigger 'channel:opened' else - client.trigger 'pusher:member_added' + client.trigger 'channel:participant-joined' channelClient getChannelClients: -> _.values(@channelClients) @@ -53,7 +53,7 @@ class ChannelClient send: (eventName, eventData) -> @channelServer.send(this, eventName, eventData) -fdescribe "Collaboration", -> +describe "Collaboration", -> describe "joining a host session", -> [hostSession, guestSession, pusher, repositoryMirrored] = []