Merge branch 'devel' into shark

This commit is contained in:
Avital Oliver
2014-01-28 15:00:19 -08:00
9 changed files with 191 additions and 164 deletions

4
meteor
View File

@@ -1,6 +1,8 @@
#!/bin/bash
BUNDLE_VERSION=0.3.28
# danger will robinson! mother:config/download-dev-bundles.sh only goes up to
# 0.3.30!
BUNDLE_VERSION=0.3.29
# OS Check. Put here because here is where we download the precompiled
# bundles that are arch specific.

View File

@@ -6,71 +6,3 @@
Meteor._noYieldsAllowed = function (f) {
return f();
};
// An even simpler queue of tasks than the fiber-enabled one. This one just
// runs all the tasks when you call runTask or flush, synchronously.
//
Meteor._SynchronousQueue = function () {
var self = this;
self._tasks = [];
self._running = false;
};
_.extend(Meteor._SynchronousQueue.prototype, {
runTask: function (task) {
var self = this;
if (!self.safeToRunTask())
throw new Error("Could not synchronously run a task from a running task");
self._tasks.push(task);
var tasks = self._tasks;
self._tasks = [];
self._running = true;
try {
while (!_.isEmpty(tasks)) {
var t = tasks.shift();
try {
t();
} catch (e) {
if (_.isEmpty(tasks)) {
// this was the last task, that is, the one we're calling runTask
// for.
throw e;
} else {
Meteor._debug("Exception in queued task: " + e.stack);
}
}
}
} finally {
self._running = false;
}
},
queueTask: function (task) {
var self = this;
var wasEmpty = _.isEmpty(self._tasks);
self._tasks.push(task);
// Intentionally not using Meteor.setTimeout, because it doesn't like runing
// in stubs for now.
if (wasEmpty)
setTimeout(_.bind(self.flush, self), 0);
},
flush: function () {
var self = this;
self.runTask(function () {});
},
drain: function () {
var self = this;
if (!self.safeToRunTask())
return;
while (!_.isEmpty(self._tasks)) {
self.flush();
}
},
safeToRunTask: function () {
var self = this;
return !self._running;
}
});

View File

@@ -23,6 +23,7 @@ Package.on_use(function (api) {
api.add_files('errors.js', ['client', 'server']);
api.add_files('fiber_helpers.js', 'server');
api.add_files('fiber_stubs_client.js', 'client');
api.add_files('unyielding_queue.js');
api.add_files('startup_client.js', ['client']);
api.add_files('startup_server.js', ['server']);
api.add_files('debug.js', ['client', 'server']);

View File

@@ -0,0 +1,72 @@
// A simpler version of Meteor._SynchronousQueue with the same external
// interface. It runs on both client and server, unlike _SynchronousQueue which
// only runs on the server. When used on the server, tasks may not yield. This
// one just runs all the tasks when you call runTask or flush, synchronously.
// It itself also does not yield.
//
Meteor._UnyieldingQueue = function () {
var self = this;
self._tasks = [];
self._running = false;
};
_.extend(Meteor._UnyieldingQueue.prototype, {
runTask: function (task) {
var self = this;
if (!self.safeToRunTask())
throw new Error("Could not synchronously run a task from a running task");
self._tasks.push(task);
var tasks = self._tasks;
self._tasks = [];
self._running = true;
try {
while (!_.isEmpty(tasks)) {
var t = tasks.shift();
try {
Meteor._noYieldsAllowed(function () {
t();
});
} catch (e) {
if (_.isEmpty(tasks)) {
// this was the last task, that is, the one we're calling runTask
// for.
throw e;
} else {
Meteor._debug("Exception in queued task: " + e.stack);
}
}
}
} finally {
self._running = false;
}
},
queueTask: function (task) {
var self = this;
var wasEmpty = _.isEmpty(self._tasks);
self._tasks.push(task);
// Intentionally not using Meteor.setTimeout, because it doesn't like runing
// in stubs for now.
if (wasEmpty)
setTimeout(_.bind(self.flush, self), 0);
},
flush: function () {
var self = this;
self.runTask(function () {});
},
drain: function () {
var self = this;
if (!self.safeToRunTask())
return;
while (!_.isEmpty(self._tasks)) {
self.flush();
}
},
safeToRunTask: function () {
var self = this;
return !self._running;
}
});

View File

@@ -7,13 +7,34 @@
// ObserveHandle: the return value of a live query.
LocalCollection = function (name) {
LocalCollection = function (options) {
var self = this;
self.name = name;
options = options || {};
self.name = options.name;
// _id -> document (also containing id)
self._docs = new LocalCollection._IdMap;
self._observeQueue = new Meteor._SynchronousQueue();
// When writing to this collection, we batch all observeChanges callbacks
// until the end of the write, and run them at this point. On the server, we
// use a single SynchronousQueue to do so, so that we never deliver callbacks
// out of order even if other writes occur during a yield. On the client, or
// on the server if we promise that our callbacks will never yield via an
// undocumented option, we use the simpler UnyieldingQueue.
//
// (What is the _observeCallbacksWillNeverYield option for? In some cases, it
// can be nice (on the server) to be able to write to a LocalCollection
// without yielding (eg, in a _noYieldsAllowed block). It's necessary to
// provide non-yielding allow callbacks in that case, but just doing that
// wouldn't be good enough if we always used SynchronousQueue on the server,
// since it tends to yield in order to run even non-yielding callbacks.)
var queueClass;
if (Meteor._SynchronousQueue && !options._observeCallbacksWillNeverYield) {
queueClass = Meteor._SynchronousQueue;
} else {
queueClass = Meteor._UnyieldingQueue;
}
self._observeQueue = new queueClass();
self.next_qid = 1; // live query id generator
@@ -26,8 +47,8 @@ LocalCollection = function (name) {
// selector, sorter, (callbacks): functions
self.queries = {};
// null if not saving originals; an IdMap from id to original document value if
// saving originals. See comments before saveOriginals().
// null if not saving originals; an IdMap from id to original document value
// if saving originals. See comments before saveOriginals().
self._savedOriginals = null;
// True when observers are paused and we should not send callbacks.

View File

@@ -48,11 +48,19 @@ Sorter = function (spec) {
// min/max.)
//
// XXX This is actually wrong! In fact, the whole attempt to compile sort
// functions independently of selectors is wrong. In MongoDB, if you have
// documents {_id: 'x', a: [1, 10]} and {_id: 'y', a: [5, 15]},
// then C.find({}, {sort: {a: 1}}) puts x before y (1 comes before 5).
// But C.find({a: {$gt: 3}}, {sort: {a: 1}}) puts y before x (1 does not match
// the selector, and 5 comes before 10).
// functions independently of selectors is wrong. In MongoDB, if you have
// documents {_id: 'x', a: [1, 10]} and {_id: 'y', a: [5, 15]}, then
// C.find({}, {sort: {a: 1}}) puts x before y (1 comes before 5). But
// C.find({a: {$gt: 3}}, {sort: {a: 1}}) puts y before x (1 does not match
// the selector, and 5 comes before 10).
//
// The way this works is pretty subtle! For example, if the documents are
// instead {_id: 'x', a: [{x: 1}, {x: 10}]}) and
// {_id: 'y', a: [{x: 5}, {x: 15}]}),
// then C.find({'a.x': {$gt: 3}}, {sort: {'a.x': 1}}) and
// C.find({a: {$elemMatch: {x: {$gt: 3}}}}, {sort: {'a.x': 1}})
// both follow this rule (y before x). ie, you do have to apply this
// through $elemMatch.
var reduceValue = function (branchValues, findMin) {
// Expand any leaf arrays that we find, and ignore those arrays themselves.
branchValues = expandArraysInBranches(branchValues, true);

View File

@@ -5,7 +5,7 @@ LocalCollectionDriver = function () {
var ensureCollection = function (name, collections) {
if (!(name in collections))
collections[name] = new LocalCollection(name);
collections[name] = new LocalCollection({name: name});
return collections[name];
};

View File

@@ -30,13 +30,24 @@ OplogObserveDriver = function (options) {
self._registerPhaseChange(PHASE.QUERYING);
self._published = new LocalCollection._IdMap;
// A minimongo LocalCollection containing the docs that match the selector,
// and maybe more. It is guaranteed to contain all the fields needed for the
// selector and the projection, and may have other fields too. (In the future
// we may try to make this collection be shared between multiple
// OplogObserveDrivers, but not currently.)
self._collection =
new LocalCollection({_observeCallbacksWillNeverYield: true});
// XXX think about what all the options are
var minimongoCursor = self._collection.find(
self._cursorDescription.selector, self._cursorDescription.options);
self._stopHandles.push(minimongoCursor.observeChanges(self._multiplexer));
var selector = self._cursorDescription.selector;
self._matcher = options.matcher;
var projection = self._cursorDescription.options.fields || {};
self._projectionFn = LocalCollection._compileProjection(projection);
// Projection function, result of combining important fields for selector and
// existing fields projection
var projection = self._cursorDescription.options.fields || {};
self._sharedProjection = self._matcher.combineIntoProjection(projection);
self._sharedProjectionFn = LocalCollection._compileProjection(
self._sharedProjection);
@@ -109,47 +120,51 @@ OplogObserveDriver = function (options) {
_.extend(OplogObserveDriver.prototype, {
_add: function (doc) {
var self = this;
var id = doc._id;
var fields = _.clone(doc);
delete fields._id;
if (self._published.has(id))
throw Error("tried to add something already published " + id);
self._published.set(id, self._sharedProjectionFn(fields));
self._multiplexer.added(id, self._projectionFn(fields));
doc = self._sharedProjectionFn(doc);
// XXX does _sharedProjection always preserve id?
if (!_.has(doc, '_id'))
throw Error("Can't add doc without _id");
self._collection.insert(doc);
},
_remove: function (id) {
_remove: function (id, options) {
var self = this;
if (!self._published.has(id))
options = options || {};
var removed = self._collection.remove({_id: id});
if (options.mustExist && removed !== 1)
throw Error("tried to remove something unpublished " + id);
self._published.remove(id);
self._multiplexer.removed(id);
},
_handleDoc: function (id, newDoc, mustMatchNow) {
var self = this;
newDoc = _.clone(newDoc);
newDoc = _.clone(newDoc); // *shallow* clone
// XXX this is just about "matching selector", not about skip/limit
var matchesNow = newDoc && self._matcher.documentMatches(newDoc).result;
if (mustMatchNow && !matchesNow) {
throw Error("expected " + EJSON.stringify(newDoc) + " to match "
+ EJSON.stringify(self._cursorDescription));
}
var matchedBefore = self._published.has(id);
var inCollection = !!self._collection.find(id).count();
if (matchesNow && !matchedBefore) {
if (matchesNow && !inCollection) {
// It matches the selector and it isn't in our collection, so add it.
// XXX once we add skip/limit, this may not always send an added, and
// we may need to do some GC
self._add(newDoc);
} else if (matchedBefore && !matchesNow) {
self._remove(id);
} else if (inCollection && !matchesNow) {
// We remove this from the collection to achieve two goals: (a) causing
// the observeChanges to fire removed() and (b) saving memory. That said,
// it would be legitimate (if !!newDoc) to update the collection instead
// of removing, if we thought we might need this doc again soon.
self._remove(id, {mustExist: true});
} else if (matchesNow) {
var oldDoc = self._published.get(id);
if (!oldDoc)
throw Error("thought that " + id + " was there!");
delete newDoc._id;
self._published.set(id, self._sharedProjectionFn(newDoc));
var changed = LocalCollection._makeChangedFields(_.clone(newDoc), oldDoc);
changed = self._projectionFn(changed);
if (!_.isEmpty(changed))
self._multiplexer.changed(id, changed);
// Replace the doc inside our collection, which may trigger a changed
// callback.
newDoc = self._sharedProjectionFn(newDoc);
// XXX does _sharedProjection always preserve id?
if (!_.has(newDoc, '_id'))
throw Error("Can't add newDoc without _id");
self._collection.update(id, newDoc);
}
},
_fetchModifiedDocuments: function () {
@@ -233,10 +248,9 @@ _.extend(OplogObserveDriver.prototype, {
}
if (op.op === 'd') {
if (self._published.has(id))
self._remove(id);
self._remove(id);
} else if (op.op === 'i') {
if (self._published.has(id))
if (self._collection.find(id).count())
throw new Error("insert found for already-existing ID");
// XXX what if selector yields? for now it can't but later it could have
@@ -258,18 +272,24 @@ _.extend(OplogObserveDriver.prototype, {
if (isReplace) {
self._handleDoc(id, _.extend({_id: id}, op.o));
} else if (self._published.has(id) && canDirectlyModifyDoc) {
// Oh great, we actually know what the document is, so we can apply
// this directly.
var newDoc = EJSON.clone(self._published.get(id));
newDoc._id = id;
LocalCollection._modify(newDoc, op.o);
self._handleDoc(id, self._sharedProjectionFn(newDoc));
} else if (!canDirectlyModifyDoc ||
self._matcher.canBecomeTrueByModifier(op.o)) {
self._needToFetch.set(id, op.ts.toString());
if (self._phase === PHASE.STEADY)
self._fetchModifiedDocuments();
} else {
var newDoc = self._collection.findOne(id);
if (newDoc && canDirectlyModifyDoc) {
// Oh great, we actually know what the document is, so we can apply
// this directly.
// XXX just send the modifier to _collection.update? but then
// we don't necessarily get to GC
// We can avoid another deep clone here since the findOne above would
// return a copy anyways
LocalCollection._modify(newDoc, op.o);
self._handleDoc(id, newDoc);
} else if (!canDirectlyModifyDoc ||
self._matcher.canBecomeTrueByModifier(op.o)) {
self._needToFetch.set(id, op.ts.toString());
if (self._phase === PHASE.STEADY)
self._fetchModifiedDocuments();
}
}
} else {
throw Error("XXX SURPRISING OPERATION: " + op);
@@ -318,18 +338,19 @@ _.extend(OplogObserveDriver.prototype, {
self._currentlyFetching = null;
++self._fetchGeneration; // ignore any in-flight fetches
self._registerPhaseChange(PHASE.QUERYING);
self._collection.pauseObservers();
// XXX this won't be quite correct for skip/limit
self._collection.remove({});
// Defer so that we don't block.
Meteor.defer(function () {
// subtle note: _published does not contain _id fields, but newResults
// does
var newResults = new LocalCollection._IdMap;
var cursor = self._cursorForQuery();
cursor.forEach(function (doc) {
newResults.set(doc._id, doc);
// Insert all the documents currently found by the query.
self._cursorForQuery().forEach(function (doc) {
self._collection.insert(doc);
});
self._publishNewResults(newResults);
// Allow observe callbacks (ie multiplexer invocations) to fire.
self._collection.resumeObservers();
self._doneQuerying();
});
@@ -399,34 +420,6 @@ _.extend(OplogObserveDriver.prototype, {
},
// Replace self._published with newResults (both are IdMaps), invoking observe
// callbacks on the multiplexer.
//
// XXX This is very similar to LocalCollection._diffQueryUnorderedChanges. We
// should really: (a) Unify IdMap and OrderedDict into Unordered/OrderedDict (b)
// Rewrite diff.js to use these classes instead of arrays and objects.
_publishNewResults: function (newResults) {
var self = this;
// First remove anything that's gone. Be careful not to modify
// self._published while iterating over it.
var idsToRemove = [];
self._published.forEach(function (doc, id) {
if (!newResults.has(id))
idsToRemove.push(id);
});
_.each(idsToRemove, function (id) {
self._remove(id);
});
// Now do adds and changes.
newResults.forEach(function (doc, id) {
// "true" here means to throw if we think this doc doesn't match the
// selector.
self._handleDoc(id, doc, true);
});
},
// This stop function is invoked from the onStop of the ObserveMultiplexer, so
// it shouldn't actually be possible to call it until the multiplexer is
// ready.
@@ -450,7 +443,6 @@ _.extend(OplogObserveDriver.prototype, {
self._writesToCommitWhenWeReachSteady = null;
// Proactively drop references to potentially big things.
self._published = null;
self._needToFetch = null;
self._currentlyFetching = null;
self._oplogEntryHandle = null;
@@ -464,6 +456,9 @@ _.extend(OplogObserveDriver.prototype, {
var self = this;
var now = new Date;
if (phase === self._phase)
return;
if (self._phase) {
var timeDiff = now - self._phaseStartTime;
Package.facts && Package.facts.Facts.incrementServerFact(

View File

@@ -109,11 +109,7 @@ npm install eachline@2.4.0
npm install source-map@0.1.31
npm install source-map-support@0.2.5
npm install bcrypt@0.7.7
# Based on 1.0.1; includes our PRs
# https://github.com/nodejitsu/node-http-proxy/pull/561 and
# https://github.com/nodejitsu/node-http-proxy/pull/560
npm install https://github.com/meteor/node-http-proxy/tarball/d8ea687936d6bed0f3e99849695cab2dcdccd6f4
npm install http-proxy@1.0.2
# Using the unreleased 1.1 branch. We can probably switch to a built NPM version
# when it gets released.