diff --git a/packages/livedata/livedata_connection.js b/packages/livedata/livedata_connection.js index 98218bf40c..f389afce28 100644 --- a/packages/livedata/livedata_connection.js +++ b/packages/livedata/livedata_connection.js @@ -12,23 +12,44 @@ Meteor._capture_subs = null; Meteor._LivedataConnection = function (url, restart_on_update) { var self = this; - self.url = url; + + // as a test hook, allow passing a stream instead of a url. + if (typeof url === "object") { + self.stream = url; + // if we have two test streams, auto reload stuff will break because + // the url is used as a key for the migration data. + self.url = "/debug"; + } else { + self.url = url; + } + self.last_session_id = null; self.stores = {}; // name -> object with methods self.method_handlers = {}; // name -> func self.next_method_id = 1; + // waiting for results of method self.outstanding_methods = []; // each item has keys: msg, callback + // waiting for data from method self.unsatisfied_methods = {}; // map from method_id -> true + // sub was ready, is no longer (due to reconnect) + self.unready_subscriptions = {}; // map from sub._id -> true + // messages from the server that have not been applied self.pending_data = []; // array of pending data messages - self.queued = {}; // name -> updates for (yet to be created) collection - self.quiesce_callbacks = []; - self.retry_migrate = null; // if we're blocking a migration, the retry func + // name -> updates for (yet to be created) collection + self.queued = {}; + // if we're blocking a migration, the retry func + self.retry_migrate = null; + // metadata for subscriptions self.subs = new LocalCollection; // keyed by subs._id. value is unset or an array. if set, sub is not // yet ready. self.sub_ready_callbacks = {}; + // just for testing + self.quiesce_callbacks = []; + + // Setup auto-reload persistence. var reload_key = "Server-" + url; var reload_data = Meteor._reload.migration_data(reload_key); @@ -55,8 +76,8 @@ Meteor._LivedataConnection = function (url, restart_on_update) { outstanding_methods: methods}]; }); - // Setup stream - self.stream = new Meteor._Stream(self.url); + // Setup stream (if not overriden above) + self.stream = self.stream || new Meteor._Stream(self.url); self.stream.on('message', function (raw_msg) { try { @@ -384,22 +405,23 @@ _.extend(Meteor._LivedataConnection.prototype, { // successful reconnection -- pick up where we left off. return; - // clear out the local database! + // Server doesn't have our data any more. Re-sync a new session. - // XXX this causes flicker ("database flap") and needs to be - // rewritten. we need to put a reset message in pending_data - // (optionally clearing pending_data and queued first, as an - // optimization), and defer processing pending_data until all of - // the subscriptions that we previously told the user were ready, - // are now once again ready. then, when we do go to process the - // messages, we need to do it in one atomic batch (the reset and - // the redeliveries together) so that livequeries don't observe - // spurious 'added' and 'removed' messages, which would cause, eg, - // DOM elements to fail to get semantically matched, leading to a - // loss of focus/input state. - _.each(self.stores, function (s) { s.reset(); }); - self.pending_data = []; + // Put a reset message into the pending data queue and discard any + // previous messages (they are unimportant now). + self.pending_data = ["reset"]; self.queued = {}; + + // Mark all currently ready subscriptions as 'unready'. + var all_subs = self.subs.find({}).fetch(); + self.unready_subscriptions = {}; + _.each(all_subs, function (sub) { + if (!self.sub_ready_callbacks[sub._id]) + self.unready_subscriptions[sub._id] = true; + }); + + // Do not clear the database here. That happens once all the subs + // are re-ready and we process pending_data. }, _livedata_data: function (msg) { @@ -408,19 +430,41 @@ _.extend(Meteor._LivedataConnection.prototype, { // Add the data message to the queue self.pending_data.push(msg); - // If there are still method invocations in flight, stop + // Process satisfied methods and subscriptions. + // NOTE: does not fire callbacks here, that happens when + // the data message is processed for real. This is just for + // quiescing. _.each(msg.methods || [], function (method_id) { delete self.unsatisfied_methods[method_id]; }); + _.each(msg.subs || [], function (sub_id) { + delete self.unready_subscriptions[sub_id]; + }); + + // If there are still method invocations in flight, stop for (var method_id in self.unsatisfied_methods) return; + // If there are still uncomplete subscriptions, stop + for (var sub_id in self.unready_subscriptions) + return; - // All methods have landed. Blow away local changes and replace + // We have quiesced. Blow away local changes and replace // with authoritative changes from server. _.each(self.stores, function (s) { s.beginUpdate(); }); _.each(self.pending_data, function (msg) { + // Reset message from reconnect. Blow away everything. + // + // XXX instead of reset message, we could have a flag, and pass + // that to beginUpdate. This would be more efficient since we don't + // have to restore a snapshot if we're just going to blow away the + // db. + if (msg === "reset") { + _.each(self.stores, function (s) { s.reset(); }); + return; + } + if (msg.collection && msg.id) { var store = self.stores[msg.collection]; diff --git a/packages/livedata/livedata_connection_tests.js b/packages/livedata/livedata_connection_tests.js new file mode 100644 index 0000000000..4c8129112e --- /dev/null +++ b/packages/livedata/livedata_connection_tests.js @@ -0,0 +1,373 @@ +var test_got_message = function (test, stream, expected) { + if (stream.sent.length === 0) { + test.fail({error: 'no message received', expected: expected}); + return; + } + + var got = stream.sent.shift(); + + if (typeof got === 'string' && typeof expected === 'object') + got = JSON.parse(got); + + test.equal(got, expected); +}; + +var SESSION_ID = '17'; + +Tinytest.add("livedata stub - receive data", function (test) { + var stream = new Meteor._StubStream(); + var conn = new Meteor._LivedataConnection(stream); + + stream.reset(); // initial connection start. + + test_got_message(test, stream, {msg: 'connect'}); + test.length(stream.sent, 0); + + stream.receive({msg: 'connected', session: SESSION_ID}); + test.length(stream.sent, 0); + + // data comes in for unknown collection. + var coll_name = Meteor.uuid(); + stream.receive({msg: 'data', collection: coll_name, id: '1234', + set: {a: 1}}); + // break throught the black box and test internal state + test.length(conn.queued[coll_name], 1); + + var coll = new Meteor.Collection(coll_name, conn); + + // queue has been emptied and doc is in db. + test.isUndefined(conn.queued[coll_name]); + test.equal(coll.find({}).fetch(), [{_id:'1234', a:1}]); + + // second message. applied directly to the db. + stream.receive({msg: 'data', collection: coll_name, id: '1234', + set: {a:2}}); + test.equal(coll.find({}).fetch(), [{_id:'1234', a:2}]); + test.isUndefined(conn.queued[coll_name]); +}); + + + +Tinytest.add("livedata stub - subscribe", function (test) { + var stream = new Meteor._StubStream(); + var conn = new Meteor._LivedataConnection(stream); + + stream.reset(); // initial connection start. + + test_got_message(test, stream, {msg: 'connect'}); + test.length(stream.sent, 0); + + stream.receive({msg: 'connected', session: SESSION_ID}); + test.length(stream.sent, 0); + + // subscribe + var callback_fired = false; + var sub = conn.subscribe('my_data', function () { + callback_fired = true; + }); + test.isFalse(callback_fired); + + var message = JSON.parse(stream.sent.shift()); + var id = message.id; + delete message.id; + test.equal(message, {msg: 'sub', name: 'my_data', params: []}); + + // get the sub satisfied. callback fires. + stream.receive({msg: 'data', 'subs': [id]}); + test.isTrue(callback_fired); +}); + + +Tinytest.add("livedata stub - methods", function (test) { + var stream = new Meteor._StubStream(); + var conn = new Meteor._LivedataConnection(stream); + + stream.reset(); // initial connection start. + + test_got_message(test, stream, {msg: 'connect'}); + test.length(stream.sent, 0); + + stream.receive({msg: 'connected', session: SESSION_ID}); + test.length(stream.sent, 0); + + var coll_name = Meteor.uuid(); + var coll = new Meteor.Collection(coll_name, conn); + + // setup method + conn.methods({do_something: function (x) { + coll.insert({value: x}); + }}); + + // setup observers + var counts = {added: 0, removed: 0, changed: 0, moved: 0}; + var handle = coll.find({}).observe( + { added: function () { counts.added += 1; }, + removed: function () { counts.removed += 1; }, + changed: function () { counts.changed += 1; }, + moved: function () { counts.moved += 1; } + }); + + + // call method with results callback + var callback_fired = false; + conn.call('do_something', 'friday!', function (err, res) { + test.isUndefined(err); + test.equal(res, '1234'); + callback_fired = true; + }); + test.isFalse(callback_fired); + + // observers saw the method run. + test.equal(counts, {added: 1, removed: 0, changed: 0, moved: 0}); + + // get response from server + var message = JSON.parse(stream.sent.shift()); + test.equal(message, {msg: 'method', method: 'do_something', + params: ['friday!'], id:message.id}); + + test.equal(coll.find({}).count(), 1); + test.equal(coll.find({value: 'friday!'}).count(), 1); + + // results result in callback + stream.receive({msg: 'result', id:message.id, result:"1234"}); + test.isTrue(callback_fired); + + // data methods do not show up (not quiescent yet) + stream.receive({msg: 'data', collection: coll_name, id: '1234', + set: {value: 'tuesday'}}); + + test.equal(coll.find({}).count(), 1); + test.equal(coll.find({value: 'friday!'}).count(), 1); + test.equal(counts, {added: 1, removed: 0, changed: 0, moved: 0}); + + // send another methods (unknown on client) + callback_fired = false; + conn.call('do_something_else', 'monday', function (err, res) { + callback_fired = true; + }); + test.isFalse(callback_fired); + + // test we still send a method request to server + var message_2 = JSON.parse(stream.sent.shift()); + test.equal(message_2, {msg: 'method', method: 'do_something_else', + params: ['monday'], id:message_2.id}); + + // get the first data satisfied message. changes are still not applied + // to database. + stream.receive({msg: 'data', 'methods': [message.id]}); + + test.equal(coll.find({}).count(), 1); + test.equal(coll.find({value: 'friday!'}).count(), 1); + test.equal(counts, {added: 1, removed: 0, changed: 0, moved: 0}); + + // second result + stream.receive({msg: 'result', id:message_2.id, result:"bupkis"}); + test.isTrue(callback_fired); + + // get second satisfied, now changes are applied. + stream.receive({msg: 'data', 'methods': [message_2.id]}); + + test.equal(coll.find({}).count(), 1); + test.equal(coll.find({value: 'friday!'}).count(), 0); + test.equal(coll.find({value: 'tuesday', _id: '1234'}).count(), 1); + test.equal(counts, {added: 2, removed: 1, changed: 0, moved: 0}); + + handle.stop(); +}); + + +// method calls another method in simulation. see not sent. +Tinytest.add("livedata stub - sub methods", function (test) { + var stream = new Meteor._StubStream(); + var conn = new Meteor._LivedataConnection(stream); + + stream.reset(); // initial connection start. + + test_got_message(test, stream, {msg: 'connect'}); + test.length(stream.sent, 0); + + stream.receive({msg: 'connected', session: SESSION_ID}); + test.length(stream.sent, 0); + + var coll_name = Meteor.uuid(); + var coll = new Meteor.Collection(coll_name, conn); + + // setup methods + conn.methods({ + do_something: function () { + conn.call('do_something_else'); + }, + do_something_else: function () { + coll.insert({a: 1}); + } + }); + + // setup observers + var counts = {added: 0, removed: 0, changed: 0, moved: 0}; + var handle = coll.find({}).observe( + { added: function () { counts.added += 1; }, + removed: function () { counts.removed += 1; }, + changed: function () { counts.changed += 1; }, + moved: function () { counts.moved += 1; } + }); + + + // call method. + conn.call('do_something'); + + // see we only send message for outer methods + var message = JSON.parse(stream.sent.shift()); + test.equal(message, {msg: 'method', method: 'do_something', + params: [], id:message.id}); + test.length(stream.sent, 0); + + // but inner method runs locally. + test.equal(counts, {added: 1, removed: 0, changed: 0, moved: 0}); + + // we get the results (this is important to make the test not block + // auto-reload!) + stream.receive({msg: 'result', id:message.id, result:"1234"}); + + // get data from the method. does not show up. + stream.receive({msg: 'data', collection: coll_name, id: '1234', + set: {value: 'tuesday'}}); + test.equal(counts, {added: 1, removed: 0, changed: 0, moved: 0}); + + // get method satisfied. data shows up. + stream.receive({msg: 'data', 'methods': [message.id]}); + test.equal(counts, {added: 2, removed: 1, changed: 0, moved: 0}); + + handle.stop(); +}); + + +// initial connect +// make a sub +// do a method +// satisfy sub +// reconnect +// method gets resent +// get data from server +// data NOT shown +// satisfy method +// data NOT shown +// resatisfy sub +// data is shown +Tinytest.add("livedata stub - reconnect", function (test) { + var stream = new Meteor._StubStream(); + var conn = new Meteor._LivedataConnection(stream); + + stream.reset(); // initial connection start. + + test_got_message(test, stream, {msg: 'connect'}); + test.length(stream.sent, 0); + + stream.receive({msg: 'connected', session: SESSION_ID}); + test.length(stream.sent, 0); + + var coll_name = Meteor.uuid(); + var coll = new Meteor.Collection(coll_name, conn); + + // setup observers + var counts = {added: 0, removed: 0, changed: 0, moved: 0}; + var handle = coll.find({}).observe( + { added: function () { counts.added += 1; }, + removed: function () { counts.removed += 1; }, + changed: function () { counts.changed += 1; }, + moved: function () { counts.moved += 1; } + }); + + // subscribe + var sub_callback_fired = false; + var sub = conn.subscribe('my_data', function () { + sub_callback_fired = true; + }); + test.isFalse(sub_callback_fired); + + var sub_message = JSON.parse(stream.sent.shift()); + test.equal(sub_message, {msg: 'sub', name: 'my_data', params: [], + id: sub_message.id}); + + + // get some data. it shows up. + stream.receive({msg: 'data', collection: coll_name, + id: '1234', set: {a:1}}); + + test.equal(coll.find({}).count(), 1); + test.equal(counts, {added: 1, removed: 0, changed: 0, moved: 0}); + test.isFalse(sub_callback_fired); + + stream.receive({msg: 'data', collection: coll_name, + id: '1234', set: {b:2}, + subs: [sub_message.id] // satisfy sub + }); + test.isTrue(sub_callback_fired); + sub_callback_fired = false; // re-arm for test that it doesn't fire again. + + test.equal(coll.find({a:1, b:2}).count(), 1); + test.equal(counts, {added: 1, removed: 0, changed: 1, moved: 0}); + + // call method. + var method_callback_fired = false; + conn.call('do_something', function () { + method_callback_fired = true; + }); + test.isFalse(method_callback_fired); + + var method_message = JSON.parse(stream.sent.shift()); + test.equal(method_message, {msg: 'method', method: 'do_something', + params: [], id:method_message.id}); + + // more data. doesn't show up. + stream.receive({msg: 'data', collection: coll_name, + id: '1234', set: {c:3}}); + test.equal(coll.find({c:3}).count(), 0); + test.equal(counts, {added: 1, removed: 0, changed: 1, moved: 0}); + + + // stream reset. reconnect! + // we send a connect, our pending messages, and our subs. + stream.reset(); + + test_got_message(test, stream, {msg: 'connect', session: SESSION_ID}); + test_got_message(test, stream, method_message); + test_got_message(test, stream, sub_message); + + // reconnect with different session id + stream.receive({msg: 'connected', session: SESSION_ID + 1}); + + // resend data. doesn't show up. + stream.receive({msg: 'data', collection: coll_name, + id: '1234', set: {a:1, b:2, c:3}}); + stream.receive({msg: 'data', collection: coll_name, + id: '2345', set: {d:4}}); + + test.equal(coll.find({c:3}).count(), 0); + test.equal(counts, {added: 1, removed: 0, changed: 1, moved: 0}); + + // satisfy and return method callback + stream.receive({msg: 'data', methods: [method_message.id]}); + + test.isFalse(method_callback_fired); + stream.receive({msg: 'result', id:method_message.id, result:"bupkis"}); + test.isTrue(method_callback_fired); + + // still no update. + test.equal(coll.find({c:3}).count(), 0); + test.equal(counts, {added: 1, removed: 0, changed: 1, moved: 0}); + + // re-satisfy sub + stream.receive({msg: 'data', subs: [sub_message.id]}); + + // now the doc changes + test.equal(coll.find({c:3}).count(), 1); + test.equal(counts, {added: 2, removed: 0, changed: 2, moved: 0}); + + + handle.stop(); +}); + +// XXX also test: +// - reconnect, with session resume. +// - restart on update flag +// - on_update event diff --git a/packages/livedata/package.js b/packages/livedata/package.js index cba9c74bb4..22929c3bde 100644 --- a/packages/livedata/package.js +++ b/packages/livedata/package.js @@ -30,6 +30,8 @@ Package.on_test(function (api) { api.use('mongo-livedata', ['client', 'server']); api.use('test-helpers', ['client', 'server']); api.use('tinytest'); + + api.add_files('livedata_connection_tests.js', ['client']); api.add_files('livedata_tests.js', ['client', 'server']); api.add_files('livedata_test_service.js', ['client', 'server']); }); diff --git a/packages/liveui/liveui_tests.js b/packages/liveui/liveui_tests.js index ed5431ad85..5aa644c260 100644 --- a/packages/liveui/liveui_tests.js +++ b/packages/liveui/liveui_tests.js @@ -91,36 +91,6 @@ OnscreenDiv.prototype.remove = function() { this.div.parentNode.removeChild(this.div); }; -///// SeededRandom ///// - -var SeededRandom = function(seed) { // seed may be a string or any type - if (! (this instanceof SeededRandom)) - return new SeededRandom(seed); - - this.gen = new Meteor._Alea(seed); // from uuid.js -}; -SeededRandom.prototype.next = function() { - return this.gen(); -}; -SeededRandom.prototype.nextBoolean = function() { - return this.next() >= 0.5; -}; -SeededRandom.prototype.nextIntBetween = function(min, max) { - // inclusive of min and max - return Math.floor(this.next() * (max-min+1)) + min; -}; -SeededRandom.prototype.nextIdentifier = function(optLen) { - var letters = []; - var len = (typeof optLen === "number" ? optLen : 12); - for(var i=0; i 0) { + if (old_idx_seq(seq_ends[j-1]) < old_idx_seq(i)) + break; + j--; + } + + ptrs[i] = (j === 0 ? -1 : seq_ends[j-1]); + seq_ends[j] = i; + if (j+1 > max_seq_len) + max_seq_len = j+1; + } + } + + // pull out the LCS/LIS into unmoved_set + var idx = (max_seq_len === 0 ? -1 : seq_ends[max_seq_len-1]); + while (idx >= 0) { + unmoved_set[idx] = true; + idx = ptrs[idx]; + } + + //////// Main Diff Algorithm + + var old_idx = 0; + var new_idx = 0; + var bump_list = []; + var bump_list_old_idx = []; + var taken_list = []; + + var scan_to = function(old_j) { + // old_j <= old_results.length (may scan to end) + while (old_idx < old_j) { + var old_doc = old_results[old_idx]; + var is_in_new = new_presence_of_id[old_doc._id]; + if (! is_in_new) { + observer.removed && observer.removed(old_doc, new_idx + bump_list.length); + } else { + if (taken_list.length >= 1 && taken_list[0] === old_idx) { + // already moved + taken_list.shift(); + } else { + // bump! + bump_list.push(new_idx); + bump_list_old_idx.push(old_idx); + } + } + old_idx++; + } + }; + + + while (new_idx <= new_results.length) { + if (new_idx < new_results.length) { + var new_doc = new_results[new_idx]; + var old_doc_idx = old_index_of_id[new_doc._id]; + if (old_doc_idx === undefined) { + // insert + observer.added && observer.added(mdc(new_doc), new_idx + bump_list.length); + } else { + var old_doc = old_results[old_doc_idx]; + //var is_unmoved = (old_doc_idx > old_idx); // greedy; not minimal + var is_unmoved = unmoved_set[new_idx]; + if (is_unmoved) { + if (old_doc_idx < old_idx) + Meteor._debug("Assertion failed while diffing: nonmonotonic lcs data"); + // no move + scan_to(old_doc_idx); + if (! _.isEqual(old_doc, new_doc)) { + observer.changed && observer.changed( + mdc(new_doc), new_idx + bump_list.length, old_doc); + } + old_idx++; + } else { + // move into place + var to_idx = new_idx + bump_list.length; + var from_idx; + if (old_doc_idx >= old_idx) { + // move backwards + from_idx = to_idx + old_doc_idx - old_idx; + // must take number of "taken" items into account; also use + // results of this binary search to insert new taken_list entry + var num_taken_before = _.sortedIndex(taken_list, old_doc_idx); + from_idx -= num_taken_before; + taken_list.splice(num_taken_before, 0, old_doc_idx); + } else { + // move forwards, from bump list + // (binary search applies) + var b = _.indexOf(bump_list_old_idx, old_doc_idx, true); + if (b < 0) + Meteor._debug("Assertion failed while diffing: no bumped item"); + from_idx = bump_list[b] + b; + to_idx--; + bump_list.splice(b, 1); + bump_list_old_idx.splice(b, 1); + } + if (from_idx != to_idx) + observer.moved && observer.moved(mdc(old_doc), from_idx, to_idx); + if (! _.isEqual(old_doc, new_doc)) { + observer.changed && observer.changed(mdc(new_doc), to_idx, old_doc); + } + } + } + } else { + scan_to(old_results.length); + } + new_idx++; + } + if (bump_list.length > 0) { + Meteor._debug(old_results); + Meteor._debug(new_results); + Meteor._debug("Assertion failed while diffing: leftover bump_list "+ + bump_list); + } + +}; diff --git a/packages/minimongo/minimongo.js b/packages/minimongo/minimongo.js index b7487a21df..ca02b7dadf 100644 --- a/packages/minimongo/minimongo.js +++ b/packages/minimongo/minimongo.js @@ -12,11 +12,18 @@ LocalCollection = function () { this.next_qid = 1; // live query id generator - // qid -> live query object. keys: results, selector_f, sort_f, cursor, (callbacks) + // qid -> live query object. keys: + // results: array of current results + // results_snapshot: snapshot of results. null if not paused. + // cursor: Cursor object for the query. + // selector_f, sort_f, (callbacks): functions this.queries = {}; // when we have a snapshot, this will contain a deep copy of 'docs'. this.current_snapshot = null; + + // True when observers are paused and we should not send callbacks. + this.paused = false; }; // options may include sort, skip, limit, reactive @@ -173,15 +180,27 @@ LocalCollection.Cursor.prototype.observe = function (options) { selector_f: self.selector_f, // not fast pathed sort_f: self.sort_f, results: [], + results_snapshot: self.collection.paused ? [] : null, cursor: this }; query.results = self._getRawObjects(); - query.added = options.added || function () {}; - query.changed = options.changed || function () {}; - query.moved = options.moved || function () {}; - query.removed = options.removed || function () {}; - if (!options._suppress_initial) + // wrap callbacks we were passed. callbacks only fire when not paused + // and are never undefined. + var if_not_paused = function (f) { + if (!f) + return function () {}; + return function (/*args*/) { + if (!self.collection.paused) + f.apply(this, arguments); + }; + }; + query.added = if_not_paused(options.added); + query.changed = if_not_paused(options.changed); + query.moved = if_not_paused(options.moved); + query.removed = if_not_paused(options.removed); + + if (!options._suppress_initial && !self.collection.paused) for (var i = 0; i < query.results.length; i++) query.added(LocalCollection._deepcopy(query.results[i]), i); @@ -451,15 +470,58 @@ LocalCollection.prototype.restore = function () { // Rerun all queries from scratch. (XXX should do something more // efficient -- diffing at least; ideally, take the snapshot in an // efficient way, say with an undo log, so that we can efficiently - // tell what changed) + // tell what changed). for (var qid in this.queries) { var query = this.queries[qid]; - for (var i = query.results.length - 1; i >= 0; i--) - query.removed(query.results[i]._id, i); + + var old_results = query.results; query.results = query.cursor._getRawObjects(); - for (var i = 0; i < query.results.length; i++) - query.added(LocalCollection._deepcopy(query.results[i]), i); + if (!this.paused) + LocalCollection._diffQuery(old_results, query.results, query, true); } }; + + +// Pause the observers. No callbacks from observers will fire until +// 'resumeObservers' is called. +LocalCollection.prototype.pauseObservers = function () { + // No-op if already paused. + if (this.paused) + return; + + // Set the 'paused' flag such that new observer messages don't fire. + this.paused = true; + + // Take a snapshot of the query results for each query. + for (var qid in this.queries) { + var query = this.queries[qid]; + + query.results_snapshot = LocalCollection._deepcopy(query.results); + } +}; + +// Resume the observers. Observers immediately receive change +// notifications to bring them to the current state of the +// database. Note that this is not just replaying all the changes that +// happened during the pause, it is a smarter 'coalesced' diff. +LocalCollection.prototype.resumeObservers = function () { + // No-op if not paused. + if (!this.paused) + return; + + // Unset the 'paused' flag. Make sure to do this first, otherwise + // observer methods won't actually fire when we trigger them. + this.paused = false; + + for (var qid in this.queries) { + var query = this.queries[qid]; + // Diff the current results against the snapshot and send to observers. + // pass the query object for its observer callbacks. + LocalCollection._diffQuery(query.results_snapshot, query.results, query, true); + query.results_snapshot = null; + } + +}; + diff --git a/packages/minimongo/minimongo_tests.js b/packages/minimongo/minimongo_tests.js index fc04e8d197..c5626eed82 100644 --- a/packages/minimongo/minimongo_tests.js +++ b/packages/minimongo/minimongo_tests.js @@ -1,6 +1,6 @@ // assert that f is a strcmp-style comparison function that puts // 'values' in the provided order -assert_ordering = function (test, f, values) { +var assert_ordering = function (test, f, values) { for (var i = 0; i < values.length; i++) { var x = f(values[i], values[i]); if (x !== 0) { @@ -35,6 +35,29 @@ assert_ordering = function (test, f, values) { } }; +var log_callbacks = function (operations) { + return { + added: function (obj, idx) { + delete obj._id; + operations.push(LocalCollection._deepcopy(['added', obj, idx])); + }, + changed: function (obj, at, old_obj) { + delete obj._id; + delete old_obj._id; + operations.push(LocalCollection._deepcopy(['changed', obj, at, old_obj])); + }, + moved: function (obj, old_at, new_at) { + delete obj._id; + operations.push(LocalCollection._deepcopy(['moved', obj, old_at, new_at])); + }, + removed: function (old_obj, at) { + var id = old_obj._id; + delete old_obj._id; + operations.push(LocalCollection._deepcopy(['removed', id, at, old_obj])); + } + }; +}; + // XXX test shared structure in all MM entrypoints Tinytest.add("minimongo - basics", function (test) { @@ -526,7 +549,7 @@ Tinytest.add("minimongo - ordering", function (test) { }; verify([{"a" : 1}, ["a"], [["a", "asc"]]], - [{c: 1}, {a: 1}, {a: {}}, {a: []}, {a: true}]) + [{c: 1}, {a: 1}, {a: {}}, {a: []}, {a: true}]); verify([{"a" : -1}, [["a", "desc"]]], [{a: true}, {a: []}, {a: {}}, {a: 1}, {c: 1}]); @@ -826,26 +849,7 @@ Tinytest.add("minimongo - modify", function (test) { Tinytest.add("minimongo - observe", function (test) { var operations = []; - var cbs = { - added: function (obj, idx) { - delete obj._id; - operations.push(LocalCollection._deepcopy(['added', obj, idx])); - }, - changed: function (obj, at, old_obj) { - delete obj._id; - delete old_obj._id; - operations.push(LocalCollection._deepcopy(['changed', obj, at, old_obj])); - }, - moved: function (obj, old_at, new_at) { - delete obj._id; - operations.push(LocalCollection._deepcopy(['moved', obj, old_at, new_at])); - }, - removed: function (old_obj, at) { - id = old_obj._id; - delete old_obj._id; - operations.push(LocalCollection._deepcopy(['removed', id, at, old_obj])); - } - }; + var cbs = log_callbacks(operations); var handle; var c = new LocalCollection(); @@ -888,3 +892,227 @@ Tinytest.add("minimongo - observe", function (test) { test.equal(operations.shift(), ['added', {a:100}, 0]); handle.stop(); }); + +Tinytest.add("minimongo - diff", function (test) { + + // test correctness + + var diff_test = function(orig_len, new_old_idx) { + var old_results = new Array(orig_len); + for(var i=1; i<=orig_len; i++) + old_results[i-1] = {_id: i}; + + var new_results = _.map(new_old_idx, function(n) { + var doc = {_id: Math.abs(n)}; + if (n < 0) + doc.changed = true; + return doc; + }); + + var results = _.clone(old_results); + var observer = { + added: function(doc, before_idx) { + test.isFalse(before_idx < 0 || before_idx > results.length); + results.splice(before_idx, 0, doc); + }, + removed: function(doc, at_idx) { + test.isFalse(at_idx < 0 || at_idx >= results.length); + test.equal(doc, results[at_idx]); + results.splice(at_idx, 1); + }, + changed: function(doc, at_idx) { + test.isFalse(at_idx < 0 || at_idx >= results.length); + results[at_idx] = doc; + }, + moved: function(doc, old_idx, new_idx) { + test.isFalse(old_idx < 0 || old_idx >= results.length); + test.isFalse(new_idx < 0 || new_idx >= results.length); + test.equal(doc, results[old_idx]); + results.splice(new_idx, 0, results.splice(old_idx, 1)[0]); + } + }; + + LocalCollection._diffQuery(old_results, new_results, observer); + test.equal(results, new_results); + }; + + // edge cases and cases run into during debugging + diff_test(5, [5, 1, 2, 3, 4]); + diff_test(0, [1, 2, 3, 4]); + diff_test(4, []); + diff_test(7, [4, 5, 6, 7, 1, 2, 3]); + diff_test(7, [5, 6, 7, 1, 2, 3, 4]); + diff_test(10, [7, 4, 11, 6, 12, 1, 5]); + diff_test(3, [3, 2, 1]); + diff_test(10, [2, 7, 4, 6, 11, 3, 8, 9]); + diff_test(0, []); + diff_test(1, []); + diff_test(0, [1]); + diff_test(1, [1]); + diff_test(5, [1, 2, 3, 4, 5]); + + // interaction between "changed" and other ops + diff_test(5, [-5, -1, 2, -3, 4]); + diff_test(7, [-4, -5, 6, 7, -1, 2, 3]); + diff_test(7, [5, 6, -7, 1, 2, -3, 4]); + diff_test(10, [7, -4, 11, 6, 12, -1, 5]); + diff_test(3, [-3, -2, -1]); + diff_test(10, [-2, 7, 4, 6, 11, -3, -8, 9]); + + +}); + + +Tinytest.add("minimongo - snapshot", function (test) { + var operations = []; + var cbs = log_callbacks(operations); + + var c = new LocalCollection(); + var h = c.find({}).observe(cbs); + + // snapshot empty, restore immediately. + + test.equal(c.find().count(), 0); + test.length(operations, 0); + c.snapshot(); + test.equal(c.find().count(), 0); + test.length(operations, 0); + c.restore(); + test.equal(c.find().count(), 0); + test.length(operations, 0); + + + // snapshot empty, add new docs + + test.equal(c.find().count(), 0); + test.length(operations, 0); + + c.snapshot(); + test.equal(c.find().count(), 0); + + c.insert({_id: 1, a: 1}); + test.equal(c.find().count(), 1); + test.equal(operations.shift(), ['added', {a:1}, 0]); + c.insert({_id: 2, b: 2}); + test.equal(c.find().count(), 2); + test.equal(operations.shift(), ['added', {b:2}, 1]); + + c.restore(); + + test.equal(c.find().count(), 0); + test.equal(operations.shift(), ['removed', 1, 0, {a:1}]); + test.equal(operations.shift(), ['removed', 2, 0, {b:2}]); + + + // snapshot with contents. see we get add, update and remove. + // depends on observer update order from diffQuery. + // reorder test statements if this changes. + + c.insert({_id: 1, a: 1}); + test.equal(c.find().count(), 1); + test.equal(operations.shift(), ['added', {a:1}, 0]); + c.insert({_id: 2, b: 2}); + test.equal(c.find().count(), 2); + test.equal(operations.shift(), ['added', {b:2}, 1]); + + c.snapshot(); + test.equal(c.find().count(), 2); + + c.remove({_id: 1}); + test.equal(c.find().count(), 1); + test.equal(operations.shift(), ['removed', 1, 0, {a:1}]); + c.insert({_id: 3, c: 3}); + test.equal(c.find().count(), 2); + test.equal(operations.shift(), ['added', {c:3}, 1]); + c.update({_id: 2}, {$set: {b: 4}}); + test.equal(operations.shift(), ['changed', {b:4}, 0, {b:2}]); + + c.restore(); + test.equal(c.find().count(), 2); + test.equal(operations.shift(), ['added', {a:1}, 0]); + test.equal(operations.shift(), ['changed', {b:2}, 1, {b:4}]); + test.equal(operations.shift(), ['removed', 3, 2, {c:3}]); + + + // snapshot with stuff. restore immediately. no changes. + + test.equal(c.find().count(), 2); + test.length(operations, 0); + c.snapshot(); + test.equal(c.find().count(), 2); + test.length(operations, 0); + c.restore(); + test.equal(c.find().count(), 2); + test.length(operations, 0); + + + + h.stop(); +}); + + +Tinytest.add("minimongo - pause", function (test) { + var operations = []; + var cbs = log_callbacks(operations); + + var c = new LocalCollection(); + var h = c.find({}).observe(cbs); + + // remove and add cancel out. + c.insert({_id: 1, a: 1}); + test.equal(operations.shift(), ['added', {a:1}, 0]); + + c.pauseObservers(); + + c.remove({_id: 1}); + test.length(operations, 0); + c.insert({_id: 1, a: 1}); + test.length(operations, 0); + + c.resumeObservers(); + test.length(operations, 0); + + + // two modifications become one + c.pauseObservers(); + + c.update({_id: 1}, {a: 2}); + c.update({_id: 1}, {a: 3}); + + c.resumeObservers(); + test.equal(operations.shift(), ['changed', {a:3}, 0, {a:1}]); + test.length(operations, 0); + + + // snapshot/restore, same results + c.snapshot(); + + c.insert({_id: 2, b: 2}); + test.equal(operations.shift(), ['added', {b:2}, 1]); + + c.pauseObservers(); + c.restore(); + c.insert({_id: 2, b: 2}); + test.length(operations, 0); + + c.resumeObservers(); + test.length(operations, 0); + + // snapshot/restore, different results + c.snapshot(); + + c.insert({_id: 3, c: 3}); + test.equal(operations.shift(), ['added', {c:3}, 2]); + + c.pauseObservers(); + c.restore(); + c.insert({_id: 3, c: 4}); + test.length(operations, 0); + + c.resumeObservers(); + test.equal(operations.shift(), ['changed', {c:4}, 2, {c:3}]); + test.length(operations, 0); + + + h.stop(); +}); diff --git a/packages/minimongo/package.js b/packages/minimongo/package.js index bd076249ff..3852607ee0 100644 --- a/packages/minimongo/package.js +++ b/packages/minimongo/package.js @@ -15,7 +15,8 @@ Package.on_use(function (api, where) { 'selector.js', 'sort.js', 'uuid.js', - 'modify.js' + 'modify.js', + 'diff.js' ], where); }); diff --git a/packages/mongo-livedata/collection.js b/packages/mongo-livedata/collection.js index ca74eb1f2f..1c9767414c 100644 --- a/packages/mongo-livedata/collection.js +++ b/packages/mongo-livedata/collection.js @@ -36,6 +36,10 @@ Meteor.Collection = function (name, manager, driver) { // to start by backing out any local writes and returning to the // last state delivered by the server. beginUpdate: function () { + // pause observers so users don't see flicker. + self._collection.pauseObservers(); + + // restore db snapshot if (self._was_snapshot) { self._collection.restore(); self._was_snapshot = false; @@ -66,9 +70,9 @@ Meteor.Collection = function (name, manager, driver) { } }, - // Called at the end of a batch of updates, just for symmetry, - // or in case some future database driver needs it. + // Called at the end of a batch of updates. endUpdate: function () { + self._collection.resumeObservers(); }, // Reset the collection to its original, empty state. @@ -136,6 +140,7 @@ _.extend(Meteor.Collection.prototype, { self._was_snapshot = true; } } + }); // 'insert' immediately returns the inserted document's new _id. The diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index 547438b636..94eefb633a 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -377,9 +377,6 @@ _Mongo.LiveResultsSet = function (cursor, options) { // expose collection name self.collection_name = cursor.collection_name; - // unique handle for this live query - self.qid = self.cursor.mongo.next_observer_id++; - // previous results snapshot. on each poll cycle, diffs against // results drives the callbacks. self.results = []; @@ -388,6 +385,7 @@ _Mongo.LiveResultsSet = function (cursor, options) { self.dirty = false; // do we need polling? self.pending_writes = []; // people to notify when polling completes self.poll_running = false; // is polling in progress now? + self.polling_suspended = false; // is polling temporarily suspended? // (each instance of the class needs to get a separate throttling // context -- we don't want to coalesce invocations of markDirty on @@ -434,6 +432,8 @@ _Mongo.LiveResultsSet.prototype._unthrottled_markDirty = function () { var self = this; self.dirty = true; + if (self.polling_suspended) + return; // don't poll when told not to if (self.poll_running) return; // only one instance can run at once. just tell it to re-cycle. self.poll_running = true; @@ -452,70 +452,27 @@ _Mongo.LiveResultsSet.prototype._unthrottled_markDirty = function () { }).run(); }; +// interface for tests to control when polling happens +_Mongo.LiveResultsSet.prototype._suspendPolling = function() { + this.polling_suspended = true; +}; +_Mongo.LiveResultsSet.prototype._resumePolling = function() { + this.polling_suspended = false; + this._unthrottled_markDirty(); // poll NOW, don't wait +}; + + _Mongo.LiveResultsSet.prototype._doPoll = function () { var self = this; // Get the new query results self.cursor.rewind(); var new_results = self.cursor.fetch(); - var present_in_new = {}, present_in_old = {}; + var old_results = self.results; - // Generate some indexes to speed up the process - _.each(new_results, function (doc) { - present_in_new[doc._id] = true; - }); - _.each(self.results, function (doc) { - present_in_old[doc._id] = true; - }); + LocalCollection._diffQuery(old_results, new_results, self); + self.results = new_results; - // If documents left the query, remove them from self.results - for (var i = 0; i < self.results.length; i++) { - if (!(self.results[i]._id in present_in_new)) { - self.removed && self.removed(self.results[i], i); - self.results.splice(i, 1); - i--; - } - } - - // Now new_results is a (non-strict) superset of self.results, so we - // can be sure that new_results is at least as long as self.results. - - // Transform self.results into new_results - // XXX this is O(N^2) in the worst case, but O(N) in typical cases - for (var i = 0; i < new_results.length; i++) { - // Newly added documents - if (!(new_results[i]._id in present_in_old)) { - self.added && self.added(new_results[i], i); - self.results.splice(i, 0, new_results[i]); - continue; - } - - // Find the offset of new_results[i] in self.results (if - // present). Note that we check the most likely case first - // (old_offset === i) - var old_offset; - for (var j = i; j < self.results.length; j++) - if (self.results[j]._id === new_results[i]._id) { - old_offset = j; - break; - } - if (old_offset === undefined) - throw new Error("Document in index, but missing from array?"); - - // Changed documents - if (!_.isEqual(self.results[old_offset], new_results[i])) { - self.changed && self.changed(new_results[i], old_offset, - self.results[old_offset]); - self.results[i] = new_results[i]; - } - - // Moved documents - if (old_offset !== i) { - self.moved && self.moved(new_results[i], old_offset, i); - self.results.splice(old_offset, 1); - self.results.splice(i, 0, new_results[i]); - } - } }; _Mongo.LiveResultsSet.prototype.stop = function () { diff --git a/packages/mongo-livedata/mongo_livedata_tests.js b/packages/mongo-livedata/mongo_livedata_tests.js index a1318ef720..9042806607 100644 --- a/packages/mongo-livedata/mongo_livedata_tests.js +++ b/packages/mongo-livedata/mongo_livedata_tests.js @@ -33,13 +33,15 @@ testAsyncMulti("mongo-livedata - database failure reporting", [ } ]); -// XXX namespacing -Meteor._LivedataTestCollection = - new Meteor.Collection("livedata_test_collection"); -Tinytest.add("mongo-livedata - basics", function (test) { - var coll = Meteor._LivedataTestCollection; +Tinytest.addAsync("mongo-livedata - basics", function (test, onComplete) { var run = test.runId(); + var coll; + if (Meteor.is_client) { + coll = new Meteor.Collection(null); // local, unmanaged + } else { + coll = new Meteor.Collection("livedata_test_collection_"+run); + } var log = ''; var obs = coll.find({run: run}, {sort: ["x"]}).observe({ @@ -57,7 +59,7 @@ Tinytest.add("mongo-livedata - basics", function (test) { } }); - var expectObserve = function (expected, f) { + var captureObserve = function (f) { if (Meteor.is_client) { f(); } else { @@ -66,11 +68,16 @@ Tinytest.add("mongo-livedata - basics", function (test) { fence.armAndWait(); } + var ret = log; + log = ''; + return ret; + }; + + var expectObserve = function (expected, f) { if (!(expected instanceof Array)) expected = [expected]; - test.include(expected, log); - log = ''; + test.include(expected, captureObserve(f)); }; test.equal(coll.find({run: run}).count(), 0); @@ -119,7 +126,8 @@ Tinytest.add("mongo-livedata - basics", function (test) { [6, 3]); }); - expectObserve(['c(13,0,3)m(13,0,1)', 'm(6,1,0)c(13,1,3)'], function () { + expectObserve(['c(13,0,3)m(13,0,1)', 'm(6,1,0)c(13,1,3)', + 'c(13,0,3)m(6,1,0)', 'm(3,0,1)c(13,1,3)'], function () { coll.update({run: run, x: 3}, {$inc: {x: 10}}, {multi: true}); test.equal(_.pluck(coll.find({run: run}, {sort: {x: -1}}).fetch(), "x"), [13, 6]); @@ -136,4 +144,128 @@ Tinytest.add("mongo-livedata - basics", function (test) { }); obs.stop(); -}); \ No newline at end of file + onComplete(); +}); + +Tinytest.addAsync("mongo-livedata - fuzz test", function(test, onComplete) { + + var run = test.runId(); + var coll; + if (Meteor.is_client) { + coll = new Meteor.Collection(null); // local, unmanaged + } else { + coll = new Meteor.Collection("livedata_test_collection_"+run); + } + + // fuzz test of observe(), especially the server-side diffing + var actual = []; + var correct = []; + var counters = {add: 0, change: 0, move: 0, remove: 0}; + + var obs = coll.find({run: run}, {sort: ["x"]}).observe({ + added: function (doc, before_index) { + counters.add++; + actual.splice(before_index, 0, doc.x); + }, + changed: function (new_doc, at_index, old_doc) { + counters.change++; + test.equal(actual[at_index], old_doc.x); + actual[at_index] = new_doc.x; + }, + moved: function (doc, old_index, new_index) { + counters.move++; + test.equal(actual[old_index], doc.x); + actual.splice(old_index, 1); + actual.splice(new_index, 0, doc.x); + }, + removed: function (doc, at_index) { + counters.remove++; + test.equal(actual[at_index], doc.x); + actual.splice(at_index, 1); + } + }); + + var step = 0; + + // Random integer in [0,n) + // use SeededRandom test helper for deterministic results + var seededRandom = new SeededRandom("foobard"); + var rnd = function (n) { + return seededRandom.nextIntBetween(0, n-1); + }; + + var finishObserve = function (f) { + if (Meteor.is_client) { + f(); + } else { + var fence = new Meteor._WriteFence; + Meteor._CurrentWriteFence.withValue(fence, f); + fence.armAndWait(); + } + }; + + var doStep = function () { + if (step++ === 100) { + obs.stop(); + onComplete(); + return; + } + + var max_counters = _.clone(counters); + + finishObserve(function () { + if (Meteor.is_server) + obs._suspendPolling(); + + // Do a batch of 1-5 operations + var batch_count = rnd(5) + 1; + for (var i = 0; i < batch_count; i++) { + // 25% add, 25% remove, 25% change in place, 25% change and move + var op = rnd(4); + var which = rnd(correct.length); + if (op === 0 || step < 2 || !correct.length) { + // Add + var x = rnd(1000000); + coll.insert({run: run, x: x}); + correct.push(x); + max_counters.add++; + } else if (op === 1 || op === 2) { + var x = correct[which]; + if (op === 1) + // Small change, not likely to cause a move + var val = x + (rnd(2) ? -1 : 1); + else + // Large change, likely to cause a move + var val = rnd(1000000); + coll.update({run: run, x: x}, {$set: {x: val}}); + correct[which] = val; + max_counters.change++; + max_counters.move++; + } else { + coll.remove({run: run, x: correct[which]}); + correct.splice(which, 1); + max_counters.remove++; + } + } + if (Meteor.is_server) + obs._resumePolling(); + + }); + + // Did we actually deliver messages that mutated the array in the + // right way? + correct.sort(function (a,b) {return a-b;}); + test.equal(actual, correct); + + // Did we limit ourselves to one 'moved' message per change, + // rather than O(results) moved messages? + _.each(max_counters, function (v, k) { + test.isTrue(max_counters[k] >= counters[k], k); + }); + + Meteor.defer(doStep); + }; + + doStep(); + +}); diff --git a/packages/stream/package.js b/packages/stream/package.js index 672f29b09e..befc26357f 100644 --- a/packages/stream/package.js +++ b/packages/stream/package.js @@ -16,5 +16,5 @@ Package.on_use(function (api) { Package.on_test(function (api) { api.use('stream', ['client', 'server']); api.use('tinytest'); - api.add_files('stream_tests.js', 'client'); + api.add_files(['stream_tests.js'], 'client'); }); diff --git a/packages/test-helpers/package.js b/packages/test-helpers/package.js index 9a4696db53..f967d96676 100644 --- a/packages/test-helpers/package.js +++ b/packages/test-helpers/package.js @@ -9,7 +9,9 @@ Package.on_use(function (api, where) { api.add_files('try_all_permutations.js', where); api.add_files('async_multi.js', where); api.add_files('simulate_event.js', where); + api.add_files('seeded_random.js', where); api.add_files('canonicalize_html.js', where); + api.add_files('stub_stream.js', where); }); Package.on_test(function (api) { diff --git a/packages/test-helpers/seeded_random.js b/packages/test-helpers/seeded_random.js new file mode 100644 index 0000000000..b712fd4fbf --- /dev/null +++ b/packages/test-helpers/seeded_random.js @@ -0,0 +1,30 @@ + + +var SeededRandom = function(seed) { // seed may be a string or any type + if (! (this instanceof SeededRandom)) + return new SeededRandom(seed); + + seed = seed || "seed"; + this.gen = new Meteor._Alea(seed); // from uuid.js +}; +SeededRandom.prototype.next = function() { + return this.gen(); +}; +SeededRandom.prototype.nextBoolean = function() { + return this.next() >= 0.5; +}; +SeededRandom.prototype.nextIntBetween = function(min, max) { + // inclusive of min and max + return Math.floor(this.next() * (max-min+1)) + min; +}; +SeededRandom.prototype.nextIdentifier = function(optLen) { + var letters = []; + var len = (typeof optLen === "number" ? optLen : 12); + for(var i=0; i