mirror of
https://github.com/atom/atom.git
synced 2026-04-28 03:01:47 -04:00
Create media connection for each other participant
This commit is contained in:
@@ -1,73 +1,67 @@
|
||||
$ = require 'jquery'
|
||||
_ = require 'underscore'
|
||||
|
||||
sessionUtils = require './session-utils'
|
||||
|
||||
module.exports =
|
||||
class MediaConnection
|
||||
_.extend @prototype, require('event-emitter')
|
||||
_.extend(@prototype, require 'event-emitter')
|
||||
|
||||
channel: null
|
||||
connection: null
|
||||
stream: null
|
||||
isLeader: null
|
||||
constructor: (@remoteParticipant) ->
|
||||
@inboundStreamPromise = $.Deferred()
|
||||
|
||||
constructor: (@channel, {@isLeader}={}) ->
|
||||
@remoteParticipant.on 'add-ice-candidate', (candidate) =>
|
||||
@getOutboundStreamPromise().done =>
|
||||
@getPeerConnection().addIceCandidate(new RTCIceCandidate(candidate))
|
||||
|
||||
start: ->
|
||||
@on 'connected', => @connected = true
|
||||
|
||||
getInboundStreamPromise: -> @inboundStreamPromise
|
||||
|
||||
getOutboundStreamPromise: ->
|
||||
@outboundStreamPromise ?= @createOutboundStreamPromise()
|
||||
|
||||
createOutboundStreamPromise: ->
|
||||
deferred = $.Deferred()
|
||||
video = config.get('collaboration.enableVideo') ? mandatory: { maxWidth: 320, maxHeight: 240 }, optional: []
|
||||
audio = config.get('collaboration.enableAudio') ? true
|
||||
navigator.webkitGetUserMedia({video, audio}, @onUserMediaAvailable, @onUserMediaUnavailable)
|
||||
success = (stream) =>
|
||||
@getPeerConnection().addStream(stream)
|
||||
deferred.resolve(stream)
|
||||
error = (args...) ->
|
||||
deferred.reject(args...)
|
||||
navigator.webkitGetUserMedia({video, audio}, success, error)
|
||||
deferred.promise()
|
||||
|
||||
waitForStream: (callback) ->
|
||||
if @stream
|
||||
callback(@stream)
|
||||
else
|
||||
@on 'stream-ready', callback
|
||||
sendOffer: ->
|
||||
@getOutboundStreamPromise().done =>
|
||||
@getPeerConnection().createOffer (localDescription) =>
|
||||
@getPeerConnection().setLocalDescription(localDescription)
|
||||
@remoteParticipant.send('offer-media-connection', localDescription)
|
||||
@remoteParticipant.one 'answer-media-connection', (remoteDescription) =>
|
||||
@getPeerConnection().setRemoteDescription(new RTCSessionDescription(remoteDescription))
|
||||
@trigger 'connected'
|
||||
|
||||
onUserMediaUnavailable: (args...) =>
|
||||
console.error "User's webcam is unavailable.", args...
|
||||
waitForOffer: ->
|
||||
@remoteParticipant.one 'offer-media-connection', (remoteDescription) =>
|
||||
@getOutboundStreamPromise().done =>
|
||||
@getPeerConnection().setRemoteDescription(new RTCSessionDescription(remoteDescription))
|
||||
@getPeerConnection().createAnswer (localDescription) =>
|
||||
@getPeerConnection().setLocalDescription(localDescription)
|
||||
@remoteParticipant.send('answer-media-connection', localDescription)
|
||||
@trigger 'connected'
|
||||
|
||||
onUserMediaAvailable: (stream) =>
|
||||
@connection = new webkitRTCPeerConnection(sessionUtils.getIceServers())
|
||||
@connection.addStream(stream)
|
||||
@channel.on 'media-handshake', (event) =>
|
||||
try
|
||||
@onSignal(event)
|
||||
catch e
|
||||
console.error event
|
||||
throw e
|
||||
isConnected: -> @connected
|
||||
|
||||
@connection.onicecandidate = (event) =>
|
||||
return unless event.candidate?
|
||||
@channel.send 'media-handshake', {candidate: event.candidate}
|
||||
getPeerConnection: ->
|
||||
@peerConnection ?= @createPeerConnection()
|
||||
|
||||
@connection.onaddstream = (event) =>
|
||||
@stream = event.stream
|
||||
@trigger 'stream-ready', @stream
|
||||
createPeerConnection: ->
|
||||
stunServer = {url: "stun:54.218.196.152:3478"}
|
||||
turnServer = {url: "turn:ninefingers@54.218.196.152:3478", credential:"youhavetoberealistic"}
|
||||
iceServers = [stunServer, turnServer]
|
||||
|
||||
unless @isLeader
|
||||
@channel.send 'media-handshake', {ready: true}
|
||||
|
||||
onSignal: (event) =>
|
||||
if value = event.ready
|
||||
success = (description) =>
|
||||
@connection.setLocalDescription(description)
|
||||
@channel.send 'media-handshake', {description}
|
||||
@connection.createOffer success, console.error
|
||||
|
||||
else if value = event.description
|
||||
remoteDescription = value
|
||||
sessionDescription = new RTCSessionDescription(remoteDescription)
|
||||
@connection.setRemoteDescription(sessionDescription)
|
||||
|
||||
if not @isLeader
|
||||
success = (localDescription) =>
|
||||
@connection.setLocalDescription(localDescription)
|
||||
@channel.send 'media-handshake', {description: localDescription}
|
||||
@connection.createAnswer success, console.error
|
||||
|
||||
else if value = event.candidate
|
||||
remoteCandidate = new RTCIceCandidate value
|
||||
@connection.addIceCandidate(remoteCandidate)
|
||||
else
|
||||
throw new Error("Unknown remote key '#{event}'")
|
||||
peerConnection = new webkitRTCPeerConnection({iceServers})
|
||||
peerConnection.onaddstream = ({stream}) =>
|
||||
@inboundStreamPromise.resolve(stream)
|
||||
peerConnection.onicecandidate = ({candidate}) =>
|
||||
@remoteParticipant.send('add-ice-candidate', candidate) if candidate?
|
||||
peerConnection
|
||||
|
||||
@@ -15,7 +15,7 @@ class ParticipantView extends View
|
||||
@div class: 'volume', outlet: 'volume'
|
||||
|
||||
initialize: (@session, @participant) ->
|
||||
@session.waitForStream (stream) =>
|
||||
@participant.getMediaConnection().getInboundStreamPromise().done (stream) =>
|
||||
@video[0].src = URL.createObjectURL(stream)
|
||||
|
||||
@video.click =>
|
||||
|
||||
@@ -1,12 +1,21 @@
|
||||
_ = require 'underscore'
|
||||
|
||||
MediaConnection = require './media-connection'
|
||||
|
||||
module.exports =
|
||||
class Participant
|
||||
constructor: (@state) ->
|
||||
_.extend(@prototype, require 'event-emitter')
|
||||
|
||||
constructor: (@channel, @state) ->
|
||||
{@clientId} = @state
|
||||
@mediaConnection = new MediaConnection(this)
|
||||
|
||||
getState: -> @state
|
||||
|
||||
send: (data...) -> @channel.send(@clientId, data...)
|
||||
|
||||
getMediaConnection: -> @mediaConnection
|
||||
|
||||
isEqual: (other) ->
|
||||
if other instanceof @constructor
|
||||
otherState = other.getState()
|
||||
|
||||
@@ -4,7 +4,6 @@ keytar = require 'keytar'
|
||||
patrick = require 'patrick'
|
||||
{Site} = require 'telepath'
|
||||
|
||||
MediaConnection = require './media-connection'
|
||||
Project = require 'project'
|
||||
WsChannel = require './ws-channel'
|
||||
Participant = require './participant'
|
||||
@@ -47,10 +46,11 @@ class Session
|
||||
@channel.on 'channel:participant-exited', (participantState) =>
|
||||
@trigger 'participant-exited', @removeParticipant(participantState)
|
||||
|
||||
@channel.on 'channel:direct-message', (senderId, data...) =>
|
||||
@participantForClientId(senderId)?.trigger(data...)
|
||||
|
||||
if @isLeader()
|
||||
@doc = @createDocument()
|
||||
@mediaConnection = @createMediaConnection()
|
||||
@mediaConnection.start()
|
||||
|
||||
@getClientIdToSiteIdMap().set(@clientId, @site.id)
|
||||
|
||||
@@ -71,7 +71,7 @@ class Session
|
||||
siteId: guestSiteId
|
||||
doc: @doc.serialize()
|
||||
repoSnapshot: repoSnapshot
|
||||
@channel.send 'welcome', welcomePackage
|
||||
@channel.broadcast 'welcome', welcomePackage
|
||||
|
||||
else
|
||||
@channel.one 'channel:subscribed', (participantStates) =>
|
||||
@@ -80,22 +80,15 @@ class Session
|
||||
@site = new Site(siteId)
|
||||
@doc = @site.deserializeDocument(doc)
|
||||
@connectDocument()
|
||||
@mediaConnection = @createMediaConnection()
|
||||
@mediaConnection.start()
|
||||
|
||||
repoUrl = @doc.get('collaborationState.repositoryState.url')
|
||||
@mirrorRepository repoUrl, repoSnapshot, =>
|
||||
@sendMediaConnectionOffers()
|
||||
@trigger 'started', @getParticipants()
|
||||
|
||||
createMediaConnection: ->
|
||||
new MediaConnection(@channel, isLeader: @isLeader())
|
||||
|
||||
copySessionId: ->
|
||||
pasteboard.write(getSessionUrl(@id)) if @id
|
||||
|
||||
waitForStream: (callback) ->
|
||||
@mediaConnection.waitForStream callback
|
||||
|
||||
createDocument: ->
|
||||
@site.createDocument
|
||||
windowState: atom.windowState
|
||||
@@ -115,14 +108,18 @@ class Session
|
||||
|
||||
getId: -> @id
|
||||
|
||||
participantForClientId: (targetClientId) ->
|
||||
_.find @getParticipants(), ({clientId}) -> clientId is targetClientId
|
||||
|
||||
getParticipants: -> _.clone(@participants)
|
||||
|
||||
getOtherParticipants: ->
|
||||
@getParticipants().filter ({clientId}) => clientId isnt @clientId
|
||||
|
||||
addParticipant: (participantState) ->
|
||||
participant = new Participant(participantState)
|
||||
participant = new Participant(@channel, participantState)
|
||||
@participants.push(participant)
|
||||
participant.getMediaConnection().waitForOffer()
|
||||
participant
|
||||
|
||||
removeParticipant: (participantState) ->
|
||||
@@ -132,7 +129,11 @@ class Session
|
||||
participant
|
||||
|
||||
setParticipantStates: (participantStates) ->
|
||||
@participants = participantStates.map (state) -> new Participant(state)
|
||||
@participants = participantStates.map (state) => new Participant(@channel, state)
|
||||
|
||||
sendMediaConnectionOffers: ->
|
||||
for participant in @getOtherParticipants()
|
||||
participant.getMediaConnection().sendOffer()
|
||||
|
||||
# TODO: move this functionality into the Participant Object
|
||||
getClientIdToSiteIdMap: -> @doc.get('collaborationState.clientIdToSiteId')
|
||||
@@ -157,7 +158,7 @@ class Session
|
||||
connectDocument: ->
|
||||
@doc.on 'replicate-change', (event) =>
|
||||
@stampEvent(event)
|
||||
@channel.send('document-changed', event)
|
||||
@channel.broadcast('document-changed', event)
|
||||
|
||||
@channel.on 'document-changed', (event) =>
|
||||
@verifyEvent(event)
|
||||
|
||||
@@ -25,8 +25,11 @@ class WsChannel
|
||||
stop: ->
|
||||
@socket.close()
|
||||
|
||||
send: (data...) ->
|
||||
broadcast: (data...) ->
|
||||
@rawSend('broadcast', data)
|
||||
|
||||
send: (data...) ->
|
||||
@rawSend('send', [@clientId, data...])
|
||||
|
||||
rawSend: (args...) ->
|
||||
@socket.send(JSON.stringify(args))
|
||||
|
||||
@@ -8,9 +8,11 @@ Session = require '../lib/session'
|
||||
ServerHost = 'localhost'
|
||||
ServerPort = 8081
|
||||
|
||||
describe "Collaboration", ->
|
||||
fdescribe "when a host and a guest join a channel", ->
|
||||
[server, leaderSession, guestSession, leaderStartedHandler, guestStartedHandler, guestStoppedHandler, token, userDataByToken] = []
|
||||
fdescribe "Collaboration", ->
|
||||
describe "when a host and a guest join a channel", ->
|
||||
[server, token, userDataByToken] = []
|
||||
[leaderSession, leaderStartedHandler, leaderParticipantEnteredHandler, leaderParticipantExitedHandler] = []
|
||||
[guestSession, guestStartedHandler, guestStoppedHandler] = []
|
||||
|
||||
beforeEach ->
|
||||
jasmine.unspy(window, 'setTimeout')
|
||||
@@ -37,10 +39,13 @@ describe "Collaboration", ->
|
||||
|
||||
runs ->
|
||||
leaderSession = new Session(site: new Site(1), host: ServerHost, port: ServerPort, secure: false)
|
||||
guestSession = new Session(id: leaderSession.getId(), host: ServerHost, port: ServerPort, secure: false)
|
||||
leaderSession.one 'started', leaderStartedHandler = jasmine.createSpy("leaderStartedHandler")
|
||||
leaderSession.on 'participant-entered', leaderParticipantEnteredHandler = jasmine.createSpy("leaderParticipantEnteredHandler")
|
||||
leaderSession.on 'participant-exited', leaderParticipantExitedHandler = jasmine.createSpy("leaderParticipantExitedHandler")
|
||||
|
||||
guestSession = new Session(id: leaderSession.getId(), host: ServerHost, port: ServerPort, secure: false)
|
||||
guestSession.one 'started', guestStartedHandler = jasmine.createSpy("guestStartedHandler")
|
||||
guestSession.one 'stopped', guestStoppedHandler = jasmine.createSpy("guestS")
|
||||
guestSession.one 'stopped', guestStoppedHandler = jasmine.createSpy("guestStoppedHandler")
|
||||
|
||||
spyOn(leaderSession, 'snapshotRepository').andCallFake (callback) -> callback({url: 'git://server/repo.git'})
|
||||
|
||||
@@ -78,9 +83,6 @@ describe "Collaboration", ->
|
||||
runs -> expect(guestSession.repositoryMirrored).toBe true
|
||||
|
||||
it "reports on the participants of the channel", ->
|
||||
leaderSession.on 'participant-entered', hostParticipantEnteredHandler = jasmine.createSpy("hostParticipantEnteredHandler")
|
||||
leaderSession.on 'participant-exited', hostParticipantExitedHandler = jasmine.createSpy("hostParticipantExitedHandler")
|
||||
|
||||
leaderSession.start()
|
||||
waitsFor "leader session to start", -> leaderStartedHandler.callCount > 0
|
||||
|
||||
@@ -106,10 +108,10 @@ describe "Collaboration", ->
|
||||
{ login: 'hubot', clientId: leaderSession.clientId }
|
||||
]
|
||||
|
||||
waitsFor "host to see guest enter", -> hostParticipantEnteredHandler.callCount > 0
|
||||
waitsFor "leader to see guest enter", -> leaderParticipantEnteredHandler.callCount > 0
|
||||
|
||||
runs ->
|
||||
expect(hostParticipantEnteredHandler).toHaveBeenCalledWith(login: 'octocat', clientId: guestSession.clientId)
|
||||
expect(leaderParticipantEnteredHandler).toHaveBeenCalledWith(login: 'octocat', clientId: guestSession.clientId)
|
||||
expect(leaderSession.getParticipants()).toEqual [
|
||||
{ login: 'hubot', clientId: leaderSession.clientId }
|
||||
{ login: 'octocat', clientId: guestSession.clientId }
|
||||
@@ -120,13 +122,36 @@ describe "Collaboration", ->
|
||||
guestSession.stop()
|
||||
|
||||
waitsFor "guest session to stop", -> guestStoppedHandler.callCount > 0
|
||||
waitsFor "host to see guest exit", -> hostParticipantExitedHandler.callCount > 0
|
||||
waitsFor "host to see guest exit", -> leaderParticipantExitedHandler.callCount > 0
|
||||
|
||||
runs ->
|
||||
expect(hostParticipantExitedHandler).toHaveBeenCalledWith(login: 'octocat', clientId: guestSession.clientId)
|
||||
expect(leaderParticipantExitedHandler).toHaveBeenCalledWith(login: 'octocat', clientId: guestSession.clientId)
|
||||
expect(leaderSession.getParticipants()).toEqual [login: 'hubot', clientId: leaderSession.clientId]
|
||||
expect(leaderSession.getOtherParticipants()).toEqual []
|
||||
|
||||
siteIdMap = leaderSession.getClientIdToSiteIdMap()
|
||||
expect(siteIdMap.get(leaderSession.clientId)).toEqual 1
|
||||
expect(siteIdMap.get(guestSession.clientId)).toEqual 2
|
||||
|
||||
it "performs a webrtc handshake between all participants", ->
|
||||
leaderSession.start()
|
||||
waitsFor "leader session to start", -> leaderStartedHandler.callCount > 0
|
||||
|
||||
runs ->
|
||||
token = 'octocat-token'
|
||||
guestSession.start()
|
||||
|
||||
waitsFor "leader to see guest enter", -> leaderParticipantEnteredHandler.callCount > 0
|
||||
|
||||
[leaderGuestMediaConnection, guestLeaderMediaConnection] = []
|
||||
runs ->
|
||||
leaderGuestMediaConnection = leaderSession.getOtherParticipants()[0].mediaConnection
|
||||
guestLeaderMediaConnection = guestSession.getOtherParticipants()[0].mediaConnection
|
||||
|
||||
waitsFor "media connection to be established between leader and guest", ->
|
||||
leaderGuestMediaConnection.isConnected() and guestLeaderMediaConnection.isConnected()
|
||||
|
||||
# This will fail when there is no internet connection
|
||||
waitsFor "media stream to be established on leader and guest", (stream1Established, stream2Established) ->
|
||||
leaderGuestMediaConnection.getInboundStreamPromise().done(stream1Established)
|
||||
guestLeaderMediaConnection.getInboundStreamPromise().done(stream2Established)
|
||||
|
||||
Reference in New Issue
Block a user