Merge PR 716 into devel: returning multiple cursors from publish.

This commit is contained in:
David Glasser
2013-03-04 18:30:51 -08:00
6 changed files with 133 additions and 7 deletions

View File

@@ -1,6 +1,9 @@
## vNEXT
* Publish functions may now return an array of cursors to publish. Currently,
the cursors must all be from different collections.
* User documents have id's when onCreateUser and validateNewUser hooks run.
* Removed all restrictions on EJSON types in MongoDB, even user-defined ones.

View File

@@ -45,7 +45,15 @@ that Meteor will call each time a client subscribes to the name.
Publish functions can return a
[`Collection.Cursor`](#meteor_collection_cursor), in which case Meteor
will publish that cursor's documents.
will publish that cursor's documents. You can also return an array of
`Collection.Cursor`s, in which case Meteor will publish all of the
cursors.
{{#warning}}
If you return multiple cursors in an array, they currently must all be from
different collections. We hope to lift this restriction in a future release.
cursors.
{{/warning}}
// server: publish the rooms collection, minus secret info.
Meteor.publish("rooms", function () {
@@ -59,6 +67,14 @@ will publish that cursor's documents.
return Rooms.find({admin: this.userId}, {fields: {secretInfo: 1}});
});
// publish dependent documents and simulate joins
Meteor.publish("roomAndMessages", function (roomId) {
return [
Rooms.find({_id: roomId}, {fields: {secretInfo: 0}}),
Messages.find({roomId: roomId})
];
});
Otherwise, the publish function should call the functions
[`added`](#publish_added) (when a new document is added to the published record
set), [`changed`](#publish_changed) (when some fields on a document in the

View File

@@ -821,8 +821,9 @@ _.extend(Meteor._LivedataSubscription.prototype, {
// SPECIAL CASE: Instead of writing their own callbacks that invoke
// this.added/changed/ready/etc, the user can just return a collection
// cursor from the publish function; we call its _publishCursor method which
// starts observing the cursor and publishes the results.
// cursor or array of cursors from the publish function; we call their
// _publishCursor method which starts observing the cursor and publishes the
// results. Note that _publishCursor does NOT call ready().
//
// XXX This uses an undocumented interface which only the Mongo cursor
// interface publishes. Should we make this interface public and encourage
@@ -834,8 +835,40 @@ _.extend(Meteor._LivedataSubscription.prototype, {
// reactiveThingy.publishMe();
// });
// };
if (res && res._publishCursor)
var isCursor = function (c) {
return c && c._publishCursor;
};
if (isCursor(res)) {
res._publishCursor(self);
// _publishCursor only returns after the initial added callbacks have run.
// mark subscription as ready.
self.ready();
} else if (_.isArray(res)) {
// check all the elements are cursors
if (! _.all(res, isCursor)) {
self.error(new Error("Publish function returned an array of non-Cursors"));
return;
}
// find duplicate collection names
// XXX we should support overlapping cursors, but that would require the
// merge box to allow overlap within a subscription
var collectionNames = {};
for (var i = 0; i < res.length; ++i) {
var collectionName = res[i]._getCollectionName();
if (_.has(collectionNames, collectionName)) {
self.error(new Error(
"Publish function returned multiple cursors for collection " +
collectionName));
return;
}
collectionNames[collectionName] = true;
};
_.each(res, function (cur) {
cur._publishCursor(self);
});
self.ready();
}
},
// This calls all stop callbacks and prevents the handler from updating any

View File

@@ -277,4 +277,47 @@ if (Meteor.isServer) {
}
/*****/
/// Helpers for "livedata - publish multiple cursors"
One = new Meteor.Collection("collectionOne");
Two = new Meteor.Collection("collectionTwo");
if (Meteor.isServer) {
One.remove({});
One.insert({name: "value1"});
One.insert({name: "value2"});
Two.remove({});
Two.insert({name: "value3"});
Two.insert({name: "value4"});
Two.insert({name: "value5"});
Meteor.publish("multiPublish", function (options) {
if (options.normal) {
return [
One.find(),
Two.find()
];
} else if (options.dup) {
// Suppress the log of the expected internal error.
Meteor._suppress_log(1);
return [
One.find(),
One.find({name: "value2"}), // multiple cursors for one collection - error
Two.find()
];
} else if (options.notCursor) {
// Suppress the log of the expected internal error.
Meteor._suppress_log(1);
return [
One.find(),
"not a cursor",
Two.find()
];
} else
throw "unexpected options";
});
}
})();

View File

@@ -507,6 +507,30 @@ if (Meteor.isClient) {
conn._stream.forceDisconnect();
}
];})());
testAsyncMulti("livedata - publish multiple cursors", [
function (test, expect) {
Meteor.subscribe("multiPublish", {normal: 1}, {
onReady: expect(function () {
test.equal(One.find().count(), 2);
test.equal(Two.find().count(), 3);
}),
onError: failure()
});
},
function (test, expect) {
Meteor.subscribe("multiPublish", {dup: 1}, {
onReady: failure(),
onError: expect(failure(test, 500, "Internal server error"))
});
},
function (test, expect) {
Meteor.subscribe("multiPublish", {notCursor: 1}, {
onReady: failure(),
onError: expect(failure(test, 500, "Internal server error"))
});
}
]);
}
// XXX some things to test in greater detail:

View File

@@ -398,14 +398,21 @@ Cursor.prototype._publishCursor = function (sub) {
}
});
// observeChanges only returns after the initial added callbacks have run.
// mark subscription as ready.
sub.ready();
// We don't call sub.ready() here: it gets called in livedata_server, after
// possibly calling _publishCursor on multiple returned cursors.
// register stop callback (expects lambda w/ no args).
sub.onStop(function () {observeHandle.stop();});
};
// Used to guarantee that publish functions return at most one cursor per
// collection. Private, because we might later have cursors that include
// documents from multiple collections somehow.
Cursor.prototype._getCollectionName = function () {
var self = this;
return self._cursorDescription.collectionName;
}
Cursor.prototype.observe = function (callbacks) {
var self = this;
return LocalCollection._observeFromObserveChanges(self, callbacks);