From 1c8c7d171cf4fa32c1aa63edfc71ee89481db871 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Mon, 19 Aug 2013 20:23:52 -0700 Subject: [PATCH] very basic insert support --- packages/mongo-livedata/id_map.js | 27 ++++++++++++++ packages/mongo-livedata/mongo_driver.js | 48 +++++++++++++++++++++---- packages/mongo-livedata/package.js | 1 + 3 files changed, 69 insertions(+), 7 deletions(-) create mode 100644 packages/mongo-livedata/id_map.js diff --git a/packages/mongo-livedata/id_map.js b/packages/mongo-livedata/id_map.js new file mode 100644 index 0000000000..fa093c305e --- /dev/null +++ b/packages/mongo-livedata/id_map.js @@ -0,0 +1,27 @@ +IdMap = function () { + var self = this; + self.map = {}; +}; + +_.extend(IdMap.prototype, { + get: function (id) { + var self = this; + var key = LocalCollection._idStringify(id); + return self.map[key]; + }, + set: function (id, value) { + var self = this; + var key = LocalCollection._idStringify(id); + self.map[key] = value; + }, + remove: function(id) { + var self = this; + var key = LocalCollection._idStringify(id); + delete self.map[key]; + }, + has: function(id) { + var self = this; + var key = LocalCollection._idStringify(id); + return _.has(self.map, key); + } +}); diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index d84c486e3a..5d73a2dcc0 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -237,7 +237,7 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { var collectionName = doc.ns.substr(dbName.length + 1); _.each(callbacksByCollection[collectionName], function (callback) { - callback(doc); + callback(EJSON.clone(doc)); }); }); @@ -260,18 +260,52 @@ MongoConnection.prototype._observeChangesWithOplog = function ( var self = this; // XXX let's do this with race conditions first! + // + // the real way will involve special oplog handling during the initial cursor + // read. specifically: + // + // 1) start reading the oplog. for every document that could conceivably be + // relevant, cache a bit of information about what we saw. (eg, cache + // document for inserts, removal fact for removes, "needs poll" for updates. + // most recent overrides.) + // + // 2) read the initial set and send added messages. + // + // 3) write a sentinel to some field. + // + // 4) wait until that sentinel comes up through the oplog. + // + // 5) use the cached information (compared to what we already know) to send + // messages about things that changed right about then + // + // 6) now that we're in the "steady state", process ops more directly - var idSet = {}; + var idSet = new IdMap; + + var selector = LocalCollection._compileSelector(cursorDescription.selector); var oplogHandle = self._oplogHandle.onOplogEntry(cursorDescription.collectionName, function (op) { + var id; if (op.op === 'd') { - // XXX check that ObjectId works here. (ie use idStringify or something) - var id = op.o._id; - if (_.has(idSet, id)) { - delete idSet[id]; + // XXX check that ObjectId works here + id = op.o._id; + if (idSet.has(id)) { + idSet.remove(id); if (callbacks.removed) callbacks.removed(id); } + } else if (op.op ==='i') { + id = op.o._id; + if (idSet.has(id)) + throw new Error("insert found for already-existing ID"); + + if (selector(op.o)) { + idSet.set(id, true); + if (callbacks.added) { + delete op.o._id; + callbacks.added(id, op.o); + } + } } else { console.log("A CHANGE TO THE DOC", op); } @@ -282,7 +316,7 @@ MongoConnection.prototype._observeChangesWithOplog = function ( initialCursor.forEach(function (initialDoc) { var id = initialDoc._id; delete initialDoc._id; - idSet[id] = true; + idSet.set(id, true); callbacks.added(id, initialDoc); }); } diff --git a/packages/mongo-livedata/package.js b/packages/mongo-livedata/package.js index 194ac21192..3827dbb929 100644 --- a/packages/mongo-livedata/package.js +++ b/packages/mongo-livedata/package.js @@ -38,6 +38,7 @@ Package.on_use(function (api) { // For tests only. api.export('MongoTest', 'server'); + api.add_files('id_map.js', 'server'); api.add_files('mongo_driver.js', 'server'); api.add_files('local_collection_driver.js', ['client', 'server']); api.add_files('remote_collection_driver.js', 'server');