diff --git a/History.md b/History.md index ca5574123c..2e8be40749 100644 --- a/History.md +++ b/History.md @@ -1,5 +1,25 @@ ## v.NEXT +* Use "faye-websocket" (0.7.2) npm module instead of "websocket" (1.0.8) for + server-to-server DDP. + +* minimongo: Support {a: {$elemMatch: {x: 1, $or: [{a: 1}, {b: 1}]}}} #1875 + +* minimongo: Support {a: {$regex: '', $options: 'i'}} #1874 + +* Upgraded dependencies + - amplify: 1.1.2 (from 1.1.0) + + +## v0.7.1.2 + +* Fix bug in tool error handling that caused `meteor` to crash on Mac + OSX when no computer name is set. + +* Work around a bug that caused MongoDB to fail an assertion when using + tailable cursors on non-oplog collections. + + ## v0.7.1.1 * Integrate with Meteor developer accounts, a new way of managing your @@ -29,7 +49,7 @@ * Add and improve support for minimongo operators. - Support `$comment`. - Support `obj` name in `$where`. - - `$regexp` matches actual regexps properly. + - `$regex` matches actual regexps properly. - Improve support for `$nin`, `$ne`, `$not`. - Support using `{ $in: [/foo/, /bar/] }`. #1707 - Support `{$exists: false}`. diff --git a/docs/.meteor/release b/docs/.meteor/release index 7fce57eeba..5a848a1d77 100644 --- a/docs/.meteor/release +++ b/docs/.meteor/release @@ -1 +1 @@ -0.7.1.1 +0.7.1.2 diff --git a/docs/client/api.html b/docs/client/api.html index 18ef696f5d..9f21784252 100644 --- a/docs/client/api.html +++ b/docs/client/api.html @@ -1585,11 +1585,19 @@ By default, the current user's `username`, `emails` and `profile` are published to the client. You can publish additional fields for the current user with: + // server Meteor.publish("userData", function () { - return Meteor.users.find({_id: this.userId}, - {fields: {'other': 1, 'things': 1}}); + if (this.userId) { + return Meteor.users.find({_id: this.userId}, + {fields: {'other': 1, 'things': 1}}); + } else { + this.ready(); + } }); + // client + Meteor.subscribe("userData"); + If the autopublish package is installed, information about all users on the system is published to all clients. This includes `username`, `profile`, and any fields in `services` that are meant to be public diff --git a/docs/lib/release-override.js b/docs/lib/release-override.js index 646ed35154..523bf17b11 100644 --- a/docs/lib/release-override.js +++ b/docs/lib/release-override.js @@ -1,5 +1,5 @@ // While galaxy apps are on their own special meteor releases, override // Meteor.release here. if (Meteor.isClient) { - Meteor.release = Meteor.release ? "0.7.1.1" : undefined; + Meteor.release = Meteor.release ? "0.7.1.2" : undefined; } diff --git a/examples/leaderboard/.meteor/release b/examples/leaderboard/.meteor/release index 7fce57eeba..5a848a1d77 100644 --- a/examples/leaderboard/.meteor/release +++ b/examples/leaderboard/.meteor/release @@ -1 +1 @@ -0.7.1.1 +0.7.1.2 diff --git a/examples/parties/.meteor/release b/examples/parties/.meteor/release index 7fce57eeba..5a848a1d77 100644 --- a/examples/parties/.meteor/release +++ b/examples/parties/.meteor/release @@ -1 +1 @@ -0.7.1.1 +0.7.1.2 diff --git a/examples/todos/.meteor/release b/examples/todos/.meteor/release index 7fce57eeba..5a848a1d77 100644 --- a/examples/todos/.meteor/release +++ b/examples/todos/.meteor/release @@ -1 +1 @@ -0.7.1.1 +0.7.1.2 diff --git a/examples/wordplay/.meteor/release b/examples/wordplay/.meteor/release index 7fce57eeba..5a848a1d77 100644 --- a/examples/wordplay/.meteor/release +++ b/examples/wordplay/.meteor/release @@ -1 +1 @@ -0.7.1.1 +0.7.1.2 diff --git a/meteor b/meteor index 94e8ff0660..29fe3a330c 100755 --- a/meteor +++ b/meteor @@ -1,6 +1,6 @@ #!/bin/bash -BUNDLE_VERSION=0.3.32 +BUNDLE_VERSION=0.3.33 # OS Check. Put here because here is where we download the precompiled # bundles that are arch specific. diff --git a/packages/amplify/amplify.js b/packages/amplify/amplify.js index 00252de12c..a7b584f122 100644 --- a/packages/amplify/amplify.js +++ b/packages/amplify/amplify.js @@ -1,19 +1,10 @@ /*! - * AmplifyJS 1.1.0 - Core, Store, Request - * - * Copyright 2011 appendTo LLC. (http://appendto.com/team) + * Amplify 1.1.2 + * + * Copyright 2011 - 2013 appendTo LLC. (http://appendto.com/team) * Dual licensed under the MIT or GPL licenses. * http://appendto.com/open-source-licenses - * - * http://amplifyjs.com - */ -/*! - * Amplify Core 1.1.0 - * - * Copyright 2011 appendTo LLC. (http://appendto.com/team) - * Dual licensed under the MIT or GPL licenses. - * http://appendto.com/open-source-licenses - * + * * http://amplifyjs.com */ (function( global, undefined ) { @@ -23,6 +14,10 @@ var slice = [].slice, var amplify = global.amplify = { publish: function( topic ) { + if ( typeof topic !== "string" ) { + throw new Error( "You must provide a valid topic to publish." ); + } + var args = slice.call( arguments, 1 ), topicSubscriptions, subscription, @@ -46,6 +41,10 @@ var amplify = global.amplify = { }, subscribe: function( topic, context, callback, priority ) { + if ( typeof topic !== "string" ) { + throw new Error( "You must provide a valid topic to create a subscription." ); + } + if ( arguments.length === 3 && typeof callback === "number" ) { priority = callback; callback = context; @@ -67,14 +66,14 @@ var amplify = global.amplify = { if ( !subscriptions[ topic ] ) { subscriptions[ topic ] = []; } - + var i = subscriptions[ topic ].length - 1, subscriptionInfo = { callback: callback, context: context, priority: priority }; - + for ( ; i >= 0; i-- ) { if ( subscriptions[ topic ][ i ].priority <= priority ) { subscriptions[ topic ].splice( i + 1, 0, subscriptionInfo ); @@ -91,7 +90,16 @@ var amplify = global.amplify = { return callback; }, - unsubscribe: function( topic, callback ) { + unsubscribe: function( topic, context, callback ) { + if ( typeof topic !== "string" ) { + throw new Error( "You must provide a valid topic to remove a subscription." ); + } + + if ( arguments.length === 2 ) { + callback = context; + context = null; + } + if ( !subscriptions[ topic ] ) { return; } @@ -101,26 +109,23 @@ var amplify = global.amplify = { for ( ; i < length; i++ ) { if ( subscriptions[ topic ][ i ].callback === callback ) { - subscriptions[ topic ].splice( i, 1 ); - break; + if ( !context || subscriptions[ topic ][ i ].context === context ) { + subscriptions[ topic ].splice( i, 1 ); + + // Adjust counter and length for removed item + i--; + length--; + } } } } }; }( this ) ); -/*! - * Amplify Store - Persistent Client-Side Storage 1.1.0 - * - * Copyright 2011 appendTo LLC. (http://appendto.com/team) - * Dual licensed under the MIT or GPL licenses. - * http://appendto.com/open-source-licenses - * - * http://amplifyjs.com - */ + (function( amplify, undefined ) { -var store = amplify.store = function( key, value, options, type ) { +var store = amplify.store = function( key, value, options ) { var type = store.type; if ( options && options.type && options.type in store.types ) { type = options.type; @@ -141,9 +146,9 @@ store.addType = function( type, storage ) { options.type = type; return store( key, value, options ); }; -} +}; store.error = function() { - return "amplify.store quota exceeded"; + return "amplify.store quota exceeded"; }; var rprefix = /^__amplify__/; @@ -223,18 +228,21 @@ function createFromStorageInterface( storageType, storage ) { // localStorage + sessionStorage // IE 8+, Firefox 3.5+, Safari 4+, Chrome 4+, Opera 10.5+, iPhone 2+, Android 2+ for ( var webStorageType in { localStorage: 1, sessionStorage: 1 } ) { - // try/catch for file protocol in Firefox + // try/catch for file protocol in Firefox and Private Browsing in Safari 5 try { - if ( window[ webStorageType ].getItem ) { - createFromStorageInterface( webStorageType, window[ webStorageType ] ); - } + // Safari 5 in Private Browsing mode exposes localStorage + // but doesn't allow storing data, so we attempt to store and remove an item. + // This will unfortunately give us a false negative if we're at the limit. + window[ webStorageType ].setItem( "__amplify__", "x" ); + window[ webStorageType ].removeItem( "__amplify__" ); + createFromStorageInterface( webStorageType, window[ webStorageType ] ); } catch( e ) {} } // globalStorage // non-standard: Firefox 2+ // https://developer.mozilla.org/en/dom/storage#globalStorage -if ( window.globalStorage ) { +if ( !store.types.localStorage && window.globalStorage ) { // try/catch for file protocol in Firefox try { createFromStorageInterface( "globalStorage", @@ -306,7 +314,9 @@ if ( window.globalStorage ) { // http://www.w3.org/TR/REC-xml/#NT-Name // simplified to assume the starting character is valid // also removed colon as it is invalid in HTML attribute names - key = key.replace( /[^-._0-9A-Za-z\xb7\xc0-\xd6\xd8-\xf6\xf8-\u037d\u37f-\u1fff\u200c-\u200d\u203f\u2040\u2070-\u218f]/g, "-" ); + key = key.replace( /[^\-._0-9A-Za-z\xb7\xc0-\xd6\xd8-\xf6\xf8-\u037d\u037f-\u1fff\u200c-\u200d\u203f\u2040\u2070-\u218f]/g, "-" ); + // adjust invalid starting character to deal with our simplified sanitization + key = key.replace( /^-/, "_-" ); if ( value === undefined ) { attr = div.getAttribute( key ); @@ -402,16 +412,11 @@ if ( window.globalStorage ) { }() ); }( this.amplify = this.amplify || {} ) ); -/*! - * Amplify Request 1.1.0 - * - * Copyright 2011 appendTo LLC. (http://appendto.com/team) - * Dual licensed under the MIT or GPL licenses. - * http://appendto.com/open-source-licenses - * - * http://amplifyjs.com - */ + +/*global amplify*/ + (function( amplify, undefined ) { +'use strict'; function noop() {} function isFunction( obj ) { @@ -457,12 +462,14 @@ amplify.request = function( resourceId, data, callback ) { resource = amplify.request.resources[ settings.resourceId ], success = settings.success || noop, error = settings.error || noop; + settings.success = async( function( data, status ) { status = status || "success"; amplify.publish( "request.success", settings, data, status ); amplify.publish( "request.complete", settings, data, status ); success( data, status ); }); + settings.error = async( function( data, status ) { status = status || "error"; amplify.publish( "request.error", settings, data, status ); @@ -506,13 +513,11 @@ amplify.request.define = function( resourceId, type, settings ) { }( amplify ) ); - - - (function( amplify, $, undefined ) { +'use strict'; var xhrProps = [ "status", "statusText", "responseText", "responseXML", "readyState" ], - rurlData = /\{([^\}]+)\}/g; + rurlData = /\{([^\}]+)\}/g; amplify.request.types.ajax = function( defnSettings ) { defnSettings = $.extend({ @@ -520,7 +525,7 @@ amplify.request.types.ajax = function( defnSettings ) { }, defnSettings ); return function( settings, request ) { - var xhr, + var xhr, handleResponse, url = defnSettings.url, abort = request.abort, ajaxSettings = $.extend( true, {}, defnSettings, { data: settings.data } ), @@ -537,7 +542,7 @@ amplify.request.types.ajax = function( defnSettings ) { return xhr.getResponseHeader( key ); }, overrideMimeType: function( type ) { - return xhr.overrideMideType( type ); + return xhr.overrideMimeType( type ); }, abort: function() { aborted = true; @@ -555,28 +560,7 @@ amplify.request.types.ajax = function( defnSettings ) { } }; - amplify.publish( "request.ajax.preprocess", - defnSettings, settings, ajaxSettings, ampXHR ); - - $.extend( ajaxSettings, { - success: function( data, status ) { - handleResponse( data, status ); - }, - error: function( _xhr, status ) { - handleResponse( null, status ); - }, - beforeSend: function( _xhr, _ajaxSettings ) { - xhr = _xhr; - ajaxSettings = _ajaxSettings; - var ret = defnSettings.beforeSend ? - defnSettings.beforeSend.call( this, ampXHR, ajaxSettings ) : true; - return ret && amplify.publish( "request.before.ajax", - defnSettings, settings, ajaxSettings, ampXHR ); - } - }); - $.ajax( ajaxSettings ); - - function handleResponse( data, status ) { + handleResponse = function( data, status ) { $.each( xhrProps, function( i, key ) { try { ampXHR[ key ] = xhr[ key ]; @@ -603,8 +587,62 @@ amplify.request.types.ajax = function( defnSettings ) { // this can happen if a request is aborted // TODO: figure out if this breaks polling or multi-part responses handleResponse = $.noop; + }; + + amplify.publish( "request.ajax.preprocess", + defnSettings, settings, ajaxSettings, ampXHR ); + + $.extend( ajaxSettings, { + isJSONP: function () { + return (/jsonp/gi).test(this.dataType); + }, + cacheURL: function () { + if (!this.isJSONP()) { + return this.url; + } + + var callbackName = 'callback'; + + // possible for the callback function name to be overridden + if (this.hasOwnProperty('jsonp')) { + if (this.jsonp !== false) { + callbackName = this.jsonp; + } else { + if (this.hasOwnProperty('jsonpCallback')) { + callbackName = this.jsonpCallback; + } + } + } + + // search and replace callback parameter in query string with empty string + var callbackRegex = new RegExp('&?' + callbackName + '=[^&]*&?', 'gi'); + return this.url.replace(callbackRegex, ''); + }, + success: function( data, status ) { + handleResponse( data, status ); + }, + error: function( _xhr, status ) { + handleResponse( null, status ); + }, + beforeSend: function( _xhr, _ajaxSettings ) { + xhr = _xhr; + ajaxSettings = _ajaxSettings; + var ret = defnSettings.beforeSend ? + defnSettings.beforeSend.call( this, ampXHR, ajaxSettings ) : true; + return ret && amplify.publish( "request.before.ajax", + defnSettings, settings, ajaxSettings, ampXHR ); + } + }); + + // cache all JSONP requests + if (ajaxSettings.cache && ajaxSettings.isJSONP()) { + $.extend(ajaxSettings, { + cache: true + }); } + $.ajax( ajaxSettings ); + request.abort = function() { ampXHR.abort(); abort.call( this ); @@ -626,8 +664,8 @@ amplify.subscribe( "request.ajax.preprocess", function( defnSettings, settings, ajaxSettings.url = ajaxSettings.url.replace( rurlData, function ( m, key ) { if ( key in data ) { - mappedKeys.push( key ); - return data[ key ]; + mappedKeys.push( key ); + return data[ key ]; } }); @@ -668,13 +706,9 @@ var cache = amplify.request.cache = { _key: function( resourceId, url, data ) { data = url + data; var length = data.length, - i = 0, - checksum = chunk(); - - while ( i < length ) { - checksum ^= chunk(); - } + i = 0; + /*jshint bitwise:false*/ function chunk() { return data.charCodeAt( i++ ) << 24 | data.charCodeAt( i++ ) << 16 | @@ -682,6 +716,12 @@ var cache = amplify.request.cache = { data.charCodeAt( i++ ) << 0; } + var checksum = chunk(); + while ( i < length ) { + checksum ^= chunk(); + } + /*jshint bitwise:true*/ + return "request-" + resourceId + "-" + checksum; }, @@ -690,7 +730,7 @@ var cache = amplify.request.cache = { return function( resource, settings, ajaxSettings, ampXHR ) { // data is already converted to a string by the time we get here var cacheKey = cache._key( settings.resourceId, - ajaxSettings.url, ajaxSettings.data ), + ajaxSettings.cacheURL(), ajaxSettings.data ), duration = resource.cache; if ( cacheKey in memoryStore ) { @@ -715,7 +755,7 @@ if ( amplify.store ) { $.each( amplify.store.types, function( type ) { cache[ type ] = function( resource, settings, ajaxSettings, ampXHR ) { var cacheKey = cache._key( settings.resourceId, - ajaxSettings.url, ajaxSettings.data ), + ajaxSettings.cacheURL(), ajaxSettings.data ), cached = amplify.store[ type ]( cacheKey ); if ( cached ) { @@ -723,7 +763,7 @@ if ( amplify.store ) { return false; } var success = ampXHR.success; - ampXHR.success = function( data ) { + ampXHR.success = function( data ) { amplify.store[ type ]( cacheKey, data, { expires: resource.cache.expires } ); success.apply( this, arguments ); }; @@ -754,6 +794,8 @@ amplify.request.decoders = { } else if ( data.status === "error" ) { delete data.status; error( data, "error" ); + } else { + error( null, "error" ); } } }; @@ -761,11 +803,11 @@ amplify.request.decoders = { amplify.subscribe( "request.before.ajax", function( resource, settings, ajaxSettings, ampXHR ) { var _success = ampXHR.success, _error = ampXHR.error, - decoder = $.isFunction( resource.decoder ) - ? resource.decoder - : resource.decoder in amplify.request.decoders - ? amplify.request.decoders[ resource.decoder ] - : amplify.request.decoders._default; + decoder = $.isFunction( resource.decoder ) ? + resource.decoder : + resource.decoder in amplify.request.decoders ? + amplify.request.decoders[ resource.decoder ] : + amplify.request.decoders._default; if ( !decoder ) { return; diff --git a/packages/binary-heap/.gitignore b/packages/binary-heap/.gitignore new file mode 100644 index 0000000000..677a6fc263 --- /dev/null +++ b/packages/binary-heap/.gitignore @@ -0,0 +1 @@ +.build* diff --git a/packages/binary-heap/binary-heap-tests.js b/packages/binary-heap/binary-heap-tests.js new file mode 100644 index 0000000000..0a487b7b2f --- /dev/null +++ b/packages/binary-heap/binary-heap-tests.js @@ -0,0 +1,138 @@ +Tinytest.add("binary-heap - simple max-heap tests", function (test) { + var h = new MaxHeap(function (a, b) { return a-b; }); + h.set("a", 1); + h.set("b", 233); + h.set("c", -122); + h.set("d", 0); + h.set("e", 0); + + test.equal(h.size(), 5); + test.equal(h.maxElementId(), "b"); + test.equal(h.get("b"), 233); + + h.remove("b"); + test.equal(h.size(), 4); + test.equal(h.maxElementId(), "a"); + h.set("e", 44); + test.equal(h.maxElementId(), "e"); + test.equal(h.get("b"), null); + test.isTrue(h.has("a")); + test.isFalse(h.has("dd")); + + h.clear(); + test.isFalse(h.has("a")); + test.equal(h.size(), 0); + test.equal(h.setDefault("a", 12345), 12345); + test.equal(h.setDefault("a", 55555), 12345); + test.equal(h.size(), 1); + test.equal(h.maxElementId(), "a"); +}); + +Tinytest.add("binary-heap - big test for max-heap", function (test) { + var positiveNumbers = _.shuffle(_.range(1, 41)); + var negativeNumbers = _.shuffle(_.range(-1, -41, -1)); + var allNumbers = negativeNumbers.concat(positiveNumbers); + + var heap = new MaxHeap(function (a, b) { return a-b; }); + var output = []; + + _.each(allNumbers, function (n) { heap.set(n, n); }); + + _.times(positiveNumbers.length + negativeNumbers.length, function () { + var maxId = heap.maxElementId(); + output.push(heap.get(maxId)); + heap.remove(maxId); + }); + + allNumbers.sort(function (a, b) { return b-a; }); + + test.equal(output, allNumbers); +}); + +Tinytest.add("binary-heap - min-max heap tests", function (test) { + var h = new MinMaxHeap(function (a, b) { return a-b; }); + h.set("a", 1); + h.set("b", 233); + h.set("c", -122); + h.set("d", 0); + h.set("e", 0); + + test.equal(h.size(), 5); + test.equal(h.maxElementId(), "b"); + test.equal(h.get("b"), 233); + test.equal(h.minElementId(), "c"); + + h.remove("b"); + test.equal(h.size(), 4); + test.equal(h.minElementId(), "c"); + h.set("e", -123); + test.equal(h.minElementId(), "e"); + test.equal(h.get("b"), null); + test.isTrue(h.has("a")); + test.isFalse(h.has("dd")); + + h.clear(); + test.isFalse(h.has("a")); + test.equal(h.size(), 0); + test.equal(h.setDefault("a", 12345), 12345); + test.equal(h.setDefault("a", 55555), 12345); + test.equal(h.size(), 1); + test.equal(h.maxElementId(), "a"); + test.equal(h.minElementId(), "a"); +}); + +Tinytest.add("binary-heap - big test for min-max-heap", function (test) { + var N = 500; + var positiveNumbers = _.shuffle(_.range(1, N + 1)); + var negativeNumbers = _.shuffle(_.range(-1, -N - 1, -1)); + var allNumbers = positiveNumbers.concat(negativeNumbers); + + var heap = new MinMaxHeap(function (a, b) { return a-b; }); + var output = []; + + var initialSets = _.clone(allNumbers); + _.each(allNumbers, function (n) { + heap.set(n, n); + heap._selfCheck(); + heap._minHeap._selfCheck(); + }); + + allNumbers = _.shuffle(allNumbers); + var secondarySets = _.clone(allNumbers); + + _.each(allNumbers, function (n) { + heap.set(-n, n); + heap._selfCheck(); + heap._minHeap._selfCheck(); + }); + + _.times(positiveNumbers.length + negativeNumbers.length, function () { + var minId = heap.minElementId(); + output.push(heap.get(minId)); + heap.remove(minId); + heap._selfCheck(); heap._minHeap._selfCheck(); + }); + + test.equal(heap.size(), 0); + + allNumbers.sort(function (a, b) { return a-b; }); + + var initialTestText = "initial sets: " + initialSets.toString() + + "; secondary sets: " + secondarySets.toString(); + test.equal(output, allNumbers, initialTestText); + + _.each(initialSets, function (n) { heap.set(n, n); }) + _.each(secondarySets, function (n) { heap.set(-n, n); }); + + allNumbers.sort(function (a, b) { return b-a; }); + output = []; + _.times(positiveNumbers.length + negativeNumbers.length, function () { + var maxId = heap.maxElementId(); + output.push(heap.get(maxId)); + heap.remove(maxId); + heap._selfCheck(); heap._minHeap._selfCheck(); + }); + + test.equal(output, allNumbers, initialTestText); +}); + diff --git a/packages/binary-heap/max-heap.js b/packages/binary-heap/max-heap.js new file mode 100644 index 0000000000..aa08a3fad5 --- /dev/null +++ b/packages/binary-heap/max-heap.js @@ -0,0 +1,226 @@ +// Constructor of Heap +// - comparator - Function - given two items returns a number +// - options: +// - initData - Array - Optional - the initial data in a format: +// Object: +// - id - String - unique id of the item +// - value - Any - the data value +// each value is retained +// - IdMap - Constructor - Optional - custom IdMap class to store id->index +// mappings internally. Standard IdMap is used by default. +MaxHeap = function (comparator, options) { + if (! _.isFunction(comparator)) + throw new Error('Passed comparator is invalid, should be a comparison function'); + var self = this; + + // a C-style comparator that is given two values and returns a number, + // negative if the first value is less than the second, positive if the second + // value is greater than the first and zero if they are equal. + self._comparator = comparator; + + options = _.defaults(options || {}, { IdMap: IdMap }); + + // _heapIdx maps an id to an index in the Heap array the corresponding value + // is located on. + self._heapIdx = new options.IdMap; + + // The Heap data-structure implemented as a 0-based contiguous array where + // every item on index idx is a node in a complete binary tree. Every node can + // have children on indexes idx*2+1 and idx*2+2, except for the leaves. Every + // node has a parent on index (idx-1)/2; + self._heap = []; + + // If the initial array is passed, we can build the heap in linear time + // complexity (O(N)) compared to linearithmic time complexity (O(nlogn)) if + // we push elements one by one. + if (_.isArray(options.initData)) + self._initFromData(options.initData); +}; + +_.extend(MaxHeap.prototype, { + // Builds a new heap in-place in linear time based on passed data + _initFromData: function (data) { + var self = this; + + self._heap = _.map(data, function (o) { + return { id: o.id, value: o.value }; + }); + + _.each(data, function (o, i) { + self._heapIdx.set(o.id, i); + }); + + if (! data.length) + return; + + // start from the first non-leaf - the parent of the last leaf + for (var i = parentIdx(data.length - 1); i >= 0; i--) + self._downHeap(i); + }, + + _downHeap: function (idx) { + var self = this; + + while (leftChildIdx(idx) < self.size()) { + var left = leftChildIdx(idx); + var right = rightChildIdx(idx); + var largest = idx; + + if (left < self.size()) { + largest = self._maxIndex(largest, left); + } + if (right < self.size()) { + largest = self._maxIndex(largest, right); + } + + if (largest === idx) + break; + + self._swap(largest, idx); + idx = largest; + } + }, + + _upHeap: function (idx) { + var self = this; + + while (idx > 0) { + var parent = parentIdx(idx); + if (self._maxIndex(parent, idx) === idx) { + self._swap(parent, idx) + idx = parent; + } else { + break; + } + } + }, + + _maxIndex: function (idxA, idxB) { + var self = this; + var valueA = self._get(idxA); + var valueB = self._get(idxB); + return self._comparator(valueA, valueB) >= 0 ? idxA : idxB; + }, + + // Internal: gets raw data object placed on idxth place in heap + _get: function (idx) { + var self = this; + return self._heap[idx].value; + }, + + _swap: function (idxA, idxB) { + var self = this; + var recA = self._heap[idxA]; + var recB = self._heap[idxB]; + + self._heapIdx.set(recA.id, idxB); + self._heapIdx.set(recB.id, idxA); + + self._heap[idxA] = recB; + self._heap[idxB] = recA; + }, + + get: function (id) { + var self = this; + if (! self.has(id)) + return null; + return self._get(self._heapIdx.get(id)); + }, + set: function (id, value) { + var self = this; + + if (self.has(id)) { + if (self.get(id) === value) + return; + + var idx = self._heapIdx.get(id); + self._heap[idx].value = value; + + // Fix the new value's position + // Either bubble new value up if it is greater than its parent + self._upHeap(idx); + // or bubble it down if it is smaller than one of its children + self._downHeap(idx); + } else { + self._heapIdx.set(id, self._heap.length); + self._heap.push({ id: id, value: value }); + self._upHeap(self._heap.length - 1); + } + }, + remove: function (id) { + var self = this; + + if (self.has(id)) { + var last = self._heap.length - 1; + var idx = self._heapIdx.get(id); + + if (idx !== last) { + self._swap(idx, last); + self._heap.pop(); + self._heapIdx.remove(id); + + // Fix the swapped value's position + self._upHeap(idx); + self._downHeap(idx); + } else { + self._heap.pop(); + self._heapIdx.remove(id); + } + } + }, + has: function (id) { + var self = this; + return self._heapIdx.has(id); + }, + empty: function (id) { + var self = this; + return !self.size(); + }, + clear: function () { + var self = this; + self._heap = []; + self._heapIdx.clear(); + }, + // iterate over values in no particular order + forEach: function (iterator) { + var self = this; + _.each(self._heap, function (obj) { + return iterator(obj.value, obj.id); + }); + }, + size: function () { + var self = this; + return self._heap.length; + }, + setDefault: function (id, def) { + var self = this; + if (self.has(id)) + return self.get(id); + self.set(id, def); + return def; + }, + clone: function () { + var self = this; + var clone = new MaxHeap(self._comparator, self._heap); + return clone; + }, + + maxElementId: function () { + var self = this; + return self.size() ? self._heap[0].id : null; + }, + + _selfCheck: function () { + var self = this; + for (var i = 1; i < self._heap.length; i++) + if (self._maxIndex(parentIdx(i), i) !== parentIdx(i)) + throw new Error("An item with id " + self._heap[i].id + + " has a parent younger than it: " + + self._heap[parentIdx(i)].id); + } +}); + +function leftChildIdx (i) { return i * 2 + 1; } +function rightChildIdx (i) { return i * 2 + 2; } +function parentIdx (i) { return (i - 1) >> 1; } + diff --git a/packages/binary-heap/min-max-heap.js b/packages/binary-heap/min-max-heap.js new file mode 100644 index 0000000000..990a71402d --- /dev/null +++ b/packages/binary-heap/min-max-heap.js @@ -0,0 +1,55 @@ +// This implementation of Min/Max-Heap is just a subclass of Max-Heap +// with a Min-Heap as an encapsulated property. +// +// Most of the operations are just proxy methods to call the same method on both +// heaps. +// +// This implementation takes 2*N memory but is fairly simple to write and +// understand. And the constant factor of a simple Heap is usually smaller +// compared to other two-way priority queues like Min/Max Heaps +// (http://www.cs.otago.ac.nz/staffpriv/mike/Papers/MinMaxHeaps/MinMaxHeaps.pdf) +// and Interval Heaps +// (http://www.cise.ufl.edu/~sahni/dsaac/enrich/c13/double.htm) +MinMaxHeap = function (comparator, options) { + var self = this; + + MaxHeap.call(self, comparator, options); + self._minHeap = new MaxHeap(function (a, b) { + return -comparator(a, b); + }, options); +}; + +Meteor._inherits(MinMaxHeap, MaxHeap); + +_.extend(MinMaxHeap.prototype, { + set: function (id, value) { + var self = this; + MaxHeap.prototype.set.apply(self, arguments); + self._minHeap.set(id, value); + }, + remove: function (id) { + var self = this; + MaxHeap.prototype.remove.apply(self, arguments); + self._minHeap.remove(id); + }, + clear: function () { + var self = this; + MaxHeap.prototype.clear.apply(self, arguments); + self._minHeap.clear(); + }, + setDefault: function (id, def) { + var self = this; + MaxHeap.prototype.setDefault.apply(self, arguments); + return self._minHeap.setDefault(id, def); + }, + clone: function () { + var self = this; + var clone = new MinMaxHeap(self._comparator, self._heap); + return clone; + }, + minElementId: function () { + var self = this; + return self._minHeap.maxElementId(); + } +}); + diff --git a/packages/binary-heap/package.js b/packages/binary-heap/package.js new file mode 100644 index 0000000000..f8c4ae4613 --- /dev/null +++ b/packages/binary-heap/package.js @@ -0,0 +1,18 @@ +Package.describe({ + summary: "Binary Heap datastructure implementation", + internal: true +}); + +Package.on_use(function (api) { + api.export('MaxHeap'); + api.export('MinMaxHeap'); + api.use(['underscore', 'id-map']); + api.add_files(['max-heap.js', 'min-max-heap.js']); +}); + +Package.on_test(function (api) { + api.use('tinytest'); + api.use('binary-heap'); + api.add_files('binary-heap-tests.js'); +}); + diff --git a/packages/facebook/facebook_configure.html b/packages/facebook/facebook_configure.html index 223b8adf2b..57285081e0 100644 --- a/packages/facebook/facebook_configure.html +++ b/packages/facebook/facebook_configure.html @@ -7,16 +7,21 @@ Visit https://developers.facebook.com/apps
  • - Create New App (Only a name is required.) + Select "Apps", then "Create a New App". (You don't need to enter a namespace.)
  • - Set "Sandbox Mode" to "Disabled" -
  • -
  • - Under "Select how your app integrates with Facebook", expand "Website with Facebook Login". + Select "Settings" and enter a "Contact Email". Then select "Add Platform" + and choose "Website".
  • Set Site URL to: {{siteUrl}}
  • +
  • + Select "Status" and make the app and all its live features available to + the general public. +
  • +
  • + Select "Dashboard". +
  • diff --git a/packages/facts/facts.js b/packages/facts/facts.js index 543023b271..d2f8d3f979 100644 --- a/packages/facts/facts.js +++ b/packages/facts/facts.js @@ -19,6 +19,9 @@ if (Meteor.isServer) { var factsByPackage = {}; var activeSubscriptions = []; + // Make factsByPackage data available to the server environment + Facts._factsByPackage = factsByPackage; + Facts.incrementServerFact = function (pkg, fact, increment) { if (!_.has(factsByPackage, pkg)) { factsByPackage[pkg] = {}; diff --git a/packages/google/google_configure.html b/packages/google/google_configure.html index 6bae558fb9..83917941b2 100644 --- a/packages/google/google_configure.html +++ b/packages/google/google_configure.html @@ -10,28 +10,22 @@ If necessary, "Create Project"
  • - Click "APIs & auth" and "Registered apps" on the left + Click "APIs & auth" and "Credentials" on the left
  • - Click the "Register App" button + Click the "Create New Client ID" button
  • Choose "Web application" as the type
  • - Click "Register" + Set Authorized Javascript Origins to: {{siteUrl}}
  • - Expand the "OAuth 2.0 Client ID section" + Set Authorized Redirect URI to: {{siteUrl}}_oauth/google?close
  • - Set Web Origin to: {{siteUrl}} -
  • -
  • - Set Redirect URI to: {{siteUrl}}_oauth/google?close -
  • -
  • - Click "Generate" + Click "Create Client ID"
  • diff --git a/packages/id-map/.gitignore b/packages/id-map/.gitignore new file mode 100644 index 0000000000..677a6fc263 --- /dev/null +++ b/packages/id-map/.gitignore @@ -0,0 +1 @@ +.build* diff --git a/packages/id-map/id-map.js b/packages/id-map/id-map.js new file mode 100644 index 0000000000..888fdee63d --- /dev/null +++ b/packages/id-map/id-map.js @@ -0,0 +1,77 @@ +IdMap = function (idStringify, idParse) { + var self = this; + self._map = {}; + self._idStringify = idStringify || JSON.stringify; + self._idParse = idParse || JSON.parse; +}; + +// Some of these methods are designed to match methods on OrderedDict, since +// (eg) ObserveMultiplex and _CachingChangeObserver use them interchangeably. +// (Conceivably, this should be replaced with "UnorderedDict" with a specific +// set of methods that overlap between the two.) + +_.extend(IdMap.prototype, { + get: function (id) { + var self = this; + var key = self._idStringify(id); + return self._map[key]; + }, + set: function (id, value) { + var self = this; + var key = self._idStringify(id); + self._map[key] = value; + }, + remove: function (id) { + var self = this; + var key = self._idStringify(id); + delete self._map[key]; + }, + has: function (id) { + var self = this; + var key = self._idStringify(id); + return _.has(self._map, key); + }, + empty: function () { + var self = this; + return _.isEmpty(self._map); + }, + clear: function () { + var self = this; + self._map = {}; + }, + // Iterates over the items in the map. Return `false` to break the loop. + forEach: function (iterator) { + var self = this; + // don't use _.each, because we can't break out of it. + var keys = _.keys(self._map); + for (var i = 0; i < keys.length; i++) { + var breakIfFalse = iterator.call(null, self._map[keys[i]], + self._idParse(keys[i])); + if (breakIfFalse === false) + return; + } + }, + size: function () { + var self = this; + return _.size(self._map); + }, + setDefault: function (id, def) { + var self = this; + var key = self._idStringify(id); + if (_.has(self._map, key)) + return self._map[key]; + self._map[key] = def; + return def; + }, + // Assumes that values are EJSON-cloneable, and that we don't need to clone + // IDs (ie, that nobody is going to mutate an ObjectId). + clone: function () { + var self = this; + var clone = new IdMap(self._idStringify, self._idParse); + self.forEach(function (value, id) { + clone.set(id, EJSON.clone(value)); + }); + return clone; + } +}); + diff --git a/packages/id-map/package.js b/packages/id-map/package.js new file mode 100644 index 0000000000..876d3406a6 --- /dev/null +++ b/packages/id-map/package.js @@ -0,0 +1,11 @@ +Package.describe({ + summary: "Dictionary data structure allowing non-string keys", + internal: true +}); + +Package.on_use(function (api) { + api.export('IdMap'); + api.use(['underscore', 'json', 'ejson']); + api.add_files([ 'id-map.js' ]); +}); + diff --git a/packages/less/plugin/compile-less.js b/packages/less/plugin/compile-less.js index fb8265abe7..a1e15445b5 100644 --- a/packages/less/plugin/compile-less.js +++ b/packages/less/plugin/compile-less.js @@ -25,10 +25,17 @@ Plugin.registerSourceHandler("less", function (compileStep) { var parser = new less.Parser(options); var astFuture = new Future; - var ast; + var sourceMap = null; try { parser.parse(source, astFuture.resolver()); - ast = astFuture.wait(); + var ast = astFuture.wait(); + + var css = ast.toCSS({ + sourceMap: true, + writeSourceMap: function (sm) { + sourceMap = JSON.parse(sm); + } + }); } catch (e) { // less.Parser.parse is supposed to report any errors via its // callback. But sometimes, it throws them instead. This is @@ -42,14 +49,6 @@ Plugin.registerSourceHandler("less", function (compileStep) { return; } - var sourceMap = null; - var css = ast.toCSS({ - sourceMap: true, - writeSourceMap: function (sm) { - sourceMap = JSON.parse(sm); - } - }); - if (sourceMap) { sourceMap.sources = [compileStep.inputPath]; diff --git a/packages/livedata/.npm/package/npm-shrinkwrap.json b/packages/livedata/.npm/package/npm-shrinkwrap.json index f48e03a46b..92d8fd541c 100644 --- a/packages/livedata/.npm/package/npm-shrinkwrap.json +++ b/packages/livedata/.npm/package/npm-shrinkwrap.json @@ -16,8 +16,13 @@ } } }, - "websocket": { - "version": "1.0.8" + "faye-websocket": { + "version": "0.7.2", + "dependencies": { + "websocket-driver": { + "version": "0.3.2" + } + } } } } diff --git a/packages/livedata/livedata_connection.js b/packages/livedata/livedata_connection.js index 30ff1ef377..be5b79e15c 100644 --- a/packages/livedata/livedata_connection.js +++ b/packages/livedata/livedata_connection.js @@ -49,7 +49,10 @@ var Connection = function (url, options) { self._stream = new LivedataTest.ClientStream(url, { retry: options.retry, headers: options.headers, - _sockjsOptions: options._sockjsOptions + _sockjsOptions: options._sockjsOptions, + // To keep some tests quiet (because we don't have a real API for handling + // client-stream-level errors). + _dontPrintErrors: options._dontPrintErrors }); } diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index 95ddab9e6f..84dc2bb68f 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -839,6 +839,13 @@ var Subscription = function ( _.extend(Subscription.prototype, { _runHandler: function () { + // XXX should we unblock() here? Either before running the publish + // function, or before running _publishCursor. + // + // Right now, each publish function blocks all future publishes and + // methods waiting on data from Mongo (or whatever else the function + // blocks on). This probably slows page load in common cases. + var self = this; try { var res = maybeAuditArgumentChecks( @@ -1160,7 +1167,7 @@ _.extend(Server.prototype, { // drop all future data coming over this connection on the // floor. We don't want to confuse things. socket.removeAllListeners('data'); - setTimeout(function () { + Meteor.setTimeout(function () { socket.send(stringifyDDP({msg: 'failed', version: version})); socket.close(); }, timeout); diff --git a/packages/livedata/livedata_tests.js b/packages/livedata/livedata_tests.js index 65736d76f8..07bd42e779 100644 --- a/packages/livedata/livedata_tests.js +++ b/packages/livedata/livedata_tests.js @@ -683,9 +683,10 @@ if (Meteor.isServer) { testAsyncMulti("livedata - connect fails to unknown place", [ function (test, expect) { var self = this; - self.conn = DDP.connect("example.com"); + self.conn = DDP.connect("example.com", {_dontPrintErrors: true}); Meteor.setTimeout(expect(function () { test.isFalse(self.conn.status().connected, "Not connected"); + self.conn.close(); }), 500); } ]); diff --git a/packages/livedata/package.js b/packages/livedata/package.js index d2b407ae13..0e585b8467 100644 --- a/packages/livedata/package.js +++ b/packages/livedata/package.js @@ -3,7 +3,11 @@ Package.describe({ internal: true }); -Npm.depends({sockjs: "0.3.8", websocket: "1.0.8"}); +// We use 'faye-websocket' for connections in server-to-server DDP, mostly +// because it's the same library used as a server in sockjs, and it's easiest to +// deal with a single websocket implementation. (Plus, its maintainer is easy +// to work with on pull requests.) +Npm.depends({sockjs: "0.3.8", "faye-websocket": "0.7.2"}); Package.on_use(function (api) { api.use(['check', 'random', 'ejson', 'json', 'underscore', 'deps', diff --git a/packages/livedata/stream_client_nodejs.js b/packages/livedata/stream_client_nodejs.js index 708189deea..a3bc996d27 100644 --- a/packages/livedata/stream_client_nodejs.js +++ b/packages/livedata/stream_client_nodejs.js @@ -11,43 +11,16 @@ // ping frames or with DDP-level messages.) LivedataTest.ClientStream = function (endpoint, options) { var self = this; + options = options || {}; + self.options = _.extend({ retry: true }, options); - // WebSocket-Node https://github.com/Worlize/WebSocket-Node - // Chosen because it can run without native components. It has a - // somewhat idiosyncratic API. We may want to use 'ws' instead in the - // future. - // - // Since server-to-server DDP is still an experimental feature, we only - // require the module if we actually create a server-to-server - // connection. This is a minor efficiency improvement, but moreover: while - // 'websocket' doesn't require native components, it tries to use some - // optional native components and prints a warning if it can't load - // them. Since native components in packages don't work when transferred to - // other architectures yet, this means that require('websocket') prints a - // spammy log message when deployed to another architecture. Delaying the - // require means you only get the log message if you're actually using the - // feature. - self.client = new (Npm.require('websocket').client)(); + self.client = null; // created in _launchConnection self.endpoint = endpoint; - self.currentConnection = null; - options = options || {}; - self.headers = options.headers || {}; - - self.client.on('connect', Meteor.bindEnvironment( - function (connection) { - return self._onConnect(connection); - }, - "stream connect callback" - )); - - self.client.on('connectFailed', function (error) { - // XXX: Make this do something better than make the tests hang if it does not work. - return self._lostConnection(); - }); + self.headers = self.options.headers || {}; self._initCommon(); @@ -63,7 +36,7 @@ _.extend(LivedataTest.ClientStream.prototype, { send: function (data) { var self = this; if (self.currentStatus.connected) { - self.currentConnection.send(data); + self.client.send(data); } }, @@ -73,80 +46,37 @@ _.extend(LivedataTest.ClientStream.prototype, { self.endpoint = url; }, - _onConnect: function (connection) { + _onConnect: function (client) { var self = this; + if (client !== self.client) { + // This connection is not from the last call to _launchConnection. + // But _launchConnection calls _cleanup which closes previous connections. + // It's our belief that this stifles future 'open' events, but maybe + // we are wrong? + throw new Error("Got open from inactive client"); + } + if (self._forcedToDisconnect) { // We were asked to disconnect between trying to open the connection and // actually opening it. Let's just pretend this never happened. - connection.close(); + self.client.close(); + self.client = null; return; } if (self.currentStatus.connected) { - // We already have a connection. It must have been the case that - // we started two parallel connection attempts (because we - // wanted to 'reconnect now' on a hanging connection and we had - // no way to cancel the connection attempt.) Just ignore/close - // the latecomer. - connection.close(); - return; + // We already have a connection. It must have been the case that we + // started two parallel connection attempts (because we wanted to + // 'reconnect now' on a hanging connection and we had no way to cancel the + // connection attempt.) But this shouldn't happen (similarly to the client + // !== self.client check above). + throw new Error("Two parallel connections?"); } - if (self.connectionTimer) { - clearTimeout(self.connectionTimer); - self.connectionTimer = null; - } - - var onError = Meteor.bindEnvironment( - function (_this, error) { - if (self.currentConnection !== _this) - return; - - Meteor._debug("stream error", error.toString(), - (new Date()).toDateString()); - self._lostConnection(); - }, - "stream error callback" - ); - - connection.on('error', function (error) { - // We have to pass in `this` explicitly because bindEnvironment - // doesn't propagate it for us. - onError(this, error); - }); - - var onClose = Meteor.bindEnvironment( - function (_this) { - if (self.options._testOnClose) - self.options._testOnClose(); - - if (self.currentConnection !== _this) - return; - - self._lostConnection(); - }, - "stream close callback" - ); - - connection.on('close', function () { - // We have to pass in `this` explicitly because bindEnvironment - // doesn't propagate it for us. - onClose(this); - }); - - connection.on('message', function (message) { - if (self.currentConnection !== this) - return; // old connection still emitting messages - - if (message.type === "utf8") // ignore binary frames - _.each(self.eventCallbacks.message, function (callback) { - callback(message.utf8Data); - }); - }); + self._clearConnectionTimer(); // update status - self.currentConnection = connection; self.currentStatus.status = "connected"; self.currentStatus.connected = true; self.currentStatus.retryCount = 0; @@ -161,10 +91,10 @@ _.extend(LivedataTest.ClientStream.prototype, { var self = this; self._clearConnectionTimer(); - if (self.currentConnection) { - var conn = self.currentConnection; - self.currentConnection = null; - conn.close(); + if (self.client) { + var client = self.client; + self.client = null; + client.close(); } }, @@ -181,25 +111,63 @@ _.extend(LivedataTest.ClientStream.prototype, { var self = this; self._cleanup(); // cleanup the old socket, if there was one. - // launch a connect attempt. we have no way to track it. we either - // get an _onConnect event, or we don't. + // Since server-to-server DDP is still an experimental feature, we only + // require the module if we actually create a server-to-server + // connection. + var FayeWebSocket = Npm.require('faye-websocket'); - // XXX: set up a timeout on this. + // We would like to specify 'ddp' as the subprotocol here. The npm module we + // used to use as a client would fail the handshake if we ask for a + // subprotocol and the server doesn't send one back (and sockjs doesn't). + // Faye doesn't have that behavior; it's unclear from reading RFC 6455 if + // Faye is erroneous or not. So for now, we don't specify protocols. + var client = self.client = new FayeWebSocket.Client( + toWebsocketUrl(self.endpoint), + [/*no subprotocols*/], + {headers: self.headers} + ); - // we would like to specify 'ddp' as the protocol here, but - // unfortunately WebSocket-Node fails the handshake if we ask for - // a protocol and the server doesn't send one back (and sockjs - // doesn't). also, related: I guess we have to accept that - // 'stream' is ddp-specific - self.client.connect(toWebsocketUrl(self.endpoint), - undefined, // protocols - undefined, // origin - self.headers); - - if (self.connectionTimer) - clearTimeout(self.connectionTimer); - self.connectionTimer = setTimeout( + self._clearConnectionTimer(); + self.connectionTimer = Meteor.setTimeout( _.bind(self._lostConnection, self), self.CONNECT_TIMEOUT); + + self.client.on('open', Meteor.bindEnvironment(function () { + return self._onConnect(client); + }, "stream connect callback")); + + var clientOnIfCurrent = function (event, description, f) { + self.client.on(event, Meteor.bindEnvironment(function () { + // Ignore events from any connection we've already cleaned up. + if (client !== self.client) + return; + f.apply(this, arguments); + }, description)); + }; + + clientOnIfCurrent('error', 'stream error callback', function (error) { + if (!self.options._dontPrintErrors) + Meteor._debug("stream error", error.message); + + // XXX: Make this do something better than make the tests hang if it does + // not work. + self._lostConnection(); + }); + + + clientOnIfCurrent('close', 'stream close callback', function () { + self._lostConnection(); + }); + + + clientOnIfCurrent('message', 'stream message callback', function (message) { + // Ignore binary frames, where message.data is a Buffer + if (typeof message.data !== "string") + return; + + _.each(self.eventCallbacks.message, function (callback) { + callback(message.data); + }); + }); } }); diff --git a/packages/livedata/stream_client_tests.js b/packages/livedata/stream_client_tests.js index dbb675852d..d9402a2157 100644 --- a/packages/livedata/stream_client_tests.js +++ b/packages/livedata/stream_client_tests.js @@ -1,17 +1,24 @@ var Fiber = Npm.require('fibers'); -Tinytest.addAsync("stream client - callbacks run in a fiber", function (test, onComplete) { - stream = new LivedataTest.ClientStream( - Meteor.absoluteUrl(), - { - _testOnClose: function () { - test.isTrue(Fiber.current); - onComplete(); - } - } - ); - stream.on('reset', function () { - test.isTrue(Fiber.current); - stream.disconnect(); - }); -}); +testAsyncMulti("stream client - callbacks run in a fiber", [ + function (test, expect) { + var stream = new LivedataTest.ClientStream(Meteor.absoluteUrl()); + + var messageFired = false; + var resetFired = false; + + stream.on('message', expect(function () { + test.isTrue(Fiber.current); + if (resetFired) + stream.disconnect(); + messageFired = true; + })); + + stream.on('reset', expect(function () { + test.isTrue(Fiber.current); + if (messageFired) + stream.disconnect(); + resetFired = true; + })); + } +]); diff --git a/packages/meteor/helpers.js b/packages/meteor/helpers.js index 9b05246669..341df557b9 100644 --- a/packages/meteor/helpers.js +++ b/packages/meteor/helpers.js @@ -111,5 +111,26 @@ _.extend(Meteor, { return fut.wait(); return result; }; + }, + + // Sets child's prototype to a new object whose prototype is parent's + // prototype. Used as: + // Meteor._inherit(ClassB, ClassA). + // _.extend(ClassB.prototype, { ... }) + // Inspired by CoffeeScript's `extend` and Google Closure's `goog.inherits`. + _inherits: function (Child, Parent) { + // copy static fields + _.each(Parent, function (prop, field) { + Child[field] = prop; + }); + + // a middle member of prototype chain: takes the prototype from the Parent + var Middle = function () { + this.constructor = Child; + }; + Middle.prototype = Parent.prototype; + Child.prototype = new Middle(); + Child.__super__ = Parent.prototype; + return Child; } }); diff --git a/packages/minimongo/helpers.js b/packages/minimongo/helpers.js index d4046124f2..ab8cc78b6f 100644 --- a/packages/minimongo/helpers.js +++ b/packages/minimongo/helpers.js @@ -16,7 +16,10 @@ isIndexable = function (x) { return isArray(x) || isPlainObject(x); }; -isOperatorObject = function (valueSelector) { +// Returns true if this is an object with at least one key and all keys begin +// with $. Unless inconsistentOK is set, throws if some keys begin with $ and +// others don't. +isOperatorObject = function (valueSelector, inconsistentOK) { if (!isPlainObject(valueSelector)) return false; @@ -26,7 +29,10 @@ isOperatorObject = function (valueSelector) { if (theseAreOperators === undefined) { theseAreOperators = thisIsOperator; } else if (theseAreOperators !== thisIsOperator) { - throw new Error("Inconsistent operator: " + valueSelector); + if (!inconsistentOK) + throw new Error("Inconsistent operator: " + + JSON.stringify(valueSelector)); + theseAreOperators = false; } }); return !!theseAreOperators; // {} has no operators diff --git a/packages/minimongo/id_map.js b/packages/minimongo/id_map.js index 5c2b628155..ba4880980b 100644 --- a/packages/minimongo/id_map.js +++ b/packages/minimongo/id_map.js @@ -1,74 +1,7 @@ LocalCollection._IdMap = function () { var self = this; - self._map = {}; + IdMap.call(self, LocalCollection._idStringify, LocalCollection._idParse); }; -// Some of these methods are designed to match methods on OrderedDict, since -// (eg) ObserveMultiplex and _CachingChangeObserver use them interchangeably. -// (Conceivably, this should be replaced with "UnorderedDict" with a specific -// set of methods that overlap between the two.) +Meteor._inherits(LocalCollection._IdMap, IdMap); -_.extend(LocalCollection._IdMap.prototype, { - get: function (id) { - var self = this; - var key = LocalCollection._idStringify(id); - return self._map[key]; - }, - set: function (id, value) { - var self = this; - var key = LocalCollection._idStringify(id); - self._map[key] = value; - }, - remove: function (id) { - var self = this; - var key = LocalCollection._idStringify(id); - delete self._map[key]; - }, - has: function (id) { - var self = this; - var key = LocalCollection._idStringify(id); - return _.has(self._map, key); - }, - empty: function () { - var self = this; - return _.isEmpty(self._map); - }, - clear: function () { - var self = this; - self._map = {}; - }, - // Iterates over the items in the map. Return `false` to break the loop. - forEach: function (iterator) { - var self = this; - // don't use _.each, because we can't break out of it. - var keys = _.keys(self._map); - for (var i = 0; i < keys.length; i++) { - var breakIfFalse = iterator.call(null, self._map[keys[i]], - LocalCollection._idParse(keys[i])); - if (breakIfFalse === false) - return; - } - }, - size: function () { - var self = this; - return _.size(self._map); - }, - setDefault: function (id, def) { - var self = this; - var key = LocalCollection._idStringify(id); - if (_.has(self._map, key)) - return self._map[key]; - self._map[key] = def; - return def; - }, - // Assumes that values are EJSON-cloneable, and that we don't need to clone - // IDs (ie, that nobody is going to mutate an ObjectId). - clone: function () { - var self = this; - var clone = new LocalCollection._IdMap; - self.forEach(function (value, id) { - clone.set(id, EJSON.clone(value)); - }); - return clone; - } -}); diff --git a/packages/minimongo/minimongo_server_tests.js b/packages/minimongo/minimongo_server_tests.js index efd7808b4c..127e30a8e6 100644 --- a/packages/minimongo/minimongo_server_tests.js +++ b/packages/minimongo/minimongo_server_tests.js @@ -332,6 +332,31 @@ Tinytest.add("minimongo - selector and projection combination", function (test) }); +Tinytest.add("minimongo - sorter and projection combination", function (test) { + function testSorterProjectionComb (sortSpec, proj, expected, desc) { + var sorter = new Minimongo.Sorter(sortSpec); + test.equal(sorter.combineIntoProjection(proj), expected, desc); + } + + // Test with inclusive projection + testSorterProjectionComb({ a: 1, b: 1 }, { b: 1, c: 1, d: 1 }, { a: true, b: true, c: true, d: true }, "simplest incl"); + testSorterProjectionComb({ a: 1, b: -1 }, { b: 1, c: 1, d: 1 }, { a: true, b: true, c: true, d: true }, "simplest incl"); + testSorterProjectionComb({ 'a.c': 1 }, { b: 1 }, { 'a.c': true, b: true }, "dot path incl"); + testSorterProjectionComb({ 'a.1.c': 1 }, { b: 1 }, { 'a.c': true, b: true }, "dot num path incl"); + testSorterProjectionComb({ 'a.1.c': 1 }, { b: 1, a: 1 }, { a: true, b: true }, "dot num path incl overlap"); + testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, { b: 1 }, { 'a.c': true, 'a.b': true, b: true }, "dot num path incl"); + testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, {}, {}, "dot num path with empty incl"); + + // Test with exclusive projection + testSorterProjectionComb({ a: 1, b: 1 }, { b: 0, c: 0, d: 0 }, { c: false, d: false }, "simplest excl"); + testSorterProjectionComb({ a: 1, b: -1 }, { b: 0, c: 0, d: 0 }, { c: false, d: false }, "simplest excl"); + testSorterProjectionComb({ 'a.c': 1 }, { b: 0 }, { b: false }, "dot path excl"); + testSorterProjectionComb({ 'a.1.c': 1 }, { b: 0 }, { b: false }, "dot num path excl"); + testSorterProjectionComb({ 'a.1.c': 1 }, { b: 0, a: 0 }, { b: false }, "dot num path excl overlap"); + testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, { b: 0 }, { b: false }, "dot num path excl"); +}); + + (function () { // TODO: Tests for "can selector become true by modifier" are incomplete, // absent or test the functionality of "not ideal" implementation (test checks diff --git a/packages/minimongo/minimongo_tests.js b/packages/minimongo/minimongo_tests.js index ef2fed60b1..f299f2206f 100644 --- a/packages/minimongo/minimongo_tests.js +++ b/packages/minimongo/minimongo_tests.js @@ -678,6 +678,9 @@ Tinytest.add("minimongo - selector_compiler", function (test) { nomatch({a: {$regex: 'a'}}, {a: 'cut'}); nomatch({a: {$regex: 'a'}}, {a: 'CAT'}); match({a: {$regex: 'a', $options: 'i'}}, {a: 'CAT'}); + match({a: {$regex: '', $options: 'i'}}, {a: 'foo'}); + nomatch({a: {$regex: '', $options: 'i'}}, {}); + nomatch({a: {$regex: '', $options: 'i'}}, {a: 5}); nomatch({a: /undefined/}, {}); nomatch({a: {$regex: 'undefined'}}, {}); nomatch({a: /xxx/}, {}); @@ -817,6 +820,12 @@ Tinytest.add("minimongo - selector_compiler", function (test) { nomatch({$or: [{a: 2}, {a: 3}], b: 2}, {a: 1, b: 2}); nomatch({$or: [{a: 1}, {a: 2}], b: 3}, {a: 1, b: 2}); + // Combining $or with equality + match({x: 1, $or: [{a: 1}, {b: 1}]}, {x: 1, b: 1}); + match({$or: [{a: 1}, {b: 1}], x: 1}, {x: 1, b: 1}); + nomatch({x: 1, $or: [{a: 1}, {b: 1}]}, {b: 1}); + nomatch({x: 1, $or: [{a: 1}, {b: 1}]}, {x: 1}); + // $or and $lt, $lte, $gt, $gte match({$or: [{a: {$lte: 1}}, {a: 2}]}, {a: 1}); nomatch({$or: [{a: {$lt: 1}}, {a: 2}]}, {a: 1}); @@ -1100,6 +1109,16 @@ Tinytest.add("minimongo - selector_compiler", function (test) { nomatch({a: {$elemMatch: {x: 5}}}, {a: {x: 5}}); match({a: {$elemMatch: {0: {$gt: 5, $lt: 9}}}}, {a: [[6]]}); match({a: {$elemMatch: {'0.b': {$gt: 5, $lt: 9}}}}, {a: [[{b:6}]]}); + match({a: {$elemMatch: {x: 1, $or: [{a: 1}, {b: 1}]}}}, + {a: [{x: 1, b: 1}]}); + match({a: {$elemMatch: {$or: [{a: 1}, {b: 1}], x: 1}}}, + {a: [{x: 1, b: 1}]}); + nomatch({a: {$elemMatch: {x: 1, $or: [{a: 1}, {b: 1}]}}}, + {a: [{b: 1}]}); + nomatch({a: {$elemMatch: {x: 1, $or: [{a: 1}, {b: 1}]}}}, + {a: [{x: 1}]}); + nomatch({a: {$elemMatch: {x: 1, $or: [{a: 1}, {b: 1}]}}}, + {a: [{x: 1}, {b: 1}]}); // $comment match({a: 5, $comment: "asdf"}, {a: 5}); @@ -1549,7 +1568,7 @@ Tinytest.add("minimongo - ordering", function (test) { // document ordering under a sort specification var verify = function (sorts, docs) { _.each(_.isArray(sorts) ? sorts : [sorts], function (sort) { - var sorter = new MinimongoTest.Sorter(sort); + var sorter = new Minimongo.Sorter(sort); assert_ordering(test, sorter.getComparator(), docs); }); }; @@ -1577,15 +1596,15 @@ Tinytest.add("minimongo - ordering", function (test) { [{c: 1}, {a: 1, b: 2}, {a: 1, b: 3}, {a: 2, b: 0}]); test.throws(function () { - new MinimongoTest.Sorter("a"); + new Minimongo.Sorter("a"); }); test.throws(function () { - new MinimongoTest.Sorter(123); + new Minimongo.Sorter(123); }); // No sort spec implies everything equal. - test.equal(new MinimongoTest.Sorter({}).getComparator()({a:1}, {a:2}), 0); + test.equal(new Minimongo.Sorter({}).getComparator()({a:1}, {a:2}), 0); // All sorts of array edge cases! // Increasing sort sorts by the smallest element it finds; 1 < 2. diff --git a/packages/minimongo/package.js b/packages/minimongo/package.js index 8791cbfe77..4dca1e8433 100644 --- a/packages/minimongo/package.js +++ b/packages/minimongo/package.js @@ -7,7 +7,7 @@ Package.on_use(function (api) { api.export('LocalCollection'); api.export('Minimongo'); api.export('MinimongoTest', { testOnly: true }); - api.use(['underscore', 'json', 'ejson', 'ordered-dict', 'deps', + api.use(['underscore', 'json', 'ejson', 'id-map', 'ordered-dict', 'deps', 'random', 'ordered-dict']); // This package is used for geo-location queries such as $near api.use('geojson-utils'); @@ -28,7 +28,8 @@ Package.on_use(function (api) { // Functionality used only by oplog tailing on the server side api.add_files([ 'selector_projection.js', - 'selector_modifier.js' + 'selector_modifier.js', + 'sorter_projection.js' ], 'server'); }); diff --git a/packages/minimongo/selector.js b/packages/minimongo/selector.js index 87abd1ca27..80636bb12d 100644 --- a/packages/minimongo/selector.js +++ b/packages/minimongo/selector.js @@ -384,7 +384,7 @@ var VALUE_OPERATORS = { }, // $options just provides options for $regex; its logic is inside $regex $options: function (operand, valueSelector) { - if (!valueSelector.$regex) + if (!_.has(valueSelector, '$regex')) throw Error("$options needs a $regex"); return everythingMatcher; }, @@ -653,7 +653,7 @@ var ELEMENT_OPERATORS = { throw Error("$elemMatch need an object"); var subMatcher, isDocMatcher; - if (isOperatorObject(operand)) { + if (isOperatorObject(operand, true)) { subMatcher = compileValueSelector(operand, matcher); isDocMatcher = false; } else { diff --git a/packages/minimongo/selector_projection.js b/packages/minimongo/selector_projection.js index 73a2727a8e..5f6e101b5b 100644 --- a/packages/minimongo/selector_projection.js +++ b/packages/minimongo/selector_projection.js @@ -3,7 +3,7 @@ // @returns Object - projection object (same as fields option of mongo cursor) Minimongo.Matcher.prototype.combineIntoProjection = function (projection) { var self = this; - var selectorPaths = self._getPathsElidingNumericKeys(); + var selectorPaths = Minimongo._pathsElidingNumericKeys(self._getPaths()); // Special case for $where operator in the selector - projection should depend // on all fields of the document. getSelectorPaths returns a list of paths @@ -12,12 +12,23 @@ Minimongo.Matcher.prototype.combineIntoProjection = function (projection) { if (_.contains(selectorPaths, '')) return {}; + return combineImportantPathsIntoProjection(selectorPaths, projection); +}; + +Minimongo._pathsElidingNumericKeys = function (paths) { + var self = this; + return _.map(paths, function (path) { + return _.reject(path.split('.'), isNumericKey).join('.'); + }); +}; + +combineImportantPathsIntoProjection = function (paths, projection) { var prjDetails = projectionDetails(projection); var tree = prjDetails.tree; var mergedProjection = {}; // merge the paths to include - tree = pathsToTree(selectorPaths, + tree = pathsToTree(paths, function (path) { return true; }, function (node, path, fullPath) { return true; }, tree); @@ -40,13 +51,6 @@ Minimongo.Matcher.prototype.combineIntoProjection = function (projection) { } }; -Minimongo.Matcher.prototype._getPathsElidingNumericKeys = function () { - var self = this; - return _.map(self._getPaths(), function (path) { - return _.reject(path.split('.'), isNumericKey).join('.'); - }); -}; - // Returns a set of key paths similar to // { 'foo.bar': 1, 'a.b.c': 1 } var treeToPaths = function (tree, prefix) { diff --git a/packages/minimongo/sort.js b/packages/minimongo/sort.js index 02218593ec..879d0e48c4 100644 --- a/packages/minimongo/sort.js +++ b/packages/minimongo/sort.js @@ -14,17 +14,19 @@ Sorter = function (spec) { var self = this; - var sortSpecParts = []; + var sortSpecParts = self._sortSpecParts = []; if (spec instanceof Array) { for (var i = 0; i < spec.length; i++) { if (typeof spec[i] === "string") { sortSpecParts.push({ + path: spec[i], lookup: makeLookupFunction(spec[i]), ascending: true }); } else { sortSpecParts.push({ + path: spec[i][0], lookup: makeLookupFunction(spec[i][0]), ascending: spec[i][1] !== "desc" }); @@ -33,12 +35,13 @@ Sorter = function (spec) { } else if (typeof spec === "object") { for (var key in spec) { sortSpecParts.push({ + path: key, lookup: makeLookupFunction(key), ascending: spec[key] >= 0 }); } } else { - throw Error("Bad sort specification: ", JSON.stringify(spec)); + throw Error("Bad sort specification: " + JSON.stringify(spec)); } // reduceValue takes in all the possible values for the sort key along various @@ -118,7 +121,12 @@ Sorter.prototype.getComparator = function (options) { }]); }; -MinimongoTest.Sorter = Sorter; +Sorter.prototype._getPaths = function () { + var self = this; + return _.pluck(self._sortSpecParts, 'path'); +}; + +Minimongo.Sorter = Sorter; // Given an array of comparators // (functions (a,b)->(negative or positive or zero)), returns a single diff --git a/packages/minimongo/sorter_projection.js b/packages/minimongo/sorter_projection.js new file mode 100644 index 0000000000..f02f388f7e --- /dev/null +++ b/packages/minimongo/sorter_projection.js @@ -0,0 +1,6 @@ +Sorter.prototype.combineIntoProjection = function (projection) { + var self = this; + var specPaths = Minimongo._pathsElidingNumericKeys(self._getPaths()); + return combineImportantPathsIntoProjection(specPaths, projection); +}; + diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index b3469a09be..caf1c927fb 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -754,11 +754,15 @@ MongoConnection.prototype._createSynchronousCursor = function( // ... and to keep querying the server indefinitely rather than just 5 times // if there's no more data. mongoOptions.numberOfRetries = -1; - // And if this cursor specifies a 'ts', then set the undocumented oplog - // replay flag, which does a special scan to find the first document - // (instead of creating an index on ts). - if (cursorDescription.selector.ts) + // And if this is on the oplog collection and the cursor specifies a 'ts', + // then set the undocumented oplog replay flag, which does a special scan to + // find the first document (instead of creating an index on ts). This is a + // very hard-coded Mongo flag which only works on the oplog collection and + // only works with the ts field. + if (cursorDescription.collectionName === OPLOG_COLLECTION && + cursorDescription.selector.ts) { mongoOptions.oplogReplay = true; + } } var dbCursor = collection.find( diff --git a/packages/mongo-livedata/mongo_livedata_tests.js b/packages/mongo-livedata/mongo_livedata_tests.js index 032e0dbaa7..f155b85fc5 100644 --- a/packages/mongo-livedata/mongo_livedata_tests.js +++ b/packages/mongo-livedata/mongo_livedata_tests.js @@ -745,6 +745,425 @@ if (Meteor.isServer) { }); x++; }); + + // compares arrays a and b w/o looking at order + var setsEqual = function (a, b) { + a = _.map(a, EJSON.stringify); + b = _.map(b, EJSON.stringify); + return _.isEmpty(_.difference(a, b)) && _.isEmpty(_.difference(b, a)); + }; + + // This test mainly checks the correctness of oplog code dealing with limited + // queries. Compitablity with poll-diff is added as well. + Tinytest.addAsync("mongo-livedata - observe sorted, limited " + idGeneration, function (test, onComplete) { + var run = test.runId(); + var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions); + + var observer = function () { + var state = {}; + var output = []; + var callbacks = { + changed: function (newDoc) { + output.push({changed: newDoc._id}); + state[newDoc._id] = newDoc; + }, + added: function (newDoc) { + output.push({added: newDoc._id}); + state[newDoc._id] = newDoc; + }, + removed: function (oldDoc) { + output.push({removed: oldDoc._id}); + delete state[oldDoc._id]; + } + }; + var handle = coll.find({foo: 22}, + {sort: {bar: 1}, limit: 3}).observe(callbacks); + + return {output: output, handle: handle, state: state}; + }; + var clearOutput = function (o) { o.output.splice(0, o.output.length); }; + + var ins = function (doc) { + var id; runInFence(function () { id = coll.insert(doc); }); + return id; + }; + var rem = function (sel) { runInFence(function () { coll.remove(sel); }); }; + var upd = function (sel, mod, opt) { + runInFence(function () { + coll.update(sel, mod, opt); + }); + }; + // tests '_id' subfields for all documents in oplog buffer + var testOplogBufferIds = function (ids) { + var bufferIds = []; + o.handle._multiplexer._observeDriver._unpublishedBuffer.forEach(function (x, id) { + bufferIds.push(id); + }); + + test.isTrue(setsEqual(ids, bufferIds), "expected: " + ids + "; got: " + bufferIds); + }; + var testSafeAppendToBufferFlag = function (expected) { + if (expected) + test.isTrue(o.handle._multiplexer._observeDriver._safeAppendToBuffer); + else + test.isFalse(o.handle._multiplexer._observeDriver._safeAppendToBuffer); + }; + + // Insert a doc and start observing. + var docId1 = ins({foo: 22, bar: 5}); + var o = observer(); + var usesOplog = o.handle._multiplexer._observeDriver._usesOplog; + // Initial add. + test.length(o.output, 1); + test.equal(o.output.shift(), {added: docId1}); + + // Insert another doc (blocking until observes have fired). + var docId2 = ins({foo: 22, bar: 6}); + // Observed add. + test.length(o.output, 1); + test.equal(o.output.shift(), {added: docId2}); + + var docId3 = ins({ foo: 22, bar: 3 }); + test.length(o.output, 1); + test.equal(o.output.shift(), {added: docId3}); + + // Add a non-matching document + ins({ foo: 13 }); + // It shouldn't be added + test.length(o.output, 0); + + // Add something that matches but is too big to fit in + var docId4 = ins({ foo: 22, bar: 7 }); + // It shouldn't be added + test.length(o.output, 0); + + // Let's add something small enough to fit in + var docId5 = ins({ foo: 22, bar: -1 }); + // We should get an added and a removed events + test.length(o.output, 2); + // doc 2 was removed from the published set as it is too big to be in + test.isTrue(setsEqual(o.output, [{added: docId5}, {removed: docId2}])); + clearOutput(o); + + // Now remove something and that doc 2 should be right back + rem(docId5); + test.length(o.output, 2); + test.isTrue(setsEqual(o.output, [{removed: docId5}, {added: docId2}])); + clearOutput(o); + usesOplog && testOplogBufferIds([docId4]); + usesOplog && testSafeAppendToBufferFlag(true); + + // Current state is [3 5 6 | 7] + // Add some negative numbers overflowing the buffer. + // New documents will take the published place, [3 5 6] will take the buffer + // and 7 will be outside of the buffer in MongoDB. + var docId6 = ins({ foo: 22, bar: -1 }); + var docId7 = ins({ foo: 22, bar: -2 }); + var docId8 = ins({ foo: 22, bar: -3 }); + test.length(o.output, 6); + var expected = [{added: docId6}, {removed: docId2}, + {added: docId7}, {removed: docId1}, + {added: docId8}, {removed: docId3}]; + + test.isTrue(setsEqual(o.output, expected)); + clearOutput(o); + usesOplog && testOplogBufferIds([docId1, docId2, docId3]); + usesOplog && testSafeAppendToBufferFlag(false); + + // Now the state is [-3 -2 -1 | 3 5 6] 7 + // If we update first 3 docs (increment them by 20), it would be + // interesting. + upd({ bar: { $lt: 0 }}, { $inc: { bar: 20 } }, { multi: true }); + + // The updated documents can't find their place in published and they can't + // be buffered as we are not aware of the situation outside of the buffer. + // But since our buffer becomes empty, it will be refilled partially with + // updated documents. + test.length(o.output, 6); + var expectedRemoves = [{removed: docId6}, + {removed: docId7}, + {removed: docId8}]; + var expectedAdds = [{added: docId3}, + {added: docId1}, + {added: docId2}]; + + test.isTrue(setsEqual(o.output, expectedAdds.concat(expectedRemoves))); + clearOutput(o); + usesOplog && testOplogBufferIds([docId4, docId7, docId8]); + usesOplog && testSafeAppendToBufferFlag(false); + + // The new arrangement is [3 5 6 | 7 17 18] 19 + // By ids: [docId3, docId1, docId2] docId4] docId6 docId7 docId8 + // Remove first 4 docs (3, 1, 2, 4) forcing buffer to become empty and + // schedule a repoll. + rem({ bar: { $lt: 10 } }); + + // XXX the oplog code analyzes the events one by one: one remove after + // another. Poll-n-diff code, on the other side, analyzes the batch action + // of multiple remove. Because of that difference, expected outputs differ. + if (usesOplog) { + var expectedRemoves = [{removed: docId3}, {removed: docId1}, + {removed: docId2}, {removed: docId4}]; + var expectedAdds = [{added: docId4}, {added: docId8}, + {added: docId7}, {added: docId6}]; + + test.length(o.output, 8); + } else { + var expectedRemoves = [{removed: docId3}, {removed: docId1}, + {removed: docId2}]; + var expectedAdds = [{added: docId8}, {added: docId7}, {added: docId6}]; + + test.length(o.output, 6); + } + + test.isTrue(setsEqual(o.output, expectedAdds.concat(expectedRemoves))); + clearOutput(o); + usesOplog && testOplogBufferIds([]); + usesOplog && testSafeAppendToBufferFlag(true); + + // The new arrangement is [17 18 19] or [docId6 docId7 docId8] + var docId9 = ins({ foo: 22, bar: 21 }); + var docId10 = ins({ foo: 22, bar: 31 }); + var docId11 = ins({ foo: 22, bar: 41 }); + var docId12 = ins({ foo: 22, bar: 51 }); + + // Becomes [17 18 19 | 21 31 41] 51 + usesOplog && testOplogBufferIds([docId9, docId10, docId11]); + usesOplog && testSafeAppendToBufferFlag(false); + test.length(o.output, 0); + upd({ bar: { $lt: 20 } }, { $inc: { bar: 5 } }, { multi: true }); + // Becomes [21 22 23 | 24 31 41] 51 + test.length(o.output, 4); + test.isTrue(setsEqual(o.output, [{removed: docId6}, + {added: docId9}, + {changed: docId7}, + {changed: docId8}])); + clearOutput(o); + usesOplog && testOplogBufferIds([docId6, docId10, docId11]); + usesOplog && testSafeAppendToBufferFlag(false); + + rem(docId9); + // Becomes [22 23 24 | 31 41] 51 + test.length(o.output, 2); + test.isTrue(setsEqual(o.output, [{removed: docId9}, {added: docId6}])); + clearOutput(o); + usesOplog && testOplogBufferIds([docId10, docId11]); + usesOplog && testSafeAppendToBufferFlag(false); + + upd({ bar: { $gt: 25 } }, { $inc: { bar: -7.5 } }, { multi: true }); + // Becomes [22 23 23.5 | 24] 33.5 43.5 - 33.5 doesn't update in-place in + // buffer, because it the driver is not sure it can do it and there is no a + // different doc which is less than 33.5. + test.length(o.output, 2); + test.isTrue(setsEqual(o.output, [{removed: docId6}, {added: docId10}])); + clearOutput(o); + usesOplog && testOplogBufferIds([docId6]); + usesOplog && testSafeAppendToBufferFlag(false); + + // Force buffer objects to be moved into published set so we can check them + rem(docId7); + rem(docId8); + rem(docId10); + // Becomes [24 33.5 43.5] + test.length(o.output, 6); + test.isTrue(setsEqual(o.output, [{removed: docId7}, {removed: docId8}, + {removed: docId10}, {added: docId6}, + {added: docId11}, {added: docId12}])); + + test.length(_.keys(o.state), 3); + test.equal(o.state[docId6], { _id: docId6, foo: 22, bar: 24 }); + test.equal(o.state[docId11], { _id: docId11, foo: 22, bar: 33.5 }); + test.equal(o.state[docId12], { _id: docId12, foo: 22, bar: 43.5 }); + clearOutput(o); + usesOplog && testOplogBufferIds([]); + usesOplog && testSafeAppendToBufferFlag(true); + + o.handle.stop(); + onComplete(); + }); + + Tinytest.addAsync("mongo-livedata - observe sorted, limited, sort fields " + idGeneration, function (test, onComplete) { + var run = test.runId(); + var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions); + + var observer = function () { + var state = {}; + var output = []; + var callbacks = { + changed: function (newDoc) { + output.push({changed: newDoc._id}); + state[newDoc._id] = newDoc; + }, + added: function (newDoc) { + output.push({added: newDoc._id}); + state[newDoc._id] = newDoc; + }, + removed: function (oldDoc) { + output.push({removed: oldDoc._id}); + delete state[oldDoc._id]; + } + }; + var handle = coll.find({}, {sort: {x: 1}, + limit: 2, + fields: {y: 1}}).observe(callbacks); + + return {output: output, handle: handle, state: state}; + }; + var clearOutput = function (o) { o.output.splice(0, o.output.length); }; + var ins = function (doc) { + var id; runInFence(function () { id = coll.insert(doc); }); + return id; + }; + var rem = function (id) { + runInFence(function () { coll.remove(id); }); + }; + + var o = observer(); + + var docId1 = ins({ x: 1, y: 1222 }); + var docId2 = ins({ x: 5, y: 5222 }); + + test.length(o.output, 2); + test.equal(o.output, [{added: docId1}, {added: docId2}]); + clearOutput(o); + + var docId3 = ins({ x: 7, y: 7222 }); + test.length(o.output, 0); + + var docId4 = ins({ x: -1, y: -1222 }); + + // Becomes [docId4 docId1 | docId2 docId3] + test.length(o.output, 2); + test.isTrue(setsEqual(o.output, [{added: docId4}, {removed: docId2}])); + + test.equal(_.size(o.state), 2); + test.equal(o.state[docId4], {_id: docId4, y: -1222}); + test.equal(o.state[docId1], {_id: docId1, y: 1222}); + clearOutput(o); + + rem(docId2); + // Becomes [docId4 docId1 | docId3] + test.length(o.output, 0); + + rem(docId4); + // Becomes [docId1 docId3] + test.length(o.output, 2); + test.isTrue(setsEqual(o.output, [{added: docId3}, {removed: docId4}])); + + test.equal(_.size(o.state), 2); + test.equal(o.state[docId3], {_id: docId3, y: 7222}); + test.equal(o.state[docId1], {_id: docId1, y: 1222}); + clearOutput(o); + + onComplete(); + }); + + Tinytest.addAsync("mongo-livedata - observe sorted, limited, big initial set" + idGeneration, function (test, onComplete) { + var run = test.runId(); + var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions); + + var observer = function () { + var state = {}; + var output = []; + var callbacks = { + changed: function (newDoc) { + output.push({changed: newDoc._id}); + state[newDoc._id] = newDoc; + }, + added: function (newDoc) { + output.push({added: newDoc._id}); + state[newDoc._id] = newDoc; + }, + removed: function (oldDoc) { + output.push({removed: oldDoc._id}); + delete state[oldDoc._id]; + } + }; + var handle = coll.find({}, {sort: {x: 1, y: 1}, limit: 3}) + .observe(callbacks); + + return {output: output, handle: handle, state: state}; + }; + var clearOutput = function (o) { o.output.splice(0, o.output.length); }; + var ins = function (doc) { + var id; runInFence(function () { id = coll.insert(doc); }); + return id; + }; + var rem = function (id) { + runInFence(function () { coll.remove(id); }); + }; + // tests '_id' subfields for all documents in oplog buffer + var testOplogBufferIds = function (ids) { + var bufferIds = []; + o.handle._multiplexer._observeDriver._unpublishedBuffer.forEach(function (x, id) { + bufferIds.push(id); + }); + + test.isTrue(setsEqual(ids, bufferIds), "expected: " + ids + "; got: " + bufferIds); + }; + var testSafeAppendToBufferFlag = function (expected) { + if (expected) + test.isTrue(o.handle._multiplexer._observeDriver._safeAppendToBuffer); + else + test.isFalse(o.handle._multiplexer._observeDriver._safeAppendToBuffer); + }; + + var ids = {}; + _.each([2, 4, 1, 3, 5, 5, 9, 1, 3, 2, 5], function (x, i) { + ids[i] = ins({ x: x, y: i }); + }); + + var o = observer(); + var usesOplog = o.handle._multiplexer._observeDriver._usesOplog; + // x: [1 1 2 | 2 3 3] 4 5 5 5 9 + // id: [2 7 0 | 9 3 8] 1 4 5 10 6 + + test.length(o.output, 3); + test.isTrue(setsEqual([{added: ids[2]}, {added: ids[7]}, {added: ids[0]}], o.output)); + usesOplog && testOplogBufferIds([ids[9], ids[3], ids[8]]); + usesOplog && testSafeAppendToBufferFlag(false); + clearOutput(o); + + rem(ids[0]); + // x: [1 1 2 | 3 3] 4 5 5 5 9 + // id: [2 7 9 | 3 8] 1 4 5 10 6 + test.length(o.output, 2); + test.isTrue(setsEqual([{removed: ids[0]}, {added: ids[9]}], o.output)); + usesOplog && testOplogBufferIds([ids[3], ids[8]]); + usesOplog && testSafeAppendToBufferFlag(false); + clearOutput(o); + + rem(ids[7]); + // x: [1 2 3 | 3] 4 5 5 5 9 + // id: [2 9 3 | 8] 1 4 5 10 6 + test.length(o.output, 2); + test.isTrue(setsEqual([{removed: ids[7]}, {added: ids[3]}], o.output)); + usesOplog && testOplogBufferIds([ids[8]]); + usesOplog && testSafeAppendToBufferFlag(false); + clearOutput(o); + + rem(ids[3]); + // x: [1 2 3 | 4 5 5] 5 9 + // id: [2 9 8 | 1 4 5] 10 6 + test.length(o.output, 2); + test.isTrue(setsEqual([{removed: ids[3]}, {added: ids[8]}], o.output)); + usesOplog && testOplogBufferIds([ids[1], ids[4], ids[5]]); + usesOplog && testSafeAppendToBufferFlag(false); + clearOutput(o); + + rem({ x: {$lt: 4} }); + // x: [4 5 5 | 5 9] + // id: [1 4 5 | 10 6] + test.length(o.output, 6); + test.isTrue(setsEqual([{removed: ids[2]}, {removed: ids[9]}, {removed: ids[8]}, + {added: ids[5]}, {added: ids[4]}, {added: ids[1]}], o.output)); + usesOplog && testOplogBufferIds([ids[10], ids[6]]); + usesOplog && testSafeAppendToBufferFlag(true); + clearOutput(o); + + + onComplete(); + }); } diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index 2cfa2a7775..5c83afdd6e 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -7,6 +7,20 @@ var PHASE = { STEADY: "STEADY" }; +// Exception thrown by _needToPollQuery which unrolls the stack up to the +// enclosing call to finishIfNeedToPollQuery. +var SwitchedToQuery = function () {}; +var finishIfNeedToPollQuery = function (f) { + return function () { + try { + f.apply(this, arguments); + } catch (e) { + if (!(e instanceof SwitchedToQuery)) + throw e; + } + }; +}; + // OplogObserveDriver is an alternative to PollingObserveDriver which follows // the Mongo operation log instead of just re-polling the query. It obeys the // same simple interface: constructing it starts sending observeChanges @@ -19,8 +33,44 @@ OplogObserveDriver = function (options) { self._cursorDescription = options.cursorDescription; self._mongoHandle = options.mongoHandle; self._multiplexer = options.multiplexer; - if (options.ordered) + + if (options.ordered) { throw Error("OplogObserveDriver only supports unordered observeChanges"); + } + + var sortSpec = options.cursorDescription.options.sort; + var sorter = sortSpec && new Minimongo.Sorter(sortSpec); + // We don't support $near and other geo-queries so it's OK to initialize the + // comparator only once in the constructor. + var comparator = sorter && sorter.getComparator(); + + if (options.cursorDescription.options.limit) { + // There are several properties ordered driver implements: + // - _limit is a positive number + // - _comparator is a function-comparator by which the query is ordered + // - _unpublishedBuffer is non-null Min/Max Heap, + // the empty buffer in STEADY phase implies that the + // everything that matches the queries selector fits + // into published set. + // - _published - Min Heap (also implements IdMap methods) + + var heapOptions = { IdMap: LocalCollection._IdMap }; + self._limit = self._cursorDescription.options.limit; + self._comparator = comparator; + self._unpublishedBuffer = new MinMaxHeap(comparator, heapOptions); + // We need something that can find Max value in addition to IdMap interface + self._published = new MaxHeap(comparator, heapOptions); + } else { + self._limit = 0; + self._comparator = null; + self._unpublishedBuffer = null; + self._published = new LocalCollection._IdMap; + } + + // Indicates if it is safe to insert a new document at the end of the buffer + // for this query. i.e. it is known that there are no documents matching the + // selector those are not in published or buffer. + self._safeAppendToBuffer = false; self._stopped = false; self._stopHandles = []; @@ -30,7 +80,6 @@ OplogObserveDriver = function (options) { self._registerPhaseChange(PHASE.QUERYING); - self._published = new LocalCollection._IdMap; var selector = self._cursorDescription.selector; self._matcher = options.matcher; var projection = self._cursorDescription.options.fields || {}; @@ -38,6 +87,8 @@ OplogObserveDriver = function (options) { // Projection function, result of combining important fields for selector and // existing fields projection self._sharedProjection = self._matcher.combineIntoProjection(projection); + if (sorter) + self._sharedProjection = sorter.combineIntoProjection(self._sharedProjection); self._sharedProjectionFn = LocalCollection._compileProjection( self._sharedProjection); @@ -51,7 +102,7 @@ OplogObserveDriver = function (options) { forEachTrigger(self._cursorDescription, function (trigger) { self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry( trigger, function (notification) { - Meteor._noYieldsAllowed(function () { + Meteor._noYieldsAllowed(finishIfNeedToPollQuery(function () { var op = notification.op; if (notification.dropCollection) { // Note: this call is not allowed to block on anything (especially @@ -65,7 +116,7 @@ OplogObserveDriver = function (options) { else self._handleOplogEntrySteadyOrFetching(op); } - }); + })); } )); }); @@ -101,55 +152,261 @@ OplogObserveDriver = function (options) { // Give _observeChanges a chance to add the new ObserveHandle to our // multiplexer, so that the added calls get streamed. - Meteor.defer(function () { + Meteor.defer(finishIfNeedToPollQuery(function () { self._runInitialQuery(); - }); + })); }; _.extend(OplogObserveDriver.prototype, { - _add: function (doc) { + _addPublished: function (id, doc) { var self = this; - var id = doc._id; var fields = _.clone(doc); delete fields._id; - if (self._published.has(id)) - throw Error("tried to add something already published " + id); - self._published.set(id, self._sharedProjectionFn(fields)); + self._published.set(id, self._sharedProjectionFn(doc)); self._multiplexer.added(id, self._projectionFn(fields)); + + // After adding this document, the published set might be overflowed + // (exceeding capacity specified by limit). If so, push the maximum element + // to the buffer, we might want to save it in memory to reduce the amount of + // Mongo lookups in the future. + if (self._limit && self._published.size() > self._limit) { + // XXX in theory the size of published is no more than limit+1 + if (self._published.size() !== self._limit + 1) { + throw new Error("After adding to published, " + + (self._published.size() - self._limit) + + " documents are overflowing the set"); + } + + var overflowingDocId = self._published.maxElementId(); + var overflowingDoc = self._published.get(overflowingDocId); + + if (EJSON.equals(overflowingDocId, id)) { + throw new Error("The document just added is overflowing the published set"); + } + + self._published.remove(overflowingDocId); + self._multiplexer.removed(overflowingDocId); + self._addBuffered(overflowingDocId, overflowingDoc); + } }, - _remove: function (id) { + _removePublished: function (id) { var self = this; - if (!self._published.has(id)) - throw Error("tried to remove something unpublished " + id); self._published.remove(id); self._multiplexer.removed(id); - }, - _handleDoc: function (id, newDoc, mustMatchNow) { - var self = this; - newDoc = _.clone(newDoc); + if (! self._limit || self._published.size() === self._limit) + return; - var matchesNow = newDoc && self._matcher.documentMatches(newDoc).result; - if (mustMatchNow && !matchesNow) { - throw Error("expected " + EJSON.stringify(newDoc) + " to match " - + EJSON.stringify(self._cursorDescription)); + if (self._published.size() > self._limit) + throw Error("self._published got too big"); + + // OK, we are publishing less than the limit. Maybe we should look in the + // buffer to find the next element past what we were publishing before. + + if (!self._unpublishedBuffer.empty()) { + // There's something in the buffer; move the first thing in it to + // _published. + var newDocId = self._unpublishedBuffer.minElementId(); + var newDoc = self._unpublishedBuffer.get(newDocId); + self._removeBuffered(newDocId); + self._addPublished(newDocId, newDoc); + return; } - var matchedBefore = self._published.has(id); + // There's nothing in the buffer. This could mean one of a few things. - if (matchesNow && !matchedBefore) { - self._add(newDoc); - } else if (matchedBefore && !matchesNow) { - self._remove(id); - } else if (matchesNow) { + // (a) We could be in the middle of re-running the query (specifically, we + // could be in _publishNewResults). In that case, _unpublishedBuffer is + // empty because we clear it at the beginning of _publishNewResults. In this + // case, our caller already knows the entire answer to the query and we + // don't need to do anything fancy here. Just return. + if (self._phase === PHASE.QUERYING) + return; + + // (b) We're pretty confident that the union of _published and + // _unpublishedBuffer contain all documents that match selector. Because + // _unpublishedBuffer is empty, that means we're confident that _published + // contains all documents that match selector. So we have nothing to do. + if (self._safeAppendToBuffer) + return; + + // (c) Maybe there are other documents out there that should be in our + // buffer. But in that case, when we emptied _unpublishedBuffer in + // _removeBuffered, we should have called _needToPollQuery, which will + // either put something in _unpublishedBuffer or set _safeAppendToBuffer (or + // both), and it will put us in QUERYING for that whole time. So in fact, we + // shouldn't be able to get here. + + throw new Error("Buffer inexplicably empty"); + }, + _changePublished: function (id, oldDoc, newDoc) { + var self = this; + self._published.set(id, self._sharedProjectionFn(newDoc)); + var changed = LocalCollection._makeChangedFields(_.clone(newDoc), oldDoc); + changed = self._projectionFn(changed); + if (!_.isEmpty(changed)) + self._multiplexer.changed(id, changed); + }, + _addBuffered: function (id, doc) { + var self = this; + self._unpublishedBuffer.set(id, self._sharedProjectionFn(doc)); + + // If something is overflowing the buffer, we just remove it from cache + if (self._unpublishedBuffer.size() > self._limit) { + var maxBufferedId = self._unpublishedBuffer.maxElementId(); + + self._unpublishedBuffer.remove(maxBufferedId); + + // Since something matching is removed from cache (both published set and + // buffer), set flag to false + self._safeAppendToBuffer = false; + } + }, + // Is called either to remove the doc completely from matching set or to move + // it to the published set later. + _removeBuffered: function (id) { + var self = this; + self._unpublishedBuffer.remove(id); + // To keep the contract "buffer is never empty in STEADY phase unless the + // everything matching fits into published" true, we poll everything as soon + // as we see the buffer becoming empty. + if (! self._unpublishedBuffer.size() && ! self._safeAppendToBuffer) + self._needToPollQuery(); + }, + // Called when a document has joined the "Matching" results set. + // Takes responsibility of keeping _unpublishedBuffer in sync with _published + // and the effect of limit enforced. + _addMatching: function (doc) { + var self = this; + var id = doc._id; + if (self._published.has(id)) + throw Error("tried to add something already published " + id); + if (self._limit && self._unpublishedBuffer.has(id)) + throw Error("tried to add something already existed in buffer " + id); + + var limit = self._limit; + var comparator = self._comparator; + var maxPublished = (limit && self._published.size() > 0) ? + self._published.get(self._published.maxElementId()) : null; + var maxBuffered = (limit && self._unpublishedBuffer.size() > 0) ? + self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId()) : null; + // The query is unlimited or didn't publish enough documents yet or the new + // document would fit into published set pushing the maximum element out, + // then we need to publish the doc. + var toPublish = ! limit || self._published.size() < limit || + comparator(doc, maxPublished) < 0; + + // Otherwise we might need to buffer it (only in case of limited query). + // Buffering is allowed if the buffer is not filled up yet and all matching + // docs are either in the published set or in the buffer. + var canAppendToBuffer = !toPublish && self._safeAppendToBuffer && + self._unpublishedBuffer.size() < limit; + + // Or if it is small enough to be safely inserted to the middle or the + // beginning of the buffer. + var canInsertIntoBuffer = !toPublish && maxBuffered && + comparator(doc, maxBuffered) <= 0; + + var toBuffer = canAppendToBuffer || canInsertIntoBuffer; + + if (toPublish) { + self._addPublished(id, doc); + } else if (toBuffer) { + self._addBuffered(id, doc); + } else { + // dropping it and not saving to the cache + self._safeAppendToBuffer = false; + } + }, + // Called when a document leaves the "Matching" results set. + // Takes responsibility of keeping _unpublishedBuffer in sync with _published + // and the effect of limit enforced. + _removeMatching: function (id) { + var self = this; + if (! self._published.has(id) && ! self._limit) + throw Error("tried to remove something matching but not cached " + id); + + if (self._published.has(id)) { + self._removePublished(id); + } else if (self._unpublishedBuffer.has(id)) { + self._removeBuffered(id); + } + }, + _handleDoc: function (id, newDoc) { + var self = this; + var matchesNow = newDoc && self._matcher.documentMatches(newDoc).result; + + var publishedBefore = self._published.has(id); + var bufferedBefore = self._limit && self._unpublishedBuffer.has(id); + var cachedBefore = publishedBefore || bufferedBefore; + + if (matchesNow && !cachedBefore) { + self._addMatching(newDoc); + } else if (cachedBefore && !matchesNow) { + self._removeMatching(id); + } else if (cachedBefore && matchesNow) { var oldDoc = self._published.get(id); - if (!oldDoc) - throw Error("thought that " + id + " was there!"); - delete newDoc._id; - self._published.set(id, self._sharedProjectionFn(newDoc)); - var changed = LocalCollection._makeChangedFields(_.clone(newDoc), oldDoc); - changed = self._projectionFn(changed); - if (!_.isEmpty(changed)) - self._multiplexer.changed(id, changed); + var comparator = self._comparator; + var minBuffered = self._limit && self._unpublishedBuffer.size() && + self._unpublishedBuffer.get(self._unpublishedBuffer.minElementId()); + + if (publishedBefore) { + // Unlimited case where the document stays in published once it matches + // or the case when we don't have enough matching docs to publish or the + // changed but matching doc will stay in published anyways. + // XXX: We rely on the emptiness of buffer. Be sure to maintain the fact + // that buffer can't be empty if there are matching documents not + // published. Notably, we don't want to schedule repoll and continue + // relying on this property. + var staysInPublished = ! self._limit || + self._unpublishedBuffer.size() === 0 || + comparator(newDoc, minBuffered) <= 0; + + if (staysInPublished) { + self._changePublished(id, oldDoc, newDoc); + } else { + // after the change doc doesn't stay in the published, remove it + self._removePublished(id); + // but it can move into buffered now, check it + var maxBuffered = self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId()); + + var toBuffer = self._safeAppendToBuffer || + (maxBuffered && comparator(newDoc, maxBuffered) <= 0); + + if (toBuffer) { + self._addBuffered(id, newDoc); + } else { + // Throw away from both published set and buffer + self._safeAppendToBuffer = false; + } + } + } else if (bufferedBefore) { + oldDoc = self._unpublishedBuffer.get(id); + // remove the old version manually so we don't trigger the querying + // immediately + self._unpublishedBuffer.remove(id); + + var maxPublished = self._published.get(self._published.maxElementId()); + var maxBuffered = self._unpublishedBuffer.size() && self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId()); + + // the buffered doc was updated, it could move to published + var toPublish = comparator(newDoc, maxPublished) < 0; + + // or stays in buffer even after the change + var staysInBuffer = (! toPublish && self._safeAppendToBuffer) || + (!toPublish && maxBuffered && comparator(newDoc, maxBuffered) <= 0); + + if (toPublish) { + self._addPublished(id, newDoc); + } else if (staysInBuffer) { + // stays in buffer but changes + self._unpublishedBuffer.set(id, newDoc); + } else { + // Throw away from both published set and buffer + self._safeAppendToBuffer = false; + } + } else { + throw new Error("cachedBefore implies either of publishedBefore or bufferedBefore is true."); + } } }, _fetchModifiedDocuments: function () { @@ -157,7 +414,7 @@ _.extend(OplogObserveDriver.prototype, { self._registerPhaseChange(PHASE.FETCHING); // Defer, because nothing called from the oplog entry handler may yield, but // fetch() yields. - Meteor.defer(function () { + Meteor.defer(finishIfNeedToPollQuery(function () { while (!self._stopped && !self._needToFetch.empty()) { if (self._phase !== PHASE.FETCHING) throw new Error("phase in fetchModifiedDocuments: " + self._phase); @@ -174,36 +431,40 @@ _.extend(OplogObserveDriver.prototype, { waiting++; self._mongoHandle._docFetcher.fetch( self._cursorDescription.collectionName, id, cacheKey, - function (err, doc) { - if (err) { - if (!anyError) - anyError = err; - } else if (!self._stopped && self._phase === PHASE.FETCHING - && self._fetchGeneration === thisGeneration) { - // We re-check the generation in case we've had an explicit - // _pollQuery call which should effectively cancel this round of - // fetches. (_pollQuery increments the generation.) - self._handleDoc(id, doc); + finishIfNeedToPollQuery(function (err, doc) { + try { + if (err) { + if (!anyError) + anyError = err; + } else if (!self._stopped && self._phase === PHASE.FETCHING + && self._fetchGeneration === thisGeneration) { + // We re-check the generation in case we've had an explicit + // _pollQuery call (eg, in another fiber) which should + // effectively cancel this round of fetches. (_pollQuery + // increments the generation.) + self._handleDoc(id, doc); + } + } finally { + waiting--; + // Because fetch() never calls its callback synchronously, this + // is safe (ie, we won't call fut.return() before the forEach is + // done). + if (waiting === 0) + fut.return(); } - waiting--; - // Because fetch() never calls its callback synchronously, this is - // safe (ie, we won't call fut.return() before the forEach is - // done). - if (waiting === 0) - fut.return(); - }); + })); }); fut.wait(); // XXX do this even if we've switched to PHASE.QUERYING? if (anyError) throw anyError; - // Exit now if we've had a _pollQuery call. + // Exit now if we've had a _pollQuery call (here or in another fiber). if (self._phase === PHASE.QUERYING) return; self._currentlyFetching = null; } self._beSteady(); - }); + })); }, _beSteady: function () { var self = this; @@ -233,16 +494,18 @@ _.extend(OplogObserveDriver.prototype, { } if (op.op === 'd') { - if (self._published.has(id)) - self._remove(id); + if (self._published.has(id) || (self._limit && self._unpublishedBuffer.has(id))) + self._removeMatching(id); } else if (op.op === 'i') { if (self._published.has(id)) - throw new Error("insert found for already-existing ID"); + throw new Error("insert found for already-existing ID in published"); + if (self._unpublishedBuffer && self._unpublishedBuffer.has(id)) + throw new Error("insert found for already-existing ID in buffer"); // XXX what if selector yields? for now it can't but later it could have // $where if (self._matcher.documentMatches(op.o).result) - self._add(op.o); + self._addMatching(op.o); } else if (op.op === 'u') { // Is this a modifier ($set/$unset, which may require us to poll the // database to figure out if the whole document matches the selector) or a @@ -256,12 +519,19 @@ _.extend(OplogObserveDriver.prototype, { var canDirectlyModifyDoc = !isReplace && modifierCanBeDirectlyApplied(op.o); + var publishedBefore = self._published.has(id); + var bufferedBefore = self._limit && self._unpublishedBuffer.has(id); + if (isReplace) { self._handleDoc(id, _.extend({_id: id}, op.o)); - } else if (self._published.has(id) && canDirectlyModifyDoc) { + } else if ((publishedBefore || bufferedBefore) && canDirectlyModifyDoc) { // Oh great, we actually know what the document is, so we can apply // this directly. - var newDoc = EJSON.clone(self._published.get(id)); + var newDoc = self._published.has(id) ? + self._published.get(id) : + self._unpublishedBuffer.get(id); + newDoc = EJSON.clone(newDoc); + newDoc._id = id; LocalCollection._modify(newDoc, op.o); self._handleDoc(id, self._sharedProjectionFn(newDoc)); @@ -280,10 +550,8 @@ _.extend(OplogObserveDriver.prototype, { if (self._stopped) throw new Error("oplog stopped surprisingly early"); - var initialCursor = self._cursorForQuery(); - initialCursor.forEach(function (initialDoc) { - self._add(initialDoc); - }); + self._runQuery(); + if (self._stopped) throw new Error("oplog stopped quite early"); // Allow observeChanges calls to return. (After this, it's possible for @@ -319,27 +587,48 @@ _.extend(OplogObserveDriver.prototype, { ++self._fetchGeneration; // ignore any in-flight fetches self._registerPhaseChange(PHASE.QUERYING); - // Defer so that we don't block. + // Defer so that we don't block. We don't need finishIfNeedToPollQuery here + // because SwitchedToQuery is not called in QUERYING mode. Meteor.defer(function () { - // subtle note: _published does not contain _id fields, but newResults - // does - var newResults = new LocalCollection._IdMap; - var cursor = self._cursorForQuery(); - cursor.forEach(function (doc) { - newResults.set(doc._id, doc); - }); - - self._publishNewResults(newResults); - + self._runQuery(); self._doneQuerying(); }); }, + _runQuery: function () { + var self = this; + var newResults = new LocalCollection._IdMap; + var newBuffer = new LocalCollection._IdMap; + + // Query 2x documents as the half excluded from the original query will go + // into unpublished buffer to reduce additional Mongo lookups in cases when + // documents are removed from the published set and need a replacement. + // XXX needs more thought on non-zero skip + // XXX 2 is a "magic number" meaning there is an extra chunk of docs for + // buffer if such is needed. + var cursor = self._cursorForQuery({ limit: self._limit * 2 }); + cursor.forEach(function (doc, i) { + if (!self._limit || i < self._limit) + newResults.set(doc._id, doc); + else + newBuffer.set(doc._id, doc); + }); + + self._publishNewResults(newResults, newBuffer); + }, + // Transitions to QUERYING and runs another query, or (if already in QUERYING) // ensures that we will query again later. // // This function may not block, because it is called from an oplog entry - // handler. + // handler. However, if we were not already in the QUERYING phase, it throws + // an exception that is caught by the closest surrounding + // finishIfNeedToPollQuery call; this ensures that we don't continue running + // close that was designed for another phase inside PHASE.QUERYING. + // + // (It's also necessary whenever logic in this file yields to check that other + // phases haven't put us into QUERYING mode, though; eg, + // _fetchModifiedDocuments does this.) _needToPollQuery: function () { var self = this; if (self._stopped) @@ -349,7 +638,7 @@ _.extend(OplogObserveDriver.prototype, { // pausing FETCHING). if (self._phase !== PHASE.QUERYING) { self._pollQuery(); - return; + throw new SwitchedToQuery; } // We're currently in QUERYING. Set a flag to ensure that we run another @@ -379,7 +668,7 @@ _.extend(OplogObserveDriver.prototype, { } }, - _cursorForQuery: function () { + _cursorForQuery: function (optionsOverwrite) { var self = this; // The query we run is almost the same as the cursor we are observing, with @@ -388,6 +677,11 @@ _.extend(OplogObserveDriver.prototype, { // "shared" projection). And we don't want to apply any transform in the // cursor, because observeChanges shouldn't use the transform. var options = _.clone(self._cursorDescription.options); + + // Allow the caller to modify the options. Useful to specify different skip + // and limit values. + _.extend(options, optionsOverwrite); + options.fields = self._sharedProjection; delete options.transform; // We are NOT deep cloning fields or selector here, which should be OK. @@ -401,13 +695,20 @@ _.extend(OplogObserveDriver.prototype, { // Replace self._published with newResults (both are IdMaps), invoking observe // callbacks on the multiplexer. + // Replace self._unpublishedBuffer with newBuffer. // // XXX This is very similar to LocalCollection._diffQueryUnorderedChanges. We // should really: (a) Unify IdMap and OrderedDict into Unordered/OrderedDict (b) // Rewrite diff.js to use these classes instead of arrays and objects. - _publishNewResults: function (newResults) { + _publishNewResults: function (newResults, newBuffer) { var self = this; + // If the query is limited and there is a buffer, shut down so it doesn't + // stay in a way. + if (self._limit) { + self._unpublishedBuffer.clear(); + } + // First remove anything that's gone. Be careful not to modify // self._published while iterating over it. var idsToRemove = []; @@ -416,15 +717,33 @@ _.extend(OplogObserveDriver.prototype, { idsToRemove.push(id); }); _.each(idsToRemove, function (id) { - self._remove(id); + self._removePublished(id); }); // Now do adds and changes. + // If self has a buffer and limit, the new fetched result will be + // limited correctly as the query has sort specifier. newResults.forEach(function (doc, id) { - // "true" here means to throw if we think this doc doesn't match the - // selector. - self._handleDoc(id, doc, true); + self._handleDoc(id, doc); }); + + // Sanity-check that everything we tried to put into _published ended up + // there. + // XXX if this is slow, remove it later + if (self._published.size() !== newResults.size()) { + throw Error("failed to copy newResults into _published!"); + } + self._published.forEach(function (doc, id) { + if (!newResults.has(id)) + throw Error("_published has a doc that newResults doesn't; " + id); + }); + + // Finally, replace the buffer + newBuffer.forEach(function (doc, id) { + self._addBuffered(id, doc); + }); + + self._safeAppendToBuffer = newBuffer.size() < self._limit; }, // This stop function is invoked from the onStop of the ObserveMultiplexer, so @@ -451,6 +770,7 @@ _.extend(OplogObserveDriver.prototype, { // Proactively drop references to potentially big things. self._published = null; + self._unpublishedBuffer = null; self._needToFetch = null; self._currentlyFetching = null; self._oplogEntryHandle = null; @@ -486,10 +806,11 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) { if (options._disableOplog) return false; - // This option (which are mostly used for sorted cursors) require us to figure - // out where a given document fits in an order to know if it's included or - // not, and we don't track that information when doing oplog tailing. - if (options.limit || options.skip) return false; + // skip is not supported: to support it we would need to keep track of all + // "skipped" documents or at least their ids. + // limit w/o a sort specifier is not supported: current implementation needs a + // deterministic way to order documents. + if (options.skip || (options.limit && !options.sort)) return false; // If a fields projection option is given check if it is supported by // minimongo (some operators are not supported). @@ -509,7 +830,9 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) { // as Mongo, and can yield!) // - $near (has "interesting" properties in MongoDB, like the possibility // of returning an ID multiple times, though even polling maybe - // have a bug there + // have a bug there) + // XXX: once we support it, we would need to think more on how we + // initialize the comparators when we create the driver. return !matcher.hasWhere() && !matcher.hasGeoQuery(); }; diff --git a/packages/mongo-livedata/oplog_tailing.js b/packages/mongo-livedata/oplog_tailing.js index b8c044ce3e..b203bfa412 100644 --- a/packages/mongo-livedata/oplog_tailing.js +++ b/packages/mongo-livedata/oplog_tailing.js @@ -1,6 +1,6 @@ var Future = Npm.require('fibers/future'); -var OPLOG_COLLECTION = 'oplog.rs'; +OPLOG_COLLECTION = 'oplog.rs'; var REPLSET_COLLECTION = 'system.replset'; // Like Perl's quotemeta: quotes all regexp metacharacters. See diff --git a/packages/mongo-livedata/oplog_tests.js b/packages/mongo-livedata/oplog_tests.js index b46c00d799..8ecc1099c5 100644 --- a/packages/mongo-livedata/oplog_tests.js +++ b/packages/mongo-livedata/oplog_tests.js @@ -4,8 +4,8 @@ Tinytest.add("mongo-livedata - oplog - cursorSupported", function (test) { var oplogEnabled = !!MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle; - var supported = function (expected, selector) { - var cursor = OplogCollection.find(selector); + var supported = function (expected, selector, options) { + var cursor = OplogCollection.find(selector, options); var handle = cursor.observeChanges({added: function () {}}); // If there's no oplog at all, we shouldn't ever use it. if (!oplogEnabled) @@ -44,4 +44,10 @@ Tinytest.add("mongo-livedata - oplog - cursorSupported", function (test) { // Nothing Minimongo doesn't understand. (Minimongo happens to fail to // implement $elemMatch inside $all which MongoDB supports.) supported(false, {x: {$all: [{$elemMatch: {y: 2}}]}}); + + supported(true, {}, { sort: {x:1} }); + supported(true, {}, { sort: {x:1}, limit: 5 }); + supported(false, {}, { limit: 5 }); + supported(false, {}, { skip: 2, limit: 5 }); + supported(false, {}, { skip: 2 }); }); diff --git a/packages/mongo-livedata/package.js b/packages/mongo-livedata/package.js index f0b1f9e3d2..4f1dfa8866 100644 --- a/packages/mongo-livedata/package.js +++ b/packages/mongo-livedata/package.js @@ -24,6 +24,10 @@ Package.on_use(function (api) { ['client', 'server']); api.use('check', ['client', 'server']); + // Binary Heap data structure is used to optimize oplog observe driver + // performance. + api.use('binary-heap', 'server'); + // Allow us to detect 'insecure'. api.use('insecure', {weak: true}); diff --git a/packages/retry/retry.js b/packages/retry/retry.js index a4407b5bdb..4a377bca4f 100644 --- a/packages/retry/retry.js +++ b/packages/retry/retry.js @@ -57,7 +57,7 @@ _.extend(Retry.prototype, { var timeout = self._timeout(count); if (self.retryTimer) clearTimeout(self.retryTimer); - self.retryTimer = setTimeout(fn, timeout); + self.retryTimer = Meteor.setTimeout(fn, timeout); return timeout; } diff --git a/packages/spiderable/spiderable.js b/packages/spiderable/spiderable.js index 0a899f6efc..3f8f40bc2c 100644 --- a/packages/spiderable/spiderable.js +++ b/packages/spiderable/spiderable.js @@ -24,6 +24,13 @@ var REQUEST_TIMEOUT = 15*1000; var MAX_BUFFER = 5*1024*1024; // 5MB WebApp.connectHandlers.use(function (req, res, next) { + // _escaped_fragment_ comes from Google's AJAX crawling spec: + // https://developers.google.com/webmasters/ajax-crawling/docs/specification + // This spec was designed during the brief era where using "#!" URLs was + // common, so it mostly describes how to translate "#!" URLs into + // _escaped_fragment_ URLs. Since then, "#!" URLs have gone out of style, but + // the (see spiderable.html) approach also + // described in the spec is still common and used by several crawlers. if (/\?.*_escaped_fragment_=/.test(req.url) || _.any(Spiderable.userAgentRegExps, function (re) { return re.test(req.headers['user-agent']); })) { diff --git a/packages/test-in-console/reporter.js b/packages/test-in-console/reporter.js index ef7e83734a..5f12d912f9 100644 --- a/packages/test-in-console/reporter.js +++ b/packages/test-in-console/reporter.js @@ -26,3 +26,10 @@ Meteor.methods({ return null; } }); + +// provide some notification we're started. This is to allow use +// in automated scripts with `meteor run --once` which does not +// print when the proxy is listening. +Meteor.startup(function () { + Meteor._debug("test-in-console listening"); +}); diff --git a/scripts/admin/banner.txt b/scripts/admin/banner.txt index cf9d026ea5..e672e9ae21 100644 --- a/scripts/admin/banner.txt +++ b/scripts/admin/banner.txt @@ -1,7 +1,4 @@ -=> Meteor 0.7.1.1: Extend oplog tailing driver to support most common - MongoDB queries. Introduce Meteor developer accounts, a new way of - managing your meteor.com deployed sites. When you use `meteor - deploy`, you will be prompted to create a developer account. +=> Meteor 0.7.1.2: Fix crash on OSX machines with no hostname set. This release is being downloaded in the background. Update your - project to Meteor 0.7.1.1 by running 'meteor update'. + project to Meteor 0.7.1.2 by running 'meteor update'. diff --git a/scripts/admin/notices.json b/scripts/admin/notices.json index a7dfc4f8d5..835f91f1f8 100644 --- a/scripts/admin/notices.json +++ b/scripts/admin/notices.json @@ -88,6 +88,9 @@ "http://jquery.com/upgrade-guide/1.9/"] } }, + { + "release": "0.7.1.2" + }, { "release": "NEXT" } diff --git a/scripts/generate-dev-bundle.sh b/scripts/generate-dev-bundle.sh index b1b16f1d45..942a092caf 100755 --- a/scripts/generate-dev-bundle.sh +++ b/scripts/generate-dev-bundle.sh @@ -107,9 +107,11 @@ npm install kexec@0.2.0 npm install source-map@0.1.32 npm install source-map-support@0.2.5 npm install bcrypt@0.7.7 -npm install http-proxy@1.0.2 npm install heapdump@0.2.5 +# Fork of 1.0.2 with https://github.com/nodejitsu/node-http-proxy/pull/592 +npm install https://github.com/meteor/node-http-proxy/tarball/99f757251b42aeb5d26535a7363c96804ee057f0 + # Using the unreleased 1.1 branch. We can probably switch to a built NPM version # when it gets released. npm install https://github.com/ariya/esprima/tarball/5044b87f94fb802d9609f1426c838874ec2007b3 diff --git a/tools/deploy-galaxy.js b/tools/deploy-galaxy.js index 6095545a7b..53319a1379 100644 --- a/tools/deploy-galaxy.js +++ b/tools/deploy-galaxy.js @@ -78,7 +78,7 @@ var ServiceConnection = function (galaxy, service) { // from the hostname of endpointUrl, and run the login command for // that galaxy. if (! authToken) - throw new Error("not logged in to galaxy?") + throw new Error("not logged in to galaxy?"); self.connection = Package.livedata.DDP.connect(endpointUrl, { headers: { @@ -117,10 +117,12 @@ _.extend(ServiceConnection.prototype, { var args = _.toArray(arguments); var name = args.shift(); self.connection.apply(name, args, function (err, result) { - if (err) + if (err) { fut['throw'](err); - else + } else { + self._cleanUpTimer(); fut['return'](result); + } }); return fut.wait(); @@ -141,6 +143,7 @@ _.extend(ServiceConnection.prototype, { args.push({ onReady: function () { ready = true; + self._cleanUpTimer(); fut['return'](); }, onError: function (e) { @@ -151,8 +154,16 @@ _.extend(ServiceConnection.prototype, { } }); - self.connection.subscribe.apply(self.connection, args); - return fut.wait(); + var sub = self.connection.subscribe.apply(self.connection, args); + fut.wait(); + return sub; + }, + + _cleanUpTimer: function () { + var self = this; + var Package = getPackage(); + Package.meteor.Meteor.clearTimeout(self.connectionTimer); + self.connectionTimer = null; }, close: function () { @@ -163,9 +174,7 @@ _.extend(ServiceConnection.prototype, { } if (self.connectionTimer) { // Clean up the timer so that Node can exit cleanly - var Package = getPackage(); - Package.meteor.Meteor.clearTimeout(self.connectionTimer); - self.connectionTimer = null; + self._cleanUpTimer(); } } }); @@ -456,17 +465,6 @@ exports.logs = function (options) { throw new Error("Can't listen to messages on the logs collection"); var logsSubscription = null; - // In case of reconnect recover the state so user sees only new logs - logReader.connection.onReconnect = function () { - logsSubscription && logsSubscription.stop(); - var opts = { streaming: options.streaming }; - if (lastLogId) - opts.resumeAfterId = lastLogId; - // XXX correctly handle errors on resubscribe - logsSubscription = logReader.subscribeAndWait("logsForApp", - options.app, opts); - }; - try { logsSubscription = logReader.subscribeAndWait("logsForApp", options.app, @@ -477,6 +475,29 @@ exports.logs = function (options) { }); } + // In case of reconnect recover the state so user sees only new logs. + // Only set up the onReconnect handler after the subscribe and wait + // has returned; if we set it up before, then we'll end up with two + // subscriptions, because the onReconnect handler will run for the + // first time before the subscribeAndWait returns. + logReader.connection.onReconnect = function () { + logsSubscription && logsSubscription.stop(); + var opts = { streaming: options.streaming }; + if (lastLogId) + opts.resumeAfterId = lastLogId; + // Don't use subscribeAndWait here; it'll deadlock. We can't + // process the sub messages until `onReconnect` returns, and + // `onReconnect` won't return unless the sub messages have been + // processed. There's no reason we need to wait for the sub to be + // ready here anyway. + // XXX correctly handle errors on resubscribe + logsSubscription = logReader.connection.subscribe( + "logsForApp", + options.app, + opts + ); + }; + return options.streaming ? null : 0; } finally { // If not streaming, close the connection to log-reader so that diff --git a/tools/tests/run.js b/tools/tests/run.js index db0ff667e4..45883021c1 100644 --- a/tools/tests/run.js +++ b/tools/tests/run.js @@ -55,6 +55,7 @@ selftest.define("run", function () { run.waitSecs(5); run.match("restarted (x2)"); // see that restart counter reset s.write("crash.js", "process.kill(process.pid, 'SIGKILL');"); + run.waitSecs(5); run.match("from signal: SIGKILL"); run.waitSecs(5); run.match("is crashing"); @@ -82,6 +83,7 @@ selftest.define("run", function () { " fs.writeFileSync(crashmark);\n" + " process.exit(137);\n" + "}\n"); + run.waitSecs(5); run.match("with code: 137"); run.match("restarted"); run.stop(); diff --git a/tools/watch.js b/tools/watch.js index f71cb7805c..a66d265be9 100644 --- a/tools/watch.js +++ b/tools/watch.js @@ -191,7 +191,7 @@ WatchSet.fromJSON = function (json) { set.files = _.clone(json.files); var reFromJSON = function (j) { - if (j.$regex) + if (_.has(j, '$regex')) return new RegExp(j.$regex, j.$options); return new RegExp(j); };