very basic insert support

This commit is contained in:
David Glasser
2013-08-19 20:23:52 -07:00
parent cae886b2f0
commit 1c8c7d171c
3 changed files with 69 additions and 7 deletions

View File

@@ -0,0 +1,27 @@
IdMap = function () {
var self = this;
self.map = {};
};
_.extend(IdMap.prototype, {
get: function (id) {
var self = this;
var key = LocalCollection._idStringify(id);
return self.map[key];
},
set: function (id, value) {
var self = this;
var key = LocalCollection._idStringify(id);
self.map[key] = value;
},
remove: function(id) {
var self = this;
var key = LocalCollection._idStringify(id);
delete self.map[key];
},
has: function(id) {
var self = this;
var key = LocalCollection._idStringify(id);
return _.has(self.map, key);
}
});

View File

@@ -237,7 +237,7 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) {
var collectionName = doc.ns.substr(dbName.length + 1);
_.each(callbacksByCollection[collectionName], function (callback) {
callback(doc);
callback(EJSON.clone(doc));
});
});
@@ -260,18 +260,52 @@ MongoConnection.prototype._observeChangesWithOplog = function (
var self = this;
// XXX let's do this with race conditions first!
//
// the real way will involve special oplog handling during the initial cursor
// read. specifically:
//
// 1) start reading the oplog. for every document that could conceivably be
// relevant, cache a bit of information about what we saw. (eg, cache
// document for inserts, removal fact for removes, "needs poll" for updates.
// most recent overrides.)
//
// 2) read the initial set and send added messages.
//
// 3) write a sentinel to some field.
//
// 4) wait until that sentinel comes up through the oplog.
//
// 5) use the cached information (compared to what we already know) to send
// messages about things that changed right about then
//
// 6) now that we're in the "steady state", process ops more directly
var idSet = {};
var idSet = new IdMap;
var selector = LocalCollection._compileSelector(cursorDescription.selector);
var oplogHandle = self._oplogHandle.onOplogEntry(cursorDescription.collectionName, function (op) {
var id;
if (op.op === 'd') {
// XXX check that ObjectId works here. (ie use idStringify or something)
var id = op.o._id;
if (_.has(idSet, id)) {
delete idSet[id];
// XXX check that ObjectId works here
id = op.o._id;
if (idSet.has(id)) {
idSet.remove(id);
if (callbacks.removed)
callbacks.removed(id);
}
} else if (op.op ==='i') {
id = op.o._id;
if (idSet.has(id))
throw new Error("insert found for already-existing ID");
if (selector(op.o)) {
idSet.set(id, true);
if (callbacks.added) {
delete op.o._id;
callbacks.added(id, op.o);
}
}
} else {
console.log("A CHANGE TO THE DOC", op);
}
@@ -282,7 +316,7 @@ MongoConnection.prototype._observeChangesWithOplog = function (
initialCursor.forEach(function (initialDoc) {
var id = initialDoc._id;
delete initialDoc._id;
idSet[id] = true;
idSet.set(id, true);
callbacks.added(id, initialDoc);
});
}

View File

@@ -38,6 +38,7 @@ Package.on_use(function (api) {
// For tests only.
api.export('MongoTest', 'server');
api.add_files('id_map.js', 'server');
api.add_files('mongo_driver.js', 'server');
api.add_files('local_collection_driver.js', ['client', 'server']);
api.add_files('remote_collection_driver.js', 'server');