From 46effaa9b31deb074d442fbca627d6e7d00a4ac2 Mon Sep 17 00:00:00 2001 From: Nick Martin Date: Mon, 12 Dec 2011 20:56:12 -0800 Subject: [PATCH] Factor stream out of livedata. Client side only. No functional changes. --- packages/livedata/livedata_client.js | 38 +++++++++--------------- packages/livedata/package.js | 1 + packages/stream/package.js | 9 ++++++ packages/stream/stream_client.js | 44 ++++++++++++++++++++++++++++ packages/stream/stream_server.js | 1 + 5 files changed, 69 insertions(+), 24 deletions(-) create mode 100644 packages/stream/package.js create mode 100644 packages/stream/stream_client.js create mode 100644 packages/stream/stream_server.js diff --git a/packages/livedata/livedata_client.js b/packages/livedata/livedata_client.js index 134c96d613..2d30869930 100644 --- a/packages/livedata/livedata_client.js +++ b/packages/livedata/livedata_client.js @@ -1,10 +1,6 @@ if (typeof Sky === "undefined") Sky = {}; -// XXX right now, if we can't connect to the database, we silently -// drop writes on the floor!! that is very, very lame. - (function () { - var socket = io.connect(); var collections = {}; // name -> Collection-type object var subs = new Collection(); @@ -17,14 +13,7 @@ if (typeof Sky === "undefined") Sky = {}; // Sky.subscriptions(). But who wants that? What does that even mean? var capture_subs; - socket.on('connect', function () { - // XXX - }); - socket.on('disconnect', function () { - // XXX reconnect - }); - - socket.on('published', function (data) { + Sky._stream.on('published', function (data) { _.each(data, function (changes, collection_name) { var coll = collections[collection_name]; if (!coll) { @@ -53,7 +42,7 @@ if (typeof Sky === "undefined") Sky = {}; }); }); - socket.on('subscription_ready', function (id) { + Sky._stream.on('subscription_ready', function (id) { var arr = sub_ready_callbacks[id]; if (arr) _.each(arr, function (c) { c(); }); delete sub_ready_callbacks[id]; @@ -61,7 +50,7 @@ if (typeof Sky === "undefined") Sky = {}; var subsToken = subs.findLive({}, { added: function (sub) { - socket.emit('subscribe', { + Sky._stream.emit('subscribe', { _id: sub._id, name: sub.name, args: sub.args}); }, changed: function (sub) { @@ -71,7 +60,7 @@ if (typeof Sky === "undefined") Sky = {}; } }, removed: function (id) { - socket.emit('unsubscribe', {_id: id}); + Sky._stream.emit('unsubscribe', {_id: id}); } }); @@ -95,8 +84,8 @@ if (typeof Sky === "undefined") Sky = {}; obj._id = _id; if (this._name) - socket.emit('handle', {collection: this._name, type: 'insert', - args: obj}); + Sky._stream.emit('handle', { + collection: this._name, type: 'insert', args: obj}); this._collection.insert(obj); return obj; @@ -115,9 +104,9 @@ if (typeof Sky === "undefined") Sky = {}; selector = {_id: selector}; if (this._name) - socket.emit('handle', {collection: this._name, type: 'update', - selector: selector, mutator: mutator, - options: options}); + Sky._stream.emit('handle', { + collection: this._name, type: 'update', + selector: selector, mutator: mutator, options: options}); this._collection.update(selector, mutator, options); }, @@ -126,8 +115,8 @@ if (typeof Sky === "undefined") Sky = {}; selector = {_id: selector}; if (this._name) - socket.emit('handle', {collection: this._name, type: 'remove', - selector: selector}); + Sky._stream.emit('handle', { + collection: this._name, type: 'remove', selector: selector}); this._collection.remove(selector); }, @@ -147,8 +136,9 @@ if (typeof Sky === "undefined") Sky = {}; // tell the server to run the handler if (this._name) - socket.emit('handle', {collection: this._name, type: 'method', - method: method, args: args}); + Sky._stream.emit('handle', { + collection: this._name, type: 'method', + method: method, args: args}); }; }, this); } diff --git a/packages/livedata/package.js b/packages/livedata/package.js index e718cac885..7feea2674a 100644 --- a/packages/livedata/package.js +++ b/packages/livedata/package.js @@ -6,6 +6,7 @@ Package.describe({ Package.require('underscore'); Package.require('session'); Package.require('minimongo'); +Package.require('stream'); Package.client_file('livedata_client.js'); diff --git a/packages/stream/package.js b/packages/stream/package.js new file mode 100644 index 0000000000..5bdc0ba521 --- /dev/null +++ b/packages/stream/package.js @@ -0,0 +1,9 @@ +Package.describe({ + summary: "Skybreak's reliable message delivery module", + internal: true +}); + +Package.require('underscore'); + +Package.client_file('stream_client.js'); +Package.server_file('stream_server.js'); diff --git a/packages/stream/stream_client.js b/packages/stream/stream_client.js new file mode 100644 index 0000000000..0ed28e0e83 --- /dev/null +++ b/packages/stream/stream_client.js @@ -0,0 +1,44 @@ +if (typeof Sky === "undefined") Sky = {}; + +(function () { + + ////////// Internals ////////// + + var socket = io.connect(); + + + + socket.on('connect', function () { + // XXX + }); + socket.on('disconnect', function () { + // XXX reconnect + }); + + + ////////// User facing API ////////// + + Sky.status = function () { + // XXX implement + return {connected: true, its_all_a_lie: true}; + }; + + Sky.reconnect = function () { + // XXX implement + }; + + + ////////// API for other packages ////////// + + Sky._stream = { + on: function (name, callback) { + socket.on(name, callback) + }, + + emit: function (XXX) { + socket.emit.apply(socket, arguments); + } + }; + + +})(); diff --git a/packages/stream/stream_server.js b/packages/stream/stream_server.js new file mode 100644 index 0000000000..45a3f3427b --- /dev/null +++ b/packages/stream/stream_server.js @@ -0,0 +1 @@ +if (typeof Sky === "undefined") Sky = {};