Merge branch 'oplog' into devel

This commit is contained in:
David Glasser
2013-12-05 02:03:47 -08:00
36 changed files with 2910 additions and 817 deletions

View File

@@ -288,6 +288,7 @@ shell-quote: https://github.com/substack/node-shell-quote
deep-equal: https://github.com/substack/node-deep-equal
editor: https://github.com/substack/node-editor
minimist: https://github.com/substack/node-minimist
quotemeta: https://github.com/substack/quotemeta
----------
Copyright 2010, 2011, 2012, 2013 James Halliday (mail@substack.net)

View File

@@ -57,7 +57,8 @@ try {
settings: settings,
packages: {
'mongo-livedata': {
url: process.env.MONGO_URL
url: process.env.MONGO_URL,
oplog: process.env.MONGO_OPLOG_URL
}
}
};

1
packages/disable-oplog/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.build*

View File

@@ -0,0 +1,6 @@
Package.describe({
summary: "Disables oplog tailing",
internal: true
});
// This package is empty; its presence is detected by mongo-livedata.

View File

@@ -63,7 +63,9 @@ if (Meteor.isServer) {
});
} else {
Facts.server = new Meteor.Collection(serverFactsCollection);
Meteor.subscribe("facts");
// XXX making all clients subscribe all the time is wasteful.
// add an interface here
// Meteor.subscribe("facts");
Template.serverFacts.factsByPackage = function () {
return Facts.server.find();

View File

@@ -1,13 +1,20 @@
DDPServer._InvalidationCrossbar = function () {
var self = this;
// A "crossbar" is a class that provides structured notification registration.
// The "invalidation crossbar" is a specific instance used by the DDP server to
// implement write fence notifications.
self.next_id = 1;
DDPServer._Crossbar = function (options) {
var self = this;
options = options || {};
self.nextId = 1;
// map from listener id to object. each object has keys 'trigger',
// 'callback'.
self.listeners = {};
self.factPackage = options.factPackage || "livedata";
self.factName = options.factName || null;
};
_.extend(DDPServer._InvalidationCrossbar.prototype, {
_.extend(DDPServer._Crossbar.prototype, {
// Listen for notification that match 'trigger'. A notification
// matches if it has the key-value pairs in trigger as a
// subset. When a notification matches, call 'callback', passing two
@@ -20,19 +27,20 @@ _.extend(DDPServer._InvalidationCrossbar.prototype, {
//
// XXX It should be legal to call fire() from inside a listen()
// callback?
//
// Note: the LiveResultsSet constructor assumes that a call to listen() never
// yields.
listen: function (trigger, callback) {
var self = this;
var id = self.next_id++;
var id = self.nextId++;
self.listeners[id] = {trigger: EJSON.clone(trigger), callback: callback};
Package.facts && Package.facts.Facts.incrementServerFact(
"livedata", "crossbar-listeners", 1);
if (self.factName && Package.facts) {
Package.facts.Facts.incrementServerFact(
self.factPackage, self.factName, 1);
}
return {
stop: function () {
Package.facts && Package.facts.Facts.incrementServerFact(
"livedata", "crossbar-listeners", -1);
if (self.factName && Package.facts) {
Package.facts.Facts.incrementServerFact(
self.factPackage, self.factName, -1);
}
delete self.listeners[id];
}
};
@@ -50,6 +58,7 @@ _.extend(DDPServer._InvalidationCrossbar.prototype, {
fire: function (notification, onComplete) {
var self = this;
var callbacks = [];
// XXX consider refactoring to "index" on "collection"
_.each(self.listeners, function (l) {
if (self._matches(notification, l.trigger))
callbacks.push(l.callback);
@@ -57,7 +66,7 @@ _.extend(DDPServer._InvalidationCrossbar.prototype, {
if (onComplete)
onComplete = Meteor.bindEnvironment(onComplete, function (e) {
Meteor._debug("Exception in InvalidationCrossbar fire complete " +
Meteor._debug("Exception in Crossbar fire complete " +
"callback", e.stack);
});
@@ -99,5 +108,6 @@ _.extend(DDPServer._InvalidationCrossbar.prototype, {
}
});
// singleton
DDPServer._InvalidationCrossbar = new DDPServer._InvalidationCrossbar;
DDPServer._InvalidationCrossbar = new DDPServer._Crossbar({
factName: "invalidation-crossbar-listeners"
});

View File

@@ -6,15 +6,16 @@
// deep meaning to the matching function, and it could be changed later
// as long as it preserves that property.
Tinytest.add('livedata - crossbar', function (test) {
test.isTrue(DDPServer._InvalidationCrossbar._matches(
{collection: "C"}, {collection: "C"}));
test.isTrue(DDPServer._InvalidationCrossbar._matches(
{collection: "C", id: "X"}, {collection: "C"}));
test.isTrue(DDPServer._InvalidationCrossbar._matches(
{collection: "C"}, {collection: "C", id: "X"}));
test.isTrue(DDPServer._InvalidationCrossbar._matches(
{collection: "C", id: "X"}, {collection: "C"}));
var crossbar = new DDPServer._Crossbar;
test.isTrue(crossbar._matches({collection: "C"},
{collection: "C"}));
test.isTrue(crossbar._matches({collection: "C", id: "X"},
{collection: "C"}));
test.isTrue(crossbar._matches({collection: "C"},
{collection: "C", id: "X"}));
test.isTrue(crossbar._matches({collection: "C", id: "X"},
{collection: "C"}));
test.isFalse(DDPServer._InvalidationCrossbar._matches(
{collection: "C", id: "X"}, {collection: "C", id: "Y"}));
test.isFalse(crossbar._matches({collection: "C", id: "X"},
{collection: "C", id: "Y"}));
});

View File

@@ -3,8 +3,6 @@
// old_results and new_results: collections of documents.
// if ordered, they are arrays.
// if unordered, they are maps {_id: doc}.
// observer: object with 'added', 'changed', 'removed',
// and (if ordered) 'moved' functions (each optional)
LocalCollection._diffQueryChanges = function (ordered, oldResults, newResults,
observer) {
if (ordered)
@@ -17,8 +15,8 @@ LocalCollection._diffQueryChanges = function (ordered, oldResults, newResults,
LocalCollection._diffQueryUnorderedChanges = function (oldResults, newResults,
observer) {
if (observer.moved) {
throw new Error("_diffQueryUnordered called with a moved observer!");
if (observer.movedBefore) {
throw new Error("_diffQueryUnordered called with a movedBefore observer!");
}
_.each(newResults, function (newDoc) {

View File

@@ -0,0 +1,56 @@
LocalCollection._IdMap = function () {
var self = this;
self._map = {};
};
// Some of these methods are designed to match methods on OrderedDict, since
// (eg) ObserveMultiplex and _CachingChangeObserver use them interchangeably.
// (Conceivably, this should be replaced with "UnorderedDict" with a specific
// set of methods that overlap between the two.)
_.extend(LocalCollection._IdMap.prototype, {
get: function (id) {
var self = this;
var key = LocalCollection._idStringify(id);
return self._map[key];
},
set: function (id, value) {
var self = this;
var key = LocalCollection._idStringify(id);
self._map[key] = value;
},
remove: function (id) {
var self = this;
var key = LocalCollection._idStringify(id);
delete self._map[key];
},
has: function (id) {
var self = this;
var key = LocalCollection._idStringify(id);
return _.has(self._map, key);
},
empty: function () {
var self = this;
return _.isEmpty(self._map);
},
clear: function () {
var self = this;
self._map = {};
},
forEach: function (iterator) {
var self = this;
_.each(self._map, function (value, key, obj) {
var context = this;
iterator.call(context, value, LocalCollection._idParse(key), obj);
});
},
// XXX used?
setDefault: function (id, def) {
var self = this;
var key = LocalCollection._idStringify(id);
if (_.has(self._map, key))
return self._map[key];
self._map[key] = def;
return def;
}
});

View File

@@ -5,7 +5,7 @@
// Cursor: a specification for a particular subset of documents, w/
// a defined order, limit, and offset. creating a Cursor with LocalCollection.find(),
// LiveResultsSet: the return value of a live query.
// ObserveHandle: the return value of a live query.
LocalCollection = function (name) {
this.name = name;
@@ -16,8 +16,7 @@ LocalCollection = function (name) {
this.next_qid = 1; // live query id generator
// qid -> live query object. keys:
// ordered: bool. ordered queries have moved callbacks and callbacks
// take indices.
// ordered: bool. ordered queries have addedBefore/movedBefore callbacks.
// results: array (ordered) or object (unordered) of current results
// results_snapshot: snapshot of results. null if not paused.
// cursor: Cursor object for the query.
@@ -32,6 +31,9 @@ LocalCollection = function (name) {
this.paused = false;
};
// Object exported only for unit testing.
// Use it to export private functions to test in Tinytest.
MinimongoTest = {};
LocalCollection._applyChanges = function (doc, changeFields) {
_.each(changeFields, function (value, key) {
@@ -42,7 +44,7 @@ LocalCollection._applyChanges = function (doc, changeFields) {
});
};
var MinimongoError = function (message) {
MinimongoError = function (message) {
var e = new Error(message);
e.name = "MinimongoError";
return e;
@@ -216,15 +218,26 @@ LocalCollection.Cursor.prototype._publishCursor = function (sub) {
return Meteor.Collection._publishCursor(self, sub, collection);
};
LocalCollection._isOrderedChanges = function (callbacks) {
LocalCollection._observeChangesCallbacksAreOrdered = function (callbacks) {
if (callbacks.added && callbacks.addedBefore)
throw new Error("Please specify only one of added() and addedBefore()");
return typeof callbacks.addedBefore == 'function' ||
typeof callbacks.movedBefore === 'function';
return !!(callbacks.addedBefore || callbacks.movedBefore);
};
LocalCollection._observeCallbacksAreOrdered = function (callbacks) {
if (callbacks.addedAt && callbacks.added)
throw new Error("Please specify only one of added() and addedAt()");
if (callbacks.changedAt && callbacks.changed)
throw new Error("Please specify only one of changed() and changedAt()");
if (callbacks.removed && callbacks.removedAt)
throw new Error("Please specify only one of removed() and removedAt()");
return !!(callbacks.addedAt || callbacks.movedTo || callbacks.changedAt
|| callbacks.removedAt);
};
// the handle that comes back from observe.
LocalCollection.LiveResultsSet = function () {};
LocalCollection.ObserveHandle = function () {};
// options to contain:
// * callbacks for observe():
@@ -241,7 +254,7 @@ LocalCollection.LiveResultsSet = function () {};
// * collection: the collection this query is querying
//
// iff x is a returned query handle, (x instanceof
// LocalCollection.LiveResultsSet) is true
// LocalCollection.ObserveHandle) is true
//
// initial results delivered through added callback
// XXX maybe callbacks should take a list of objects, to expose transactions?
@@ -255,7 +268,7 @@ _.extend(LocalCollection.Cursor.prototype, {
observeChanges: function (options) {
var self = this;
var ordered = LocalCollection._isOrderedChanges(options);
var ordered = LocalCollection._observeChangesCallbacksAreOrdered(options);
if (!options._allow_unordered && !ordered && (self.skip || self.limit))
throw new Error("must use ordered observe with skip or limit");
@@ -284,8 +297,7 @@ _.extend(LocalCollection.Cursor.prototype, {
query.results_snapshot = (ordered ? [] : {});
// wrap callbacks we were passed. callbacks only fire when not paused and
// are never undefined (except that query.moved is undefined for unordered
// callbacks).
// are never undefined
// Filters out blacklisted fields according to cursor's projection.
// XXX wrong place for this?
@@ -315,7 +327,6 @@ _.extend(LocalCollection.Cursor.prototype, {
query.changed = wrapCallback(options.changed, 1, true);
query.removed = wrapCallback(options.removed);
if (ordered) {
query.moved = wrapCallback(options.moved);
query.addedBefore = wrapCallback(options.addedBefore, 1);
query.movedBefore = wrapCallback(options.movedBefore);
}
@@ -331,7 +342,7 @@ _.extend(LocalCollection.Cursor.prototype, {
});
}
var handle = new LocalCollection.LiveResultsSet;
var handle = new LocalCollection.ObserveHandle;
_.extend(handle, {
collection: self.collection,
stop: function () {
@@ -964,231 +975,6 @@ LocalCollection._makeChangedFields = function (newDoc, oldDoc) {
return fields;
};
LocalCollection._observeFromObserveChanges = function (cursor, callbacks) {
var transform = cursor.getTransform();
if (!transform)
transform = function (doc) {return doc;};
if (callbacks.addedAt && callbacks.added)
throw new Error("Please specify only one of added() and addedAt()");
if (callbacks.changedAt && callbacks.changed)
throw new Error("Please specify only one of changed() and changedAt()");
if (callbacks.removed && callbacks.removedAt)
throw new Error("Please specify only one of removed() and removedAt()");
if (callbacks.addedAt || callbacks.movedTo ||
callbacks.changedAt || callbacks.removedAt)
return LocalCollection._observeOrderedFromObserveChanges(cursor, callbacks, transform);
else
return LocalCollection._observeUnorderedFromObserveChanges(cursor, callbacks, transform);
};
LocalCollection._observeUnorderedFromObserveChanges =
function (cursor, callbacks, transform) {
var docs = {};
var suppressed = !!callbacks._suppress_initial;
var handle = cursor.observeChanges({
added: function (id, fields) {
var strId = LocalCollection._idStringify(id);
var doc = EJSON.clone(fields);
doc._id = id;
docs[strId] = doc;
suppressed || callbacks.added && callbacks.added(transform(doc));
},
changed: function (id, fields) {
var strId = LocalCollection._idStringify(id);
var doc = docs[strId];
var oldDoc = EJSON.clone(doc);
// writes through to the doc set
LocalCollection._applyChanges(doc, fields);
suppressed || callbacks.changed && callbacks.changed(transform(doc), transform(oldDoc));
},
removed: function (id) {
var strId = LocalCollection._idStringify(id);
var doc = docs[strId];
delete docs[strId];
suppressed || callbacks.removed && callbacks.removed(transform(doc));
}
});
suppressed = false;
return handle;
};
LocalCollection._observeOrderedFromObserveChanges =
function (cursor, callbacks, transform) {
var docs = new OrderedDict(LocalCollection._idStringify);
var suppressed = !!callbacks._suppress_initial;
// The "_no_indices" option sets all index arguments to -1
// and skips the linear scans required to generate them.
// This lets observers that don't need absolute indices
// benefit from the other features of this API --
// relative order, transforms, and applyChanges -- without
// the speed hit.
var indices = !callbacks._no_indices;
var handle = cursor.observeChanges({
addedBefore: function (id, fields, before) {
var doc = EJSON.clone(fields);
doc._id = id;
// XXX could `before` be a falsy ID? Technically
// idStringify seems to allow for them -- though
// OrderedDict won't call stringify on a falsy arg.
docs.putBefore(id, doc, before || null);
if (!suppressed) {
if (callbacks.addedAt) {
var index = indices ? docs.indexOf(id) : -1;
callbacks.addedAt(transform(EJSON.clone(doc)),
index, before);
} else if (callbacks.added) {
callbacks.added(transform(EJSON.clone(doc)));
}
}
},
changed: function (id, fields) {
var doc = docs.get(id);
if (!doc)
throw new Error("Unknown id for changed: " + id);
var oldDoc = EJSON.clone(doc);
// writes through to the doc set
LocalCollection._applyChanges(doc, fields);
if (callbacks.changedAt) {
var index = indices ? docs.indexOf(id) : -1;
callbacks.changedAt(transform(EJSON.clone(doc)),
transform(oldDoc), index);
} else if (callbacks.changed) {
callbacks.changed(transform(EJSON.clone(doc)),
transform(oldDoc));
}
},
movedBefore: function (id, before) {
var doc = docs.get(id);
var from;
// only capture indexes if we're going to call the callback that needs them.
if (callbacks.movedTo)
from = indices ? docs.indexOf(id) : -1;
docs.moveBefore(id, before || null);
if (callbacks.movedTo) {
var to = indices ? docs.indexOf(id) : -1;
callbacks.movedTo(transform(EJSON.clone(doc)), from, to,
before || null);
} else if (callbacks.moved) {
callbacks.moved(transform(EJSON.clone(doc)));
}
},
removed: function (id) {
var doc = docs.get(id);
var index;
if (callbacks.removedAt)
index = indices ? docs.indexOf(id) : -1;
docs.remove(id);
callbacks.removedAt && callbacks.removedAt(transform(doc), index);
callbacks.removed && callbacks.removed(transform(doc));
}
});
suppressed = false;
return handle;
};
LocalCollection._compileProjection = function (fields) {
if (!_.isObject(fields))
throw MinimongoError("fields option must be an object");
if (_.any(_.values(fields), function (x) {
return _.indexOf([1, 0, true, false], x) === -1; }))
throw MinimongoError("Projection values should be one of 1, 0, true, or false");
var _idProjection = _.isUndefined(fields._id) ? true : fields._id;
// Find the non-_id keys (_id is handled specially because it is included unless
// explicitly excluded). Sort the keys, so that our code to detect overlaps
// like 'foo' and 'foo.bar' can assume that 'foo' comes first.
var fieldsKeys = _.keys(fields).sort();
// If there are other rules other than '_id', treat '_id' differently in a
// separate case. If '_id' is the only rule, use it to understand if it is
// including/excluding projection.
if (fieldsKeys.length > 0 && !(fieldsKeys.length === 1 && fieldsKeys[0] === '_id'))
fieldsKeys = _.reject(fieldsKeys, function (key) { return key === '_id'; });
var including = null; // Unknown
var projectionRulesTree = {}; // Tree represented as nested objects
_.each(fieldsKeys, function (keyPath) {
var rule = !!fields[keyPath];
if (including === null)
including = rule;
if (including !== rule)
// This error message is copies from MongoDB shell
throw MinimongoError("You cannot currently mix including and excluding fields.");
var treePos = projectionRulesTree;
keyPath = keyPath.split('.');
_.each(keyPath.slice(0, -1), function (key, idx) {
if (!_.has(treePos, key))
treePos[key] = {};
else if (_.isBoolean(treePos[key])) {
// Check passed projection fields' keys: If you have two rules such as
// 'foo.bar' and 'foo.bar.baz', then the result becomes ambiguous. If
// that happens, there is a probability you are doing something wrong,
// framework should notify you about such mistake earlier on cursor
// compilation step than later during runtime. Note, that real mongo
// doesn't do anything about it and the later rule appears in projection
// project, more priority it takes.
//
// Example, assume following in mongo shell:
// > db.coll.insert({ a: { b: 23, c: 44 } })
// > db.coll.find({}, { 'a': 1, 'a.b': 1 })
// { "_id" : ObjectId("520bfe456024608e8ef24af3"), "a" : { "b" : 23 } }
// > db.coll.find({}, { 'a.b': 1, 'a': 1 })
// { "_id" : ObjectId("520bfe456024608e8ef24af3"), "a" : { "b" : 23, "c" : 44 } }
//
// Note, how second time the return set of keys is different.
var currentPath = keyPath.join('.');
var anotherPath = keyPath.slice(0, idx + 1).join('.');
throw MinimongoError("both " + currentPath + " and " + anotherPath +
" found in fields option, using both of them may trigger " +
"unexpected behavior. Did you mean to use only one of them?");
}
treePos = treePos[key];
});
treePos[_.last(keyPath)] = including;
});
// returns transformed doc according to ruleTree
var transform = function (doc, ruleTree) {
// Special case for "sets"
if (_.isArray(doc))
return _.map(doc, function (subdoc) { return transform(subdoc, ruleTree); });
var res = including ? {} : EJSON.clone(doc);
_.each(ruleTree, function (rule, key) {
if (!_.has(doc, key))
return;
if (_.isObject(rule)) {
// For sub-objects/subsets we branch
if (_.isObject(doc[key]))
res[key] = transform(doc[key], rule);
// Otherwise we don't even touch this subfield
} else if (including)
res[key] = doc[key];
else
delete res[key];
});
return res;
};
return function (obj) {
var res = transform(obj, projectionRulesTree);
if (_idProjection && _.has(obj, '_id'))
res._id = obj._id;
if (!_idProjection && _.has(res, '_id'))
delete res._id;
return res;
};
};
// Searches $near operator in the selector recursively
// (including all $or/$and/$nor/$not branches)
var isGeoQuery = function (selector) {

View File

@@ -0,0 +1,457 @@
Tinytest.add("minimongo - modifier affects selector", function (test) {
function testSelectorPaths (sel, paths, desc) {
test.isTrue(_.isEqual(MinimongoTest.getSelectorPaths(sel), paths), desc);
}
testSelectorPaths({
foo: {
bar: 3,
baz: 42
}
}, ['foo'], "literal");
testSelectorPaths({
foo: 42,
bar: 33
}, ['foo', 'bar'], "literal");
testSelectorPaths({
foo: [ 'something' ],
bar: "asdf"
}, ['foo', 'bar'], "literal");
testSelectorPaths({
a: { $lt: 3 },
b: "you know, literal",
'path.is.complicated': { $not: { $regex: 'acme.*corp' } }
}, ['a', 'b', 'path.is.complicated'], "literal + operators");
testSelectorPaths({
$or: [{ 'a.b': 1 }, { 'a.b.c': { $lt: 22 } },
{$and: [{ 'x.d': { $ne: 5, $gte: 433 } }, { 'a.b': 234 }]}]
}, ['a.b', 'a.b.c', 'x.d'], 'group operators + duplicates');
// When top-level value is an object, it is treated as a literal,
// so when you query col.find({ a: { foo: 1, bar: 2 } })
// it doesn't mean you are looking for anything that has 'a.foo' to be 1 and
// 'a.bar' to be 2, instead you are looking for 'a' to be exatly that object
// with exatly that order of keys. { a: { foo: 1, bar: 2, baz: 3 } } wouldn't
// match it. That's why in this selector 'a' would be important key, not a.foo
// and a.bar.
testSelectorPaths({
a: {
foo: 1,
bar: 2
},
'b.c': {
literal: "object",
but: "we still observe any changes in 'b.c'"
}
}, ['a', 'b.c'], "literal object");
function testSelectorAffectedByModifier (sel, mod, yes, desc) {
if (yes)
test.isTrue(LocalCollection._isSelectorAffectedByModifier(sel, mod, desc));
else
test.isFalse(LocalCollection._isSelectorAffectedByModifier(sel, mod, desc));
}
function affected(sel, mod, desc) {
testSelectorAffectedByModifier(sel, mod, 1, desc);
}
function notAffected(sel, mod, desc) {
testSelectorAffectedByModifier(sel, mod, 0, desc);
}
notAffected({ foo: 0 }, { $set: { bar: 1 } }, "simplest");
affected({ foo: 0 }, { $set: { foo: 1 } }, "simplest");
affected({ foo: 0 }, { $set: { 'foo.bar': 1 } }, "simplest");
notAffected({ 'foo.bar': 0 }, { $set: { 'foo.baz': 1 } }, "simplest");
affected({ 'foo.bar': 0 }, { $set: { 'foo.1': 1 } }, "simplest");
affected({ 'foo.bar': 0 }, { $set: { 'foo.2.bar': 1 } }, "simplest");
notAffected({ 'foo': 0 }, { $set: { 'foobaz': 1 } }, "correct prefix check");
notAffected({ 'foobar': 0 }, { $unset: { 'foo': 1 } }, "correct prefix check");
notAffected({ 'foo.bar': 0 }, { $unset: { 'foob': 1 } }, "correct prefix check");
notAffected({ 'foo.Infinity.x': 0 }, { $unset: { 'foo.x': 1 } }, "we convert integer fields correctly");
notAffected({ 'foo.1e3.x': 0 }, { $unset: { 'foo.x': 1 } }, "we convert integer fields correctly");
affected({ 'foo.3.bar': 0 }, { $set: { 'foo.3.bar': 1 } }, "observe for an array element");
notAffected({ 'foo.4.bar.baz': 0 }, { $unset: { 'foo.3.bar': 1 } }, "delicate work with numeric fields in selector");
notAffected({ 'foo.4.bar.baz': 0 }, { $unset: { 'foo.bar': 1 } }, "delicate work with numeric fields in selector");
affected({ 'foo.4.bar.baz': 0 }, { $unset: { 'foo.4.bar': 1 } }, "delicate work with numeric fields in selector");
affected({ 'foo.bar.baz': 0 }, { $unset: { 'foo.3.bar': 1 } }, "delicate work with numeric fields in selector");
affected({ 'foo.0.bar': 0 }, { $set: { 'foo.0.0.bar': 1 } }, "delicate work with nested arrays and selectors by indecies");
});
Tinytest.add("minimongo - selector and projection combination", function (test) {
function testSelProjectionComb (sel, proj, expected, desc) {
test.equal(LocalCollection._combineSelectorAndProjection(sel, proj), expected, desc);
}
// Test with inclusive projection
testSelProjectionComb({ a: 1, b: 2 }, { b: 1, c: 1, d: 1 }, { a: true, b: true, c: true, d: true }, "simplest incl");
testSelProjectionComb({ $or: [{ a: 1234, e: {$lt: 5} }], b: 2 }, { b: 1, c: 1, d: 1 }, { a: true, b: true, c: true, d: true, e: true }, "simplest incl, branching");
testSelProjectionComb({
'a.b': { $lt: 3 },
'y.0': -1,
'a.c': 15
}, {
'd': 1,
'z': 1
}, {
'a.b': true,
'y': true,
'a.c': true,
'd': true,
'z': true
}, "multikey paths in selector - incl");
testSelProjectionComb({
foo: 1234,
$and: [{ k: -1 }, { $or: [{ b: 15 }] }]
}, {
'foo.bar': 1,
'foo.zzz': 1,
'b.asdf': 1
}, {
foo: true,
b: true,
k: true
}, "multikey paths in fields - incl");
testSelProjectionComb({
'a.b.c': 123,
'a.b.d': 321,
'b.c.0': 111,
'a.e': 12345
}, {
'a.b.z': 1,
'a.b.d.g': 1,
'c.c.c': 1
}, {
'a.b.c': true,
'a.b.d': true,
'a.b.z': true,
'b.c': true,
'a.e': true,
'c.c.c': true
}, "multikey both paths - incl");
testSelProjectionComb({
'a.b.c.d': 123,
'a.b1.c.d': 421,
'a.b.c.e': 111
}, {
'a.b': 1
}, {
'a.b': true,
'a.b1.c.d': true
}, "shadowing one another - incl");
testSelProjectionComb({
'a.b': 123,
'foo.bar': false
}, {
'a.b.c.d': 1,
'foo': 1
}, {
'a.b': true,
'foo': true
}, "shadowing one another - incl");
testSelProjectionComb({
'a.b.c': 1
}, {
'a.b.c': 1
}, {
'a.b.c': true
}, "same paths - incl");
testSelProjectionComb({
'x.4.y': 42,
'z.0.1': 33
}, {
'x.x': 1
}, {
'x.x': true,
'x.y': true,
'z': true
}, "numbered keys in selector - incl");
testSelProjectionComb({
'a.b.c': 42,
$where: function () { return true; }
}, {
'a.b': 1,
'z.z': 1
}, {}, "$where in the selector - incl");
testSelProjectionComb({
$or: [
{'a.b.c': 42},
{$where: function () { return true; } }
]
}, {
'a.b': 1,
'z.z': 1
}, {}, "$where in the selector - incl");
// Test with exclusive projection
testSelProjectionComb({ a: 1, b: 2 }, { b: 0, c: 0, d: 0 }, { c: false, d: false }, "simplest excl");
testSelProjectionComb({ $or: [{ a: 1234, e: {$lt: 5} }], b: 2 }, { b: 0, c: 0, d: 0 }, { c: false, d: false }, "simplest excl, branching");
testSelProjectionComb({
'a.b': { $lt: 3 },
'y.0': -1,
'a.c': 15
}, {
'd': 0,
'z': 0
}, {
d: false,
z: false
}, "multikey paths in selector - excl");
testSelProjectionComb({
foo: 1234,
$and: [{ k: -1 }, { $or: [{ b: 15 }] }]
}, {
'foo.bar': 0,
'foo.zzz': 0,
'b.asdf': 0
}, {
}, "multikey paths in fields - excl");
testSelProjectionComb({
'a.b.c': 123,
'a.b.d': 321,
'b.c.0': 111,
'a.e': 12345
}, {
'a.b.z': 0,
'a.b.d.g': 0,
'c.c.c': 0
}, {
'a.b.z': false,
'c.c.c': false
}, "multikey both paths - excl");
testSelProjectionComb({
'a.b.c.d': 123,
'a.b1.c.d': 421,
'a.b.c.e': 111
}, {
'a.b': 0
}, {
}, "shadowing one another - excl");
testSelProjectionComb({
'a.b': 123,
'foo.bar': false
}, {
'a.b.c.d': 0,
'foo': 0
}, {
}, "shadowing one another - excl");
testSelProjectionComb({
'a.b.c': 1
}, {
'a.b.c': 0
}, {
}, "same paths - excl");
testSelProjectionComb({
'a.b': 123,
'a.c.d': 222,
'ddd': 123
}, {
'a.b': 0,
'a.c.e': 0,
'asdf': 0
}, {
'a.c.e': false,
'asdf': false
}, "intercept the selector path - excl");
testSelProjectionComb({
'a.b.c': 14
}, {
'a.b.d': 0
}, {
'a.b.d': false
}, "different branches - excl");
testSelProjectionComb({
'a.b.c.d': "124",
'foo.bar.baz.que': "some value"
}, {
'a.b.c.d.e': 0,
'foo.bar': 0
}, {
}, "excl on incl paths - excl");
testSelProjectionComb({
'x.4.y': 42,
'z.0.1': 33
}, {
'x.x': 0,
'x.y': 0
}, {
'x.x': false,
}, "numbered keys in selector - excl");
testSelProjectionComb({
'a.b.c': 42,
$where: function () { return true; }
}, {
'a.b': 0,
'z.z': 0
}, {}, "$where in the selector - excl");
testSelProjectionComb({
$or: [
{'a.b.c': 42},
{$where: function () { return true; } }
]
}, {
'a.b': 0,
'z.z': 0
}, {}, "$where in the selector - excl");
});
(function () {
// TODO: Tests for "can selector become true by modifier" are incomplete,
// absent or test the functionality of "not ideal" implementation (test checks
// that certain case always returns true as implementation is incomplete)
// - tests with $and/$or/$nor/$not branches (are absent)
// - more tests with arrays fields and numeric keys (incomplete and test "not
// ideal" implementation)
// - tests when numeric keys actually mean numeric keys, not array indexes
// (are absent)
// - tests with $-operators in the selector (are incomplete and test "not
// ideal" implementation)
var test = null; // set this global in the beginning of every test
// T - should return true
// F - should return false
function T (sel, mod, desc) {
test.isTrue(LocalCollection._canSelectorBecomeTrueByModifier(sel, mod), desc);
}
function F (sel, mod, desc) {
test.isFalse(LocalCollection._canSelectorBecomeTrueByModifier(sel, mod), desc);
}
Tinytest.add("minimongo - can selector become true by modifier - literals (structured tests)", function (t) {
test = t;
var selector = {
'a.b.c': 2,
'foo.bar': {
z: { y: 1 }
},
'foo.baz': [ {ans: 42}, "string", false, undefined ],
'empty.field': null
};
T(selector, {$set:{ 'a.b.c': 2 }});
F(selector, {$unset:{ 'a': 1 }});
F(selector, {$unset:{ 'a.b': 1 }});
F(selector, {$unset:{ 'a.b.c': 1 }});
T(selector, {$set:{ 'a.b': { c: 2 } }});
F(selector, {$set:{ 'a.b': {} }});
T(selector, {$set:{ 'a.b': { c: 2, x: 5 } }});
F(selector, {$set:{ 'a.b.c.k': 3 }});
F(selector, {$set:{ 'a.b.c.k': {} }});
F(selector, {$unset:{ 'foo': 1 }});
F(selector, {$unset:{ 'foo.bar': 1 }});
F(selector, {$unset:{ 'foo.bar.z': 1 }});
F(selector, {$unset:{ 'foo.bar.z.y': 1 }});
F(selector, {$set:{ 'foo.bar.x': 1 }});
F(selector, {$set:{ 'foo.bar': {} }});
F(selector, {$set:{ 'foo.bar': 3 }});
T(selector, {$set:{ 'foo.bar': { z: { y: 1 } } }});
T(selector, {$set:{ 'foo.bar.z': { y: 1 } }});
T(selector, {$set:{ 'foo.bar.z.y': 1 }});
F(selector, {$set:{ 'empty.field': {} }});
T(selector, {$set:{ 'empty': {} }});
T(selector, {$set:{ 'empty.field': null }});
T(selector, {$set:{ 'empty.field': undefined }});
F(selector, {$set:{ 'empty.field.a': 3 }});
});
Tinytest.add("minimongo - can selector become true by modifier - literals (adhoc tests)", function (t) {
test = t;
T({x:1}, {$set:{x:1}}, "simple set scalar");
T({x:"a"}, {$set:{x:"a"}}, "simple set scalar");
T({x:false}, {$set:{x:false}}, "simple set scalar");
F({x:true}, {$set:{x:false}}, "simple set scalar");
F({x:2}, {$set:{x:3}}, "simple set scalar");
F({'foo.bar.baz': 1, x:1}, {$unset:{'foo.bar.baz': 1}, $set:{x:1}}, "simple unset of the interesting path");
F({'foo.bar.baz': 1, x:1}, {$unset:{'foo.bar': 1}, $set:{x:1}}, "simple unset of the interesting path prefix");
F({'foo.bar.baz': 1, x:1}, {$unset:{'foo': 1}, $set:{x:1}}, "simple unset of the interesting path prefix");
F({'foo.bar.baz': 1}, {$unset:{'foo.baz': 1}}, "simple unset of the interesting path prefix");
F({'foo.bar.baz': 1}, {$unset:{'foo.bar.bar': 1}}, "simple unset of the interesting path prefix");
});
Tinytest.add("minimongo - can selector become true by modifier - regexps", function (t) {
test = t;
// Regexp
T({ 'foo.bar': /^[0-9]+$/i }, { $set: {'foo.bar': '01233'} }, "set of regexp");
// XXX this test should be False, should be fixed within improved implementation
T({ 'foo.bar': /^[0-9]+$/i, x: 1 }, { $set: {'foo.bar': '0a1233', x: 1} }, "set of regexp");
// XXX this test should be False, should be fixed within improved implementation
T({ 'foo.bar': /^[0-9]+$/i, x: 1 }, { $unset: {'foo.bar': 1}, $set: { x: 1 } }, "unset of regexp");
T({ 'foo.bar': /^[0-9]+$/i, x: 1 }, { $set: { x: 1 } }, "don't touch regexp");
});
Tinytest.add("minimongo - can selector become true by modifier - undefined/null", function (t) {
test = t;
// Nulls / Undefined
T({ 'foo.bar': null }, {$set:{'foo.bar': null}}, "set of null looking for null");
T({ 'foo.bar': null }, {$set:{'foo.bar': undefined}}, "set of undefined looking for null");
T({ 'foo.bar': undefined }, {$set:{'foo.bar': null}}, "set of null looking for undefined");
T({ 'foo.bar': undefined }, {$set:{'foo.bar': undefined}}, "set of undefined looking for undefined");
T({ 'foo.bar': null }, {$set:{'foo': null}}, "set of null of parent path looking for null");
F({ 'foo.bar': null }, {$set:{'foo.bar.baz': null}}, "set of null of different path looking for null");
T({ 'foo.bar': null }, { $unset: { 'foo': 1 } }, "unset the parent");
T({ 'foo.bar': null }, { $unset: { 'foo.bar': 1 } }, "unset tracked path");
T({ 'foo.bar': null }, { $set: { 'foo': 3 } }, "set the parent");
T({ 'foo.bar': null }, { $set: { 'foo': {baz:1} } }, "set the parent");
});
Tinytest.add("minimongo - can selector become true by modifier - literals with arrays", function (t) {
test = t;
// These tests are incomplete and in theory they all should return true as we
// don't support any case with numeric fields yet.
T({'a.1.b': 1, x:1}, {$unset:{'a.1.b': 1}, $set:{x:1}}, "unset of array element's field with exactly the same index as selector");
F({'a.2.b': 1}, {$unset:{'a.1.b': 1}}, "unset of array element's field with different index as selector");
// This is false, because if you are looking for array but in reality it is an
// object, it just can't get to true.
F({'a.2.b': 1}, {$unset:{'a.b': 1}}, "unset of field while selector is looking for index");
T({ 'foo.bar': null }, {$set:{'foo.1.bar': null}}, "set array's element's field to null looking for null");
T({ 'foo.bar': null }, {$set:{'foo.0.bar': 1, 'foo.1.bar': null}}, "set array's element's field to null looking for null");
// This is false, because there may remain other array elements that match
// but we modified this test as we don't support this case yet
T({'a.b': 1}, {$unset:{'a.1.b': 1}}, "unset of array element's field");
});
Tinytest.add("minimongo - can selector become true by modifier - set an object literal whose fields are selected", function (t) {
test = t;
T({ 'a.b.c': 1 }, { $set: { 'a.b': { c: 1 } } }, "a simple scalar selector and simple set");
F({ 'a.b.c': 1 }, { $set: { 'a.b': { c: 2 } } }, "a simple scalar selector and simple set to false");
F({ 'a.b.c': 1 }, { $set: { 'a.b': { d: 1 } } }, "a simple scalar selector and simple set a wrong literal");
F({ 'a.b.c': 1 }, { $set: { 'a.b': 222 } }, "a simple scalar selector and simple set a wrong type");
});
})();

View File

@@ -264,7 +264,7 @@ Tinytest.add("minimongo - lookup", function (test) {
Tinytest.add("minimongo - selector_compiler", function (test) {
var matches = function (should_match, selector, doc) {
var does_match = LocalCollection._matches(selector, doc);
var does_match = MinimongoTest.matches(selector, doc);
if (does_match != should_match) {
// XXX super janky
test.fail({type: "minimongo-ordering",
@@ -1016,6 +1016,12 @@ Tinytest.add("minimongo - projection_compiler", function (test) {
"_id blacklisted, no _id"]
]);
testProjection({}, [
[{ a: 1, b: 2, c: "3" },
{ a: 1, b: 2, c: "3" },
"empty projection"]
]);
test.throws(function () {
testProjection({ 'inc': 1, 'excl': 0 }, [
[ { inc: 42, excl: 42 }, { inc: 42 }, "Can't combine incl/excl rules" ]
@@ -1116,6 +1122,18 @@ Tinytest.add("minimongo - fetch with fields", function (test) {
if (!i) return;
test.isTrue(x.i === arr[i-1].i + 1);
});
// Temporary unsupported operators
// queries are taken from MongoDB docs examples
test.throws(function () {
c.find({}, { fields: { 'grades.$': 1 } });
});
test.throws(function () {
c.find({}, { fields: { grades: { $elemMatch: { mean: 70 } } } });
});
test.throws(function () {
c.find({}, { fields: { grades: { $slice: [20, 10] } } });
});
});
Tinytest.add("minimongo - fetch with projection, subarrays", function (test) {
@@ -1175,6 +1193,37 @@ Tinytest.add("minimongo - fetch with projection, subarrays", function (test) {
{a: [ [ { c: 2 }, { c: 4 } ], { c: 5 }, [ { c: 9 } ] ] });
});
Tinytest.add("minimongo - fetch with projection, deep copy", function (test) {
// Compiled fields projection defines the contract: returned document doesn't
// retain anything from the passed argument.
var doc = {
a: { x: 42 },
b: {
y: { z: 33 }
},
c: "asdf"
};
var fields = {
'a': 1,
'b.y': 1
};
var projectionFn = LocalCollection._compileProjection(fields);
var filteredDoc = projectionFn(doc);
doc.a.x++;
doc.b.y.z--;
test.equal(filteredDoc.a.x, 42, "projection returning deep copy - including");
test.equal(filteredDoc.b.y.z, 33, "projection returning deep copy - including");
fields = { c: 0 };
projectionFn = LocalCollection._compileProjection(fields);
filteredDoc = projectionFn(doc);
doc.a.x = 5;
test.equal(filteredDoc.a.x, 43, "projection returning deep copy - excluding");
});
Tinytest.add("minimongo - observe ordered with projection", function (test) {
// These tests are copy-paste from "minimongo -observe ordered",
// slightly modified to test projection

View File

@@ -23,14 +23,16 @@ LocalCollection._modify = function (doc, mod, isInsert) {
if (!is_modifier) {
if (mod._id && !EJSON.equals(doc._id, mod._id))
throw Error("Cannot change the _id of a document");
throw MinimongoError("Cannot change the _id of a document");
// replace the whole document
for (var k in mod) {
if (k.substr(0, 1) === '$')
throw Error("When replacing document, field name may not start with '$'");
throw MinimongoError(
"When replacing document, field name may not start with '$'");
if (/\./.test(k))
throw Error("When replacing document, field name may not contain '.'");
throw MinimongoError(
"When replacing document, field name may not contain '.'");
}
new_doc = mod;
} else {
@@ -43,12 +45,13 @@ LocalCollection._modify = function (doc, mod, isInsert) {
if (isInsert && op === '$setOnInsert')
mod_func = LocalCollection._modifiers['$set'];
if (!mod_func)
throw Error("Invalid modifier specified " + op);
throw MinimongoError("Invalid modifier specified " + op);
for (var keypath in mod[op]) {
// XXX mongo doesn't allow mod field names to end in a period,
// but I don't see why.. it allows '' as a key, as does JS
if (keypath.length && keypath[keypath.length-1] === '.')
throw Error("Invalid mod field name, may not end in a period");
throw MinimongoError(
"Invalid mod field name, may not end in a period");
var arg = mod[op][keypath];
var keyparts = keypath.split('.');
@@ -101,7 +104,8 @@ LocalCollection._findModTarget = function (doc, keyparts, no_create,
if (forbid_array)
return null;
if (!numeric)
throw Error("can't append to array using string field name ["
throw MinimongoError(
"can't append to array using string field name ["
+ keypart + "]");
keypart = parseInt(keypart);
if (last)
@@ -113,7 +117,7 @@ LocalCollection._findModTarget = function (doc, keyparts, no_create,
if (doc.length === keypart)
doc.push({});
else if (typeof doc[keypart] !== "object")
throw Error("can't modify field '" + keyparts[i + 1] +
throw MinimongoError("can't modify field '" + keyparts[i + 1] +
"' of list value " + JSON.stringify(doc[keypart]));
}
} else {
@@ -141,18 +145,28 @@ LocalCollection._noCreateModifiers = {
LocalCollection._modifiers = {
$inc: function (target, field, arg) {
if (typeof arg !== "number")
throw Error("Modifier $inc allowed for numbers only");
throw MinimongoError("Modifier $inc allowed for numbers only");
if (field in target) {
if (typeof target[field] !== "number")
throw Error("Cannot apply $inc modifier to non-number");
throw MinimongoError("Cannot apply $inc modifier to non-number");
target[field] += arg;
} else {
target[field] = arg;
}
},
$set: function (target, field, arg) {
if (!_.isObject(target)) { // not an array or an object
var e = MinimongoError("Cannot set property on non-object field");
e.setPropertyError = true;
throw e;
}
if (target === null) {
var e = MinimongoError("Cannot set property on null");
e.setPropertyError = true;
throw e;
}
if (field === '_id' && !EJSON.equals(arg, target._id))
throw Error("Cannot change the _id of a document");
throw MinimongoError("Cannot change the _id of a document");
target[field] = EJSON.clone(arg);
},
@@ -172,7 +186,7 @@ LocalCollection._modifiers = {
if (target[field] === undefined)
target[field] = [];
if (!(target[field] instanceof Array))
throw Error("Cannot apply $push modifier to non-array");
throw MinimongoError("Cannot apply $push modifier to non-array");
if (!(arg && arg.$each)) {
// Simple mode: not $each
@@ -183,16 +197,16 @@ LocalCollection._modifiers = {
// Fancy mode: $each (and maybe $slice and $sort)
var toPush = arg.$each;
if (!(toPush instanceof Array))
throw Error("$each must be an array");
throw MinimongoError("$each must be an array");
// Parse $slice.
var slice = undefined;
if ('$slice' in arg) {
if (typeof arg.$slice !== "number")
throw Error("$slice must be a numeric value");
throw MinimongoError("$slice must be a numeric value");
// XXX should check to make sure integer
if (arg.$slice > 0)
throw Error("$slice in $push must be zero or negative");
throw MinimongoError("$slice in $push must be zero or negative");
slice = arg.$slice;
}
@@ -200,14 +214,14 @@ LocalCollection._modifiers = {
var sortFunction = undefined;
if (arg.$sort) {
if (slice === undefined)
throw Error("$sort requires $slice to be present");
throw MinimongoError("$sort requires $slice to be present");
// XXX this allows us to use a $sort whose value is an array, but that's
// actually an extension of the Node driver, so it won't work
// server-side. Could be confusing!
sortFunction = LocalCollection._compileSort(arg.$sort);
for (var i = 0; i < toPush.length; i++) {
if (LocalCollection._f._type(toPush[i]) !== 3) {
throw Error("$push like modifiers using $sort " +
throw MinimongoError("$push like modifiers using $sort " +
"require all elements to be objects");
}
}
@@ -231,12 +245,12 @@ LocalCollection._modifiers = {
},
$pushAll: function (target, field, arg) {
if (!(typeof arg === "object" && arg instanceof Array))
throw Error("Modifier $pushAll/pullAll allowed for arrays only");
throw MinimongoError("Modifier $pushAll/pullAll allowed for arrays only");
var x = target[field];
if (x === undefined)
target[field] = arg;
else if (!(x instanceof Array))
throw Error("Cannot apply $pushAll modifier to non-array");
throw MinimongoError("Cannot apply $pushAll modifier to non-array");
else {
for (var i = 0; i < arg.length; i++)
x.push(arg[i]);
@@ -247,7 +261,7 @@ LocalCollection._modifiers = {
if (x === undefined)
target[field] = [arg];
else if (!(x instanceof Array))
throw Error("Cannot apply $addToSet modifier to non-array");
throw MinimongoError("Cannot apply $addToSet modifier to non-array");
else {
var isEach = false;
if (typeof arg === "object") {
@@ -273,7 +287,7 @@ LocalCollection._modifiers = {
if (x === undefined)
return;
else if (!(x instanceof Array))
throw Error("Cannot apply $pop modifier to non-array");
throw MinimongoError("Cannot apply $pop modifier to non-array");
else {
if (typeof arg === 'number' && arg < 0)
x.splice(0, 1);
@@ -288,7 +302,7 @@ LocalCollection._modifiers = {
if (x === undefined)
return;
else if (!(x instanceof Array))
throw Error("Cannot apply $pull/pullAll modifier to non-array");
throw MinimongoError("Cannot apply $pull/pullAll modifier to non-array");
else {
var out = []
if (typeof arg === "object" && !(arg instanceof Array)) {
@@ -315,14 +329,14 @@ LocalCollection._modifiers = {
},
$pullAll: function (target, field, arg) {
if (!(typeof arg === "object" && arg instanceof Array))
throw Error("Modifier $pushAll/pullAll allowed for arrays only");
throw MinimongoError("Modifier $pushAll/pullAll allowed for arrays only");
if (target === undefined)
return;
var x = target[field];
if (x === undefined)
return;
else if (!(x instanceof Array))
throw Error("Cannot apply $pull/pullAll modifier to non-array");
throw MinimongoError("Cannot apply $pull/pullAll modifier to non-array");
else {
var out = []
for (var i = 0; i < x.length; i++) {
@@ -342,11 +356,11 @@ LocalCollection._modifiers = {
$rename: function (target, field, arg, keypath, doc) {
if (keypath === arg)
// no idea why mongo has this restriction..
throw Error("$rename source must differ from target");
throw MinimongoError("$rename source must differ from target");
if (target === null)
throw Error("$rename source field invalid");
throw MinimongoError("$rename source field invalid");
if (typeof arg !== "string")
throw Error("$rename target must be a string");
throw MinimongoError("$rename target must be a string");
if (target === undefined)
return;
var v = target[field];
@@ -355,14 +369,14 @@ LocalCollection._modifiers = {
var keyparts = arg.split('.');
var target2 = LocalCollection._findModTarget(doc, keyparts, false, true);
if (target2 === null)
throw Error("$rename target field invalid");
throw MinimongoError("$rename target field invalid");
var field2 = keyparts.pop();
target2[field2] = v;
},
$bit: function (target, field, arg) {
// XXX mongo only supports $bit on integers, and we only support
// native javascript numbers (doubles) so far, so we can't support $bit
throw Error("$bit is not supported");
throw MinimongoError("$bit is not supported");
}
};
@@ -373,3 +387,4 @@ LocalCollection._removeDollarOperators = function (selector) {
selectorDoc[k] = selector[k];
return selectorDoc;
};

View File

@@ -0,0 +1,179 @@
// XXX maybe move these into another ObserveHelpers package or something
// _CachingChangeObserver is an object which receives observeChanges callbacks
// and keeps a cache of the current cursor state up to date in self.docs. Users
// of this class should read the docs field but not modify it. You should pass
// the "applyChange" field as the callbacks to the underlying observeChanges
// call. Optionally, you can specify your own observeChanges callbacks which are
// invoked immediately before the docs field is updated; this object is made
// available as `this` to those callbacks.
LocalCollection._CachingChangeObserver = function (options) {
var self = this;
options = options || {};
var orderedFromCallbacks = options.callbacks &&
LocalCollection._observeChangesCallbacksAreOrdered(options.callbacks);
if (_.has(options, 'ordered')) {
self.ordered = options.ordered;
if (options.callbacks && options.ordered !== orderedFromCallbacks)
throw Error("ordered option doesn't match callbacks");
} else if (options.callbacks) {
self.ordered = orderedFromCallbacks;
} else {
throw Error("must provide ordered or callbacks");
}
var callbacks = options.callbacks || {};
if (self.ordered) {
self.docs = new OrderedDict(LocalCollection._idStringify);
self.applyChange = {
addedBefore: function (id, fields, before) {
var doc = EJSON.clone(fields);
doc._id = id;
callbacks.addedBefore && callbacks.addedBefore.call(
self, id, fields, before);
// This line triggers if we provide added with movedBefore.
callbacks.added && callbacks.added.call(self, id, fields);
// XXX could `before` be a falsy ID? Technically
// idStringify seems to allow for them -- though
// OrderedDict won't call stringify on a falsy arg.
self.docs.putBefore(id, doc, before || null);
},
movedBefore: function (id, before) {
var doc = self.docs.get(id);
callbacks.movedBefore && callbacks.movedBefore.call(self, id, before);
self.docs.moveBefore(id, before || null);
}
};
} else {
self.docs = new LocalCollection._IdMap;
self.applyChange = {
added: function (id, fields) {
var doc = EJSON.clone(fields);
callbacks.added && callbacks.added.call(self, id, fields);
doc._id = id;
self.docs.set(id, doc);
}
};
}
// The methods in _IdMap and OrderedDict used by these callbacks are
// identical.
self.applyChange.changed = function (id, fields) {
var doc = self.docs.get(id);
if (!doc)
throw new Error("Unknown id for changed: " + id);
callbacks.changed && callbacks.changed.call(
self, id, EJSON.clone(fields));
LocalCollection._applyChanges(doc, fields);
};
self.applyChange.removed = function (id) {
callbacks.removed && callbacks.removed.call(self, id);
self.docs.remove(id);
};
};
LocalCollection._observeFromObserveChanges = function (cursor, observeCallbacks) {
var transform = cursor.getTransform() || function (doc) {return doc;};
var suppressed = !!observeCallbacks._suppress_initial;
var observeChangesCallbacks;
if (LocalCollection._observeCallbacksAreOrdered(observeCallbacks)) {
// The "_no_indices" option sets all index arguments to -1 and skips the
// linear scans required to generate them. This lets observers that don't
// need absolute indices benefit from the other features of this API --
// relative order, transforms, and applyChanges -- without the speed hit.
var indices = !observeCallbacks._no_indices;
observeChangesCallbacks = {
addedBefore: function (id, fields, before) {
var self = this;
if (suppressed || !(observeCallbacks.addedAt || observeCallbacks.added))
return;
var doc = transform(_.extend(fields, {_id: id}));
if (observeCallbacks.addedAt) {
var index = indices
? (before ? self.docs.indexOf(before) : self.docs.size()) : -1;
observeCallbacks.addedAt(doc, index, before);
} else {
observeCallbacks.added(doc);
}
},
changed: function (id, fields) {
var self = this;
if (!(observeCallbacks.changedAt || observeCallbacks.changed))
return;
var doc = EJSON.clone(self.docs.get(id));
if (!doc)
throw new Error("Unknown id for changed: " + id);
var oldDoc = transform(EJSON.clone(doc));
LocalCollection._applyChanges(doc, fields);
doc = transform(doc);
if (observeCallbacks.changedAt) {
var index = indices ? self.docs.indexOf(id) : -1;
observeCallbacks.changedAt(doc, oldDoc, index);
} else {
observeCallbacks.changed(doc, oldDoc);
}
},
movedBefore: function (id, before) {
var self = this;
if (!observeCallbacks.movedTo)
return;
var from = indices ? self.docs.indexOf(id) : -1;
var to = indices
? (before ? self.docs.indexOf(before) : self.docs.size()) : -1;
// When not moving backwards, adjust for the fact that removing the
// document slides everything back one slot.
if (to > from)
--to;
observeCallbacks.movedTo(transform(EJSON.clone(self.docs.get(id))),
from, to, before || null);
},
removed: function (id) {
var self = this;
if (!(observeCallbacks.removedAt || observeCallbacks.removed))
return;
// technically maybe there should be an EJSON.clone here, but it's about
// to be removed from self.docs!
var doc = transform(self.docs.get(id));
if (observeCallbacks.removedAt) {
var index = indices ? self.docs.indexOf(id) : -1;
observeCallbacks.removedAt(doc, index);
} else {
observeCallbacks.removed(doc);
}
}
};
} else {
observeChangesCallbacks = {
added: function (id, fields) {
if (!suppressed && observeCallbacks.added) {
var doc = _.extend(fields, {_id: id});
observeCallbacks.added(transform(doc));
}
},
changed: function (id, fields) {
var self = this;
if (observeCallbacks.changed) {
var oldDoc = self.docs.get(id);
var doc = EJSON.clone(oldDoc);
LocalCollection._applyChanges(doc, fields);
observeCallbacks.changed(transform(doc), transform(oldDoc));
}
},
removed: function (id) {
var self = this;
if (observeCallbacks.removed) {
observeCallbacks.removed(transform(self.docs.get(id)));
}
}
};
}
var changeObserver = new LocalCollection._CachingChangeObserver(
{callbacks: observeChangesCallbacks});
var handle = cursor.observeChanges(changeObserver.applyChange);
suppressed = false;
return handle;
};

View File

@@ -5,6 +5,7 @@ Package.describe({
Package.on_use(function (api) {
api.export('LocalCollection');
api.export('MinimongoTest', { testOnly: true });
api.use(['underscore', 'json', 'ejson', 'ordered-dict', 'deps',
'random', 'ordered-dict']);
// This package is used for geo-location queries such as $near
@@ -12,17 +13,26 @@ Package.on_use(function (api) {
api.add_files([
'minimongo.js',
'selector.js',
'projection.js',
'modify.js',
'diff.js',
'id_map.js',
'observe.js',
'objectid.js'
]);
// Functionality used only by oplog tailing on the server side
api.add_files([
'selector_projection.js',
'selector_modifier.js'
], 'server');
});
Package.on_test(function (api) {
api.use('geojson-utils', 'client');
api.use('minimongo', 'client');
api.use('minimongo', ['client', 'server']);
api.use('test-helpers', 'client');
api.use(['tinytest', 'underscore', 'ejson', 'ordered-dict',
'random', 'deps']);
api.add_files('minimongo_tests.js', 'client');
api.add_files('minimongo_server_tests.js', 'server');
});

View File

@@ -0,0 +1,168 @@
// Knows how to compile a fields projection to a predicate function.
// @returns - Function: a closure that filters out an object according to the
// fields projection rules:
// @param obj - Object: MongoDB-styled document
// @returns - Object: a document with the fields filtered out
// according to projection rules. Doesn't retain subfields
// of passed argument.
LocalCollection._compileProjection = function (fields) {
LocalCollection._checkSupportedProjection(fields);
var _idProjection = _.isUndefined(fields._id) ? true : fields._id;
var details = projectionDetails(fields);
// returns transformed doc according to ruleTree
var transform = function (doc, ruleTree) {
// Special case for "sets"
if (_.isArray(doc))
return _.map(doc, function (subdoc) { return transform(subdoc, ruleTree); });
var res = details.including ? {} : EJSON.clone(doc);
_.each(ruleTree, function (rule, key) {
if (!_.has(doc, key))
return;
if (_.isObject(rule)) {
// For sub-objects/subsets we branch
if (_.isObject(doc[key]))
res[key] = transform(doc[key], rule);
// Otherwise we don't even touch this subfield
} else if (details.including)
res[key] = EJSON.clone(doc[key]);
else
delete res[key];
});
return res;
};
return function (obj) {
var res = transform(obj, details.tree);
if (_idProjection && _.has(obj, '_id'))
res._id = obj._id;
if (!_idProjection && _.has(res, '_id'))
delete res._id;
return res;
};
};
// Traverses the keys of passed projection and constructs a tree where all
// leaves are either all True or all False
// @returns Object:
// - tree - Object - tree representation of keys involved in projection
// (exception for '_id' as it is a special case handled separately)
// - including - Boolean - "take only certain fields" type of projection
projectionDetails = function (fields) {
// Find the non-_id keys (_id is handled specially because it is included unless
// explicitly excluded). Sort the keys, so that our code to detect overlaps
// like 'foo' and 'foo.bar' can assume that 'foo' comes first.
var fieldsKeys = _.keys(fields).sort();
// If there are other rules other than '_id', treat '_id' differently in a
// separate case. If '_id' is the only rule, use it to understand if it is
// including/excluding projection.
if (fieldsKeys.length > 0 && !(fieldsKeys.length === 1 && fieldsKeys[0] === '_id'))
fieldsKeys = _.reject(fieldsKeys, function (key) { return key === '_id'; });
var including = null; // Unknown
_.each(fieldsKeys, function (keyPath) {
var rule = !!fields[keyPath];
if (including === null)
including = rule;
if (including !== rule)
// This error message is copies from MongoDB shell
throw MinimongoError("You cannot currently mix including and excluding fields.");
});
var projectionRulesTree = pathsToTree(
fieldsKeys,
function (path) { return including; },
function (node, path, fullPath) {
// Check passed projection fields' keys: If you have two rules such as
// 'foo.bar' and 'foo.bar.baz', then the result becomes ambiguous. If
// that happens, there is a probability you are doing something wrong,
// framework should notify you about such mistake earlier on cursor
// compilation step than later during runtime. Note, that real mongo
// doesn't do anything about it and the later rule appears in projection
// project, more priority it takes.
//
// Example, assume following in mongo shell:
// > db.coll.insert({ a: { b: 23, c: 44 } })
// > db.coll.find({}, { 'a': 1, 'a.b': 1 })
// { "_id" : ObjectId("520bfe456024608e8ef24af3"), "a" : { "b" : 23 } }
// > db.coll.find({}, { 'a.b': 1, 'a': 1 })
// { "_id" : ObjectId("520bfe456024608e8ef24af3"), "a" : { "b" : 23, "c" : 44 } }
//
// Note, how second time the return set of keys is different.
var currentPath = fullPath;
var anotherPath = path;
throw MinimongoError("both " + currentPath + " and " + anotherPath +
" found in fields option, using both of them may trigger " +
"unexpected behavior. Did you mean to use only one of them?");
});
return {
tree: projectionRulesTree,
including: including
};
};
// paths - Array: list of mongo style paths
// newLeafFn - Function: of form function(path) should return a scalar value to
// put into list created for that path
// conflictFn - Function: of form function(node, path, fullPath) is called
// when building a tree path for 'fullPath' node on
// 'path' was already a leaf with a value. Must return a
// conflict resolution.
// initial tree - Optional Object: starting tree.
// @returns - Object: tree represented as a set of nested objects
pathsToTree = function (paths, newLeafFn, conflictFn, tree) {
tree = tree || {};
_.each(paths, function (keyPath) {
var treePos = tree;
var pathArr = keyPath.split('.');
// use _.all just for iteration with break
var success = _.all(pathArr.slice(0, -1), function (key, idx) {
if (!_.has(treePos, key))
treePos[key] = {};
else if (!_.isObject(treePos[key])) {
treePos[key] = conflictFn(treePos[key],
pathArr.slice(0, idx + 1).join('.'),
keyPath);
// break out of loop if we are failing for this path
if (!_.isObject(treePos[key]))
return false;
}
treePos = treePos[key];
return true;
});
if (success) {
var lastKey = _.last(pathArr);
if (!_.has(treePos, lastKey))
treePos[lastKey] = newLeafFn(keyPath);
else
treePos[lastKey] = conflictFn(treePos[lastKey], keyPath, keyPath);
}
});
return tree;
};
LocalCollection._checkSupportedProjection = function (fields) {
if (!_.isObject(fields) || _.isArray(fields))
throw MinimongoError("fields option must be an object");
_.each(fields, function (val, keyPath) {
if (_.contains(keyPath.split('.'), '$'))
throw MinimongoError("Minimongo doesn't support $ operator in projections yet.");
if (_.indexOf([1, 0, true, false], val) === -1)
throw MinimongoError("Projection values should be one of 1, 0, true, or false");
});
};

View File

@@ -540,7 +540,7 @@ LocalCollection._f = {
// For unit tests. True if the given document matches the given
// selector.
LocalCollection._matches = function (selector, doc) {
MinimongoTest.matches = function (selector, doc) {
return (LocalCollection._compileSelector(selector))(doc);
};

View File

@@ -0,0 +1,137 @@
// Returns true if the modifier applied to some document may change the result
// of matching the document by selector
// The modifier is always in a form of Object:
// - $set
// - 'a.b.22.z': value
// - 'foo.bar': 42
// - $unset
// - 'abc.d': 1
LocalCollection._isSelectorAffectedByModifier = function (selector, modifier) {
// safe check for $set/$unset being objects
modifier = _.extend({ $set: {}, $unset: {} }, modifier);
var modifiedPaths = _.keys(modifier.$set).concat(_.keys(modifier.$unset));
var meaningfulPaths = getPaths(selector);
return _.any(modifiedPaths, function (path) {
var mod = path.split('.');
return _.any(meaningfulPaths, function (meaningfulPath) {
var sel = meaningfulPath.split('.');
var i = 0, j = 0;
while (i < sel.length && j < mod.length) {
if (numericKey(sel[i]) && numericKey(mod[j])) {
// foo.4.bar selector affected by foo.4 modifier
// foo.3.bar selector unaffected by foo.4 modifier
if (sel[i] === mod[j])
i++, j++;
else
return false;
} else if (numericKey(sel[i])) {
// foo.4.bar selector unaffected by foo.bar modifier
return false;
} else if (numericKey(mod[j])) {
j++;
} else if (sel[i] === mod[j])
i++, j++;
else
return false;
}
// One is a prefix of another, taking numeric fields into account
return true;
});
});
};
getPathsWithoutNumericKeys = function (sel) {
return _.map(getPaths(sel), function (path) {
return _.reject(path.split('.'), numericKey).join('.');
});
};
// @param selector - Object: MongoDB selector. Currently doesn't support
// $-operators and arrays well.
// @param modifier - Object: MongoDB-styled modifier with `$set`s and `$unsets`
// only. (assumed to come from oplog)
// @returns - Boolean: if after applying the modifier, selector can start
// accepting the modified value.
LocalCollection._canSelectorBecomeTrueByModifier = function (selector, modifier)
{
if (!LocalCollection._isSelectorAffectedByModifier(selector, modifier))
return false;
modifier = _.extend({$set:{}, $unset:{}}, modifier);
if (_.any(_.keys(selector), pathHasNumericKeys) ||
_.any(_.keys(modifier.$unset), pathHasNumericKeys) ||
_.any(_.keys(modifier.$set), pathHasNumericKeys))
return true;
if (!isLiteralSelector(selector))
return true;
// convert a selector into an object matching the selector
// { 'a.b': { ans: 42 }, 'foo.bar': null, 'foo.baz': "something" }
// => { a: { b: { ans: 42 } }, foo: { bar: null, baz: "something" } }
var doc = pathsToTree(_.keys(selector),
function (path) { return selector[path]; },
_.identity /*conflict resolution is no resolution*/);
var selectorFn = LocalCollection._compileSelector(selector);
try {
LocalCollection._modify(doc, modifier);
} catch (e) {
// Couldn't set a property on a field which is a scalar or null in the
// selector.
// Example:
// real document: { 'a.b': 3 }
// selector: { 'a': 12 }
// converted selector (ideal document): { 'a': 12 }
// modifier: { $set: { 'a.b': 4 } }
// We don't know what real document was like but from the error raised by
// $set on a scalar field we can reason that the structure of real document
// is completely different.
if (e.name === "MinimongoError" && e.setPropertyError)
return false;
throw e;
}
return selectorFn(doc);
};
// Returns a list of key paths the given selector is looking for
var getPaths = MinimongoTest.getSelectorPaths = function (sel) {
return _.chain(sel).map(function (v, k) {
// we don't know how to handle $where because it can be anything
if (k === "$where")
return ''; // matches everything
// we branch from $or/$and/$nor operator
if (_.contains(['$or', '$and', '$nor'], k))
return _.map(v, getPaths);
// the value is a literal or some comparison operator
return k;
}).flatten().uniq().value();
};
function pathHasNumericKeys (path) {
return _.any(path.split('.'), numericKey);
}
// string can be converted to integer
function numericKey (s) {
return /^[0-9]+$/.test(s);
}
function isLiteralSelector (selector) {
return _.all(selector, function (subSelector, keyPath) {
if (keyPath.substr(0, 1) === "$" || _.isRegExp(subSelector))
return false;
if (!_.isObject(subSelector) || _.isArray(subSelector))
return true;
return _.all(subSelector, function (value, key) {
return key.substr(0, 1) !== "$";
});
});
}

View File

@@ -0,0 +1,58 @@
// Knows how to combine a mongo selector and a fields projection to a new fields
// projection taking into account active fields from the passed selector.
// @returns Object - projection object (same as fields option of mongo cursor)
LocalCollection._combineSelectorAndProjection = function (selector, projection)
{
var selectorPaths = getPathsWithoutNumericKeys(selector);
// Special case for $where operator in the selector - projection should depend
// on all fields of the document. getSelectorPaths returns a list of paths
// selector depends on. If one of the paths is '' (empty string) representing
// the root or the whole document, complete projection should be returned.
if (_.contains(selectorPaths, ''))
return {};
var prjDetails = projectionDetails(projection);
var tree = prjDetails.tree;
var mergedProjection = {};
// merge the paths to include
tree = pathsToTree(selectorPaths,
function (path) { return true; },
function (node, path, fullPath) { return true; },
tree);
mergedProjection = treeToPaths(tree);
if (prjDetails.including) {
// both selector and projection are pointing on fields to include
// so we can just return the merged tree
return mergedProjection;
} else {
// selector is pointing at fields to include
// projection is pointing at fields to exclude
// make sure we don't exclude important paths
var mergedExclProjection = {};
_.each(mergedProjection, function (incl, path) {
if (!incl)
mergedExclProjection[path] = false;
});
return mergedExclProjection;
}
};
// Returns a set of key paths similar to
// { 'foo.bar': 1, 'a.b.c': 1 }
var treeToPaths = function (tree, prefix) {
prefix = prefix || '';
var result = {};
_.each(tree, function (val, key) {
if (_.isObject(val))
_.extend(result, treeToPaths(val, prefix + key + '.'));
else
result[prefix + key] = val;
});
return result;
};

View File

@@ -1,7 +1,7 @@
{
"dependencies": {
"mongodb": {
"version": "1.3.19",
"from": "https://github.com/meteor/node-mongodb-native/tarball/779bbac916a751f305d84c727a6cc7dfddab7924",
"dependencies": {
"bson": {
"version": "0.2.2"

View File

@@ -0,0 +1,61 @@
var Fiber = Npm.require('fibers');
var Future = Npm.require('fibers/future');
DocFetcher = function (mongoConnection) {
var self = this;
self._mongoConnection = mongoConnection;
// Map from cache key -> [callback]
self._callbacksForCacheKey = {};
};
_.extend(DocFetcher.prototype, {
// Fetches document "id" from collectionName, returning it or null if not
// found.
//
// If you make multiple calls to fetch() with the same cacheKey (a string),
// DocFetcher may assume that they all return the same document. (It does
// not check to see if collectionName/id match.)
fetch: function (collectionName, id, cacheKey, callback) {
var self = this;
check(collectionName, String);
// id is some sort of scalar
check(cacheKey, String);
// If there's already an in-progress fetch for this cache key, yield until
// it's done and return whatever it returns.
if (_.has(self._callbacksForCacheKey, cacheKey)) {
self._callbacksForCacheKey[cacheKey].push(callback);
return;
}
var callbacks = self._callbacksForCacheKey[cacheKey] = [callback];
Fiber(function () {
try {
var doc = self._mongoConnection.findOne(
collectionName, {_id: id}) || null;
// Return doc to all relevant callbacks. Note that this array can
// continue to grow during callback excecution.
while (!_.isEmpty(callbacks)) {
// Clone the document so that the various calls to fetch don't return
// objects that are intertwingled with each other. Clone before
// popping the future, so that if clone throws, the error gets passed
// to the next callback.
var clonedDoc = EJSON.clone(doc);
callbacks.pop()(null, clonedDoc);
}
} catch (e) {
while (!_.isEmpty(callbacks)) {
callbacks.pop()(e);
}
} finally {
// XXX consider keeping the doc around for a period of time before
// removing from the cache
delete self._callbacksForCacheKey[cacheKey];
}
}).run();
}
});
MongoTest.DocFetcher = DocFetcher;

View File

@@ -0,0 +1,38 @@
var Fiber = Npm.require('fibers');
var Future = Npm.require('fibers/future');
testAsyncMulti("mongo-livedata - doc fetcher", [
function (test, expect) {
var self = this;
var collName = "docfetcher-" + Random.id();
var collection = new Meteor.Collection(collName);
var id1 = collection.insert({x: 1});
var id2 = collection.insert({y: 2});
var fetcher = new MongoTest.DocFetcher(
MongoInternals.defaultRemoteCollectionDriver().mongo);
// Test basic operation.
fetcher.fetch(collName, id1, Random.id(), expect(null, {_id: id1, x: 1}));
fetcher.fetch(collName, "nonexistent!", Random.id(), expect(null, null));
var fetched = false;
var cacheKey = Random.id();
var expected = {_id: id2, y: 2};
fetcher.fetch(collName, id2, cacheKey, expect(function (e, d) {
fetched = true;
test.isFalse(e);
test.equal(d, expected);
}));
// The fetcher yields.
test.isFalse(fetched);
// Now ask for another document with the same cache key. Because a fetch for
// that cache key is in flight, we will get the other fetch's document, not
// this random document.
fetcher.fetch(collName, Random.id(), cacheKey, expect(function (e, d) {
test.isFalse(e);
test.equal(d, expected);
}));
}
]);

View File

@@ -13,6 +13,7 @@ var Fiber = Npm.require('fibers');
var Future = Npm.require(path.join('fibers', 'future'));
MongoInternals = {};
MongoTest = {};
var replaceNames = function (filter, thing) {
if (typeof thing === "object") {
@@ -28,6 +29,14 @@ var replaceNames = function (filter, thing) {
return thing;
};
// Ensure that EJSON.clone keeps a Timestamp as a Timestamp (instead of just
// doing a structural clone).
// XXX how ok is this? what if there are multiple copies of MongoDB loaded?
MongoDB.Timestamp.prototype.clone = function () {
// Timestamps should be immutable.
return this;
};
var makeMongoLegal = function (name) { return "EJSON" + name; };
var unmakeMongoLegal = function (name) { return name.substr(5); };
@@ -42,6 +51,13 @@ var replaceMongoAtomWithMeteor = function (document) {
if (document["EJSON$type"] && document["EJSON$value"]) {
return EJSON.fromJSONValue(replaceNames(unmakeMongoLegal, document));
}
if (document instanceof MongoDB.Timestamp) {
// For now, the Meteor representation of a Mongo timestamp type (not a date!
// this is a weird internal thing used in the oplog!) is the same as the
// Mongo representation. We need to do this explicitly or else we would do a
// structural clone and lose the prototype.
return document;
}
return undefined;
};
@@ -54,7 +70,15 @@ var replaceMeteorAtomWithMongo = function (document) {
}
if (document instanceof Meteor.Collection.ObjectID) {
return new MongoDB.ObjectID(document.toHexString());
} else if (EJSON._isCustomType(document)) {
}
if (document instanceof MongoDB.Timestamp) {
// For now, the Meteor representation of a Mongo timestamp type (not a date!
// this is a weird internal thing used in the oplog!) is the same as the
// Mongo representation. We need to do this explicitly or else we would do a
// structural clone and lose the prototype.
return document;
}
if (EJSON._isCustomType(document)) {
return replaceNames(makeMongoLegal, EJSON.toJSONValue(document));
}
// It is not ordinarily possible to stick dollar-sign keys into mongo
@@ -84,18 +108,19 @@ var replaceTypes = function (document, atomTransformer) {
};
MongoConnection = function (url) {
MongoConnection = function (url, options) {
var self = this;
options = options || {};
self._connectCallbacks = [];
self._liveResultsSets = {};
self._observeMultiplexers = {};
var options = {db: {safe: true}};
var mongoOptions = {db: {safe: true}, server: {}, replSet: {}};
// Set autoReconnect to true, unless passed on the URL. Why someone
// would want to set autoReconnect to false, I'm not really sure, but
// keeping this for backwards compatibility for now.
if (!(/[\?&]auto_?[rR]econnect=/.test(url))) {
options.server = {auto_reconnect: true};
mongoOptions.server.auto_reconnect = true;
}
// Disable the native parser by default, unless specifically enabled
@@ -107,10 +132,19 @@ MongoConnection = function (url) {
// to a different platform (aka deploy)
// We should revisit this after binary npm module support lands.
if (!(/[\?&]native_?[pP]arser=/.test(url))) {
options.db.native_parser = false;
mongoOptions.db.native_parser = false;
}
MongoDB.connect(url, options, function(err, db) {
// XXX maybe we should have a better way of allowing users to configure the
// underlying Mongo driver
if (_.has(options, 'poolSize')) {
// If we just set this for "server", replSet will override it. If we just
// set it for replSet, it will be ignored if we're not using a replSet.
mongoOptions.server.poolSize = options.poolSize;
mongoOptions.replSet.poolSize = options.poolSize;
}
MongoDB.connect(url, mongoOptions, function(err, db) {
if (err)
throw err;
self.db = db;
@@ -122,10 +156,28 @@ MongoConnection = function (url) {
});
}).run();
});
self._docFetcher = new DocFetcher(self);
self._oplogHandle = null;
if (options.oplogUrl && !Package['disable-oplog']) {
var dbNameFuture = new Future;
self._withDb(function (db) {
dbNameFuture.return(db.databaseName);
});
self._oplogHandle = new OplogHandle(options.oplogUrl, dbNameFuture);
}
};
MongoConnection.prototype.close = function() {
var self = this;
// XXX probably untested
var oplogHandle = self._oplogHandle;
self._oplogHandle = null;
if (oplogHandle)
oplogHandle.stop();
// Use Future.wrap so that errors get thrown. This happens to
// work even outside a fiber since the 'close' method is not
// actually asynchronous.
@@ -177,6 +229,7 @@ MongoConnection.prototype._maybeBeginWrite = function () {
return {committed: function () {}};
};
//////////// Public API //////////
// The write methods block until the database has confirmed the write (it may
@@ -575,22 +628,28 @@ MongoConnection.prototype._dropIndex = function (collectionName, index) {
// like fetch or forEach on it).
//
// ObserveHandle is the "observe handle" returned from observeChanges. It has a
// reference to a LiveResultsSet.
// reference to an ObserveMultiplexer.
//
// LiveResultsSet caches the results of a query and reruns it when necessary.
// It is hooked up to one or more ObserveHandles; a single LiveResultsSet
// can drive multiple sets of observation callbacks if they are for the
// same query.
// ObserveMultiplexer allows multiple identical ObserveHandles to be driven by a
// single observe driver.
//
// There are two "observe drivers" which drive ObserveMultiplexers:
// - PollingObserveDriver caches the results of a query and reruns it when
// necessary.
// - OplogObserveDriver follows the Mongo operation log to directly observe
// database changes.
// Both implementations follow the same simple interface: when you create them,
// they start sending observeChanges callbacks (and a ready() invocation) to
// their ObserveMultiplexer, and you stop them by calling their stop() method.
var CursorDescription = function (collectionName, selector, options) {
CursorDescription = function (collectionName, selector, options) {
var self = this;
self.collectionName = collectionName;
self.selector = Meteor.Collection._rewriteSelector(selector);
self.options = options || {};
};
var Cursor = function (mongo, cursorDescription) {
Cursor = function (mongo, cursorDescription) {
var self = this;
self._mongo = mongo;
@@ -651,7 +710,7 @@ Cursor.prototype.observe = function (callbacks) {
Cursor.prototype.observeChanges = function (callbacks) {
var self = this;
var ordered = LocalCollection._isOrderedChanges(callbacks);
var ordered = LocalCollection._observeChangesCallbacksAreOrdered(callbacks);
return self._mongo._observeChanges(
self._cursorDescription, ordered, callbacks);
};
@@ -679,6 +738,11 @@ MongoConnection.prototype._createSynchronousCursor = function(
// ... and to keep querying the server indefinitely rather than just 5 times
// if there's no more data.
mongoOptions.numberOfRetries = -1;
// And if this cursor specifies a 'ts', then set the undocumented oplog
// replay flag, which does a special scan to find the first document
// (instead of creating an index on ts).
if (cursorDescription.selector.ts)
mongoOptions.oplogReplay = true;
}
var dbCursor = collection.find(
@@ -717,16 +781,20 @@ var SynchronousCursor = function (dbCursor, cursorDescription, options) {
_.extend(SynchronousCursor.prototype, {
_nextObject: function () {
var self = this;
while (true) {
var doc = self._synchronousNextObject().wait();
if (!doc || typeof doc._id === 'undefined') return null;
if (!doc) return null;
doc = replaceTypes(doc, replaceMongoAtomWithMeteor);
if (!self._cursorDescription.options.tailable) {
if (!self._cursorDescription.options.tailable && _.has(doc, '_id')) {
// Did Mongo give us duplicate documents in the same cursor? If so,
// ignore this one. (Do this before the transform, since transform might
// return some unrelated value.) We don't do this for tailable cursors,
// because we want to maintain O(1) memory usage.
// because we want to maintain O(1) memory usage. And if there isn't _id
// for some reason (maybe it's the oplog), then we don't do this either.
// (Be careful to do this for falsey but existing _id, though.)
var strId = LocalCollection._idStringify(doc._id);
if (self._visitedIds[strId]) continue;
self._visitedIds[strId] = true;
@@ -804,22 +872,57 @@ _.extend(SynchronousCursor.prototype, {
}
});
var nextObserveHandleId = 1;
var ObserveHandle = function (liveResultsSet, callbacks) {
MongoConnection.prototype.tail = function (cursorDescription, docCallback) {
var self = this;
self._liveResultsSet = liveResultsSet;
self._added = callbacks.added;
self._addedBefore = callbacks.addedBefore;
self._changed = callbacks.changed;
self._removed = callbacks.removed;
self._moved = callbacks.moved;
self._movedBefore = callbacks.movedBefore;
self._observeHandleId = nextObserveHandleId++;
};
ObserveHandle.prototype.stop = function () {
var self = this;
self._liveResultsSet._removeObserveHandle(self);
self._liveResultsSet = null;
if (!cursorDescription.options.tailable)
throw new Error("Can only tail a tailable cursor");
var cursor = self._createSynchronousCursor(cursorDescription);
var stopped = false;
var lastTS = undefined;
Meteor.defer(function () {
while (true) {
if (stopped)
return;
try {
var doc = cursor._nextObject();
} catch (err) {
// There's no good way to figure out if this was actually an error
// from Mongo. Ah well. But either way, we need to retry the cursor
// (unless the failure was because the observe got stopped).
doc = null;
}
// Since cursor._nextObject can yield, we need to check again to see if
// we've been stopped before calling the callback.
if (stopped)
return;
if (doc) {
// If a tailable cursor contains a "ts" field, use it to recreate the
// cursor on error. ("ts" is a standard that Mongo uses internally for
// the oplog, and there's a special flag that lets you do binary search
// on it instead of needing to use an index.)
lastTS = doc.ts;
docCallback(doc);
} else {
var newSelector = _.clone(cursorDescription.selector);
if (lastTS) {
newSelector.ts = {$gt: lastTS};
}
cursor = self._createSynchronousCursor(new CursorDescription(
cursorDescription.collectionName,
newSelector,
cursorDescription.options));
}
}
});
return {
stop: function () {
stopped = true;
cursor.close();
}
};
};
MongoConnection.prototype._observeChanges = function (
@@ -833,342 +936,96 @@ MongoConnection.prototype._observeChanges = function (
var observeKey = JSON.stringify(
_.extend({ordered: ordered}, cursorDescription));
var liveResultsSet;
var observeHandle;
var newlyCreated = false;
var multiplexer, observeDriver;
var firstHandle = false;
// Find a matching LiveResultsSet, or create a new one. This next block is
// Find a matching ObserveMultiplexer, or create a new one. This next block is
// guaranteed to not yield (and it doesn't call anything that can observe a
// new query), so no other calls to this function can interleave with it.
Meteor._noYieldsAllowed(function () {
if (_.has(self._liveResultsSets, observeKey)) {
liveResultsSet = self._liveResultsSets[observeKey];
if (_.has(self._observeMultiplexers, observeKey)) {
multiplexer = self._observeMultiplexers[observeKey];
} else {
// Create a new LiveResultsSet. It is created "locked": no polling can
// take place.
liveResultsSet = new LiveResultsSet(
cursorDescription,
self,
ordered,
function () {
delete self._liveResultsSets[observeKey];
},
callbacks._testOnlyPollCallback);
self._liveResultsSets[observeKey] = liveResultsSet;
newlyCreated = true;
firstHandle = true;
// Create a new ObserveMultiplexer.
multiplexer = new ObserveMultiplexer({
ordered: ordered,
onStop: function () {
observeDriver.stop();
delete self._observeMultiplexers[observeKey];
}
});
self._observeMultiplexers[observeKey] = multiplexer;
}
observeHandle = new ObserveHandle(liveResultsSet, callbacks);
});
if (newlyCreated) {
// This is the first ObserveHandle on this LiveResultsSet. Add it and run
// the initial synchronous poll (which may yield).
liveResultsSet._addFirstObserveHandle(observeHandle);
} else {
// Not the first ObserveHandle. Add it to the LiveResultsSet. This call
// yields until we're not in the middle of a poll, and its invocation of the
// initial 'added' callbacks may yield as well. It blocks until the 'added'
// callbacks have fired.
liveResultsSet._addObserveHandleAndSendInitialAdds(observeHandle);
var observeHandle = new ObserveHandle(multiplexer, callbacks);
if (firstHandle) {
var driverClass = PollingObserveDriver;
if (self._oplogHandle && !ordered && !callbacks._testOnlyPollCallback
&& OplogObserveDriver.cursorSupported(cursorDescription)) {
driverClass = OplogObserveDriver;
}
observeDriver = new driverClass({
cursorDescription: cursorDescription,
mongoHandle: self,
multiplexer: multiplexer,
ordered: ordered,
_testOnlyPollCallback: callbacks._testOnlyPollCallback
});
// This field is only set for the first ObserveHandle in an
// ObserveMultiplexer. It is only there for use tests.
observeHandle._observeDriver = observeDriver;
}
// Blocks until the initial adds have been sent.
multiplexer.addHandleAndSendInitialAdds(observeHandle);
return observeHandle;
};
var LiveResultsSet = function (cursorDescription, mongoHandle, ordered,
stopCallback, testOnlyPollCallback) {
var self = this;
// Listen for the invalidation messages that will trigger us to poll the
// database for changes. If this selector specifies specific IDs, specify them
// here, so that updates to different specific IDs don't cause us to poll.
// listenCallback is the same kind of (notification, complete) callback passed
// to InvalidationCrossbar.listen.
self._cursorDescription = cursorDescription;
self._mongoHandle = mongoHandle;
self._ordered = ordered;
self._stopCallbacks = [stopCallback];
listenAll = function (cursorDescription, listenCallback) {
var listeners = [];
forEachTrigger(cursorDescription, function (trigger) {
// The "drop collection" event is used by the oplog crossbar, not the
// invalidation crossbar.
if (trigger.dropCollection)
return;
listeners.push(DDPServer._InvalidationCrossbar.listen(
trigger, listenCallback));
});
// This constructor cannot yield, so we don't create the synchronousCursor yet
// (since that can yield).
self._synchronousCursor = null;
// previous results snapshot. on each poll cycle, diffs against
// results drives the callbacks.
self._results = ordered ? [] : {};
// The number of _pollMongo calls that have been added to self._taskQueue but
// have not started running. Used to make sure we never schedule more than one
// _pollMongo (other than possibly the one that is currently running). It's
// also used by _suspendPolling to pretend there's a poll scheduled. Usually,
// it's either 0 (for "no polls scheduled other than maybe one currently
// running") or 1 (for "a poll scheduled that isn't running yet"), but it can
// also be 2 if incremented by _suspendPolling.
self._pollsScheduledButNotStarted = 0;
// Number of _addObserveHandleAndSendInitialAdds tasks scheduled but not yet
// running. _removeObserveHandle uses this to know if it's safe to shut down
// this LiveResultsSet.
self._addHandleTasksScheduledButNotPerformed = 0;
self._pendingWrites = []; // people to notify when polling completes
// Make sure to create a separately throttled function for each LiveResultsSet
// object.
self._ensurePollIsScheduled = _.throttle(
self._unthrottledEnsurePollIsScheduled, 50 /* ms */);
self._taskQueue = new Meteor._SynchronousQueue();
// Listen for the invalidation messages that will trigger us to poll the
// database for changes. If this selector specifies specific IDs, specify them
// here, so that updates to different specific IDs don't cause us to poll.
var listenOnTrigger = function (trigger) {
var listener = DDPServer._InvalidationCrossbar.listen(
trigger, function (notification, complete) {
// When someone does a transaction that might affect us, schedule a poll
// of the database. If that transaction happens inside of a write fence,
// block the fence until we've polled and notified observers.
var fence = DDPServer._CurrentWriteFence.get();
if (fence)
self._pendingWrites.push(fence.beginWrite());
// Ensure a poll is scheduled... but if we already know that one is,
// don't hit the throttled _ensurePollIsScheduled function (which might
// lead to us calling it unnecessarily in 50ms).
if (self._pollsScheduledButNotStarted === 0)
self._ensurePollIsScheduled();
complete();
return {
stop: function () {
_.each(listeners, function (listener) {
listener.stop();
});
self._stopCallbacks.push(function () { listener.stop(); });
}
};
};
forEachTrigger = function (cursorDescription, triggerCallback) {
var key = {collection: cursorDescription.collectionName};
var specificIds = LocalCollection._idsMatchedBySelector(
cursorDescription.selector);
if (specificIds) {
_.each(specificIds, function (id) {
listenOnTrigger(_.extend({id: id}, key));
triggerCallback(_.extend({id: id}, key));
});
triggerCallback(_.extend({dropCollection: true}, key));
} else {
listenOnTrigger(key);
triggerCallback(key);
}
// Map from handle ID to ObserveHandle.
self._observeHandles = {};
self._callbackMultiplexer = {};
var callbackNames = ['added', 'changed', 'removed'];
if (self._ordered) {
callbackNames.push('moved');
callbackNames.push('addedBefore');
callbackNames.push('movedBefore');
}
_.each(callbackNames, function (callback) {
var handleCallback = '_' + callback;
self._callbackMultiplexer[callback] = function () {
var args = _.toArray(arguments);
// Because callbacks can yield and _removeObserveHandle() (ie,
// handle.stop()) doesn't synchronize its actions with _taskQueue,
// ObserveHandles can disappear from self._observeHandles during this
// dispatch. Thus, we save a copy of the keys of self._observeHandles
// before we start to iterate, and we check to see if the handle is still
// there each time.
_.each(_.keys(self._observeHandles), function (handleId) {
var handle = self._observeHandles[handleId];
if (handle && handle[handleCallback])
handle[handleCallback].apply(null, EJSON.clone(args));
});
};
});
// every once and a while, poll even if we don't think we're dirty, for
// eventual consistency with database writes from outside the Meteor
// universe.
//
// For testing, there's an undocumented callback argument to observeChanges
// which disables time-based polling and gets called at the beginning of each
// poll.
if (testOnlyPollCallback) {
self._testOnlyPollCallback = testOnlyPollCallback;
} else {
var intervalHandle = Meteor.setInterval(
_.bind(self._ensurePollIsScheduled, self), 10 * 1000);
self._stopCallbacks.push(function () {
Meteor.clearInterval(intervalHandle);
});
}
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "live-results-sets", 1);
};
_.extend(LiveResultsSet.prototype, {
_addFirstObserveHandle: function (handle) {
var self = this;
if (! _.isEmpty(self._observeHandles))
throw new Error("Not the first observe handle!");
if (! _.isEmpty(self._results))
throw new Error("Call _addFirstObserveHandle before polling!");
self._observeHandles[handle._observeHandleId] = handle;
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "observe-handles", 1);
// Run the first _poll() cycle synchronously (delivering results to the
// first ObserveHandle).
++self._pollsScheduledButNotStarted;
self._taskQueue.runTask(function () {
self._pollMongo();
});
},
// This is always called through _.throttle.
_unthrottledEnsurePollIsScheduled: function () {
var self = this;
if (self._pollsScheduledButNotStarted > 0)
return;
++self._pollsScheduledButNotStarted;
self._taskQueue.queueTask(function () {
self._pollMongo();
});
},
// test-only interface for controlling polling.
//
// _suspendPolling blocks until any currently running and scheduled polls are
// done, and prevents any further polls from being scheduled. (new
// ObserveHandles can be added and receive their initial added callbacks,
// though.)
//
// _resumePolling immediately polls, and allows further polls to occur.
_suspendPolling: function() {
var self = this;
// Pretend that there's another poll scheduled (which will prevent
// _ensurePollIsScheduled from queueing any more polls).
++self._pollsScheduledButNotStarted;
// Now block until all currently running or scheduled polls are done.
self._taskQueue.runTask(function() {});
// Confirm that there is only one "poll" (the fake one we're pretending to
// have) scheduled.
if (self._pollsScheduledButNotStarted !== 1)
throw new Error("_pollsScheduledButNotStarted is " +
self._pollsScheduledButNotStarted);
},
_resumePolling: function() {
var self = this;
// We should be in the same state as in the end of _suspendPolling.
if (self._pollsScheduledButNotStarted !== 1)
throw new Error("_pollsScheduledButNotStarted is " +
self._pollsScheduledButNotStarted);
// Run a poll synchronously (which will counteract the
// ++_pollsScheduledButNotStarted from _suspendPolling).
self._taskQueue.runTask(function () {
self._pollMongo();
});
},
_pollMongo: function () {
var self = this;
--self._pollsScheduledButNotStarted;
self._testOnlyPollCallback && self._testOnlyPollCallback();
// Save the list of pending writes which this round will commit.
var writesForCycle = self._pendingWrites;
self._pendingWrites = [];
// Get the new query results. (These calls can yield.)
if (self._synchronousCursor) {
self._synchronousCursor.rewind();
} else {
self._synchronousCursor = self._mongoHandle._createSynchronousCursor(
self._cursorDescription);
}
var newResults = self._synchronousCursor.getRawObjects(self._ordered);
var oldResults = self._results;
// Run diffs. (This can yield too.)
if (!_.isEmpty(self._observeHandles)) {
LocalCollection._diffQueryChanges(
self._ordered, oldResults, newResults, self._callbackMultiplexer);
}
// Replace self._results atomically.
self._results = newResults;
// Mark all the writes which existed before this call as commmitted. (If new
// writes have shown up in the meantime, there'll already be another
// _pollMongo task scheduled.)
_.each(writesForCycle, function (w) {w.committed();});
},
// Adds the observe handle to this set and sends its initial added
// callbacks. Meteor._SynchronousQueue guarantees that this won't interleave
// with a call to _pollMongo or another call to this function.
_addObserveHandleAndSendInitialAdds: function (handle) {
var self = this;
// Check this before calling runTask (even though runTask does the same
// check) so that we don't leak a LiveResultsSet by incrementing
// _addHandleTasksScheduledButNotPerformed and never decrementing it.
if (!self._taskQueue.safeToRunTask())
throw new Error(
"Can't call observe() from an observe callback on the same query");
// Keep track of how many of these tasks are on the queue, so that
// _removeObserveHandle knows if it's safe to GC.
++self._addHandleTasksScheduledButNotPerformed;
self._taskQueue.runTask(function () {
if (!self._observeHandles)
throw new Error("Can't add observe handle to stopped LiveResultsSet");
if (_.has(self._observeHandles, handle._observeHandleId))
throw new Error("Duplicate observe handle ID");
self._observeHandles[handle._observeHandleId] = handle;
--self._addHandleTasksScheduledButNotPerformed;
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "observe-handles", 1);
// Send initial adds.
if (handle._added || handle._addedBefore) {
_.each(self._results, function (doc, i) {
var fields = EJSON.clone(doc);
delete fields._id;
if (self._ordered) {
handle._added && handle._added(doc._id, fields);
handle._addedBefore && handle._addedBefore(doc._id, fields, null);
} else {
handle._added(doc._id, fields);
}
});
}
});
},
// Remove an observe handle. If it was the last observe handle, call all the
// stop callbacks; you cannot add any more observe handles after this.
//
// This is not synchronized with polls and handle additions: this means that
// you can safely call it from within an observe callback.
_removeObserveHandle: function (handle) {
var self = this;
if (!_.has(self._observeHandles, handle._observeHandleId))
throw new Error("Unknown observe handle ID " + handle._observeHandleId);
delete self._observeHandles[handle._observeHandleId];
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "observe-handles", -1);
if (_.isEmpty(self._observeHandles) &&
self._addHandleTasksScheduledButNotPerformed === 0) {
// The last observe handle was stopped; call our stop callbacks, which:
// - removes us from the MongoConnection's _liveResultsSets map
// - stops the poll timer
// - removes us from the invalidation crossbar
_.each(self._stopCallbacks, function (c) { c(); });
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "live-results-sets", -1);
// This will cause future _addObserveHandleAndSendInitialAdds calls to
// throw.
self._observeHandles = null;
}
}
});
// observeChanges for tailable cursors on capped collections.
//
// Some differences from normal cursors:
@@ -1209,59 +1066,18 @@ MongoConnection.prototype._observeChangesTailable = function (
+ " tailable cursor without a "
+ (ordered ? "addedBefore" : "added") + " callback");
}
var cursor = self._createSynchronousCursor(cursorDescription);
var stopped = false;
var lastTS = undefined;
Meteor.defer(function () {
while (true) {
if (stopped)
return;
try {
var doc = cursor._nextObject();
} catch (err) {
// There's no good way to figure out if this was actually an error from
// Mongo. Ah well. But either way, we need to retry the cursor (unless
// the failure was because the observe got stopped).
doc = null;
}
if (stopped)
return;
if (doc) {
var id = doc._id;
delete doc._id;
// If a tailable cursor contains a "ts" field, use it to recreate the
// cursor on error, and don't publish the field. ("ts" is a standard
// that Mongo uses internally for the oplog, and there's a special flag
// that lets you do binary search on it instead of needing to use an
// index.)
lastTS = doc.ts;
delete doc.ts;
if (ordered) {
callbacks.addedBefore(id, doc, null);
} else {
callbacks.added(id, doc);
}
} else {
var newSelector = _.clone(cursorDescription.selector);
if (lastTS) {
newSelector.ts = {$gt: lastTS};
}
// XXX maybe set replay flag
cursor = self._createSynchronousCursor(new CursorDescription(
cursorDescription.collectionName,
newSelector,
cursorDescription.options));
}
return self.tail(cursorDescription, function (doc) {
var id = doc._id;
delete doc._id;
// The ts is an implementation detail. Hide it.
delete doc.ts;
if (ordered) {
callbacks.addedBefore(id, doc, null);
} else {
callbacks.added(id, doc);
}
});
return {
stop: function () {
stopped = true;
cursor.close();
}
};
};
// XXX We probably need to find a better way to expose this. Right now
@@ -1270,3 +1086,4 @@ MongoConnection.prototype._observeChangesTailable = function (
MongoInternals.MongoTimestamp = MongoDB.Timestamp;
MongoInternals.Connection = MongoConnection;
MongoInternals.NpmModule = MongoDB;

View File

@@ -346,7 +346,7 @@ Tinytest.addAsync("mongo-livedata - basics, " + idGeneration, function (test, on
Tinytest.addAsync("mongo-livedata - fuzz test, " + idGeneration, function(test, onComplete) {
var run = test.runId();
var run = Random.id();
var coll;
if (Meteor.isClient) {
coll = new Meteor.Collection(null, collectionOptions); // local, unmanaged
@@ -382,6 +382,15 @@ Tinytest.addAsync("mongo-livedata - fuzz test, " + idGeneration, function(test,
}
});
// XXX What if there are multiple observe handles on the ObserveMultiplexer?
// There shouldn't be because the collection has a name unique to this
// run.
if (Meteor.isServer) {
// For now, has to be polling (not oplog).
test.isTrue(obs._observeDriver);
test.isTrue(obs._observeDriver._suspendPolling);
}
var step = 0;
// Use non-deterministic randomness so we can have a shorter fuzz
@@ -413,11 +422,8 @@ Tinytest.addAsync("mongo-livedata - fuzz test, " + idGeneration, function(test,
var max_counters = _.clone(counters);
finishObserve(function () {
// XXX What if there are multiple observe handles on the LiveResultsSet?
// There shouldn't be because the collection has a name unique to this
// run.
if (Meteor.isServer)
obs._liveResultsSet._suspendPolling();
obs._observeDriver._suspendPolling();
// Do a batch of 1-10 operations
var batch_count = rnd(10) + 1;
@@ -450,7 +456,7 @@ Tinytest.addAsync("mongo-livedata - fuzz test, " + idGeneration, function(test,
}
}
if (Meteor.isServer)
obs._liveResultsSet._resumePolling();
obs._observeDriver._resumePolling();
});
@@ -513,7 +519,7 @@ Tinytest.addAsync("mongo-livedata - scribbling, " + idGeneration, function (test
});
Tinytest.addAsync("mongo-livedata - stop handle in callback, " + idGeneration, function (test, onComplete) {
var run = test.runId();
var run = Random.id();
var coll;
if (Meteor.isClient) {
coll = new Meteor.Collection(null, collectionOptions); // local, unmanaged
@@ -572,11 +578,11 @@ if (Meteor.isServer) {
var coll = new Meteor.Collection("observeInCallback-"+run, collectionOptions);
var callbackCalled = false;
var handle = coll.find().observe({
var handle = coll.find({}).observe({
added: function (newDoc) {
callbackCalled = true;
test.throws(function () {
coll.find().observe({});
coll.find({}).observe();
});
}
});
@@ -599,12 +605,12 @@ if (Meteor.isServer) {
var observer = function (noAdded) {
var output = [];
var callbacks = {
changedAt: function (newDoc) {
changed: function (newDoc) {
output.push({changed: newDoc._id});
}
};
if (!noAdded) {
callbacks.addedAt = function (doc) {
callbacks.added = function (doc) {
output.push({added: doc._id});
};
}
@@ -639,11 +645,10 @@ if (Meteor.isServer) {
// Original observe not affected.
test.length(o1.output, 0);
// White-box test: both observes should have the same underlying
// LiveResultsSet.
var liveResultsSet = o1.handle._liveResultsSet;
test.isTrue(liveResultsSet);
test.isTrue(liveResultsSet === o2.handle._liveResultsSet);
// White-box test: both observes should share an ObserveMultiplexer.
var observeMultiplexer = o1.handle._multiplexer;
test.isTrue(observeMultiplexer);
test.isTrue(observeMultiplexer === o2.handle._multiplexer);
// Update. Both observes fire.
runInFence(function () {
@@ -667,14 +672,15 @@ if (Meteor.isServer) {
test.length(o2.output, 1);
test.equal(o2.output.shift(), {changed: docId2});
// Stop second handle. Nothing should happen, but the liveResultsSet should
// Stop second handle. Nothing should happen, but the multiplexer should
// be stopped.
test.isTrue(observeMultiplexer._handles); // This will change.
o2.handle.stop();
test.length(o1.output, 0);
test.length(o2.output, 0);
// White-box: liveResultsSet has nulled its _observeHandles so you can't
// White-box: ObserveMultiplexer has nulled its _handles so you can't
// accidentally join to it.
test.isNull(liveResultsSet._observeHandles);
test.isNull(observeMultiplexer._handles);
// Start yet another handle on the same query.
var o3 = observer();
@@ -686,8 +692,8 @@ if (Meteor.isServer) {
// Old observers not called.
test.length(o1.output, 0);
test.length(o2.output, 0);
// White-box: Different LiveResultsSet.
test.isTrue(liveResultsSet !== o3.handle._liveResultsSet);
// White-box: Different ObserveMultiplexer.
test.isTrue(observeMultiplexer !== o3.handle._multiplexer);
// Start another handle with no added callback. Regression test for #589.
var o4 = observer(true);
@@ -966,8 +972,9 @@ if (Meteor.isServer) {
var handlesToStop = [];
var observe = function (name, query) {
var handle = coll.find(query).observeChanges({
// Make sure that we only poll on invalidation, not due to time,
// and keep track of when we do.
// Make sure that we only poll on invalidation, not due to time, and
// keep track of when we do. Note: this option disables the use of
// oplogs (which admittedly is somewhat irrelevant to this feature).
_testOnlyPollCallback: function () {
polls[name] = (name in polls ? polls[name] + 1 : 1);
}
@@ -1872,3 +1879,21 @@ if (Meteor.isServer) {
elements: ['Y', 'A', 'B', 'C']});
});
}
// This is a VERY white-box test.
Meteor.isServer && Tinytest.add("mongo-livedata - oplog - _disableOplog", function (test) {
var collName = Random.id();
var coll = new Meteor.Collection(collName);
if (MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle) {
var observeWithOplog = coll.find({x: 5})
.observeChanges({added: function () {}});
test.isTrue(observeWithOplog._observeDriver);
test.isTrue(observeWithOplog._observeDriver._usesOplog);
observeWithOplog.stop();
}
var observeWithoutOplog = coll.find({x: 6}, {_disableOplog: true})
.observeChanges({added: function () {}});
test.isTrue(observeWithoutOplog._observeDriver);
test.isFalse(observeWithoutOplog._observeDriver._usesOplog);
observeWithoutOplog.stop();
});

View File

@@ -20,7 +20,7 @@ _.each ([{added:'added', forceOrdered: true},
if (forceOrdered)
callbacks.push("movedBefore");
withCallbackLogger(test,
[added, "changed", "removed"],
callbacks,
Meteor.isServer,
function (logger) {
var barid = c.insert({thing: "stuff"});
@@ -168,6 +168,63 @@ if (Meteor.isServer) {
onComplete();
});
});
Tinytest.addAsync("observeChanges - unordered - specific fields + selector on excluded fields", function (test, onComplete) {
var c = makeCollection();
withCallbackLogger(test, ["added", "changed", "removed"], Meteor.isServer, function (logger) {
var handle = c.find({ mac: 1, cheese: 2 },
{fields:{noodles: 1, bacon: 1, eggs: 1}}).observeChanges(logger);
var barid = c.insert({thing: "stuff", mac: 1, cheese: 2});
logger.expectResultOnly("added", [barid, {}]);
var fooid = c.insert({noodles: "good", bacon: "bad", apples: "ok", mac: 1, cheese: 2});
logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad"}]);
c.update(fooid, {noodles: "alright", potatoes: "tasty", apples: "ok", mac: 1, cheese: 2});
logger.expectResultOnly("changed",
[fooid, {noodles: "alright", bacon: undefined}]);
// Doesn't get update event, since modifies only hidden fields
c.update(fooid, {noodles: "alright", potatoes: "meh", apples: "ok", mac: 1, cheese: 2});
logger.expectNoResult();
c.remove(fooid);
logger.expectResultOnly("removed", [fooid]);
c.remove(barid);
logger.expectResultOnly("removed", [barid]);
fooid = c.insert({noodles: "good", bacon: "bad", mac: 1, cheese: 2});
logger.expectResult("added", [fooid, {noodles: "good", bacon: "bad"}]);
logger.expectNoResult();
handle.stop();
onComplete();
});
});
Tinytest.addAsync("observeChanges - unordered - specific fields + modify on excluded fields", function (test, onComplete) {
var c = makeCollection();
withCallbackLogger(test, ["added", "changed", "removed"], Meteor.isServer, function (logger) {
var handle = c.find({ mac: 1, cheese: 2 },
{fields:{noodles: 1, bacon: 1, eggs: 1}}).observeChanges(logger);
var fooid = c.insert({noodles: "good", bacon: "bad", apples: "ok", mac: 1, cheese: 2});
logger.expectResultOnly("added", [fooid, {noodles: "good", bacon: "bad"}]);
// Noodles go into shadow, mac appears as eggs
c.update(fooid, {$rename: { noodles: 'shadow', apples: 'eggs' }});
logger.expectResultOnly("changed",
[fooid, {eggs:"ok", noodles: undefined}]);
c.remove(fooid);
logger.expectResultOnly("removed", [fooid]);
logger.expectNoResult();
handle.stop();
onComplete();
});
});
}

View File

@@ -0,0 +1,218 @@
var Future = Npm.require('fibers/future');
ObserveMultiplexer = function (options) {
var self = this;
if (!options || !_.has(options, 'ordered'))
throw Error("must specified ordered");
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "observe-multiplexers", 1);
self._ordered = options.ordered;
self._onStop = options.onStop || function () {};
self._queue = new Meteor._SynchronousQueue();
self._handles = {};
self._readyFuture = new Future;
self._cache = new LocalCollection._CachingChangeObserver({
ordered: options.ordered});
// Number of addHandleAndSendInitialAdds tasks scheduled but not yet
// running. removeHandle uses this to know if it's time to call the onStop
// callback.
self._addHandleTasksScheduledButNotPerformed = 0;
_.each(self.callbackNames(), function (callbackName) {
self[callbackName] = function (/* ... */) {
self._applyCallback(callbackName, _.toArray(arguments));
};
});
};
_.extend(ObserveMultiplexer.prototype, {
addHandleAndSendInitialAdds: function (handle) {
var self = this;
// Check this before calling runTask (even though runTask does the same
// check) so that we don't leak an ObserveMultiplexer on error by
// incrementing _addHandleTasksScheduledButNotPerformed and never
// decrementing it.
if (!self._queue.safeToRunTask())
throw new Error(
"Can't call observeChanges from an observe callback on the same query");
++self._addHandleTasksScheduledButNotPerformed;
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "observe-handles", 1);
self._queue.runTask(function () {
self._handles[handle._id] = handle;
// Send out whatever adds we have so far (whether or not we the
// multiplexer is ready).
self._sendAdds(handle);
--self._addHandleTasksScheduledButNotPerformed;
});
// *outside* the task, since otherwise we'd deadlock
self._readyFuture.wait();
},
// Remove an observe handle. If it was the last observe handle, call the
// onStop callback; you cannot add any more observe handles after this.
//
// This is not synchronized with polls and handle additions: this means that
// you can safely call it from within an observe callback, but it also means
// that we have to be careful when we iterate over _handles.
removeHandle: function (id) {
var self = this;
// This should not be possible: you can only call removeHandle by having
// access to the ObserveHandle, which isn't returned to user code until the
// multiplex is ready.
if (!self._ready())
throw new Error("Can't remove handles until the multiplex is ready");
delete self._handles[id];
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "observe-handles", -1);
if (_.isEmpty(self._handles) &&
self._addHandleTasksScheduledButNotPerformed === 0) {
self._stop();
}
},
_stop: function () {
var self = this;
// It shouldn't be possible for us to stop when all our handles still
// haven't been returned from observeChanges!
if (!self._ready())
throw Error("surprising _stop: not ready");
// Call stop callback (which kills the underlying process which sends us
// callbacks and removes us from the connection's dictionary).
self._onStop();
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "observe-multiplexers", -1);
// Cause future addHandleAndSendInitialAdds calls to throw (but the onStop
// callback should make our connection forget about us).
self._handles = null;
},
// Allows all addHandleAndSendInitialAdds calls to return, once all preceding
// adds have been processed. Does not block.
ready: function () {
var self = this;
self._queue.queueTask(function () {
if (self._ready())
throw Error("can't make ObserveMultiplex ready twice!");
self._readyFuture.return();
});
},
// Calls "cb" once the effects of all "ready", "addHandleAndSendInitialAdds"
// and observe callbacks which came before this call have been propagated to
// all handles. "ready" must have already been called on this multiplexer.
onFlush: function (cb) {
var self = this;
self._queue.queueTask(function () {
if (!self._ready())
throw Error("only call onFlush on a multiplexer that will be ready");
cb();
});
},
callbackNames: function () {
var self = this;
if (self._ordered)
return ["addedBefore", "changed", "movedBefore", "removed"];
else
return ["added", "changed", "removed"];
},
_ready: function () {
return this._readyFuture.isResolved();
},
_applyCallback: function (callbackName, args) {
var self = this;
self._queue.queueTask(function () {
// First, apply the change to the cache.
// XXX We could make applyChange callbacks promise not to hang on to any
// state from their arguments (assuming that their supplied callbacks
// don't) and skip this clone. Currently 'changed' hangs on to state
// though.
self._cache.applyChange[callbackName].apply(null, EJSON.clone(args));
// If we haven't finished the initial adds, then we should only be getting
// adds.
if (!self._ready() &&
(callbackName !== 'added' && callbackName !== 'addedBefore')) {
throw new Error("Got " + callbackName + " during initial adds");
}
// Now multiplex the callbacks out to all observe handles. It's OK if
// these calls yield; since we're inside a task, no other use of our queue
// can continue until these are done. (But we do have to be careful to not
// use a handle that got removed, because removeHandle does not use the
// queue; thus, we iterate over an array of keys that we control.)
_.each(_.keys(self._handles), function (handleId) {
var handle = self._handles[handleId];
if (!handle)
return;
var callback = handle['_' + callbackName];
// clone arguments so that callbacks can mutate their arguments
callback && callback.apply(null, EJSON.clone(args));
});
});
},
// Sends initial adds to a handle. It should only be called from within a task
// (the task that is processing the addHandleAndSendInitialAdds call). It
// synchronously invokes the handle's added or addedBefore; there's no need to
// flush the queue afterwards to ensure that the callbacks get out.
_sendAdds: function (handle) {
var self = this;
if (self._queue.safeToRunTask())
throw Error("_sendAdds may only be called from within a task!");
var add = self._ordered ? handle._addedBefore : handle._added;
if (!add)
return;
// note: docs may be an _IdMap or an OrderedDict
self._cache.docs.forEach(function (doc, id) {
if (!_.has(self._handles, handle._id))
throw Error("handle got removed before sending initial adds!");
var fields = EJSON.clone(doc);
delete fields._id;
if (self._ordered)
add(id, fields, null); // we're going in order, so add at end
else
add(id, fields);
});
}
});
var nextObserveHandleId = 1;
ObserveHandle = function (multiplexer, callbacks) {
var self = this;
// The end user is only supposed to call stop(). The other fields are
// accessible to the multiplexer, though.
self._multiplexer = multiplexer;
_.each(multiplexer.callbackNames(), function (name) {
if (callbacks[name]) {
self['_' + name] = callbacks[name];
} else if (name === "addedBefore" && callbacks.added) {
// Special case: if you specify "added" and "movedBefore", you get an
// ordered observe where for some reason you don't get ordering data on
// the adds. I dunno, we wrote tests for it, there must have been a
// reason.
self._addedBefore = function (id, fields, before) {
callbacks.added(id, fields);
};
}
});
self._stopped = false;
self._id = nextObserveHandleId++;
};
ObserveHandle.prototype.stop = function () {
var self = this;
if (self._stopped)
return;
self._stopped = true;
self._multiplexer.removeHandle(self._id);
};

View File

@@ -0,0 +1,364 @@
var Fiber = Npm.require('fibers');
var Future = Npm.require('fibers/future');
var PHASE = {
INITIALIZING: 1,
FETCHING: 2,
STEADY: 3
};
// OplogObserveDriver is an alternative to PollingObserveDriver which follows
// the Mongo operation log instead of just re-polling the query. It obeys the
// same simple interface: constructing it starts sending observeChanges
// callbacks (and a ready() invocation) to the ObserveMultiplexer, and you stop
// it by calling the stop() method.
OplogObserveDriver = function (options) {
var self = this;
self._usesOplog = true; // tests look at this
self._cursorDescription = options.cursorDescription;
self._mongoHandle = options.mongoHandle;
self._multiplexer = options.multiplexer;
if (options.ordered)
throw Error("OplogObserveDriver only supports unordered observeChanges");
self._stopped = false;
self._stopHandles = [];
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "oplog-observers", 1);
self._phase = PHASE.INITIALIZING;
self._published = new LocalCollection._IdMap;
var selector = self._cursorDescription.selector;
self._selectorFn = LocalCollection._compileSelector(
self._cursorDescription.selector);
var projection = self._cursorDescription.options.fields || {};
self._projectionFn = LocalCollection._compileProjection(projection);
// Projection function, result of combining important fields for selector and
// existing fields projection
var sharedProjection = LocalCollection._combineSelectorAndProjection(
selector, projection);
self._sharedProjectionFn = LocalCollection._compileProjection(
sharedProjection);
self._needToFetch = new LocalCollection._IdMap;
self._currentlyFetching = new LocalCollection._IdMap;
self._writesToCommitWhenWeReachSteady = [];
forEachTrigger(self._cursorDescription, function (trigger) {
self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry(
trigger, function (notification) {
var op = notification.op;
if (op.op === 'c') {
// XXX actually, drop collection needs to be handled by doing a
// re-query
self._published.forEach(function (fields, id) {
self._remove(id);
});
} else {
// All other operators should be handled depending on phase
if (self._phase === PHASE.INITIALIZING)
self._handleOplogEntryInitializing(op);
else
self._handleOplogEntrySteadyOrFetching(op);
}
}
));
});
// XXX ordering w.r.t. everything else?
self._stopHandles.push(listenAll(
self._cursorDescription, function (notification, complete) {
// If we're not in a write fence, we don't have to do anything.
var fence = DDPServer._CurrentWriteFence.get();
if (!fence) {
complete();
return;
}
var write = fence.beginWrite();
// This write cannot complete until we've caught up to "this point" in the
// oplog, and then made it back to the steady state.
Meteor.defer(complete);
self._mongoHandle._oplogHandle.waitUntilCaughtUp();
if (self._stopped) {
// We're stopped, so just immediately commit.
write.committed();
} else if (self._phase === PHASE.STEADY) {
// Make sure that all of the callbacks have made it through the
// multiplexer and been delivered to ObserveHandles before committing
// writes.
self._multiplexer.onFlush(function () {
write.committed();
});
} else {
self._writesToCommitWhenWeReachSteady.push(write);
}
}
));
// Give _observeChanges a chance to add the new ObserveHandle to our
// multiplexer, so that the added calls get streamed.
Meteor.defer(function () {
self._runInitialQuery();
});
};
_.extend(OplogObserveDriver.prototype, {
_add: function (doc) {
var self = this;
var id = doc._id;
var fields = _.clone(doc);
delete fields._id;
if (self._published.has(id))
throw Error("tried to add something already published " + id);
self._published.set(id, self._sharedProjectionFn(fields));
self._multiplexer.added(id, self._projectionFn(fields));
},
_remove: function (id) {
var self = this;
if (!self._published.has(id))
throw Error("tried to remove something unpublished " + id);
self._published.remove(id);
self._multiplexer.removed(id);
},
_handleDoc: function (id, newDoc) {
var self = this;
newDoc = _.clone(newDoc);
var matchesNow = newDoc && self._selectorFn(newDoc);
var matchedBefore = self._published.has(id);
if (matchesNow && !matchedBefore) {
self._add(newDoc);
} else if (matchedBefore && !matchesNow) {
self._remove(id);
} else if (matchesNow) {
var oldDoc = self._published.get(id);
if (!oldDoc)
throw Error("thought that " + id + " was there!");
delete newDoc._id;
self._published.set(id, self._sharedProjectionFn(newDoc));
var changed = LocalCollection._makeChangedFields(_.clone(newDoc), oldDoc);
changed = self._projectionFn(changed);
if (!_.isEmpty(changed))
self._multiplexer.changed(id, changed);
}
},
_fetchModifiedDocuments: function () {
var self = this;
self._phase = PHASE.FETCHING;
while (!self._stopped && !self._needToFetch.empty()) {
if (self._phase !== PHASE.FETCHING)
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
self._currentlyFetching = self._needToFetch;
self._needToFetch = new LocalCollection._IdMap;
var waiting = 0;
var error = null;
var fut = new Future;
Fiber(function () {
self._currentlyFetching.forEach(function (cacheKey, id) {
// currentlyFetching will not be updated during this loop.
waiting++;
self._mongoHandle._docFetcher.fetch(
self._cursorDescription.collectionName, id, cacheKey,
function (err, doc) {
if (err) {
if (!error)
error = err;
} else if (!self._stopped) {
self._handleDoc(id, doc);
}
waiting--;
if (waiting == 0)
fut.return();
});
});
}).run();
fut.wait();
if (error)
throw error;
self._currentlyFetching = new LocalCollection._IdMap;
}
self._beSteady();
},
_beSteady: function () {
var self = this;
self._phase = PHASE.STEADY;
var writes = self._writesToCommitWhenWeReachSteady;
self._writesToCommitWhenWeReachSteady = [];
self._multiplexer.onFlush(function () {
_.each(writes, function (w) {
w.committed();
});
});
},
_handleOplogEntryInitializing: function (op) {
var self = this;
self._needToFetch.set(idForOp(op), op.ts.toString());
},
_handleOplogEntrySteadyOrFetching: function (op) {
var self = this;
var id = idForOp(op);
// If we're already fetching this one, or about to, we can't optimize; make
// sure that we fetch it again if necessary.
if (self._currentlyFetching.has(id) || self._needToFetch.has(id)) {
if (self._phase !== PHASE.FETCHING)
throw Error("map not empty during steady phase");
self._needToFetch.set(id, op.ts.toString());
return;
}
if (op.op === 'd') {
if (self._published.has(id))
self._remove(id);
} else if (op.op === 'i') {
if (self._published.has(id))
throw new Error("insert found for already-existing ID");
// XXX what if selector yields? for now it can't but later it could have
// $where
if (self._selectorFn(op.o))
self._add(op.o);
} else if (op.op === 'u') {
// Is this a modifier ($set/$unset, which may require us to poll the
// database to figure out if the whole document matches the selector) or a
// replacement (in which case we can just directly re-evaluate the
// selector)?
var isReplace = !_.has(op.o, '$set') && !_.has(op.o, '$unset');
if (isReplace) {
self._handleDoc(id, _.extend({_id: id}, op.o));
} else if (self._published.has(id)) {
// Oh great, we actually know what the document is, so we can apply
// this directly.
var newDoc = EJSON.clone(self._published.get(id));
newDoc._id = id;
LocalCollection._modify(newDoc, op.o);
self._handleDoc(id, self._sharedProjectionFn(newDoc));
} else if (LocalCollection._canSelectorBecomeTrueByModifier(
self._cursorDescription.selector, op.o)) {
self._needToFetch.set(id, op.ts.toString());
if (self._phase === PHASE.STEADY)
self._fetchModifiedDocuments();
}
} else {
throw Error("XXX SURPRISING OPERATION: " + op);
}
},
_runInitialQuery: function () {
var self = this;
if (self._stopped)
throw new Error("oplog stopped surprisingly early");
var initialCursor = new Cursor(self._mongoHandle, self._cursorDescription);
initialCursor.forEach(function (initialDoc) {
self._add(initialDoc);
});
if (self._stopped)
throw new Error("oplog stopped quite early");
// Allow observeChanges calls to return. (After this, it's possible for
// stop() to be called.)
self._multiplexer.ready();
if (self._stopped)
return;
self._mongoHandle._oplogHandle.waitUntilCaughtUp();
if (self._stopped)
return;
if (self._phase !== PHASE.INITIALIZING)
throw Error("Phase unexpectedly " + self._phase);
if (self._needToFetch.empty()) {
self._beSteady();
} else {
self._fetchModifiedDocuments();
}
},
// This stop function is invoked from the onStop of the ObserveMultiplexer, so
// it shouldn't actually be possible to call it until the multiplexer is
// ready.
stop: function () {
var self = this;
if (self._stopped)
return;
self._stopped = true;
_.each(self._stopHandles, function (handle) {
handle.stop();
});
// Note: we *don't* use multiplexer.onFlush here because this stop
// callback is actually invoked by the multiplexer itself when it has
// determined that there are no handles left. So nothing is actually going
// to get flushed (and it's probably not valid to call methods on the
// dying multiplexer).
_.each(self._writesToCommitWhenWeReachSteady, function (w) {
w.committed();
});
self._writesToCommitWhenWeReachSteady = null;
// Proactively drop references to potentially big things.
self._published = null;
self._needToFetch = null;
self._currentlyFetching = null;
self._oplogEntryHandle = null;
self._listenersHandle = null;
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "oplog-observers", -1);
}
});
// Does our oplog tailing code support this cursor? For now, we are being very
// conservative and allowing only simple queries with simple options.
// (This is a "static method".)
OplogObserveDriver.cursorSupported = function (cursorDescription) {
// First, check the options.
var options = cursorDescription.options;
// Did the user say no explicitly?
if (options._disableOplog)
return false;
// This option (which are mostly used for sorted cursors) require us to figure
// out where a given document fits in an order to know if it's included or
// not, and we don't track that information when doing oplog tailing.
if (options.limit || options.skip) return false;
// If a fields projection option is given check if it is supported by
// minimongo (some operators are not supported).
if (options.fields) {
try {
LocalCollection._checkSupportedProjection(options.fields);
} catch (e) {
if (e.name === "MinimongoError")
return false;
else
throw e;
}
}
// For now, we're just dealing with equality queries: no $operators, regexps,
// or $and/$or/$where/etc clauses. We can expand the scope of what we're
// comfortable processing later. ($where will get pretty scary since it will
// allow selector processing to yield!)
return _.all(cursorDescription.selector, function (value, field) {
// No logical operators like $and.
if (field.substr(0, 1) === '$')
return false;
// We only allow scalars, not sub-documents or $operators or RegExp.
// XXX Date would be easy too, though I doubt anyone is doing equality
// lookups on dates
return typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean" ||
value === null ||
value instanceof Meteor.Collection.ObjectID;
});
};
MongoTest.OplogObserveDriver = OplogObserveDriver;

View File

@@ -0,0 +1,235 @@
var Future = Npm.require('fibers/future');
var OPLOG_COLLECTION = 'oplog.rs';
// Like Perl's quotemeta: quotes all regexp metacharacters. See
// https://github.com/substack/quotemeta/blob/master/index.js
// XXX this is duplicated with accounts_server.js
var quotemeta = function (str) {
return String(str).replace(/(\W)/g, '\\$1');
};
var showTS = function (ts) {
return "Timestamp(" + ts.getHighBits() + ", " + ts.getLowBits() + ")";
};
idForOp = function (op) {
if (op.op === 'd')
return op.o._id;
else if (op.op === 'i')
return op.o._id;
else if (op.op === 'u')
return op.o2._id;
else if (op.op === 'c')
throw Error("Operator 'c' doesn't supply an object with id: " +
EJSON.stringify(op));
else
throw Error("Unknown op: " + EJSON.stringify(op));
};
OplogHandle = function (oplogUrl, dbNameFuture) {
var self = this;
self._oplogUrl = oplogUrl;
self._dbNameFuture = dbNameFuture;
self._oplogLastEntryConnection = null;
self._oplogTailConnection = null;
self._stopped = false;
self._tailHandle = null;
self._readyFuture = new Future();
self._crossbar = new DDPServer._Crossbar({
factPackage: "mongo-livedata", factName: "oplog-watchers"
});
self._lastProcessedTS = null;
// Lazily calculate the basic selector. Don't call _baseOplogSelector() at the
// top level of the constructor, because we don't want the constructor to
// block. Note that the _.once is per-handle.
self._baseOplogSelector = _.once(function () {
return {
ns: new RegExp('^' + quotemeta(self._dbNameFuture.wait()) + '\\.'),
$or: [
{ op: {$in: ['i', 'u', 'd']} },
// If it is not db.collection.drop(), ignore it
{ op: 'c', 'o.drop': { $exists: true } }]
};
});
// XXX doc
self._catchingUpFutures = [];
// Setting up the connections and tail handler is a blocking operation, so we
// do it "later".
Meteor.defer(function () {
self._startTailing();
});
};
_.extend(OplogHandle.prototype, {
stop: function () {
var self = this;
if (self._stopped)
return;
self._stopped = true;
if (self._tailHandle)
self._tailHandle.stop();
// XXX should close connections too
},
onOplogEntry: function (trigger, callback) {
var self = this;
if (self._stopped)
throw new Error("Called onOplogEntry on stopped handle!");
// Calling onOplogEntry requires us to wait for the tailing to be ready.
self._readyFuture.wait();
var originalCallback = callback;
callback = Meteor.bindEnvironment(function (notification, onComplete) {
// XXX can we avoid this clone by making oplog.js careful?
try {
originalCallback(EJSON.clone(notification));
} finally {
onComplete();
}
}, function (err) {
Meteor._debug("Error in oplog callback", err.stack);
});
var listenHandle = self._crossbar.listen(trigger, callback);
return {
stop: function () {
listenHandle.stop();
}
};
},
// Calls `callback` once the oplog has been processed up to a point that is
// roughly "now": specifically, once we've processed all ops that are
// currently visible.
// XXX become convinced that this is actually safe even if oplogConnection
// is some kind of pool
waitUntilCaughtUp: function () {
var self = this;
if (self._stopped)
throw new Error("Called waitUntilCaughtUp on stopped handle!");
// Calling waitUntilCaughtUp requries us to wait for the oplog connection to
// be ready.
self._readyFuture.wait();
// We need to make the selector at least as restrictive as the actual
// tailing selector (ie, we need to specify the DB name) or else we might
// find a TS that won't show up in the actual tail stream.
var lastEntry = self._oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, self._baseOplogSelector(),
{fields: {ts: 1}, sort: {$natural: -1}});
if (!lastEntry) {
// Really, nothing in the oplog? Well, we've processed everything.
return;
}
var ts = lastEntry.ts;
if (!ts)
throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry));
if (self._lastProcessedTS && ts.lessThanOrEqual(self._lastProcessedTS)) {
// We've already caught up to here.
return;
}
var insertAfter = self._catchingUpFutures.length;
while (insertAfter - 1 > 0
&& self._catchingUpFutures[insertAfter - 1].ts.greaterThan(ts)) {
insertAfter--;
}
// XXX this can occur if we fail over from one primary to another. so this
// check needs to be removed before we merge oplog. that said, it has been
// helpful so far at proving that we are properly using poolSize 1. Also, we
// could keep something like it if we could actually detect failover; see
// https://github.com/mongodb/node-mongodb-native/issues/1120
if (insertAfter !== self._catchingUpFutures.length) {
throw Error("found misordered oplog: "
+ showTS(_.last(self._catchingUpFutures).ts) + " vs "
+ showTS(ts));
}
var f = new Future;
self._catchingUpFutures.splice(insertAfter, 0, {ts: ts, future: f});
f.wait();
},
_startTailing: function () {
var self = this;
// We make two separate connections to Mongo. The Node Mongo driver
// implements a naive round-robin connection pool: each "connection" is a
// pool of several (5 by default) TCP connections, and each request is
// rotated through the pools. Tailable cursor queries block on the server
// until there is some data to return (or until a few seconds have
// passed). So if the connection pool used for tailing cursors is the same
// pool used for other queries, the other queries will be delayed by seconds
// 1/5 of the time.
//
// The tail connection will only ever be running a single tail command, so
// it only needs to make one underlying TCP connection.
self._oplogTailConnection = new MongoConnection(
self._oplogUrl, {poolSize: 1});
// XXX better docs, but: it's to get monotonic results
// XXX is it safe to say "if there's an in flight query, just use its
// results"? I don't think so but should consider that
self._oplogLastEntryConnection = new MongoConnection(
self._oplogUrl, {poolSize: 1});
// Find the last oplog entry. Blocks until the connection is ready.
var lastOplogEntry = self._oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, {}, {sort: {$natural: -1}});
var dbName = self._dbNameFuture.wait();
var oplogSelector = _.clone(self._baseOplogSelector());
if (lastOplogEntry) {
// Start after the last entry that currently exists.
oplogSelector.ts = {$gt: lastOplogEntry.ts};
// If there are any calls to callWhenProcessedLatest before any other
// oplog entries show up, allow callWhenProcessedLatest to call its
// callback immediately.
self._lastProcessedTS = lastOplogEntry.ts;
}
var cursorDescription = new CursorDescription(
OPLOG_COLLECTION, oplogSelector, {tailable: true});
self._tailHandle = self._oplogTailConnection.tail(
cursorDescription, function (doc) {
if (!(doc.ns && doc.ns.length > dbName.length + 1 &&
doc.ns.substr(0, dbName.length + 1) === (dbName + '.')))
throw new Error("Unexpected ns");
var trigger = {collection: doc.ns.substr(dbName.length + 1),
dropCollection: false,
op: doc};
// Is it a special command and the collection name is hidden somewhere
// in operator?
if (trigger.collection === "$cmd") {
trigger.collection = doc.o.drop;
trigger.dropCollection = true;
trigger.id = null;
} else {
// All other ops have an id.
trigger.id = idForOp(doc);
}
var f = new Future;
self._crossbar.fire(trigger, f.resolver());
f.wait();
// Now that we've processed this operation, process pending sequencers.
if (!doc.ts)
throw Error("oplog entry without ts: " + EJSON.stringify(doc));
self._lastProcessedTS = doc.ts;
while (!_.isEmpty(self._catchingUpFutures)
&& self._catchingUpFutures[0].ts.lessThanOrEqual(
self._lastProcessedTS)) {
var sequencer = self._catchingUpFutures.shift();
sequencer.future.return();
}
});
self._readyFuture.return();
}
});

View File

@@ -0,0 +1,32 @@
var OplogCollection = new Meteor.Collection("oplog-" + Random.id());
Tinytest.add("mongo-livedata - oplog - cursorSupported", function (test) {
var supported = function (expected, selector) {
var cursor = OplogCollection.find(selector);
test.equal(
MongoTest.OplogObserveDriver.cursorSupported(cursor._cursorDescription),
expected);
};
supported(true, "asdf");
supported(true, 1234);
supported(true, new Meteor.Collection.ObjectID());
supported(true, {_id: "asdf"});
supported(true, {_id: 1234});
supported(true, {_id: new Meteor.Collection.ObjectID()});
supported(true, {foo: "asdf",
bar: 1234,
baz: new Meteor.Collection.ObjectID(),
eeney: true,
miney: false,
moe: null});
supported(true, {});
supported(false, {$and: [{foo: "asdf"}, {bar: "baz"}]});
supported(false, {foo: {x: 1}});
supported(false, {foo: {$gt: 1}});
supported(false, {foo: [1, 2, 3]});
});

View File

@@ -12,7 +12,11 @@ Package.describe({
internal: true
});
Npm.depends({mongodb: "1.3.19"});
Npm.depends({
// 1.3.19, plus a patch to add oplogReplay flag:
// https://github.com/mongodb/node-mongodb-native/pull/1108
mongodb: "https://github.com/meteor/node-mongodb-native/tarball/779bbac916a751f305d84c727a6cc7dfddab7924"
});
Package.on_use(function (api) {
api.use(['random', 'ejson', 'json', 'underscore', 'minimongo', 'logging',
@@ -26,6 +30,11 @@ Package.on_use(function (api) {
// Allow us to detect 'autopublish', and publish collections if it's loaded.
api.use('autopublish', 'server', {weak: true});
// Allow us to detect 'disable-oplog', which turns off oplog tailing for your
// app even if it's configured in the environment. (This package will be
// probably be removed before 1.0.)
api.use('disable-oplog', 'server', {weak: true});
// defaultRemoteCollectionDriver gets its deployConfig from something that is
// (for questionable reasons) initialized by the webapp package.
api.use('webapp', 'server', {weak: true});
@@ -35,8 +44,13 @@ Package.on_use(function (api) {
// Stuff that should be exposed via a real API, but we haven't yet.
api.export('MongoInternals', 'server');
// For tests only.
api.export('MongoTest', 'server', {testOnly: true});
api.add_files('mongo_driver.js', 'server');
api.add_files(['mongo_driver.js', 'oplog_tailing.js',
'observe_multiplex.js', 'doc_fetcher.js',
'polling_observe_driver.js','oplog_observe_driver.js'],
'server');
api.add_files('local_collection_driver.js', ['client', 'server']);
api.add_files('remote_collection_driver.js', 'server');
api.add_files('collection.js', ['client', 'server']);
@@ -53,4 +67,6 @@ Package.on_test(function (api) {
api.add_files('allow_tests.js', ['client', 'server']);
api.add_files('collection_tests.js', ['client', 'server']);
api.add_files('observe_changes_tests.js', ['client', 'server']);
api.add_files('oplog_tests.js', 'server');
api.add_files('doc_fetcher_tests.js', 'server');
});

View File

@@ -0,0 +1,179 @@
PollingObserveDriver = function (options) {
var self = this;
self._cursorDescription = options.cursorDescription;
self._mongoHandle = options.mongoHandle;
self._ordered = options.ordered;
self._multiplexer = options.multiplexer;
self._stopCallbacks = [];
self._stopped = false;
self._synchronousCursor = self._mongoHandle._createSynchronousCursor(
self._cursorDescription);
// previous results snapshot. on each poll cycle, diffs against
// results drives the callbacks.
self._results = null;
// The number of _pollMongo calls that have been added to self._taskQueue but
// have not started running. Used to make sure we never schedule more than one
// _pollMongo (other than possibly the one that is currently running). It's
// also used by _suspendPolling to pretend there's a poll scheduled. Usually,
// it's either 0 (for "no polls scheduled other than maybe one currently
// running") or 1 (for "a poll scheduled that isn't running yet"), but it can
// also be 2 if incremented by _suspendPolling.
self._pollsScheduledButNotStarted = 0;
self._pendingWrites = []; // people to notify when polling completes
// Make sure to create a separately throttled function for each
// PollingObserveDriver object.
self._ensurePollIsScheduled = _.throttle(
self._unthrottledEnsurePollIsScheduled, 50 /* ms */);
// XXX figure out if we still need a queue
self._taskQueue = new Meteor._SynchronousQueue();
var listenersHandle = listenAll(
self._cursorDescription, function (notification, complete) {
// When someone does a transaction that might affect us, schedule a poll
// of the database. If that transaction happens inside of a write fence,
// block the fence until we've polled and notified observers.
var fence = DDPServer._CurrentWriteFence.get();
if (fence)
self._pendingWrites.push(fence.beginWrite());
// Ensure a poll is scheduled... but if we already know that one is,
// don't hit the throttled _ensurePollIsScheduled function (which might
// lead to us calling it unnecessarily in 50ms).
if (self._pollsScheduledButNotStarted === 0)
self._ensurePollIsScheduled();
complete();
}
);
self._stopCallbacks.push(function () { listenersHandle.stop(); });
// every once and a while, poll even if we don't think we're dirty, for
// eventual consistency with database writes from outside the Meteor
// universe.
//
// For testing, there's an undocumented callback argument to observeChanges
// which disables time-based polling and gets called at the beginning of each
// poll.
if (options._testOnlyPollCallback) {
self._testOnlyPollCallback = options._testOnlyPollCallback;
} else {
var intervalHandle = Meteor.setInterval(
_.bind(self._ensurePollIsScheduled, self), 10 * 1000);
self._stopCallbacks.push(function () {
Meteor.clearInterval(intervalHandle);
});
}
// Make sure we actually poll soon!
self._unthrottledEnsurePollIsScheduled();
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "mongo-pollsters", 1);
};
_.extend(PollingObserveDriver.prototype, {
// This is always called through _.throttle (except once at startup).
_unthrottledEnsurePollIsScheduled: function () {
var self = this;
if (self._pollsScheduledButNotStarted > 0)
return;
++self._pollsScheduledButNotStarted;
self._taskQueue.queueTask(function () {
self._pollMongo();
});
},
// test-only interface for controlling polling.
//
// _suspendPolling blocks until any currently running and scheduled polls are
// done, and prevents any further polls from being scheduled. (new
// ObserveHandles can be added and receive their initial added callbacks,
// though.)
//
// _resumePolling immediately polls, and allows further polls to occur.
_suspendPolling: function() {
var self = this;
// Pretend that there's another poll scheduled (which will prevent
// _ensurePollIsScheduled from queueing any more polls).
++self._pollsScheduledButNotStarted;
// Now block until all currently running or scheduled polls are done.
self._taskQueue.runTask(function() {});
// Confirm that there is only one "poll" (the fake one we're pretending to
// have) scheduled.
if (self._pollsScheduledButNotStarted !== 1)
throw new Error("_pollsScheduledButNotStarted is " +
self._pollsScheduledButNotStarted);
},
_resumePolling: function() {
var self = this;
// We should be in the same state as in the end of _suspendPolling.
if (self._pollsScheduledButNotStarted !== 1)
throw new Error("_pollsScheduledButNotStarted is " +
self._pollsScheduledButNotStarted);
// Run a poll synchronously (which will counteract the
// ++_pollsScheduledButNotStarted from _suspendPolling).
self._taskQueue.runTask(function () {
self._pollMongo();
});
},
_pollMongo: function () {
var self = this;
--self._pollsScheduledButNotStarted;
var first = false;
if (!self._results) {
first = true;
// XXX maybe use _IdMap/OrderedDict instead?
self._results = self._ordered ? [] : {};
}
self._testOnlyPollCallback && self._testOnlyPollCallback();
// Save the list of pending writes which this round will commit.
var writesForCycle = self._pendingWrites;
self._pendingWrites = [];
// Get the new query results. (These calls can yield.)
if (!first)
self._synchronousCursor.rewind();
var newResults = self._synchronousCursor.getRawObjects(self._ordered);
var oldResults = self._results;
// Run diffs. (This can yield too.)
if (!self._stopped) {
LocalCollection._diffQueryChanges(
self._ordered, oldResults, newResults, self._multiplexer);
}
// Replace self._results atomically.
self._results = newResults;
// Signals the multiplexer to call all initial adds.
if (first)
self._multiplexer.ready();
// Once the ObserveMultiplexer has processed everything we've done in this
// round, mark all the writes which existed before this call as
// commmitted. (If new writes have shown up in the meantime, there'll
// already be another _pollMongo task scheduled.)
self._multiplexer.onFlush(function () {
_.each(writesForCycle, function (w) {
w.committed();
});
});
},
stop: function () {
var self = this;
self._stopped = true;
_.each(self._stopCallbacks, function (c) { c(); });
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "mongo-pollsters", -1);
}
});

View File

@@ -1,6 +1,7 @@
MongoInternals.RemoteCollectionDriver = function (mongo_url) {
MongoInternals.RemoteCollectionDriver = function (
mongo_url, options) {
var self = this;
self.mongo = new MongoConnection(mongo_url);
self.mongo = new MongoConnection(mongo_url, options);
};
_.extend(MongoInternals.RemoteCollectionDriver.prototype, {
@@ -23,14 +24,21 @@ _.extend(MongoInternals.RemoteCollectionDriver.prototype, {
// you're only trying to receive data from a remote DDP server.)
MongoInternals.defaultRemoteCollectionDriver = _.once(function () {
var mongoUrl;
var connectionOptions = {};
AppConfig.configurePackage("mongo-livedata", function (config) {
// This will keep running if mongo gets reconfigured. That's not ideal, but
// should be ok for now.
mongoUrl = config.url;
if (config.oplog)
connectionOptions.oplogUrl = config.oplog;
});
// XXX bad error since it could also be set directly in METEOR_DEPLOY_CONFIG
if (! mongoUrl)
throw new Error("MONGO_URL must be set in environment");
return new MongoInternals.RemoteCollectionDriver(mongoUrl);
return new MongoInternals.RemoteCollectionDriver(mongoUrl, connectionOptions);
});

View File

@@ -41,7 +41,9 @@ elif [ "$METEOR_WAREHOUSE_DIR" ]; then
INSTALLED_METEOR=t
export METEOR_TEST_NO_SPRINGBOARD=t
if [ -z "$TEST_RELEASE" ]; then
TEST_RELEASE="0.6.5-rc12"
# We need a release whose mongo-livedata exports
# MongoInternals.NpmModule.
TEST_RELEASE="oplog-alpha1"
fi
METEOR="$METEOR --release=$TEST_RELEASE" # some random non-official release

View File

@@ -1120,6 +1120,10 @@ Fiber(function () {
.boolean('production')
.describe('production', 'Run in production mode. Minify and bundle CSS and JS files.')
.boolean('once') // See #Once
// To ensure that QA covers both PollingObserveDriver and
// OplogObserveDriver, this option disables oplog for tests.
// (It still creates a replset, it just doesn't do oplog tailing.)
.boolean('disable-oplog')
.describe('settings', 'Set optional data for Meteor.settings on the server')
.usage(
"Usage: meteor test-packages [--release <release>] [options] [package...]\n" +
@@ -1201,6 +1205,7 @@ Fiber(function () {
port: argv.port,
minify: argv.production,
once: argv.once,
disableOplog: argv['disable-oplog'],
testPackages: testPackages,
settingsFile: argv.settings,
banner: "Tests"

View File

@@ -4,7 +4,8 @@ var path = require("path");
var files = require('./files.js');
var _ = require('underscore');
var unipackage = require('./unipackage.js');
var Fiber = require('fibers');
/** Internal.
*
@@ -24,7 +25,7 @@ var find_mongo_pids = function (app_dir, port, callback) {
_.each(stdout.split('\n'), function (ps_line) {
// matches mongos we start.
var m = ps_line.match(/^\s*(\d+).+mongod .+--port (\d+) --dbpath (.+)(?:\/|\\)\.meteor(?:\/|\\)local(?:\/|\\)db\s*$/);
var m = ps_line.match(/^\s*(\d+).+mongod .+--port (\d+) --dbpath (.+)(?:\/|\\)\.meteor(?:\/|\\)local(?:\/|\\)db --replSet /);
if (m && m.length === 4) {
var found_pid = parseInt(m[1]);
var found_port = parseInt(m[2]);
@@ -125,10 +126,10 @@ var find_mongo_and_kill_it_dead = function (port, callback) {
});
};
exports.launch_mongo = function (app_dir, port, launch_callback, on_exit_callback) {
exports.launchMongo = function (options) {
var handle = {stop: function (callback) { callback(); } };
launch_callback = launch_callback || function () {};
on_exit_callback = on_exit_callback || function () {};
var onListen = options.onListen || function () {};
var onExit = options.onExit || function () {};
// If we are passed an external mongo, assume it is launched and never
// exits. Matches code in run.js:exports.run.
@@ -136,7 +137,7 @@ exports.launch_mongo = function (app_dir, port, launch_callback, on_exit_callbac
// Since it is externally managed, asking it to actually stop would be
// impolite, so our stoppable handle is a noop
if (process.env.MONGO_URL) {
launch_callback();
onListen();
return handle;
}
@@ -146,51 +147,129 @@ exports.launch_mongo = function (app_dir, port, launch_callback, on_exit_callbac
'mongod');
// store data in app_dir
var data_path = path.join(app_dir, '.meteor', 'local', 'db');
files.mkdir_p(data_path, 0755);
var dbPath = path.join(options.context.appDir, '.meteor', 'local', 'db');
files.mkdir_p(dbPath, 0755);
// add .gitignore if needed.
files.add_to_gitignore(path.join(app_dir, '.meteor'), 'local');
files.add_to_gitignore(path.join(options.context.appDir, '.meteor'), 'local');
find_mongo_and_kill_it_dead(port, function (err) {
if (err) {
launch_callback({reason: "Can't kill running mongo: " + err.reason});
return;
}
find_mongo_and_kill_it_dead(options.port, function (err) {
Fiber(function (){
if (err) {
// XXX this was being passed to onListen and ignored before. should do
// something better.
throw {reason: "Can't kill running mongo: " + err.reason};
}
var child_process = require('child_process');
var proc = child_process.spawn(mongod_path, [
'--bind_ip', '127.0.0.1',
'--smallfiles',
'--nohttpinterface',
'--port', port,
'--dbpath', data_path
]);
var callOnExit = function (code, signal) {
on_exit_callback(code, signal, stderrOutput);
};
handle.stop = function (callback) {
var tries = 0;
var exited = false;
proc.removeListener('exit', callOnExit);
proc.kill('SIGINT');
callback && callback(err);
};
var portFile = path.join(dbPath, 'METEOR-PORT');
var createReplSet = true;
try {
createReplSet = +(fs.readFileSync(portFile)) !== options.port;
} catch (e) {
if (!e || e.code !== 'ENOENT')
throw e;
}
var stderrOutput = '';
// If this is the first time we're using this DB, or we changed port since
// the last time, then we want to destroying any existing replSet
// configuration and create a new one. First we delete the "local" database
// if it exists. (It's a pain and slow to change the port in an existing
// replSet configuration. It's also a little slow to initiate a new replSet,
// thus the attempt to not do it unless the port changes.)
if (createReplSet) {
try {
var dbFiles = fs.readdirSync(dbPath);
} catch (e) {
if (!e || e.code !== 'ENOENT')
throw e;
}
_.each(dbFiles, function (dbFile) {
if (/^local\./.test(dbFile))
fs.unlinkSync(path.join(dbPath, dbFile));
});
proc.stderr.setEncoding('utf8');
proc.stderr.on('data', function (data) {
stderrOutput += data;
});
// Load mongo-livedata so we'll be able to talk to it.
var mongoNpmModule = unipackage.load({
library: options.context.library,
packages: [ 'mongo-livedata' ],
release: options.context.releaseVersion
})['mongo-livedata'].MongoInternals.NpmModule;
}
proc.on('exit', callOnExit);
// Start mongod with a dummy replSet and wait for it to listen.
var child_process = require('child_process');
var replSetName = 'meteor';
var proc = child_process.spawn(mongod_path, [
// nb: cli-test.sh and find_mongo_pids assume that the next four arguments
// exist in this order without anything in between
'--bind_ip', '127.0.0.1',
'--smallfiles',
'--nohttpinterface',
'--port', options.port,
'--dbpath', dbPath,
'--replSet', replSetName
]);
proc.stdout.setEncoding('utf8');
proc.stdout.on('data', function (data) {
// process.stdout.write(data);
if (/ \[initandlisten\] waiting for connections on port/.test(data))
launch_callback();
});
var stderrOutput = '';
proc.stderr.setEncoding('utf8');
proc.stderr.on('data', function (data) {
stderrOutput += data;
});
var callOnExit = function (code, signal) {
onExit(code, signal, stderrOutput);
};
proc.on('exit', callOnExit);
handle.stop = function (callback) {
var tries = 0;
var exited = false;
proc.removeListener('exit', callOnExit);
proc.kill('SIGINT');
callback && callback(err);
};
proc.stdout.setEncoding('utf8');
var listening = false;
var replSetReady = false;
var maybeCallOnListen = function () {
if (listening && replSetReady) {
if (createReplSet)
fs.writeFileSync(portFile, options.port);
onListen();
}
};
proc.stdout.on('data', function (data) {
if (/ \[initandlisten\] waiting for connections on port/.test(data)) {
if (createReplSet) {
// Connect to it and start a replset.
var db = new mongoNpmModule.Db(
'meteor', new mongoNpmModule.Server('127.0.0.1', options.port),
{safe: true});
db.open(function(err, db) {
if (err)
throw err;
db.admin().command({
replSetInitiate: {
_id: replSetName,
members: [{_id : 0, host: '127.0.0.1:' + options.port}]
}
}, function (err, result) {
if (err)
throw err;
db.close(true);
});
});
}
listening = true;
maybeCallOnListen();
}
if (/ \[rsMgr\] replSet PRIMARY/.test(data)) {
replSetReady = true;
maybeCallOnListen();
}
});
}).run();
});
return handle;
};

View File

@@ -20,6 +20,7 @@ var unipackage = require('./unipackage.js');
var _ = require('underscore');
var inFiber = require('./fiber-helpers.js').inFiber;
var Future = require('fibers/future');
var Fiber = require('fibers');
////////// Globals //////////
//XXX: Refactor to not have globals anymore?
@@ -242,6 +243,8 @@ var startServer = function (options) {
env.PORT = options.innerPort;
env.MONGO_URL = options.mongoUrl;
if (options.oplogUrl)
env.MONGO_OPLOG_URL = options.oplogUrl;
env.ROOT_URL = options.rootUrl;
if (options.settings)
env.METEOR_SETTINGS = options.settings;
@@ -412,6 +415,15 @@ exports.run = function (context, options) {
// Allow override and use of external mongo. Matches code in launch_mongo.
var mongoUrl = process.env.MONGO_URL ||
("mongodb://127.0.0.1:" + mongoPort + "/meteor");
// Allow people to specify an MONGO_OPLOG_URL override. If someone specifies a
// MONGO_URL but not an MONGO_OPLOG_URL, disable the oplog. If neither is
// specified, use the default internal mongo oplog.
var oplogUrl = undefined;
if (!options.disableOplog) {
oplogUrl = process.env.MONGO_OPLOG_URL ||
(process.env.MONGO_URL ? undefined
: "mongodb://127.0.0.1:" + mongoPort + "/local");
}
var firstRun = true;
var serverHandle;
@@ -564,6 +576,7 @@ exports.run = function (context, options) {
outerPort: outerPort,
innerPort: innerPort,
mongoUrl: mongoUrl,
oplogUrl: oplogUrl,
rootUrl: rootUrl,
library: context.library,
rawLogs: options.rawLogs,
@@ -598,55 +611,64 @@ exports.run = function (context, options) {
var mongoErrorTimer;
var mongoStartupPrintTimer;
var launch = function () {
Status.mongoHandle = mongo_runner.launch_mongo(
context.appDir,
mongoPort,
function () { // On Mongo startup complete
// don't print mongo startup is slow warning.
if (mongoStartupPrintTimer) {
clearTimeout(mongoStartupPrintTimer);
mongoStartupPrintTimer = null;
Fiber(function () {
Status.mongoHandle = mongo_runner.launchMongo({
context: context,
port: mongoPort,
onListen: function () { // On Mongo startup complete
// don't print mongo startup is slow warning.
if (mongoStartupPrintTimer) {
clearTimeout(mongoStartupPrintTimer);
mongoStartupPrintTimer = null;
}
restartServer();
},
onExit: function (code, signal, stderr) { // On Mongo dead
if (Status.shuttingDown) {
return;
}
// Print only last 20 lines of stderr.
stderr = stderr.split('\n').slice(-20).join('\n');
console.log(
stderr + "Unexpected mongo exit code " + code + ". Restarting.\n");
// if mongo dies 3 times with less than 5 seconds between each,
// declare it failed and die.
mongoErrorCount += 1;
if (mongoErrorCount >= 3) {
var explanation = mongoExitCodes.Codes[code];
console.log("Can't start mongod\n");
if (explanation)
console.log(explanation.longText);
if (explanation === mongoExitCodes.EXIT_NET_ERROR) {
console.log(
"\nCheck for other processes listening on port " + mongoPort +
"\nor other meteors running in the same project.");
}
if (!explanation && /GLIBC/i.test(stderr)) {
console.log(
"\nLooks like you are trying to run Meteor on an old Linux " +
"distribution. Meteor on Linux requires glibc version 2.9 " +
"or above. Try upgrading your distribution to the latest " +
"version.");
}
process.exit(1);
}
if (mongoErrorTimer)
clearTimeout(mongoErrorTimer);
mongoErrorTimer = setTimeout(function () {
mongoErrorCount = 0;
mongoErrorTimer = null;
}, 5000);
// Wait a sec to restart.
setTimeout(launch, 1000);
}
restartServer();
},
function (code, signal, stderr) { // On Mongo dead
if (Status.shuttingDown) {
return;
}
// Print only last 20 lines of stderr.
stderr = stderr.split('\n').slice(-20).join('\n');
console.log(stderr + "Unexpected mongo exit code " + code + ". Restarting.\n");
// if mongo dies 3 times with less than 5 seconds between each,
// declare it failed and die.
mongoErrorCount += 1;
if (mongoErrorCount >= 3) {
var explanation = mongoExitCodes.Codes[code];
console.log("Can't start mongod\n");
if (explanation)
console.log(explanation.longText);
if (explanation === mongoExitCodes.EXIT_NET_ERROR)
console.log("\nCheck for other processes listening on port " + mongoPort +
"\nor other meteors running in the same project.");
if (!explanation && /GLIBC/i.test(stderr))
console.log("\nLooks like you are trying to run Meteor on an old Linux " +
"distribution. Meteor on Linux only supports Linux with glibc " +
"version 2.9 and above. Try upgrading your distribution " +
"to the latest version.");
process.exit(1);
}
if (mongoErrorTimer)
clearTimeout(mongoErrorTimer);
mongoErrorTimer = setTimeout(function () {
mongoErrorCount = 0;
mongoErrorTimer = null;
}, 5000);
// Wait a sec to restart.
setTimeout(launch, 1000);
});
}).run();
};
startProxy(outerPort, innerPort, function () {