Distribute events with pusher

This commit is contained in:
Kevin Sawicki & Nathan Sobo
2013-07-24 15:05:39 -07:00
parent 71cbcf9c4a
commit 1bfb10bf2b
13 changed files with 3954 additions and 89 deletions

View File

@@ -118,6 +118,7 @@ window.deserializeEditorWindow = ->
windowState = atom.getWindowState()
atom.packageStates = windowState.getObject('packageStates') ? {}
windowState.remove('packageStates')
window.project = deserialize(windowState.get('project'))
unless window.project?

View File

@@ -25,6 +25,8 @@ updateProgressBar = (message, percentDone) ->
guestSession = new GuestSession(sessionId)
guestSession.on 'started', ->
atom.windowState = guestSession.getDocument().get('windowState')
window.site = guestSession.getSite()
loadingView.remove()
window.startEditorWindow()
@@ -34,6 +36,8 @@ guestSession.on 'connection-opened', ->
guestSession.on 'connection-document-received', ->
updateProgressBar('Synchronizing repository', 50)
guestSession.start()
operationsDone = -1
guestSession.on 'mirror-progress', (message, command, operationCount) ->
operationsDone++

View File

@@ -11,7 +11,7 @@ module.exports =
if atom.getLoadSettings().sessionId
new GuestView(atom.guestSession)
else
hostSession = new HostSession()
hostSession = new HostSession(window.site)
copySession = ->
sessionId = hostSession.getId()

View File

@@ -5,64 +5,98 @@ telepath = require 'telepath'
Project = require 'project'
MediaConnection = require './media-connection'
sessionUtils = require './session-utils'
Pusher = require '../vendor/pusher'
Session = require './session'
module.exports =
class GuestSession
_.extend @prototype, require('event-emitter')
class GuestSession extends Session
participants: null
peer: null
mediaConnection: null
constructor: (sessionId) ->
@peer = sessionUtils.createPeer()
connection = @peer.connect(sessionId, reliable: true)
window.site = new telepath.Site(@getId())
constructor: (@hostId) ->
connection.on 'open', =>
@trigger 'connection-opened'
start: ->
channel = @subscribe("presence-atom")
connection.once 'data', (data) =>
@trigger 'connection-document-received'
channel.on 'pusher:subscription_succeeded', =>
console.log 'in the channel', arguments
doc = @createTelepathDocument(data, connection)
repoUrl = doc.get('collaborationState.repositoryState.url')
channel.one 'client-welcome', ({doc, siteId, repoSnapshot}) =>
console.log 'here? client welcome'
@site = new telepath.Site(siteId)
@doc = @site.deserializeDocument(doc)
@connectDocument(@doc, channel)
repoUrl = @doc.get('collaborationState.repositoryState.url')
@mirrorRepository repoUrl, repoSnapshot, => @trigger 'started'
@mirrorRepository(repoUrl, data.repoSnapshot)
getSite: -> @site
guest = doc.get('collaborationState.guest')
host = doc.get('collaborationState.host')
@mediaConnection = new MediaConnection(guest, host, isHost: false)
@mediaConnection.start()
getDocument: -> @doc
waitForStream: (callback) ->
@mediaConnection.waitForStream callback
getId: -> @peer.id
createTelepathDocument: (data, connection) ->
doc = window.site.deserializeDocument(data.doc)
sessionUtils.connectDocument(doc, connection)
atom.windowState = doc.get('windowState')
@participants = doc.get('collaborationState.participants')
@participants.on 'changed', =>
@trigger 'participants-changed', @participants.toObject()
doc
mirrorRepository: (repoUrl, repoSnapshot) ->
mirrorRepository: (repoUrl, repoSnapshot, callback) ->
repoPath = Project.pathForRepositoryUrl(repoUrl)
progressCallback = (args...) => @trigger 'mirror-progress', args...
patrick.mirror repoPath, repoSnapshot, {progressCallback}, (error) =>
throw new Error(error) if error
if error?
console.error(error)
else
callback()
# 'started' will trigger window.startEditorWindow() which creates the git global
@trigger 'started'
# id = @getId()
# email = project.getRepo().getConfigValue('user.email')
# @participants.push {id, email}
id = @getId()
email = project.getRepo().getConfigValue('user.email')
@participants.push {id, email}
# @peer = sessionUtils.createPeer()
# connection = @peer.connect(sessionId, reliable: true)
# window.site = new telepath.Site(@getId())
#
# connection.on 'open', =>
# @trigger 'connection-opened'
#
# connection.once 'data', (data) =>
# @trigger 'connection-document-received'
#
# doc = @createTelepathDocument(data, connection)
# repoUrl = doc.get('collaborationState.repositoryState.url')
#
# @mirrorRepository(repoUrl, data.repoSnapshot)
#
# guest = doc.get('collaborationState.guest')
# host = doc.get('collaborationState.host')
# @mediaConnection = new MediaConnection(guest, host, isHost: false)
# @mediaConnection.start()
#
# waitForStream: (callback) ->
# @mediaConnection.waitForStream callback
#
# getId: -> @peer.id
#
# createTelepathDocument: (data, connection) ->
# doc = window.site.deserializeDocument(data.doc)
# sessionUtils.connectDocument(doc, connection)
#
# atom.windowState = doc.get('windowState')
#
# @participants = doc.get('collaborationState.participants')
# @participants.on 'changed', =>
# @trigger 'participants-changed', @participants.toObject()
#
# doc
#
# mirrorRepository: (repoUrl, repoSnapshot) ->
# repoPath = Project.pathForRepositoryUrl(repoUrl)
#
# progressCallback = (args...) => @trigger 'mirror-progress', args...
#
# patrick.mirror repoPath, repoSnapshot, {progressCallback}, (error) =>
# throw new Error(error) if error
#
# # 'started' will trigger window.startEditorWindow() which creates the git global
# @trigger 'started'
#
# id = @getId()
# email = project.getRepo().getConfigValue('user.email')
# @participants.push {id, email}

View File

@@ -12,13 +12,13 @@ class GuestView extends View
guestSession: null
initialize: (@guestSession) ->
@guestSession.on 'participants-changed', (participants) =>
@updateParticipants(participants)
@updateParticipants(@guestSession.participants.toObject())
@guestSession.waitForStream (stream) =>
@video[0].src = URL.createObjectURL(stream)
# @guestSession.on 'participants-changed', (participants) =>
# @updateParticipants(participants)
#
# @updateParticipants(@guestSession.participants.toObject())
#
# @guestSession.waitForStream (stream) =>
# @video[0].src = URL.createObjectURL(stream)
@attach()

View File

@@ -1,23 +1,32 @@
fs = require 'fs'
_ = require 'underscore'
guid = require 'guid'
patrick = require 'patrick'
telepath = require 'telepath'
MediaConnection = require './media-connection'
sessionUtils = require './session-utils'
Pusher = require '../vendor/pusher'
Session = require './session'
module.exports =
class HostSession
_.extend @prototype, require('event-emitter')
class HostSession extends Session
participants: null
peer: null
mediaConnection: null
doc: null
constructor: ->
@doc = site.createDocument
constructor: (@site) ->
@id = guid.create().toString()
@nextGuestSiteId = @site.id + 1
getSite: -> @site
getDocument: -> @doc
createDocument: ->
@site.createDocument
windowState: atom.windowState
collaborationState:
guest: {description: '', candidate: '', ready: false}
@@ -27,41 +36,50 @@ class HostSession
url: project.getRepo().getConfigValue('remote.origin.url')
branch: project.getRepo().getShortHead()
host = @doc.get('collaborationState.host')
guest = @doc.get('collaborationState.guest')
@mediaConnection = new MediaConnection(host, guest, isHost: true)
@peer = sessionUtils.createPeer()
start: ->
return if @isSharing()
@mediaConnection.start()
patrick.snapshot project.getPath(), (error, repoSnapshot) =>
throw new Error(error) if error
@doc = @createDocument()
channel = @subscribe("presence-atom")
channel.on 'pusher:subscription_succeeded', =>
@trigger 'started'
@connectDocument(@doc, channel)
@participants = @doc.get('collaborationState.participants')
@participants.push
id: @getId()
email: project.getRepo().getConfigValue('user.email')
@participants.on 'changed', =>
@trigger 'participants-changed', @participants.toObject()
@peer.on 'connection', (connection) =>
connection.on 'open', =>
connection.send({repoSnapshot, doc: @doc.serialize()})
sessionUtils.connectDocument(@doc, connection)
@trigger 'started'
connection.on 'close', =>
@participants.each (participant, index) =>
if connection.peer is participant.get('id')
@participants.remove(index)
@trigger 'stopped'
channel.on 'pusher:member_added', =>
@snapshotRepository (repoSnapshot) =>
welcomePackage =
siteId: @nextGuestSiteId++
doc: @doc.serialize()
repoSnapshot: repoSnapshot
channel.send 'client-welcome', welcomePackage
# host = @doc.get('collaborationState.host')
# guest = @doc.get('collaborationState.guest')
# @mediaConnection = new MediaConnection(host, guest, isHost: true)
# @mediaConnection.start()
@getId()
snapshotRepository: (callback) ->
patrick.snapshot project.getPath(), (error, repoSnapshot) =>
if error
console.error(error)
else
callback(repoSnapshot)
# @participants = @doc.get('collaborationState.participants')
# @participants.push
# id: @getId()
# email: project.getRepo().getConfigValue('user.email')
#
# @participants.on 'changed', =>
# @trigger 'participants-changed', @participants.toObject()
# connection.on 'close', =>
# @participants.each (participant, index) =>
# if connection.peer is participant.get('id')
# @participants.remove(index)
# @trigger 'stopped'
stop: ->
return unless @peer?
@peer.destroy()
@@ -70,8 +88,7 @@ class HostSession
waitForStream: (callback) ->
@mediaConnection.waitForStream callback
getId: ->
@peer.id
getId: -> @id
isSharing: ->
@peer? and not _.isEmpty(@peer.connections)

View File

@@ -29,8 +29,8 @@ class HostView extends View
@hostSession.on 'participants-changed', (participants) =>
@updateParticipants(participants)
@hostSession.waitForStream (stream) =>
@video[0].src = URL.createObjectURL(stream)
# @hostSession.waitForStream (stream) =>
# @video[0].src = URL.createObjectURL(stream)
@attach()

View File

@@ -16,7 +16,7 @@ class MediaConnection
start: ->
constraints = {video: true, audio: true}
navigator.webkitGetUserMedia constraints, @onUserMediaAvailable, @onUserMediaUnavailable
# navigator.webkitGetUserMedia constraints, @onUserMediaAvailable, @onUserMediaUnavailable
waitForStream: (callback) ->
if @stream

View File

@@ -0,0 +1,39 @@
_ = require 'underscore'
module.exports =
class RateLimitedChannel
_.extend @prototype, require('event-emitter')
constructor: (@channel) ->
@queue = []
setInterval(@sendBatch, 200)
@channel.bind_all (eventName, args...) =>
if eventName is 'client-batch'
@receiveBatch(args...)
else
@trigger eventName, args...
receiveBatch: (batch) =>
@trigger event... for event in batch
sendBatch: =>
return if @queue.length is 0
batch = []
batchSize = 2
while event = @queue.shift()
eventJson = JSON.stringify(event)
if batchSize + eventJson.length > 10000
console.log 'over 10k in batch, bailing'
@queue.unshift(event)
break
else
batch.push(eventJson)
batchSize += eventJson.length + 1
console.log 'sending batch'
@channel.trigger 'client-batch', "[#{batch.join(',')}]"
send: (args...) ->
@queue.push(args)

View File

@@ -0,0 +1,51 @@
_ = require 'underscore'
keytar = require 'keytar'
RateLimitedChannel = require './rate-limited-channel'
module.exports =
class Session
_.extend @prototype, require('event-emitter')
subscribe: (channelName) ->
new RateLimitedChannel(@getPusherConnection().subscribe(channelName))
getPusherConnection: ->
@pusher ?= new Pusher '490be67c75616316d386',
encrypted: true
authEndpoint: 'https://fierce-caverns-8387.herokuapp.com/pusher/auth'
auth:
params:
oauth_token: keytar.getPassword('github.com', 'github')
connectDocument: (doc, channel) ->
nextOutputEventId = 1
outputListener = (event) ->
event.id = nextOutputEventId++
console.log 'sending event', event
channel.send('client-document-changed', event)
doc.on('replicate-change', outputListener)
queuedEvents = []
nextInputEventId = 1
handleInputEvent = (event) ->
console.log 'received event', event
doc.applyRemoteChange(event)
nextInputEventId = event.id + 1
flushQueuedEvents = ->
loop
eventHandled = false
for event, index in queuedEvents when event.id is nextInputEventId
handleInputEvent(event)
queuedEvents.splice(index, 1)
eventHandled = true
break
break unless eventHandled
channel.on 'client-document-changed', (event) ->
if event.id is nextInputEventId
handleInputEvent(event)
flushQueuedEvents()
else
console.log 'enqueing event', event
queuedEvents.push(event)

View File

@@ -0,0 +1,102 @@
_ = require 'underscore'
keytar = require 'keytar'
{Site} = require 'telepath'
GuestSession = require '../lib/guest-session'
HostSession = require '../lib/host-session'
class PusherServer
constructor: ->
@channels = {}
getChannel: (channelName) ->
@channels[channelName] ?= new ChannelServer(channelName)
createClient: -> new PusherClient(this)
class ChannelServer
constructor: (@name) ->
@channelClients = {}
subscribe: (subscribingClient) ->
channelClient = new ChannelClient(subscribingClient, this)
@channelClients[subscribingClient.id] = channelClient
setTimeout =>
for client in @getChannelClients()
if client is channelClient
client.trigger 'pusher:subscription_succeeded'
else
client.trigger 'pusher:member_added'
channelClient
getChannelClients: -> _.values(@channelClients)
send: (sendingClient, eventName, eventData) ->
setTimeout =>
for client in @getChannelClients() when client isnt sendingClient
client.trigger(eventName, eventData)
class PusherClient
@nextId: 1
constructor: (@server) ->
@id = @constructor.nextId++
subscribe: (channelName) ->
@server.getChannel(channelName).subscribe(this)
class ChannelClient
_.extend @prototype, require('event-emitter')
constructor: (@pusherClient, @channelServer) ->
send: (eventName, eventData) ->
@channelServer.send(this, eventName, eventData)
fdescribe "Collaboration", ->
describe "joining a host session", ->
[hostSession, guestSession, pusher, repositoryMirrored] = []
beforeEach ->
spyOn(keytar, 'getPassword')
jasmine.unspy(window, 'setTimeout')
pusherServer = new PusherServer()
hostSession = new HostSession(new Site(1))
spyOn(hostSession, 'snapshotRepository').andCallFake (callback) ->
callback({url: 'git://server/repo.git'})
spyOn(hostSession, 'subscribe').andCallFake (channelName) ->
pusherServer.createClient().subscribe(channelName)
guestSession = new GuestSession(hostSession.getId())
spyOn(guestSession, 'subscribe').andCallFake (channelName) ->
pusherServer.createClient().subscribe(channelName)
spyOn(guestSession, 'mirrorRepository').andCallFake (repoUrl, repoSnapshot, callback) ->
setTimeout ->
repositoryMirrored = true
callback()
it "sends the document from the host session to the guest session", ->
hostSession.start()
startedHandler = jasmine.createSpy('startedHandler')
guestSession.on 'started', startedHandler
waitsFor "host session to start", (started) -> hostSession.one 'started', started
runs ->
guestSession.start()
waitsFor "guest session to receive document", -> guestSession.getDocument()?
runs ->
expect(guestSession.mirrorRepository.argsForCall[0][1]).toEqual {url: 'git://server/repo.git'}
expect(guestSession.getSite().id).toBe 2
hostSession.getDocument().set('this should', 'replicate')
guestSession.getDocument().set('this also', 'replicates')
waitsFor "documents to replicate", ->
guestSession.getDocument().get('this should') is 'replicate' and
hostSession.getDocument().get('this also') is 'replicates'
waitsFor "guest session to start", -> startedHandler.callCount is 1
runs ->
expect(repositoryMirrored).toBe true

View File

@@ -904,6 +904,7 @@ Reliable.prototype._intervalSend = function(msg) {
var self = this;
msg = util.pack(msg);
util.blobToBinaryString(msg, function(str) {
console.log('sending', msg);
self._dc.send(str);
});
if (self._queue.length === 0) {
@@ -965,6 +966,7 @@ Reliable.prototype._handleMessage = function(msg) {
var idata = this._incoming[id];
var odata = this._outgoing[id];
var data;
console.log('handle message', msg);
switch (msg[0]) {
// No chunking was done.
case 'no':
@@ -984,7 +986,8 @@ Reliable.prototype._handleMessage = function(msg) {
break;
}
this._ack(id);
if (!window.disableEnd)
this._ack(id);
break;
case 'ack':
data = odata;

File diff suppressed because it is too large Load Diff