Merge branch 'devel' into shark

# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
This commit is contained in:
David Greenspan
2014-03-06 16:21:33 -08:00
54 changed files with 1885 additions and 474 deletions

View File

@@ -1,5 +1,25 @@
## v.NEXT
* Use "faye-websocket" (0.7.2) npm module instead of "websocket" (1.0.8) for
server-to-server DDP.
* minimongo: Support {a: {$elemMatch: {x: 1, $or: [{a: 1}, {b: 1}]}}} #1875
* minimongo: Support {a: {$regex: '', $options: 'i'}} #1874
* Upgraded dependencies
- amplify: 1.1.2 (from 1.1.0)
## v0.7.1.2
* Fix bug in tool error handling that caused `meteor` to crash on Mac
OSX when no computer name is set.
* Work around a bug that caused MongoDB to fail an assertion when using
tailable cursors on non-oplog collections.
## v0.7.1.1
* Integrate with Meteor developer accounts, a new way of managing your
@@ -29,7 +49,7 @@
* Add and improve support for minimongo operators.
- Support `$comment`.
- Support `obj` name in `$where`.
- `$regexp` matches actual regexps properly.
- `$regex` matches actual regexps properly.
- Improve support for `$nin`, `$ne`, `$not`.
- Support using `{ $in: [/foo/, /bar/] }`. #1707
- Support `{$exists: false}`.

View File

@@ -1 +1 @@
0.7.1.1
0.7.1.2

View File

@@ -1585,11 +1585,19 @@ By default, the current user's `username`, `emails` and `profile` are
published to the client. You can publish additional fields for the
current user with:
// server
Meteor.publish("userData", function () {
return Meteor.users.find({_id: this.userId},
{fields: {'other': 1, 'things': 1}});
if (this.userId) {
return Meteor.users.find({_id: this.userId},
{fields: {'other': 1, 'things': 1}});
} else {
this.ready();
}
});
// client
Meteor.subscribe("userData");
If the autopublish package is installed, information about all users
on the system is published to all clients. This includes `username`,
`profile`, and any fields in `services` that are meant to be public

View File

@@ -1,5 +1,5 @@
// While galaxy apps are on their own special meteor releases, override
// Meteor.release here.
if (Meteor.isClient) {
Meteor.release = Meteor.release ? "0.7.1.1" : undefined;
Meteor.release = Meteor.release ? "0.7.1.2" : undefined;
}

View File

@@ -1 +1 @@
0.7.1.1
0.7.1.2

View File

@@ -1 +1 @@
0.7.1.1
0.7.1.2

View File

@@ -1 +1 @@
0.7.1.1
0.7.1.2

View File

@@ -1 +1 @@
0.7.1.1
0.7.1.2

2
meteor
View File

@@ -1,6 +1,6 @@
#!/bin/bash
BUNDLE_VERSION=0.3.32
BUNDLE_VERSION=0.3.33
# OS Check. Put here because here is where we download the precompiled
# bundles that are arch specific.

View File

@@ -1,19 +1,10 @@
/*!
* AmplifyJS 1.1.0 - Core, Store, Request
*
* Copyright 2011 appendTo LLC. (http://appendto.com/team)
* Amplify 1.1.2
*
* Copyright 2011 - 2013 appendTo LLC. (http://appendto.com/team)
* Dual licensed under the MIT or GPL licenses.
* http://appendto.com/open-source-licenses
*
* http://amplifyjs.com
*/
/*!
* Amplify Core 1.1.0
*
* Copyright 2011 appendTo LLC. (http://appendto.com/team)
* Dual licensed under the MIT or GPL licenses.
* http://appendto.com/open-source-licenses
*
*
* http://amplifyjs.com
*/
(function( global, undefined ) {
@@ -23,6 +14,10 @@ var slice = [].slice,
var amplify = global.amplify = {
publish: function( topic ) {
if ( typeof topic !== "string" ) {
throw new Error( "You must provide a valid topic to publish." );
}
var args = slice.call( arguments, 1 ),
topicSubscriptions,
subscription,
@@ -46,6 +41,10 @@ var amplify = global.amplify = {
},
subscribe: function( topic, context, callback, priority ) {
if ( typeof topic !== "string" ) {
throw new Error( "You must provide a valid topic to create a subscription." );
}
if ( arguments.length === 3 && typeof callback === "number" ) {
priority = callback;
callback = context;
@@ -67,14 +66,14 @@ var amplify = global.amplify = {
if ( !subscriptions[ topic ] ) {
subscriptions[ topic ] = [];
}
var i = subscriptions[ topic ].length - 1,
subscriptionInfo = {
callback: callback,
context: context,
priority: priority
};
for ( ; i >= 0; i-- ) {
if ( subscriptions[ topic ][ i ].priority <= priority ) {
subscriptions[ topic ].splice( i + 1, 0, subscriptionInfo );
@@ -91,7 +90,16 @@ var amplify = global.amplify = {
return callback;
},
unsubscribe: function( topic, callback ) {
unsubscribe: function( topic, context, callback ) {
if ( typeof topic !== "string" ) {
throw new Error( "You must provide a valid topic to remove a subscription." );
}
if ( arguments.length === 2 ) {
callback = context;
context = null;
}
if ( !subscriptions[ topic ] ) {
return;
}
@@ -101,26 +109,23 @@ var amplify = global.amplify = {
for ( ; i < length; i++ ) {
if ( subscriptions[ topic ][ i ].callback === callback ) {
subscriptions[ topic ].splice( i, 1 );
break;
if ( !context || subscriptions[ topic ][ i ].context === context ) {
subscriptions[ topic ].splice( i, 1 );
// Adjust counter and length for removed item
i--;
length--;
}
}
}
}
};
}( this ) );
/*!
* Amplify Store - Persistent Client-Side Storage 1.1.0
*
* Copyright 2011 appendTo LLC. (http://appendto.com/team)
* Dual licensed under the MIT or GPL licenses.
* http://appendto.com/open-source-licenses
*
* http://amplifyjs.com
*/
(function( amplify, undefined ) {
var store = amplify.store = function( key, value, options, type ) {
var store = amplify.store = function( key, value, options ) {
var type = store.type;
if ( options && options.type && options.type in store.types ) {
type = options.type;
@@ -141,9 +146,9 @@ store.addType = function( type, storage ) {
options.type = type;
return store( key, value, options );
};
}
};
store.error = function() {
return "amplify.store quota exceeded";
return "amplify.store quota exceeded";
};
var rprefix = /^__amplify__/;
@@ -223,18 +228,21 @@ function createFromStorageInterface( storageType, storage ) {
// localStorage + sessionStorage
// IE 8+, Firefox 3.5+, Safari 4+, Chrome 4+, Opera 10.5+, iPhone 2+, Android 2+
for ( var webStorageType in { localStorage: 1, sessionStorage: 1 } ) {
// try/catch for file protocol in Firefox
// try/catch for file protocol in Firefox and Private Browsing in Safari 5
try {
if ( window[ webStorageType ].getItem ) {
createFromStorageInterface( webStorageType, window[ webStorageType ] );
}
// Safari 5 in Private Browsing mode exposes localStorage
// but doesn't allow storing data, so we attempt to store and remove an item.
// This will unfortunately give us a false negative if we're at the limit.
window[ webStorageType ].setItem( "__amplify__", "x" );
window[ webStorageType ].removeItem( "__amplify__" );
createFromStorageInterface( webStorageType, window[ webStorageType ] );
} catch( e ) {}
}
// globalStorage
// non-standard: Firefox 2+
// https://developer.mozilla.org/en/dom/storage#globalStorage
if ( window.globalStorage ) {
if ( !store.types.localStorage && window.globalStorage ) {
// try/catch for file protocol in Firefox
try {
createFromStorageInterface( "globalStorage",
@@ -306,7 +314,9 @@ if ( window.globalStorage ) {
// http://www.w3.org/TR/REC-xml/#NT-Name
// simplified to assume the starting character is valid
// also removed colon as it is invalid in HTML attribute names
key = key.replace( /[^-._0-9A-Za-z\xb7\xc0-\xd6\xd8-\xf6\xf8-\u037d\u37f-\u1fff\u200c-\u200d\u203f\u2040\u2070-\u218f]/g, "-" );
key = key.replace( /[^\-._0-9A-Za-z\xb7\xc0-\xd6\xd8-\xf6\xf8-\u037d\u037f-\u1fff\u200c-\u200d\u203f\u2040\u2070-\u218f]/g, "-" );
// adjust invalid starting character to deal with our simplified sanitization
key = key.replace( /^-/, "_-" );
if ( value === undefined ) {
attr = div.getAttribute( key );
@@ -402,16 +412,11 @@ if ( window.globalStorage ) {
}() );
}( this.amplify = this.amplify || {} ) );
/*!
* Amplify Request 1.1.0
*
* Copyright 2011 appendTo LLC. (http://appendto.com/team)
* Dual licensed under the MIT or GPL licenses.
* http://appendto.com/open-source-licenses
*
* http://amplifyjs.com
*/
/*global amplify*/
(function( amplify, undefined ) {
'use strict';
function noop() {}
function isFunction( obj ) {
@@ -457,12 +462,14 @@ amplify.request = function( resourceId, data, callback ) {
resource = amplify.request.resources[ settings.resourceId ],
success = settings.success || noop,
error = settings.error || noop;
settings.success = async( function( data, status ) {
status = status || "success";
amplify.publish( "request.success", settings, data, status );
amplify.publish( "request.complete", settings, data, status );
success( data, status );
});
settings.error = async( function( data, status ) {
status = status || "error";
amplify.publish( "request.error", settings, data, status );
@@ -506,13 +513,11 @@ amplify.request.define = function( resourceId, type, settings ) {
}( amplify ) );
(function( amplify, $, undefined ) {
'use strict';
var xhrProps = [ "status", "statusText", "responseText", "responseXML", "readyState" ],
rurlData = /\{([^\}]+)\}/g;
rurlData = /\{([^\}]+)\}/g;
amplify.request.types.ajax = function( defnSettings ) {
defnSettings = $.extend({
@@ -520,7 +525,7 @@ amplify.request.types.ajax = function( defnSettings ) {
}, defnSettings );
return function( settings, request ) {
var xhr,
var xhr, handleResponse,
url = defnSettings.url,
abort = request.abort,
ajaxSettings = $.extend( true, {}, defnSettings, { data: settings.data } ),
@@ -537,7 +542,7 @@ amplify.request.types.ajax = function( defnSettings ) {
return xhr.getResponseHeader( key );
},
overrideMimeType: function( type ) {
return xhr.overrideMideType( type );
return xhr.overrideMimeType( type );
},
abort: function() {
aborted = true;
@@ -555,28 +560,7 @@ amplify.request.types.ajax = function( defnSettings ) {
}
};
amplify.publish( "request.ajax.preprocess",
defnSettings, settings, ajaxSettings, ampXHR );
$.extend( ajaxSettings, {
success: function( data, status ) {
handleResponse( data, status );
},
error: function( _xhr, status ) {
handleResponse( null, status );
},
beforeSend: function( _xhr, _ajaxSettings ) {
xhr = _xhr;
ajaxSettings = _ajaxSettings;
var ret = defnSettings.beforeSend ?
defnSettings.beforeSend.call( this, ampXHR, ajaxSettings ) : true;
return ret && amplify.publish( "request.before.ajax",
defnSettings, settings, ajaxSettings, ampXHR );
}
});
$.ajax( ajaxSettings );
function handleResponse( data, status ) {
handleResponse = function( data, status ) {
$.each( xhrProps, function( i, key ) {
try {
ampXHR[ key ] = xhr[ key ];
@@ -603,8 +587,62 @@ amplify.request.types.ajax = function( defnSettings ) {
// this can happen if a request is aborted
// TODO: figure out if this breaks polling or multi-part responses
handleResponse = $.noop;
};
amplify.publish( "request.ajax.preprocess",
defnSettings, settings, ajaxSettings, ampXHR );
$.extend( ajaxSettings, {
isJSONP: function () {
return (/jsonp/gi).test(this.dataType);
},
cacheURL: function () {
if (!this.isJSONP()) {
return this.url;
}
var callbackName = 'callback';
// possible for the callback function name to be overridden
if (this.hasOwnProperty('jsonp')) {
if (this.jsonp !== false) {
callbackName = this.jsonp;
} else {
if (this.hasOwnProperty('jsonpCallback')) {
callbackName = this.jsonpCallback;
}
}
}
// search and replace callback parameter in query string with empty string
var callbackRegex = new RegExp('&?' + callbackName + '=[^&]*&?', 'gi');
return this.url.replace(callbackRegex, '');
},
success: function( data, status ) {
handleResponse( data, status );
},
error: function( _xhr, status ) {
handleResponse( null, status );
},
beforeSend: function( _xhr, _ajaxSettings ) {
xhr = _xhr;
ajaxSettings = _ajaxSettings;
var ret = defnSettings.beforeSend ?
defnSettings.beforeSend.call( this, ampXHR, ajaxSettings ) : true;
return ret && amplify.publish( "request.before.ajax",
defnSettings, settings, ajaxSettings, ampXHR );
}
});
// cache all JSONP requests
if (ajaxSettings.cache && ajaxSettings.isJSONP()) {
$.extend(ajaxSettings, {
cache: true
});
}
$.ajax( ajaxSettings );
request.abort = function() {
ampXHR.abort();
abort.call( this );
@@ -626,8 +664,8 @@ amplify.subscribe( "request.ajax.preprocess", function( defnSettings, settings,
ajaxSettings.url = ajaxSettings.url.replace( rurlData, function ( m, key ) {
if ( key in data ) {
mappedKeys.push( key );
return data[ key ];
mappedKeys.push( key );
return data[ key ];
}
});
@@ -668,13 +706,9 @@ var cache = amplify.request.cache = {
_key: function( resourceId, url, data ) {
data = url + data;
var length = data.length,
i = 0,
checksum = chunk();
while ( i < length ) {
checksum ^= chunk();
}
i = 0;
/*jshint bitwise:false*/
function chunk() {
return data.charCodeAt( i++ ) << 24 |
data.charCodeAt( i++ ) << 16 |
@@ -682,6 +716,12 @@ var cache = amplify.request.cache = {
data.charCodeAt( i++ ) << 0;
}
var checksum = chunk();
while ( i < length ) {
checksum ^= chunk();
}
/*jshint bitwise:true*/
return "request-" + resourceId + "-" + checksum;
},
@@ -690,7 +730,7 @@ var cache = amplify.request.cache = {
return function( resource, settings, ajaxSettings, ampXHR ) {
// data is already converted to a string by the time we get here
var cacheKey = cache._key( settings.resourceId,
ajaxSettings.url, ajaxSettings.data ),
ajaxSettings.cacheURL(), ajaxSettings.data ),
duration = resource.cache;
if ( cacheKey in memoryStore ) {
@@ -715,7 +755,7 @@ if ( amplify.store ) {
$.each( amplify.store.types, function( type ) {
cache[ type ] = function( resource, settings, ajaxSettings, ampXHR ) {
var cacheKey = cache._key( settings.resourceId,
ajaxSettings.url, ajaxSettings.data ),
ajaxSettings.cacheURL(), ajaxSettings.data ),
cached = amplify.store[ type ]( cacheKey );
if ( cached ) {
@@ -723,7 +763,7 @@ if ( amplify.store ) {
return false;
}
var success = ampXHR.success;
ampXHR.success = function( data ) {
ampXHR.success = function( data ) {
amplify.store[ type ]( cacheKey, data, { expires: resource.cache.expires } );
success.apply( this, arguments );
};
@@ -754,6 +794,8 @@ amplify.request.decoders = {
} else if ( data.status === "error" ) {
delete data.status;
error( data, "error" );
} else {
error( null, "error" );
}
}
};
@@ -761,11 +803,11 @@ amplify.request.decoders = {
amplify.subscribe( "request.before.ajax", function( resource, settings, ajaxSettings, ampXHR ) {
var _success = ampXHR.success,
_error = ampXHR.error,
decoder = $.isFunction( resource.decoder )
? resource.decoder
: resource.decoder in amplify.request.decoders
? amplify.request.decoders[ resource.decoder ]
: amplify.request.decoders._default;
decoder = $.isFunction( resource.decoder ) ?
resource.decoder :
resource.decoder in amplify.request.decoders ?
amplify.request.decoders[ resource.decoder ] :
amplify.request.decoders._default;
if ( !decoder ) {
return;

1
packages/binary-heap/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.build*

View File

@@ -0,0 +1,138 @@
Tinytest.add("binary-heap - simple max-heap tests", function (test) {
var h = new MaxHeap(function (a, b) { return a-b; });
h.set("a", 1);
h.set("b", 233);
h.set("c", -122);
h.set("d", 0);
h.set("e", 0);
test.equal(h.size(), 5);
test.equal(h.maxElementId(), "b");
test.equal(h.get("b"), 233);
h.remove("b");
test.equal(h.size(), 4);
test.equal(h.maxElementId(), "a");
h.set("e", 44);
test.equal(h.maxElementId(), "e");
test.equal(h.get("b"), null);
test.isTrue(h.has("a"));
test.isFalse(h.has("dd"));
h.clear();
test.isFalse(h.has("a"));
test.equal(h.size(), 0);
test.equal(h.setDefault("a", 12345), 12345);
test.equal(h.setDefault("a", 55555), 12345);
test.equal(h.size(), 1);
test.equal(h.maxElementId(), "a");
});
Tinytest.add("binary-heap - big test for max-heap", function (test) {
var positiveNumbers = _.shuffle(_.range(1, 41));
var negativeNumbers = _.shuffle(_.range(-1, -41, -1));
var allNumbers = negativeNumbers.concat(positiveNumbers);
var heap = new MaxHeap(function (a, b) { return a-b; });
var output = [];
_.each(allNumbers, function (n) { heap.set(n, n); });
_.times(positiveNumbers.length + negativeNumbers.length, function () {
var maxId = heap.maxElementId();
output.push(heap.get(maxId));
heap.remove(maxId);
});
allNumbers.sort(function (a, b) { return b-a; });
test.equal(output, allNumbers);
});
Tinytest.add("binary-heap - min-max heap tests", function (test) {
var h = new MinMaxHeap(function (a, b) { return a-b; });
h.set("a", 1);
h.set("b", 233);
h.set("c", -122);
h.set("d", 0);
h.set("e", 0);
test.equal(h.size(), 5);
test.equal(h.maxElementId(), "b");
test.equal(h.get("b"), 233);
test.equal(h.minElementId(), "c");
h.remove("b");
test.equal(h.size(), 4);
test.equal(h.minElementId(), "c");
h.set("e", -123);
test.equal(h.minElementId(), "e");
test.equal(h.get("b"), null);
test.isTrue(h.has("a"));
test.isFalse(h.has("dd"));
h.clear();
test.isFalse(h.has("a"));
test.equal(h.size(), 0);
test.equal(h.setDefault("a", 12345), 12345);
test.equal(h.setDefault("a", 55555), 12345);
test.equal(h.size(), 1);
test.equal(h.maxElementId(), "a");
test.equal(h.minElementId(), "a");
});
Tinytest.add("binary-heap - big test for min-max-heap", function (test) {
var N = 500;
var positiveNumbers = _.shuffle(_.range(1, N + 1));
var negativeNumbers = _.shuffle(_.range(-1, -N - 1, -1));
var allNumbers = positiveNumbers.concat(negativeNumbers);
var heap = new MinMaxHeap(function (a, b) { return a-b; });
var output = [];
var initialSets = _.clone(allNumbers);
_.each(allNumbers, function (n) {
heap.set(n, n);
heap._selfCheck();
heap._minHeap._selfCheck();
});
allNumbers = _.shuffle(allNumbers);
var secondarySets = _.clone(allNumbers);
_.each(allNumbers, function (n) {
heap.set(-n, n);
heap._selfCheck();
heap._minHeap._selfCheck();
});
_.times(positiveNumbers.length + negativeNumbers.length, function () {
var minId = heap.minElementId();
output.push(heap.get(minId));
heap.remove(minId);
heap._selfCheck(); heap._minHeap._selfCheck();
});
test.equal(heap.size(), 0);
allNumbers.sort(function (a, b) { return a-b; });
var initialTestText = "initial sets: " + initialSets.toString() +
"; secondary sets: " + secondarySets.toString();
test.equal(output, allNumbers, initialTestText);
_.each(initialSets, function (n) { heap.set(n, n); })
_.each(secondarySets, function (n) { heap.set(-n, n); });
allNumbers.sort(function (a, b) { return b-a; });
output = [];
_.times(positiveNumbers.length + negativeNumbers.length, function () {
var maxId = heap.maxElementId();
output.push(heap.get(maxId));
heap.remove(maxId);
heap._selfCheck(); heap._minHeap._selfCheck();
});
test.equal(output, allNumbers, initialTestText);
});

View File

@@ -0,0 +1,226 @@
// Constructor of Heap
// - comparator - Function - given two items returns a number
// - options:
// - initData - Array - Optional - the initial data in a format:
// Object:
// - id - String - unique id of the item
// - value - Any - the data value
// each value is retained
// - IdMap - Constructor - Optional - custom IdMap class to store id->index
// mappings internally. Standard IdMap is used by default.
MaxHeap = function (comparator, options) {
if (! _.isFunction(comparator))
throw new Error('Passed comparator is invalid, should be a comparison function');
var self = this;
// a C-style comparator that is given two values and returns a number,
// negative if the first value is less than the second, positive if the second
// value is greater than the first and zero if they are equal.
self._comparator = comparator;
options = _.defaults(options || {}, { IdMap: IdMap });
// _heapIdx maps an id to an index in the Heap array the corresponding value
// is located on.
self._heapIdx = new options.IdMap;
// The Heap data-structure implemented as a 0-based contiguous array where
// every item on index idx is a node in a complete binary tree. Every node can
// have children on indexes idx*2+1 and idx*2+2, except for the leaves. Every
// node has a parent on index (idx-1)/2;
self._heap = [];
// If the initial array is passed, we can build the heap in linear time
// complexity (O(N)) compared to linearithmic time complexity (O(nlogn)) if
// we push elements one by one.
if (_.isArray(options.initData))
self._initFromData(options.initData);
};
_.extend(MaxHeap.prototype, {
// Builds a new heap in-place in linear time based on passed data
_initFromData: function (data) {
var self = this;
self._heap = _.map(data, function (o) {
return { id: o.id, value: o.value };
});
_.each(data, function (o, i) {
self._heapIdx.set(o.id, i);
});
if (! data.length)
return;
// start from the first non-leaf - the parent of the last leaf
for (var i = parentIdx(data.length - 1); i >= 0; i--)
self._downHeap(i);
},
_downHeap: function (idx) {
var self = this;
while (leftChildIdx(idx) < self.size()) {
var left = leftChildIdx(idx);
var right = rightChildIdx(idx);
var largest = idx;
if (left < self.size()) {
largest = self._maxIndex(largest, left);
}
if (right < self.size()) {
largest = self._maxIndex(largest, right);
}
if (largest === idx)
break;
self._swap(largest, idx);
idx = largest;
}
},
_upHeap: function (idx) {
var self = this;
while (idx > 0) {
var parent = parentIdx(idx);
if (self._maxIndex(parent, idx) === idx) {
self._swap(parent, idx)
idx = parent;
} else {
break;
}
}
},
_maxIndex: function (idxA, idxB) {
var self = this;
var valueA = self._get(idxA);
var valueB = self._get(idxB);
return self._comparator(valueA, valueB) >= 0 ? idxA : idxB;
},
// Internal: gets raw data object placed on idxth place in heap
_get: function (idx) {
var self = this;
return self._heap[idx].value;
},
_swap: function (idxA, idxB) {
var self = this;
var recA = self._heap[idxA];
var recB = self._heap[idxB];
self._heapIdx.set(recA.id, idxB);
self._heapIdx.set(recB.id, idxA);
self._heap[idxA] = recB;
self._heap[idxB] = recA;
},
get: function (id) {
var self = this;
if (! self.has(id))
return null;
return self._get(self._heapIdx.get(id));
},
set: function (id, value) {
var self = this;
if (self.has(id)) {
if (self.get(id) === value)
return;
var idx = self._heapIdx.get(id);
self._heap[idx].value = value;
// Fix the new value's position
// Either bubble new value up if it is greater than its parent
self._upHeap(idx);
// or bubble it down if it is smaller than one of its children
self._downHeap(idx);
} else {
self._heapIdx.set(id, self._heap.length);
self._heap.push({ id: id, value: value });
self._upHeap(self._heap.length - 1);
}
},
remove: function (id) {
var self = this;
if (self.has(id)) {
var last = self._heap.length - 1;
var idx = self._heapIdx.get(id);
if (idx !== last) {
self._swap(idx, last);
self._heap.pop();
self._heapIdx.remove(id);
// Fix the swapped value's position
self._upHeap(idx);
self._downHeap(idx);
} else {
self._heap.pop();
self._heapIdx.remove(id);
}
}
},
has: function (id) {
var self = this;
return self._heapIdx.has(id);
},
empty: function (id) {
var self = this;
return !self.size();
},
clear: function () {
var self = this;
self._heap = [];
self._heapIdx.clear();
},
// iterate over values in no particular order
forEach: function (iterator) {
var self = this;
_.each(self._heap, function (obj) {
return iterator(obj.value, obj.id);
});
},
size: function () {
var self = this;
return self._heap.length;
},
setDefault: function (id, def) {
var self = this;
if (self.has(id))
return self.get(id);
self.set(id, def);
return def;
},
clone: function () {
var self = this;
var clone = new MaxHeap(self._comparator, self._heap);
return clone;
},
maxElementId: function () {
var self = this;
return self.size() ? self._heap[0].id : null;
},
_selfCheck: function () {
var self = this;
for (var i = 1; i < self._heap.length; i++)
if (self._maxIndex(parentIdx(i), i) !== parentIdx(i))
throw new Error("An item with id " + self._heap[i].id +
" has a parent younger than it: " +
self._heap[parentIdx(i)].id);
}
});
function leftChildIdx (i) { return i * 2 + 1; }
function rightChildIdx (i) { return i * 2 + 2; }
function parentIdx (i) { return (i - 1) >> 1; }

View File

@@ -0,0 +1,55 @@
// This implementation of Min/Max-Heap is just a subclass of Max-Heap
// with a Min-Heap as an encapsulated property.
//
// Most of the operations are just proxy methods to call the same method on both
// heaps.
//
// This implementation takes 2*N memory but is fairly simple to write and
// understand. And the constant factor of a simple Heap is usually smaller
// compared to other two-way priority queues like Min/Max Heaps
// (http://www.cs.otago.ac.nz/staffpriv/mike/Papers/MinMaxHeaps/MinMaxHeaps.pdf)
// and Interval Heaps
// (http://www.cise.ufl.edu/~sahni/dsaac/enrich/c13/double.htm)
MinMaxHeap = function (comparator, options) {
var self = this;
MaxHeap.call(self, comparator, options);
self._minHeap = new MaxHeap(function (a, b) {
return -comparator(a, b);
}, options);
};
Meteor._inherits(MinMaxHeap, MaxHeap);
_.extend(MinMaxHeap.prototype, {
set: function (id, value) {
var self = this;
MaxHeap.prototype.set.apply(self, arguments);
self._minHeap.set(id, value);
},
remove: function (id) {
var self = this;
MaxHeap.prototype.remove.apply(self, arguments);
self._minHeap.remove(id);
},
clear: function () {
var self = this;
MaxHeap.prototype.clear.apply(self, arguments);
self._minHeap.clear();
},
setDefault: function (id, def) {
var self = this;
MaxHeap.prototype.setDefault.apply(self, arguments);
return self._minHeap.setDefault(id, def);
},
clone: function () {
var self = this;
var clone = new MinMaxHeap(self._comparator, self._heap);
return clone;
},
minElementId: function () {
var self = this;
return self._minHeap.maxElementId();
}
});

View File

@@ -0,0 +1,18 @@
Package.describe({
summary: "Binary Heap datastructure implementation",
internal: true
});
Package.on_use(function (api) {
api.export('MaxHeap');
api.export('MinMaxHeap');
api.use(['underscore', 'id-map']);
api.add_files(['max-heap.js', 'min-max-heap.js']);
});
Package.on_test(function (api) {
api.use('tinytest');
api.use('binary-heap');
api.add_files('binary-heap-tests.js');
});

View File

@@ -7,16 +7,21 @@
Visit <a href="https://developers.facebook.com/apps" target="_blank">https://developers.facebook.com/apps</a>
</li>
<li>
Create New App (Only a name is required.)
Select "Apps", then "Create a New App". (You don't need to enter a namespace.)
</li>
<li>
Set "Sandbox Mode" to "Disabled"
</li>
<li>
Under "Select how your app integrates with Facebook", expand "Website with Facebook Login".
Select "Settings" and enter a "Contact Email". Then select "Add Platform"
and choose "Website".
</li>
<li>
Set Site URL to: <span class="url">{{siteUrl}}</span>
</li>
<li>
Select "Status" and make the app and all its live features available to
the general public.
</li>
<li>
Select "Dashboard".
</li>
</ol>
</template>

View File

@@ -19,6 +19,9 @@ if (Meteor.isServer) {
var factsByPackage = {};
var activeSubscriptions = [];
// Make factsByPackage data available to the server environment
Facts._factsByPackage = factsByPackage;
Facts.incrementServerFact = function (pkg, fact, increment) {
if (!_.has(factsByPackage, pkg)) {
factsByPackage[pkg] = {};

View File

@@ -10,28 +10,22 @@
If necessary, "Create Project"
</li>
<li>
Click "APIs &amp; auth" and "Registered apps" on the left
Click "APIs &amp; auth" and "Credentials" on the left
</li>
<li>
Click the "Register App" button
Click the "Create New Client ID" button
</li>
<li>
Choose "Web application" as the type
</li>
<li>
Click "Register"
Set Authorized Javascript Origins to: <span class="url">{{siteUrl}}</span>
</li>
<li>
Expand the "OAuth 2.0 Client ID section"
Set Authorized Redirect URI to: <span class="url">{{siteUrl}}_oauth/google?close</span>
</li>
<li>
Set Web Origin to: <span class="url">{{siteUrl}}</span>
</li>
<li>
Set Redirect URI to: <span class="url">{{siteUrl}}_oauth/google?close</span>
</li>
<li>
Click "Generate"
Click "Create Client ID"
</li>
</ol>
</template>

1
packages/id-map/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.build*

77
packages/id-map/id-map.js Normal file
View File

@@ -0,0 +1,77 @@
IdMap = function (idStringify, idParse) {
var self = this;
self._map = {};
self._idStringify = idStringify || JSON.stringify;
self._idParse = idParse || JSON.parse;
};
// Some of these methods are designed to match methods on OrderedDict, since
// (eg) ObserveMultiplex and _CachingChangeObserver use them interchangeably.
// (Conceivably, this should be replaced with "UnorderedDict" with a specific
// set of methods that overlap between the two.)
_.extend(IdMap.prototype, {
get: function (id) {
var self = this;
var key = self._idStringify(id);
return self._map[key];
},
set: function (id, value) {
var self = this;
var key = self._idStringify(id);
self._map[key] = value;
},
remove: function (id) {
var self = this;
var key = self._idStringify(id);
delete self._map[key];
},
has: function (id) {
var self = this;
var key = self._idStringify(id);
return _.has(self._map, key);
},
empty: function () {
var self = this;
return _.isEmpty(self._map);
},
clear: function () {
var self = this;
self._map = {};
},
// Iterates over the items in the map. Return `false` to break the loop.
forEach: function (iterator) {
var self = this;
// don't use _.each, because we can't break out of it.
var keys = _.keys(self._map);
for (var i = 0; i < keys.length; i++) {
var breakIfFalse = iterator.call(null, self._map[keys[i]],
self._idParse(keys[i]));
if (breakIfFalse === false)
return;
}
},
size: function () {
var self = this;
return _.size(self._map);
},
setDefault: function (id, def) {
var self = this;
var key = self._idStringify(id);
if (_.has(self._map, key))
return self._map[key];
self._map[key] = def;
return def;
},
// Assumes that values are EJSON-cloneable, and that we don't need to clone
// IDs (ie, that nobody is going to mutate an ObjectId).
clone: function () {
var self = this;
var clone = new IdMap(self._idStringify, self._idParse);
self.forEach(function (value, id) {
clone.set(id, EJSON.clone(value));
});
return clone;
}
});

View File

@@ -0,0 +1,11 @@
Package.describe({
summary: "Dictionary data structure allowing non-string keys",
internal: true
});
Package.on_use(function (api) {
api.export('IdMap');
api.use(['underscore', 'json', 'ejson']);
api.add_files([ 'id-map.js' ]);
});

View File

@@ -25,10 +25,17 @@ Plugin.registerSourceHandler("less", function (compileStep) {
var parser = new less.Parser(options);
var astFuture = new Future;
var ast;
var sourceMap = null;
try {
parser.parse(source, astFuture.resolver());
ast = astFuture.wait();
var ast = astFuture.wait();
var css = ast.toCSS({
sourceMap: true,
writeSourceMap: function (sm) {
sourceMap = JSON.parse(sm);
}
});
} catch (e) {
// less.Parser.parse is supposed to report any errors via its
// callback. But sometimes, it throws them instead. This is
@@ -42,14 +49,6 @@ Plugin.registerSourceHandler("less", function (compileStep) {
return;
}
var sourceMap = null;
var css = ast.toCSS({
sourceMap: true,
writeSourceMap: function (sm) {
sourceMap = JSON.parse(sm);
}
});
if (sourceMap) {
sourceMap.sources = [compileStep.inputPath];

View File

@@ -16,8 +16,13 @@
}
}
},
"websocket": {
"version": "1.0.8"
"faye-websocket": {
"version": "0.7.2",
"dependencies": {
"websocket-driver": {
"version": "0.3.2"
}
}
}
}
}

View File

@@ -49,7 +49,10 @@ var Connection = function (url, options) {
self._stream = new LivedataTest.ClientStream(url, {
retry: options.retry,
headers: options.headers,
_sockjsOptions: options._sockjsOptions
_sockjsOptions: options._sockjsOptions,
// To keep some tests quiet (because we don't have a real API for handling
// client-stream-level errors).
_dontPrintErrors: options._dontPrintErrors
});
}

View File

@@ -839,6 +839,13 @@ var Subscription = function (
_.extend(Subscription.prototype, {
_runHandler: function () {
// XXX should we unblock() here? Either before running the publish
// function, or before running _publishCursor.
//
// Right now, each publish function blocks all future publishes and
// methods waiting on data from Mongo (or whatever else the function
// blocks on). This probably slows page load in common cases.
var self = this;
try {
var res = maybeAuditArgumentChecks(
@@ -1160,7 +1167,7 @@ _.extend(Server.prototype, {
// drop all future data coming over this connection on the
// floor. We don't want to confuse things.
socket.removeAllListeners('data');
setTimeout(function () {
Meteor.setTimeout(function () {
socket.send(stringifyDDP({msg: 'failed', version: version}));
socket.close();
}, timeout);

View File

@@ -683,9 +683,10 @@ if (Meteor.isServer) {
testAsyncMulti("livedata - connect fails to unknown place", [
function (test, expect) {
var self = this;
self.conn = DDP.connect("example.com");
self.conn = DDP.connect("example.com", {_dontPrintErrors: true});
Meteor.setTimeout(expect(function () {
test.isFalse(self.conn.status().connected, "Not connected");
self.conn.close();
}), 500);
}
]);

View File

@@ -3,7 +3,11 @@ Package.describe({
internal: true
});
Npm.depends({sockjs: "0.3.8", websocket: "1.0.8"});
// We use 'faye-websocket' for connections in server-to-server DDP, mostly
// because it's the same library used as a server in sockjs, and it's easiest to
// deal with a single websocket implementation. (Plus, its maintainer is easy
// to work with on pull requests.)
Npm.depends({sockjs: "0.3.8", "faye-websocket": "0.7.2"});
Package.on_use(function (api) {
api.use(['check', 'random', 'ejson', 'json', 'underscore', 'deps',

View File

@@ -11,43 +11,16 @@
// ping frames or with DDP-level messages.)
LivedataTest.ClientStream = function (endpoint, options) {
var self = this;
options = options || {};
self.options = _.extend({
retry: true
}, options);
// WebSocket-Node https://github.com/Worlize/WebSocket-Node
// Chosen because it can run without native components. It has a
// somewhat idiosyncratic API. We may want to use 'ws' instead in the
// future.
//
// Since server-to-server DDP is still an experimental feature, we only
// require the module if we actually create a server-to-server
// connection. This is a minor efficiency improvement, but moreover: while
// 'websocket' doesn't require native components, it tries to use some
// optional native components and prints a warning if it can't load
// them. Since native components in packages don't work when transferred to
// other architectures yet, this means that require('websocket') prints a
// spammy log message when deployed to another architecture. Delaying the
// require means you only get the log message if you're actually using the
// feature.
self.client = new (Npm.require('websocket').client)();
self.client = null; // created in _launchConnection
self.endpoint = endpoint;
self.currentConnection = null;
options = options || {};
self.headers = options.headers || {};
self.client.on('connect', Meteor.bindEnvironment(
function (connection) {
return self._onConnect(connection);
},
"stream connect callback"
));
self.client.on('connectFailed', function (error) {
// XXX: Make this do something better than make the tests hang if it does not work.
return self._lostConnection();
});
self.headers = self.options.headers || {};
self._initCommon();
@@ -63,7 +36,7 @@ _.extend(LivedataTest.ClientStream.prototype, {
send: function (data) {
var self = this;
if (self.currentStatus.connected) {
self.currentConnection.send(data);
self.client.send(data);
}
},
@@ -73,80 +46,37 @@ _.extend(LivedataTest.ClientStream.prototype, {
self.endpoint = url;
},
_onConnect: function (connection) {
_onConnect: function (client) {
var self = this;
if (client !== self.client) {
// This connection is not from the last call to _launchConnection.
// But _launchConnection calls _cleanup which closes previous connections.
// It's our belief that this stifles future 'open' events, but maybe
// we are wrong?
throw new Error("Got open from inactive client");
}
if (self._forcedToDisconnect) {
// We were asked to disconnect between trying to open the connection and
// actually opening it. Let's just pretend this never happened.
connection.close();
self.client.close();
self.client = null;
return;
}
if (self.currentStatus.connected) {
// We already have a connection. It must have been the case that
// we started two parallel connection attempts (because we
// wanted to 'reconnect now' on a hanging connection and we had
// no way to cancel the connection attempt.) Just ignore/close
// the latecomer.
connection.close();
return;
// We already have a connection. It must have been the case that we
// started two parallel connection attempts (because we wanted to
// 'reconnect now' on a hanging connection and we had no way to cancel the
// connection attempt.) But this shouldn't happen (similarly to the client
// !== self.client check above).
throw new Error("Two parallel connections?");
}
if (self.connectionTimer) {
clearTimeout(self.connectionTimer);
self.connectionTimer = null;
}
var onError = Meteor.bindEnvironment(
function (_this, error) {
if (self.currentConnection !== _this)
return;
Meteor._debug("stream error", error.toString(),
(new Date()).toDateString());
self._lostConnection();
},
"stream error callback"
);
connection.on('error', function (error) {
// We have to pass in `this` explicitly because bindEnvironment
// doesn't propagate it for us.
onError(this, error);
});
var onClose = Meteor.bindEnvironment(
function (_this) {
if (self.options._testOnClose)
self.options._testOnClose();
if (self.currentConnection !== _this)
return;
self._lostConnection();
},
"stream close callback"
);
connection.on('close', function () {
// We have to pass in `this` explicitly because bindEnvironment
// doesn't propagate it for us.
onClose(this);
});
connection.on('message', function (message) {
if (self.currentConnection !== this)
return; // old connection still emitting messages
if (message.type === "utf8") // ignore binary frames
_.each(self.eventCallbacks.message, function (callback) {
callback(message.utf8Data);
});
});
self._clearConnectionTimer();
// update status
self.currentConnection = connection;
self.currentStatus.status = "connected";
self.currentStatus.connected = true;
self.currentStatus.retryCount = 0;
@@ -161,10 +91,10 @@ _.extend(LivedataTest.ClientStream.prototype, {
var self = this;
self._clearConnectionTimer();
if (self.currentConnection) {
var conn = self.currentConnection;
self.currentConnection = null;
conn.close();
if (self.client) {
var client = self.client;
self.client = null;
client.close();
}
},
@@ -181,25 +111,63 @@ _.extend(LivedataTest.ClientStream.prototype, {
var self = this;
self._cleanup(); // cleanup the old socket, if there was one.
// launch a connect attempt. we have no way to track it. we either
// get an _onConnect event, or we don't.
// Since server-to-server DDP is still an experimental feature, we only
// require the module if we actually create a server-to-server
// connection.
var FayeWebSocket = Npm.require('faye-websocket');
// XXX: set up a timeout on this.
// We would like to specify 'ddp' as the subprotocol here. The npm module we
// used to use as a client would fail the handshake if we ask for a
// subprotocol and the server doesn't send one back (and sockjs doesn't).
// Faye doesn't have that behavior; it's unclear from reading RFC 6455 if
// Faye is erroneous or not. So for now, we don't specify protocols.
var client = self.client = new FayeWebSocket.Client(
toWebsocketUrl(self.endpoint),
[/*no subprotocols*/],
{headers: self.headers}
);
// we would like to specify 'ddp' as the protocol here, but
// unfortunately WebSocket-Node fails the handshake if we ask for
// a protocol and the server doesn't send one back (and sockjs
// doesn't). also, related: I guess we have to accept that
// 'stream' is ddp-specific
self.client.connect(toWebsocketUrl(self.endpoint),
undefined, // protocols
undefined, // origin
self.headers);
if (self.connectionTimer)
clearTimeout(self.connectionTimer);
self.connectionTimer = setTimeout(
self._clearConnectionTimer();
self.connectionTimer = Meteor.setTimeout(
_.bind(self._lostConnection, self),
self.CONNECT_TIMEOUT);
self.client.on('open', Meteor.bindEnvironment(function () {
return self._onConnect(client);
}, "stream connect callback"));
var clientOnIfCurrent = function (event, description, f) {
self.client.on(event, Meteor.bindEnvironment(function () {
// Ignore events from any connection we've already cleaned up.
if (client !== self.client)
return;
f.apply(this, arguments);
}, description));
};
clientOnIfCurrent('error', 'stream error callback', function (error) {
if (!self.options._dontPrintErrors)
Meteor._debug("stream error", error.message);
// XXX: Make this do something better than make the tests hang if it does
// not work.
self._lostConnection();
});
clientOnIfCurrent('close', 'stream close callback', function () {
self._lostConnection();
});
clientOnIfCurrent('message', 'stream message callback', function (message) {
// Ignore binary frames, where message.data is a Buffer
if (typeof message.data !== "string")
return;
_.each(self.eventCallbacks.message, function (callback) {
callback(message.data);
});
});
}
});

View File

@@ -1,17 +1,24 @@
var Fiber = Npm.require('fibers');
Tinytest.addAsync("stream client - callbacks run in a fiber", function (test, onComplete) {
stream = new LivedataTest.ClientStream(
Meteor.absoluteUrl(),
{
_testOnClose: function () {
test.isTrue(Fiber.current);
onComplete();
}
}
);
stream.on('reset', function () {
test.isTrue(Fiber.current);
stream.disconnect();
});
});
testAsyncMulti("stream client - callbacks run in a fiber", [
function (test, expect) {
var stream = new LivedataTest.ClientStream(Meteor.absoluteUrl());
var messageFired = false;
var resetFired = false;
stream.on('message', expect(function () {
test.isTrue(Fiber.current);
if (resetFired)
stream.disconnect();
messageFired = true;
}));
stream.on('reset', expect(function () {
test.isTrue(Fiber.current);
if (messageFired)
stream.disconnect();
resetFired = true;
}));
}
]);

View File

@@ -111,5 +111,26 @@ _.extend(Meteor, {
return fut.wait();
return result;
};
},
// Sets child's prototype to a new object whose prototype is parent's
// prototype. Used as:
// Meteor._inherit(ClassB, ClassA).
// _.extend(ClassB.prototype, { ... })
// Inspired by CoffeeScript's `extend` and Google Closure's `goog.inherits`.
_inherits: function (Child, Parent) {
// copy static fields
_.each(Parent, function (prop, field) {
Child[field] = prop;
});
// a middle member of prototype chain: takes the prototype from the Parent
var Middle = function () {
this.constructor = Child;
};
Middle.prototype = Parent.prototype;
Child.prototype = new Middle();
Child.__super__ = Parent.prototype;
return Child;
}
});

View File

@@ -16,7 +16,10 @@ isIndexable = function (x) {
return isArray(x) || isPlainObject(x);
};
isOperatorObject = function (valueSelector) {
// Returns true if this is an object with at least one key and all keys begin
// with $. Unless inconsistentOK is set, throws if some keys begin with $ and
// others don't.
isOperatorObject = function (valueSelector, inconsistentOK) {
if (!isPlainObject(valueSelector))
return false;
@@ -26,7 +29,10 @@ isOperatorObject = function (valueSelector) {
if (theseAreOperators === undefined) {
theseAreOperators = thisIsOperator;
} else if (theseAreOperators !== thisIsOperator) {
throw new Error("Inconsistent operator: " + valueSelector);
if (!inconsistentOK)
throw new Error("Inconsistent operator: " +
JSON.stringify(valueSelector));
theseAreOperators = false;
}
});
return !!theseAreOperators; // {} has no operators

View File

@@ -1,74 +1,7 @@
LocalCollection._IdMap = function () {
var self = this;
self._map = {};
IdMap.call(self, LocalCollection._idStringify, LocalCollection._idParse);
};
// Some of these methods are designed to match methods on OrderedDict, since
// (eg) ObserveMultiplex and _CachingChangeObserver use them interchangeably.
// (Conceivably, this should be replaced with "UnorderedDict" with a specific
// set of methods that overlap between the two.)
Meteor._inherits(LocalCollection._IdMap, IdMap);
_.extend(LocalCollection._IdMap.prototype, {
get: function (id) {
var self = this;
var key = LocalCollection._idStringify(id);
return self._map[key];
},
set: function (id, value) {
var self = this;
var key = LocalCollection._idStringify(id);
self._map[key] = value;
},
remove: function (id) {
var self = this;
var key = LocalCollection._idStringify(id);
delete self._map[key];
},
has: function (id) {
var self = this;
var key = LocalCollection._idStringify(id);
return _.has(self._map, key);
},
empty: function () {
var self = this;
return _.isEmpty(self._map);
},
clear: function () {
var self = this;
self._map = {};
},
// Iterates over the items in the map. Return `false` to break the loop.
forEach: function (iterator) {
var self = this;
// don't use _.each, because we can't break out of it.
var keys = _.keys(self._map);
for (var i = 0; i < keys.length; i++) {
var breakIfFalse = iterator.call(null, self._map[keys[i]],
LocalCollection._idParse(keys[i]));
if (breakIfFalse === false)
return;
}
},
size: function () {
var self = this;
return _.size(self._map);
},
setDefault: function (id, def) {
var self = this;
var key = LocalCollection._idStringify(id);
if (_.has(self._map, key))
return self._map[key];
self._map[key] = def;
return def;
},
// Assumes that values are EJSON-cloneable, and that we don't need to clone
// IDs (ie, that nobody is going to mutate an ObjectId).
clone: function () {
var self = this;
var clone = new LocalCollection._IdMap;
self.forEach(function (value, id) {
clone.set(id, EJSON.clone(value));
});
return clone;
}
});

View File

@@ -332,6 +332,31 @@ Tinytest.add("minimongo - selector and projection combination", function (test)
});
Tinytest.add("minimongo - sorter and projection combination", function (test) {
function testSorterProjectionComb (sortSpec, proj, expected, desc) {
var sorter = new Minimongo.Sorter(sortSpec);
test.equal(sorter.combineIntoProjection(proj), expected, desc);
}
// Test with inclusive projection
testSorterProjectionComb({ a: 1, b: 1 }, { b: 1, c: 1, d: 1 }, { a: true, b: true, c: true, d: true }, "simplest incl");
testSorterProjectionComb({ a: 1, b: -1 }, { b: 1, c: 1, d: 1 }, { a: true, b: true, c: true, d: true }, "simplest incl");
testSorterProjectionComb({ 'a.c': 1 }, { b: 1 }, { 'a.c': true, b: true }, "dot path incl");
testSorterProjectionComb({ 'a.1.c': 1 }, { b: 1 }, { 'a.c': true, b: true }, "dot num path incl");
testSorterProjectionComb({ 'a.1.c': 1 }, { b: 1, a: 1 }, { a: true, b: true }, "dot num path incl overlap");
testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, { b: 1 }, { 'a.c': true, 'a.b': true, b: true }, "dot num path incl");
testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, {}, {}, "dot num path with empty incl");
// Test with exclusive projection
testSorterProjectionComb({ a: 1, b: 1 }, { b: 0, c: 0, d: 0 }, { c: false, d: false }, "simplest excl");
testSorterProjectionComb({ a: 1, b: -1 }, { b: 0, c: 0, d: 0 }, { c: false, d: false }, "simplest excl");
testSorterProjectionComb({ 'a.c': 1 }, { b: 0 }, { b: false }, "dot path excl");
testSorterProjectionComb({ 'a.1.c': 1 }, { b: 0 }, { b: false }, "dot num path excl");
testSorterProjectionComb({ 'a.1.c': 1 }, { b: 0, a: 0 }, { b: false }, "dot num path excl overlap");
testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, { b: 0 }, { b: false }, "dot num path excl");
});
(function () {
// TODO: Tests for "can selector become true by modifier" are incomplete,
// absent or test the functionality of "not ideal" implementation (test checks

View File

@@ -678,6 +678,9 @@ Tinytest.add("minimongo - selector_compiler", function (test) {
nomatch({a: {$regex: 'a'}}, {a: 'cut'});
nomatch({a: {$regex: 'a'}}, {a: 'CAT'});
match({a: {$regex: 'a', $options: 'i'}}, {a: 'CAT'});
match({a: {$regex: '', $options: 'i'}}, {a: 'foo'});
nomatch({a: {$regex: '', $options: 'i'}}, {});
nomatch({a: {$regex: '', $options: 'i'}}, {a: 5});
nomatch({a: /undefined/}, {});
nomatch({a: {$regex: 'undefined'}}, {});
nomatch({a: /xxx/}, {});
@@ -817,6 +820,12 @@ Tinytest.add("minimongo - selector_compiler", function (test) {
nomatch({$or: [{a: 2}, {a: 3}], b: 2}, {a: 1, b: 2});
nomatch({$or: [{a: 1}, {a: 2}], b: 3}, {a: 1, b: 2});
// Combining $or with equality
match({x: 1, $or: [{a: 1}, {b: 1}]}, {x: 1, b: 1});
match({$or: [{a: 1}, {b: 1}], x: 1}, {x: 1, b: 1});
nomatch({x: 1, $or: [{a: 1}, {b: 1}]}, {b: 1});
nomatch({x: 1, $or: [{a: 1}, {b: 1}]}, {x: 1});
// $or and $lt, $lte, $gt, $gte
match({$or: [{a: {$lte: 1}}, {a: 2}]}, {a: 1});
nomatch({$or: [{a: {$lt: 1}}, {a: 2}]}, {a: 1});
@@ -1100,6 +1109,16 @@ Tinytest.add("minimongo - selector_compiler", function (test) {
nomatch({a: {$elemMatch: {x: 5}}}, {a: {x: 5}});
match({a: {$elemMatch: {0: {$gt: 5, $lt: 9}}}}, {a: [[6]]});
match({a: {$elemMatch: {'0.b': {$gt: 5, $lt: 9}}}}, {a: [[{b:6}]]});
match({a: {$elemMatch: {x: 1, $or: [{a: 1}, {b: 1}]}}},
{a: [{x: 1, b: 1}]});
match({a: {$elemMatch: {$or: [{a: 1}, {b: 1}], x: 1}}},
{a: [{x: 1, b: 1}]});
nomatch({a: {$elemMatch: {x: 1, $or: [{a: 1}, {b: 1}]}}},
{a: [{b: 1}]});
nomatch({a: {$elemMatch: {x: 1, $or: [{a: 1}, {b: 1}]}}},
{a: [{x: 1}]});
nomatch({a: {$elemMatch: {x: 1, $or: [{a: 1}, {b: 1}]}}},
{a: [{x: 1}, {b: 1}]});
// $comment
match({a: 5, $comment: "asdf"}, {a: 5});
@@ -1549,7 +1568,7 @@ Tinytest.add("minimongo - ordering", function (test) {
// document ordering under a sort specification
var verify = function (sorts, docs) {
_.each(_.isArray(sorts) ? sorts : [sorts], function (sort) {
var sorter = new MinimongoTest.Sorter(sort);
var sorter = new Minimongo.Sorter(sort);
assert_ordering(test, sorter.getComparator(), docs);
});
};
@@ -1577,15 +1596,15 @@ Tinytest.add("minimongo - ordering", function (test) {
[{c: 1}, {a: 1, b: 2}, {a: 1, b: 3}, {a: 2, b: 0}]);
test.throws(function () {
new MinimongoTest.Sorter("a");
new Minimongo.Sorter("a");
});
test.throws(function () {
new MinimongoTest.Sorter(123);
new Minimongo.Sorter(123);
});
// No sort spec implies everything equal.
test.equal(new MinimongoTest.Sorter({}).getComparator()({a:1}, {a:2}), 0);
test.equal(new Minimongo.Sorter({}).getComparator()({a:1}, {a:2}), 0);
// All sorts of array edge cases!
// Increasing sort sorts by the smallest element it finds; 1 < 2.

View File

@@ -7,7 +7,7 @@ Package.on_use(function (api) {
api.export('LocalCollection');
api.export('Minimongo');
api.export('MinimongoTest', { testOnly: true });
api.use(['underscore', 'json', 'ejson', 'ordered-dict', 'deps',
api.use(['underscore', 'json', 'ejson', 'id-map', 'ordered-dict', 'deps',
'random', 'ordered-dict']);
// This package is used for geo-location queries such as $near
api.use('geojson-utils');
@@ -28,7 +28,8 @@ Package.on_use(function (api) {
// Functionality used only by oplog tailing on the server side
api.add_files([
'selector_projection.js',
'selector_modifier.js'
'selector_modifier.js',
'sorter_projection.js'
], 'server');
});

View File

@@ -384,7 +384,7 @@ var VALUE_OPERATORS = {
},
// $options just provides options for $regex; its logic is inside $regex
$options: function (operand, valueSelector) {
if (!valueSelector.$regex)
if (!_.has(valueSelector, '$regex'))
throw Error("$options needs a $regex");
return everythingMatcher;
},
@@ -653,7 +653,7 @@ var ELEMENT_OPERATORS = {
throw Error("$elemMatch need an object");
var subMatcher, isDocMatcher;
if (isOperatorObject(operand)) {
if (isOperatorObject(operand, true)) {
subMatcher = compileValueSelector(operand, matcher);
isDocMatcher = false;
} else {

View File

@@ -3,7 +3,7 @@
// @returns Object - projection object (same as fields option of mongo cursor)
Minimongo.Matcher.prototype.combineIntoProjection = function (projection) {
var self = this;
var selectorPaths = self._getPathsElidingNumericKeys();
var selectorPaths = Minimongo._pathsElidingNumericKeys(self._getPaths());
// Special case for $where operator in the selector - projection should depend
// on all fields of the document. getSelectorPaths returns a list of paths
@@ -12,12 +12,23 @@ Minimongo.Matcher.prototype.combineIntoProjection = function (projection) {
if (_.contains(selectorPaths, ''))
return {};
return combineImportantPathsIntoProjection(selectorPaths, projection);
};
Minimongo._pathsElidingNumericKeys = function (paths) {
var self = this;
return _.map(paths, function (path) {
return _.reject(path.split('.'), isNumericKey).join('.');
});
};
combineImportantPathsIntoProjection = function (paths, projection) {
var prjDetails = projectionDetails(projection);
var tree = prjDetails.tree;
var mergedProjection = {};
// merge the paths to include
tree = pathsToTree(selectorPaths,
tree = pathsToTree(paths,
function (path) { return true; },
function (node, path, fullPath) { return true; },
tree);
@@ -40,13 +51,6 @@ Minimongo.Matcher.prototype.combineIntoProjection = function (projection) {
}
};
Minimongo.Matcher.prototype._getPathsElidingNumericKeys = function () {
var self = this;
return _.map(self._getPaths(), function (path) {
return _.reject(path.split('.'), isNumericKey).join('.');
});
};
// Returns a set of key paths similar to
// { 'foo.bar': 1, 'a.b.c': 1 }
var treeToPaths = function (tree, prefix) {

View File

@@ -14,17 +14,19 @@
Sorter = function (spec) {
var self = this;
var sortSpecParts = [];
var sortSpecParts = self._sortSpecParts = [];
if (spec instanceof Array) {
for (var i = 0; i < spec.length; i++) {
if (typeof spec[i] === "string") {
sortSpecParts.push({
path: spec[i],
lookup: makeLookupFunction(spec[i]),
ascending: true
});
} else {
sortSpecParts.push({
path: spec[i][0],
lookup: makeLookupFunction(spec[i][0]),
ascending: spec[i][1] !== "desc"
});
@@ -33,12 +35,13 @@ Sorter = function (spec) {
} else if (typeof spec === "object") {
for (var key in spec) {
sortSpecParts.push({
path: key,
lookup: makeLookupFunction(key),
ascending: spec[key] >= 0
});
}
} else {
throw Error("Bad sort specification: ", JSON.stringify(spec));
throw Error("Bad sort specification: " + JSON.stringify(spec));
}
// reduceValue takes in all the possible values for the sort key along various
@@ -118,7 +121,12 @@ Sorter.prototype.getComparator = function (options) {
}]);
};
MinimongoTest.Sorter = Sorter;
Sorter.prototype._getPaths = function () {
var self = this;
return _.pluck(self._sortSpecParts, 'path');
};
Minimongo.Sorter = Sorter;
// Given an array of comparators
// (functions (a,b)->(negative or positive or zero)), returns a single

View File

@@ -0,0 +1,6 @@
Sorter.prototype.combineIntoProjection = function (projection) {
var self = this;
var specPaths = Minimongo._pathsElidingNumericKeys(self._getPaths());
return combineImportantPathsIntoProjection(specPaths, projection);
};

View File

@@ -754,11 +754,15 @@ MongoConnection.prototype._createSynchronousCursor = function(
// ... and to keep querying the server indefinitely rather than just 5 times
// if there's no more data.
mongoOptions.numberOfRetries = -1;
// And if this cursor specifies a 'ts', then set the undocumented oplog
// replay flag, which does a special scan to find the first document
// (instead of creating an index on ts).
if (cursorDescription.selector.ts)
// And if this is on the oplog collection and the cursor specifies a 'ts',
// then set the undocumented oplog replay flag, which does a special scan to
// find the first document (instead of creating an index on ts). This is a
// very hard-coded Mongo flag which only works on the oplog collection and
// only works with the ts field.
if (cursorDescription.collectionName === OPLOG_COLLECTION &&
cursorDescription.selector.ts) {
mongoOptions.oplogReplay = true;
}
}
var dbCursor = collection.find(

View File

@@ -745,6 +745,425 @@ if (Meteor.isServer) {
});
x++;
});
// compares arrays a and b w/o looking at order
var setsEqual = function (a, b) {
a = _.map(a, EJSON.stringify);
b = _.map(b, EJSON.stringify);
return _.isEmpty(_.difference(a, b)) && _.isEmpty(_.difference(b, a));
};
// This test mainly checks the correctness of oplog code dealing with limited
// queries. Compitablity with poll-diff is added as well.
Tinytest.addAsync("mongo-livedata - observe sorted, limited " + idGeneration, function (test, onComplete) {
var run = test.runId();
var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions);
var observer = function () {
var state = {};
var output = [];
var callbacks = {
changed: function (newDoc) {
output.push({changed: newDoc._id});
state[newDoc._id] = newDoc;
},
added: function (newDoc) {
output.push({added: newDoc._id});
state[newDoc._id] = newDoc;
},
removed: function (oldDoc) {
output.push({removed: oldDoc._id});
delete state[oldDoc._id];
}
};
var handle = coll.find({foo: 22},
{sort: {bar: 1}, limit: 3}).observe(callbacks);
return {output: output, handle: handle, state: state};
};
var clearOutput = function (o) { o.output.splice(0, o.output.length); };
var ins = function (doc) {
var id; runInFence(function () { id = coll.insert(doc); });
return id;
};
var rem = function (sel) { runInFence(function () { coll.remove(sel); }); };
var upd = function (sel, mod, opt) {
runInFence(function () {
coll.update(sel, mod, opt);
});
};
// tests '_id' subfields for all documents in oplog buffer
var testOplogBufferIds = function (ids) {
var bufferIds = [];
o.handle._multiplexer._observeDriver._unpublishedBuffer.forEach(function (x, id) {
bufferIds.push(id);
});
test.isTrue(setsEqual(ids, bufferIds), "expected: " + ids + "; got: " + bufferIds);
};
var testSafeAppendToBufferFlag = function (expected) {
if (expected)
test.isTrue(o.handle._multiplexer._observeDriver._safeAppendToBuffer);
else
test.isFalse(o.handle._multiplexer._observeDriver._safeAppendToBuffer);
};
// Insert a doc and start observing.
var docId1 = ins({foo: 22, bar: 5});
var o = observer();
var usesOplog = o.handle._multiplexer._observeDriver._usesOplog;
// Initial add.
test.length(o.output, 1);
test.equal(o.output.shift(), {added: docId1});
// Insert another doc (blocking until observes have fired).
var docId2 = ins({foo: 22, bar: 6});
// Observed add.
test.length(o.output, 1);
test.equal(o.output.shift(), {added: docId2});
var docId3 = ins({ foo: 22, bar: 3 });
test.length(o.output, 1);
test.equal(o.output.shift(), {added: docId3});
// Add a non-matching document
ins({ foo: 13 });
// It shouldn't be added
test.length(o.output, 0);
// Add something that matches but is too big to fit in
var docId4 = ins({ foo: 22, bar: 7 });
// It shouldn't be added
test.length(o.output, 0);
// Let's add something small enough to fit in
var docId5 = ins({ foo: 22, bar: -1 });
// We should get an added and a removed events
test.length(o.output, 2);
// doc 2 was removed from the published set as it is too big to be in
test.isTrue(setsEqual(o.output, [{added: docId5}, {removed: docId2}]));
clearOutput(o);
// Now remove something and that doc 2 should be right back
rem(docId5);
test.length(o.output, 2);
test.isTrue(setsEqual(o.output, [{removed: docId5}, {added: docId2}]));
clearOutput(o);
usesOplog && testOplogBufferIds([docId4]);
usesOplog && testSafeAppendToBufferFlag(true);
// Current state is [3 5 6 | 7]
// Add some negative numbers overflowing the buffer.
// New documents will take the published place, [3 5 6] will take the buffer
// and 7 will be outside of the buffer in MongoDB.
var docId6 = ins({ foo: 22, bar: -1 });
var docId7 = ins({ foo: 22, bar: -2 });
var docId8 = ins({ foo: 22, bar: -3 });
test.length(o.output, 6);
var expected = [{added: docId6}, {removed: docId2},
{added: docId7}, {removed: docId1},
{added: docId8}, {removed: docId3}];
test.isTrue(setsEqual(o.output, expected));
clearOutput(o);
usesOplog && testOplogBufferIds([docId1, docId2, docId3]);
usesOplog && testSafeAppendToBufferFlag(false);
// Now the state is [-3 -2 -1 | 3 5 6] 7
// If we update first 3 docs (increment them by 20), it would be
// interesting.
upd({ bar: { $lt: 0 }}, { $inc: { bar: 20 } }, { multi: true });
// The updated documents can't find their place in published and they can't
// be buffered as we are not aware of the situation outside of the buffer.
// But since our buffer becomes empty, it will be refilled partially with
// updated documents.
test.length(o.output, 6);
var expectedRemoves = [{removed: docId6},
{removed: docId7},
{removed: docId8}];
var expectedAdds = [{added: docId3},
{added: docId1},
{added: docId2}];
test.isTrue(setsEqual(o.output, expectedAdds.concat(expectedRemoves)));
clearOutput(o);
usesOplog && testOplogBufferIds([docId4, docId7, docId8]);
usesOplog && testSafeAppendToBufferFlag(false);
// The new arrangement is [3 5 6 | 7 17 18] 19
// By ids: [docId3, docId1, docId2] docId4] docId6 docId7 docId8
// Remove first 4 docs (3, 1, 2, 4) forcing buffer to become empty and
// schedule a repoll.
rem({ bar: { $lt: 10 } });
// XXX the oplog code analyzes the events one by one: one remove after
// another. Poll-n-diff code, on the other side, analyzes the batch action
// of multiple remove. Because of that difference, expected outputs differ.
if (usesOplog) {
var expectedRemoves = [{removed: docId3}, {removed: docId1},
{removed: docId2}, {removed: docId4}];
var expectedAdds = [{added: docId4}, {added: docId8},
{added: docId7}, {added: docId6}];
test.length(o.output, 8);
} else {
var expectedRemoves = [{removed: docId3}, {removed: docId1},
{removed: docId2}];
var expectedAdds = [{added: docId8}, {added: docId7}, {added: docId6}];
test.length(o.output, 6);
}
test.isTrue(setsEqual(o.output, expectedAdds.concat(expectedRemoves)));
clearOutput(o);
usesOplog && testOplogBufferIds([]);
usesOplog && testSafeAppendToBufferFlag(true);
// The new arrangement is [17 18 19] or [docId6 docId7 docId8]
var docId9 = ins({ foo: 22, bar: 21 });
var docId10 = ins({ foo: 22, bar: 31 });
var docId11 = ins({ foo: 22, bar: 41 });
var docId12 = ins({ foo: 22, bar: 51 });
// Becomes [17 18 19 | 21 31 41] 51
usesOplog && testOplogBufferIds([docId9, docId10, docId11]);
usesOplog && testSafeAppendToBufferFlag(false);
test.length(o.output, 0);
upd({ bar: { $lt: 20 } }, { $inc: { bar: 5 } }, { multi: true });
// Becomes [21 22 23 | 24 31 41] 51
test.length(o.output, 4);
test.isTrue(setsEqual(o.output, [{removed: docId6},
{added: docId9},
{changed: docId7},
{changed: docId8}]));
clearOutput(o);
usesOplog && testOplogBufferIds([docId6, docId10, docId11]);
usesOplog && testSafeAppendToBufferFlag(false);
rem(docId9);
// Becomes [22 23 24 | 31 41] 51
test.length(o.output, 2);
test.isTrue(setsEqual(o.output, [{removed: docId9}, {added: docId6}]));
clearOutput(o);
usesOplog && testOplogBufferIds([docId10, docId11]);
usesOplog && testSafeAppendToBufferFlag(false);
upd({ bar: { $gt: 25 } }, { $inc: { bar: -7.5 } }, { multi: true });
// Becomes [22 23 23.5 | 24] 33.5 43.5 - 33.5 doesn't update in-place in
// buffer, because it the driver is not sure it can do it and there is no a
// different doc which is less than 33.5.
test.length(o.output, 2);
test.isTrue(setsEqual(o.output, [{removed: docId6}, {added: docId10}]));
clearOutput(o);
usesOplog && testOplogBufferIds([docId6]);
usesOplog && testSafeAppendToBufferFlag(false);
// Force buffer objects to be moved into published set so we can check them
rem(docId7);
rem(docId8);
rem(docId10);
// Becomes [24 33.5 43.5]
test.length(o.output, 6);
test.isTrue(setsEqual(o.output, [{removed: docId7}, {removed: docId8},
{removed: docId10}, {added: docId6},
{added: docId11}, {added: docId12}]));
test.length(_.keys(o.state), 3);
test.equal(o.state[docId6], { _id: docId6, foo: 22, bar: 24 });
test.equal(o.state[docId11], { _id: docId11, foo: 22, bar: 33.5 });
test.equal(o.state[docId12], { _id: docId12, foo: 22, bar: 43.5 });
clearOutput(o);
usesOplog && testOplogBufferIds([]);
usesOplog && testSafeAppendToBufferFlag(true);
o.handle.stop();
onComplete();
});
Tinytest.addAsync("mongo-livedata - observe sorted, limited, sort fields " + idGeneration, function (test, onComplete) {
var run = test.runId();
var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions);
var observer = function () {
var state = {};
var output = [];
var callbacks = {
changed: function (newDoc) {
output.push({changed: newDoc._id});
state[newDoc._id] = newDoc;
},
added: function (newDoc) {
output.push({added: newDoc._id});
state[newDoc._id] = newDoc;
},
removed: function (oldDoc) {
output.push({removed: oldDoc._id});
delete state[oldDoc._id];
}
};
var handle = coll.find({}, {sort: {x: 1},
limit: 2,
fields: {y: 1}}).observe(callbacks);
return {output: output, handle: handle, state: state};
};
var clearOutput = function (o) { o.output.splice(0, o.output.length); };
var ins = function (doc) {
var id; runInFence(function () { id = coll.insert(doc); });
return id;
};
var rem = function (id) {
runInFence(function () { coll.remove(id); });
};
var o = observer();
var docId1 = ins({ x: 1, y: 1222 });
var docId2 = ins({ x: 5, y: 5222 });
test.length(o.output, 2);
test.equal(o.output, [{added: docId1}, {added: docId2}]);
clearOutput(o);
var docId3 = ins({ x: 7, y: 7222 });
test.length(o.output, 0);
var docId4 = ins({ x: -1, y: -1222 });
// Becomes [docId4 docId1 | docId2 docId3]
test.length(o.output, 2);
test.isTrue(setsEqual(o.output, [{added: docId4}, {removed: docId2}]));
test.equal(_.size(o.state), 2);
test.equal(o.state[docId4], {_id: docId4, y: -1222});
test.equal(o.state[docId1], {_id: docId1, y: 1222});
clearOutput(o);
rem(docId2);
// Becomes [docId4 docId1 | docId3]
test.length(o.output, 0);
rem(docId4);
// Becomes [docId1 docId3]
test.length(o.output, 2);
test.isTrue(setsEqual(o.output, [{added: docId3}, {removed: docId4}]));
test.equal(_.size(o.state), 2);
test.equal(o.state[docId3], {_id: docId3, y: 7222});
test.equal(o.state[docId1], {_id: docId1, y: 1222});
clearOutput(o);
onComplete();
});
Tinytest.addAsync("mongo-livedata - observe sorted, limited, big initial set" + idGeneration, function (test, onComplete) {
var run = test.runId();
var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions);
var observer = function () {
var state = {};
var output = [];
var callbacks = {
changed: function (newDoc) {
output.push({changed: newDoc._id});
state[newDoc._id] = newDoc;
},
added: function (newDoc) {
output.push({added: newDoc._id});
state[newDoc._id] = newDoc;
},
removed: function (oldDoc) {
output.push({removed: oldDoc._id});
delete state[oldDoc._id];
}
};
var handle = coll.find({}, {sort: {x: 1, y: 1}, limit: 3})
.observe(callbacks);
return {output: output, handle: handle, state: state};
};
var clearOutput = function (o) { o.output.splice(0, o.output.length); };
var ins = function (doc) {
var id; runInFence(function () { id = coll.insert(doc); });
return id;
};
var rem = function (id) {
runInFence(function () { coll.remove(id); });
};
// tests '_id' subfields for all documents in oplog buffer
var testOplogBufferIds = function (ids) {
var bufferIds = [];
o.handle._multiplexer._observeDriver._unpublishedBuffer.forEach(function (x, id) {
bufferIds.push(id);
});
test.isTrue(setsEqual(ids, bufferIds), "expected: " + ids + "; got: " + bufferIds);
};
var testSafeAppendToBufferFlag = function (expected) {
if (expected)
test.isTrue(o.handle._multiplexer._observeDriver._safeAppendToBuffer);
else
test.isFalse(o.handle._multiplexer._observeDriver._safeAppendToBuffer);
};
var ids = {};
_.each([2, 4, 1, 3, 5, 5, 9, 1, 3, 2, 5], function (x, i) {
ids[i] = ins({ x: x, y: i });
});
var o = observer();
var usesOplog = o.handle._multiplexer._observeDriver._usesOplog;
// x: [1 1 2 | 2 3 3] 4 5 5 5 9
// id: [2 7 0 | 9 3 8] 1 4 5 10 6
test.length(o.output, 3);
test.isTrue(setsEqual([{added: ids[2]}, {added: ids[7]}, {added: ids[0]}], o.output));
usesOplog && testOplogBufferIds([ids[9], ids[3], ids[8]]);
usesOplog && testSafeAppendToBufferFlag(false);
clearOutput(o);
rem(ids[0]);
// x: [1 1 2 | 3 3] 4 5 5 5 9
// id: [2 7 9 | 3 8] 1 4 5 10 6
test.length(o.output, 2);
test.isTrue(setsEqual([{removed: ids[0]}, {added: ids[9]}], o.output));
usesOplog && testOplogBufferIds([ids[3], ids[8]]);
usesOplog && testSafeAppendToBufferFlag(false);
clearOutput(o);
rem(ids[7]);
// x: [1 2 3 | 3] 4 5 5 5 9
// id: [2 9 3 | 8] 1 4 5 10 6
test.length(o.output, 2);
test.isTrue(setsEqual([{removed: ids[7]}, {added: ids[3]}], o.output));
usesOplog && testOplogBufferIds([ids[8]]);
usesOplog && testSafeAppendToBufferFlag(false);
clearOutput(o);
rem(ids[3]);
// x: [1 2 3 | 4 5 5] 5 9
// id: [2 9 8 | 1 4 5] 10 6
test.length(o.output, 2);
test.isTrue(setsEqual([{removed: ids[3]}, {added: ids[8]}], o.output));
usesOplog && testOplogBufferIds([ids[1], ids[4], ids[5]]);
usesOplog && testSafeAppendToBufferFlag(false);
clearOutput(o);
rem({ x: {$lt: 4} });
// x: [4 5 5 | 5 9]
// id: [1 4 5 | 10 6]
test.length(o.output, 6);
test.isTrue(setsEqual([{removed: ids[2]}, {removed: ids[9]}, {removed: ids[8]},
{added: ids[5]}, {added: ids[4]}, {added: ids[1]}], o.output));
usesOplog && testOplogBufferIds([ids[10], ids[6]]);
usesOplog && testSafeAppendToBufferFlag(true);
clearOutput(o);
onComplete();
});
}

View File

@@ -7,6 +7,20 @@ var PHASE = {
STEADY: "STEADY"
};
// Exception thrown by _needToPollQuery which unrolls the stack up to the
// enclosing call to finishIfNeedToPollQuery.
var SwitchedToQuery = function () {};
var finishIfNeedToPollQuery = function (f) {
return function () {
try {
f.apply(this, arguments);
} catch (e) {
if (!(e instanceof SwitchedToQuery))
throw e;
}
};
};
// OplogObserveDriver is an alternative to PollingObserveDriver which follows
// the Mongo operation log instead of just re-polling the query. It obeys the
// same simple interface: constructing it starts sending observeChanges
@@ -19,8 +33,44 @@ OplogObserveDriver = function (options) {
self._cursorDescription = options.cursorDescription;
self._mongoHandle = options.mongoHandle;
self._multiplexer = options.multiplexer;
if (options.ordered)
if (options.ordered) {
throw Error("OplogObserveDriver only supports unordered observeChanges");
}
var sortSpec = options.cursorDescription.options.sort;
var sorter = sortSpec && new Minimongo.Sorter(sortSpec);
// We don't support $near and other geo-queries so it's OK to initialize the
// comparator only once in the constructor.
var comparator = sorter && sorter.getComparator();
if (options.cursorDescription.options.limit) {
// There are several properties ordered driver implements:
// - _limit is a positive number
// - _comparator is a function-comparator by which the query is ordered
// - _unpublishedBuffer is non-null Min/Max Heap,
// the empty buffer in STEADY phase implies that the
// everything that matches the queries selector fits
// into published set.
// - _published - Min Heap (also implements IdMap methods)
var heapOptions = { IdMap: LocalCollection._IdMap };
self._limit = self._cursorDescription.options.limit;
self._comparator = comparator;
self._unpublishedBuffer = new MinMaxHeap(comparator, heapOptions);
// We need something that can find Max value in addition to IdMap interface
self._published = new MaxHeap(comparator, heapOptions);
} else {
self._limit = 0;
self._comparator = null;
self._unpublishedBuffer = null;
self._published = new LocalCollection._IdMap;
}
// Indicates if it is safe to insert a new document at the end of the buffer
// for this query. i.e. it is known that there are no documents matching the
// selector those are not in published or buffer.
self._safeAppendToBuffer = false;
self._stopped = false;
self._stopHandles = [];
@@ -30,7 +80,6 @@ OplogObserveDriver = function (options) {
self._registerPhaseChange(PHASE.QUERYING);
self._published = new LocalCollection._IdMap;
var selector = self._cursorDescription.selector;
self._matcher = options.matcher;
var projection = self._cursorDescription.options.fields || {};
@@ -38,6 +87,8 @@ OplogObserveDriver = function (options) {
// Projection function, result of combining important fields for selector and
// existing fields projection
self._sharedProjection = self._matcher.combineIntoProjection(projection);
if (sorter)
self._sharedProjection = sorter.combineIntoProjection(self._sharedProjection);
self._sharedProjectionFn = LocalCollection._compileProjection(
self._sharedProjection);
@@ -51,7 +102,7 @@ OplogObserveDriver = function (options) {
forEachTrigger(self._cursorDescription, function (trigger) {
self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry(
trigger, function (notification) {
Meteor._noYieldsAllowed(function () {
Meteor._noYieldsAllowed(finishIfNeedToPollQuery(function () {
var op = notification.op;
if (notification.dropCollection) {
// Note: this call is not allowed to block on anything (especially
@@ -65,7 +116,7 @@ OplogObserveDriver = function (options) {
else
self._handleOplogEntrySteadyOrFetching(op);
}
});
}));
}
));
});
@@ -101,55 +152,261 @@ OplogObserveDriver = function (options) {
// Give _observeChanges a chance to add the new ObserveHandle to our
// multiplexer, so that the added calls get streamed.
Meteor.defer(function () {
Meteor.defer(finishIfNeedToPollQuery(function () {
self._runInitialQuery();
});
}));
};
_.extend(OplogObserveDriver.prototype, {
_add: function (doc) {
_addPublished: function (id, doc) {
var self = this;
var id = doc._id;
var fields = _.clone(doc);
delete fields._id;
if (self._published.has(id))
throw Error("tried to add something already published " + id);
self._published.set(id, self._sharedProjectionFn(fields));
self._published.set(id, self._sharedProjectionFn(doc));
self._multiplexer.added(id, self._projectionFn(fields));
// After adding this document, the published set might be overflowed
// (exceeding capacity specified by limit). If so, push the maximum element
// to the buffer, we might want to save it in memory to reduce the amount of
// Mongo lookups in the future.
if (self._limit && self._published.size() > self._limit) {
// XXX in theory the size of published is no more than limit+1
if (self._published.size() !== self._limit + 1) {
throw new Error("After adding to published, " +
(self._published.size() - self._limit) +
" documents are overflowing the set");
}
var overflowingDocId = self._published.maxElementId();
var overflowingDoc = self._published.get(overflowingDocId);
if (EJSON.equals(overflowingDocId, id)) {
throw new Error("The document just added is overflowing the published set");
}
self._published.remove(overflowingDocId);
self._multiplexer.removed(overflowingDocId);
self._addBuffered(overflowingDocId, overflowingDoc);
}
},
_remove: function (id) {
_removePublished: function (id) {
var self = this;
if (!self._published.has(id))
throw Error("tried to remove something unpublished " + id);
self._published.remove(id);
self._multiplexer.removed(id);
},
_handleDoc: function (id, newDoc, mustMatchNow) {
var self = this;
newDoc = _.clone(newDoc);
if (! self._limit || self._published.size() === self._limit)
return;
var matchesNow = newDoc && self._matcher.documentMatches(newDoc).result;
if (mustMatchNow && !matchesNow) {
throw Error("expected " + EJSON.stringify(newDoc) + " to match "
+ EJSON.stringify(self._cursorDescription));
if (self._published.size() > self._limit)
throw Error("self._published got too big");
// OK, we are publishing less than the limit. Maybe we should look in the
// buffer to find the next element past what we were publishing before.
if (!self._unpublishedBuffer.empty()) {
// There's something in the buffer; move the first thing in it to
// _published.
var newDocId = self._unpublishedBuffer.minElementId();
var newDoc = self._unpublishedBuffer.get(newDocId);
self._removeBuffered(newDocId);
self._addPublished(newDocId, newDoc);
return;
}
var matchedBefore = self._published.has(id);
// There's nothing in the buffer. This could mean one of a few things.
if (matchesNow && !matchedBefore) {
self._add(newDoc);
} else if (matchedBefore && !matchesNow) {
self._remove(id);
} else if (matchesNow) {
// (a) We could be in the middle of re-running the query (specifically, we
// could be in _publishNewResults). In that case, _unpublishedBuffer is
// empty because we clear it at the beginning of _publishNewResults. In this
// case, our caller already knows the entire answer to the query and we
// don't need to do anything fancy here. Just return.
if (self._phase === PHASE.QUERYING)
return;
// (b) We're pretty confident that the union of _published and
// _unpublishedBuffer contain all documents that match selector. Because
// _unpublishedBuffer is empty, that means we're confident that _published
// contains all documents that match selector. So we have nothing to do.
if (self._safeAppendToBuffer)
return;
// (c) Maybe there are other documents out there that should be in our
// buffer. But in that case, when we emptied _unpublishedBuffer in
// _removeBuffered, we should have called _needToPollQuery, which will
// either put something in _unpublishedBuffer or set _safeAppendToBuffer (or
// both), and it will put us in QUERYING for that whole time. So in fact, we
// shouldn't be able to get here.
throw new Error("Buffer inexplicably empty");
},
_changePublished: function (id, oldDoc, newDoc) {
var self = this;
self._published.set(id, self._sharedProjectionFn(newDoc));
var changed = LocalCollection._makeChangedFields(_.clone(newDoc), oldDoc);
changed = self._projectionFn(changed);
if (!_.isEmpty(changed))
self._multiplexer.changed(id, changed);
},
_addBuffered: function (id, doc) {
var self = this;
self._unpublishedBuffer.set(id, self._sharedProjectionFn(doc));
// If something is overflowing the buffer, we just remove it from cache
if (self._unpublishedBuffer.size() > self._limit) {
var maxBufferedId = self._unpublishedBuffer.maxElementId();
self._unpublishedBuffer.remove(maxBufferedId);
// Since something matching is removed from cache (both published set and
// buffer), set flag to false
self._safeAppendToBuffer = false;
}
},
// Is called either to remove the doc completely from matching set or to move
// it to the published set later.
_removeBuffered: function (id) {
var self = this;
self._unpublishedBuffer.remove(id);
// To keep the contract "buffer is never empty in STEADY phase unless the
// everything matching fits into published" true, we poll everything as soon
// as we see the buffer becoming empty.
if (! self._unpublishedBuffer.size() && ! self._safeAppendToBuffer)
self._needToPollQuery();
},
// Called when a document has joined the "Matching" results set.
// Takes responsibility of keeping _unpublishedBuffer in sync with _published
// and the effect of limit enforced.
_addMatching: function (doc) {
var self = this;
var id = doc._id;
if (self._published.has(id))
throw Error("tried to add something already published " + id);
if (self._limit && self._unpublishedBuffer.has(id))
throw Error("tried to add something already existed in buffer " + id);
var limit = self._limit;
var comparator = self._comparator;
var maxPublished = (limit && self._published.size() > 0) ?
self._published.get(self._published.maxElementId()) : null;
var maxBuffered = (limit && self._unpublishedBuffer.size() > 0) ?
self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId()) : null;
// The query is unlimited or didn't publish enough documents yet or the new
// document would fit into published set pushing the maximum element out,
// then we need to publish the doc.
var toPublish = ! limit || self._published.size() < limit ||
comparator(doc, maxPublished) < 0;
// Otherwise we might need to buffer it (only in case of limited query).
// Buffering is allowed if the buffer is not filled up yet and all matching
// docs are either in the published set or in the buffer.
var canAppendToBuffer = !toPublish && self._safeAppendToBuffer &&
self._unpublishedBuffer.size() < limit;
// Or if it is small enough to be safely inserted to the middle or the
// beginning of the buffer.
var canInsertIntoBuffer = !toPublish && maxBuffered &&
comparator(doc, maxBuffered) <= 0;
var toBuffer = canAppendToBuffer || canInsertIntoBuffer;
if (toPublish) {
self._addPublished(id, doc);
} else if (toBuffer) {
self._addBuffered(id, doc);
} else {
// dropping it and not saving to the cache
self._safeAppendToBuffer = false;
}
},
// Called when a document leaves the "Matching" results set.
// Takes responsibility of keeping _unpublishedBuffer in sync with _published
// and the effect of limit enforced.
_removeMatching: function (id) {
var self = this;
if (! self._published.has(id) && ! self._limit)
throw Error("tried to remove something matching but not cached " + id);
if (self._published.has(id)) {
self._removePublished(id);
} else if (self._unpublishedBuffer.has(id)) {
self._removeBuffered(id);
}
},
_handleDoc: function (id, newDoc) {
var self = this;
var matchesNow = newDoc && self._matcher.documentMatches(newDoc).result;
var publishedBefore = self._published.has(id);
var bufferedBefore = self._limit && self._unpublishedBuffer.has(id);
var cachedBefore = publishedBefore || bufferedBefore;
if (matchesNow && !cachedBefore) {
self._addMatching(newDoc);
} else if (cachedBefore && !matchesNow) {
self._removeMatching(id);
} else if (cachedBefore && matchesNow) {
var oldDoc = self._published.get(id);
if (!oldDoc)
throw Error("thought that " + id + " was there!");
delete newDoc._id;
self._published.set(id, self._sharedProjectionFn(newDoc));
var changed = LocalCollection._makeChangedFields(_.clone(newDoc), oldDoc);
changed = self._projectionFn(changed);
if (!_.isEmpty(changed))
self._multiplexer.changed(id, changed);
var comparator = self._comparator;
var minBuffered = self._limit && self._unpublishedBuffer.size() &&
self._unpublishedBuffer.get(self._unpublishedBuffer.minElementId());
if (publishedBefore) {
// Unlimited case where the document stays in published once it matches
// or the case when we don't have enough matching docs to publish or the
// changed but matching doc will stay in published anyways.
// XXX: We rely on the emptiness of buffer. Be sure to maintain the fact
// that buffer can't be empty if there are matching documents not
// published. Notably, we don't want to schedule repoll and continue
// relying on this property.
var staysInPublished = ! self._limit ||
self._unpublishedBuffer.size() === 0 ||
comparator(newDoc, minBuffered) <= 0;
if (staysInPublished) {
self._changePublished(id, oldDoc, newDoc);
} else {
// after the change doc doesn't stay in the published, remove it
self._removePublished(id);
// but it can move into buffered now, check it
var maxBuffered = self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId());
var toBuffer = self._safeAppendToBuffer ||
(maxBuffered && comparator(newDoc, maxBuffered) <= 0);
if (toBuffer) {
self._addBuffered(id, newDoc);
} else {
// Throw away from both published set and buffer
self._safeAppendToBuffer = false;
}
}
} else if (bufferedBefore) {
oldDoc = self._unpublishedBuffer.get(id);
// remove the old version manually so we don't trigger the querying
// immediately
self._unpublishedBuffer.remove(id);
var maxPublished = self._published.get(self._published.maxElementId());
var maxBuffered = self._unpublishedBuffer.size() && self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId());
// the buffered doc was updated, it could move to published
var toPublish = comparator(newDoc, maxPublished) < 0;
// or stays in buffer even after the change
var staysInBuffer = (! toPublish && self._safeAppendToBuffer) ||
(!toPublish && maxBuffered && comparator(newDoc, maxBuffered) <= 0);
if (toPublish) {
self._addPublished(id, newDoc);
} else if (staysInBuffer) {
// stays in buffer but changes
self._unpublishedBuffer.set(id, newDoc);
} else {
// Throw away from both published set and buffer
self._safeAppendToBuffer = false;
}
} else {
throw new Error("cachedBefore implies either of publishedBefore or bufferedBefore is true.");
}
}
},
_fetchModifiedDocuments: function () {
@@ -157,7 +414,7 @@ _.extend(OplogObserveDriver.prototype, {
self._registerPhaseChange(PHASE.FETCHING);
// Defer, because nothing called from the oplog entry handler may yield, but
// fetch() yields.
Meteor.defer(function () {
Meteor.defer(finishIfNeedToPollQuery(function () {
while (!self._stopped && !self._needToFetch.empty()) {
if (self._phase !== PHASE.FETCHING)
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
@@ -174,36 +431,40 @@ _.extend(OplogObserveDriver.prototype, {
waiting++;
self._mongoHandle._docFetcher.fetch(
self._cursorDescription.collectionName, id, cacheKey,
function (err, doc) {
if (err) {
if (!anyError)
anyError = err;
} else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit
// _pollQuery call which should effectively cancel this round of
// fetches. (_pollQuery increments the generation.)
self._handleDoc(id, doc);
finishIfNeedToPollQuery(function (err, doc) {
try {
if (err) {
if (!anyError)
anyError = err;
} else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit
// _pollQuery call (eg, in another fiber) which should
// effectively cancel this round of fetches. (_pollQuery
// increments the generation.)
self._handleDoc(id, doc);
}
} finally {
waiting--;
// Because fetch() never calls its callback synchronously, this
// is safe (ie, we won't call fut.return() before the forEach is
// done).
if (waiting === 0)
fut.return();
}
waiting--;
// Because fetch() never calls its callback synchronously, this is
// safe (ie, we won't call fut.return() before the forEach is
// done).
if (waiting === 0)
fut.return();
});
}));
});
fut.wait();
// XXX do this even if we've switched to PHASE.QUERYING?
if (anyError)
throw anyError;
// Exit now if we've had a _pollQuery call.
// Exit now if we've had a _pollQuery call (here or in another fiber).
if (self._phase === PHASE.QUERYING)
return;
self._currentlyFetching = null;
}
self._beSteady();
});
}));
},
_beSteady: function () {
var self = this;
@@ -233,16 +494,18 @@ _.extend(OplogObserveDriver.prototype, {
}
if (op.op === 'd') {
if (self._published.has(id))
self._remove(id);
if (self._published.has(id) || (self._limit && self._unpublishedBuffer.has(id)))
self._removeMatching(id);
} else if (op.op === 'i') {
if (self._published.has(id))
throw new Error("insert found for already-existing ID");
throw new Error("insert found for already-existing ID in published");
if (self._unpublishedBuffer && self._unpublishedBuffer.has(id))
throw new Error("insert found for already-existing ID in buffer");
// XXX what if selector yields? for now it can't but later it could have
// $where
if (self._matcher.documentMatches(op.o).result)
self._add(op.o);
self._addMatching(op.o);
} else if (op.op === 'u') {
// Is this a modifier ($set/$unset, which may require us to poll the
// database to figure out if the whole document matches the selector) or a
@@ -256,12 +519,19 @@ _.extend(OplogObserveDriver.prototype, {
var canDirectlyModifyDoc =
!isReplace && modifierCanBeDirectlyApplied(op.o);
var publishedBefore = self._published.has(id);
var bufferedBefore = self._limit && self._unpublishedBuffer.has(id);
if (isReplace) {
self._handleDoc(id, _.extend({_id: id}, op.o));
} else if (self._published.has(id) && canDirectlyModifyDoc) {
} else if ((publishedBefore || bufferedBefore) && canDirectlyModifyDoc) {
// Oh great, we actually know what the document is, so we can apply
// this directly.
var newDoc = EJSON.clone(self._published.get(id));
var newDoc = self._published.has(id) ?
self._published.get(id) :
self._unpublishedBuffer.get(id);
newDoc = EJSON.clone(newDoc);
newDoc._id = id;
LocalCollection._modify(newDoc, op.o);
self._handleDoc(id, self._sharedProjectionFn(newDoc));
@@ -280,10 +550,8 @@ _.extend(OplogObserveDriver.prototype, {
if (self._stopped)
throw new Error("oplog stopped surprisingly early");
var initialCursor = self._cursorForQuery();
initialCursor.forEach(function (initialDoc) {
self._add(initialDoc);
});
self._runQuery();
if (self._stopped)
throw new Error("oplog stopped quite early");
// Allow observeChanges calls to return. (After this, it's possible for
@@ -319,27 +587,48 @@ _.extend(OplogObserveDriver.prototype, {
++self._fetchGeneration; // ignore any in-flight fetches
self._registerPhaseChange(PHASE.QUERYING);
// Defer so that we don't block.
// Defer so that we don't block. We don't need finishIfNeedToPollQuery here
// because SwitchedToQuery is not called in QUERYING mode.
Meteor.defer(function () {
// subtle note: _published does not contain _id fields, but newResults
// does
var newResults = new LocalCollection._IdMap;
var cursor = self._cursorForQuery();
cursor.forEach(function (doc) {
newResults.set(doc._id, doc);
});
self._publishNewResults(newResults);
self._runQuery();
self._doneQuerying();
});
},
_runQuery: function () {
var self = this;
var newResults = new LocalCollection._IdMap;
var newBuffer = new LocalCollection._IdMap;
// Query 2x documents as the half excluded from the original query will go
// into unpublished buffer to reduce additional Mongo lookups in cases when
// documents are removed from the published set and need a replacement.
// XXX needs more thought on non-zero skip
// XXX 2 is a "magic number" meaning there is an extra chunk of docs for
// buffer if such is needed.
var cursor = self._cursorForQuery({ limit: self._limit * 2 });
cursor.forEach(function (doc, i) {
if (!self._limit || i < self._limit)
newResults.set(doc._id, doc);
else
newBuffer.set(doc._id, doc);
});
self._publishNewResults(newResults, newBuffer);
},
// Transitions to QUERYING and runs another query, or (if already in QUERYING)
// ensures that we will query again later.
//
// This function may not block, because it is called from an oplog entry
// handler.
// handler. However, if we were not already in the QUERYING phase, it throws
// an exception that is caught by the closest surrounding
// finishIfNeedToPollQuery call; this ensures that we don't continue running
// close that was designed for another phase inside PHASE.QUERYING.
//
// (It's also necessary whenever logic in this file yields to check that other
// phases haven't put us into QUERYING mode, though; eg,
// _fetchModifiedDocuments does this.)
_needToPollQuery: function () {
var self = this;
if (self._stopped)
@@ -349,7 +638,7 @@ _.extend(OplogObserveDriver.prototype, {
// pausing FETCHING).
if (self._phase !== PHASE.QUERYING) {
self._pollQuery();
return;
throw new SwitchedToQuery;
}
// We're currently in QUERYING. Set a flag to ensure that we run another
@@ -379,7 +668,7 @@ _.extend(OplogObserveDriver.prototype, {
}
},
_cursorForQuery: function () {
_cursorForQuery: function (optionsOverwrite) {
var self = this;
// The query we run is almost the same as the cursor we are observing, with
@@ -388,6 +677,11 @@ _.extend(OplogObserveDriver.prototype, {
// "shared" projection). And we don't want to apply any transform in the
// cursor, because observeChanges shouldn't use the transform.
var options = _.clone(self._cursorDescription.options);
// Allow the caller to modify the options. Useful to specify different skip
// and limit values.
_.extend(options, optionsOverwrite);
options.fields = self._sharedProjection;
delete options.transform;
// We are NOT deep cloning fields or selector here, which should be OK.
@@ -401,13 +695,20 @@ _.extend(OplogObserveDriver.prototype, {
// Replace self._published with newResults (both are IdMaps), invoking observe
// callbacks on the multiplexer.
// Replace self._unpublishedBuffer with newBuffer.
//
// XXX This is very similar to LocalCollection._diffQueryUnorderedChanges. We
// should really: (a) Unify IdMap and OrderedDict into Unordered/OrderedDict (b)
// Rewrite diff.js to use these classes instead of arrays and objects.
_publishNewResults: function (newResults) {
_publishNewResults: function (newResults, newBuffer) {
var self = this;
// If the query is limited and there is a buffer, shut down so it doesn't
// stay in a way.
if (self._limit) {
self._unpublishedBuffer.clear();
}
// First remove anything that's gone. Be careful not to modify
// self._published while iterating over it.
var idsToRemove = [];
@@ -416,15 +717,33 @@ _.extend(OplogObserveDriver.prototype, {
idsToRemove.push(id);
});
_.each(idsToRemove, function (id) {
self._remove(id);
self._removePublished(id);
});
// Now do adds and changes.
// If self has a buffer and limit, the new fetched result will be
// limited correctly as the query has sort specifier.
newResults.forEach(function (doc, id) {
// "true" here means to throw if we think this doc doesn't match the
// selector.
self._handleDoc(id, doc, true);
self._handleDoc(id, doc);
});
// Sanity-check that everything we tried to put into _published ended up
// there.
// XXX if this is slow, remove it later
if (self._published.size() !== newResults.size()) {
throw Error("failed to copy newResults into _published!");
}
self._published.forEach(function (doc, id) {
if (!newResults.has(id))
throw Error("_published has a doc that newResults doesn't; " + id);
});
// Finally, replace the buffer
newBuffer.forEach(function (doc, id) {
self._addBuffered(id, doc);
});
self._safeAppendToBuffer = newBuffer.size() < self._limit;
},
// This stop function is invoked from the onStop of the ObserveMultiplexer, so
@@ -451,6 +770,7 @@ _.extend(OplogObserveDriver.prototype, {
// Proactively drop references to potentially big things.
self._published = null;
self._unpublishedBuffer = null;
self._needToFetch = null;
self._currentlyFetching = null;
self._oplogEntryHandle = null;
@@ -486,10 +806,11 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) {
if (options._disableOplog)
return false;
// This option (which are mostly used for sorted cursors) require us to figure
// out where a given document fits in an order to know if it's included or
// not, and we don't track that information when doing oplog tailing.
if (options.limit || options.skip) return false;
// skip is not supported: to support it we would need to keep track of all
// "skipped" documents or at least their ids.
// limit w/o a sort specifier is not supported: current implementation needs a
// deterministic way to order documents.
if (options.skip || (options.limit && !options.sort)) return false;
// If a fields projection option is given check if it is supported by
// minimongo (some operators are not supported).
@@ -509,7 +830,9 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) {
// as Mongo, and can yield!)
// - $near (has "interesting" properties in MongoDB, like the possibility
// of returning an ID multiple times, though even polling maybe
// have a bug there
// have a bug there)
// XXX: once we support it, we would need to think more on how we
// initialize the comparators when we create the driver.
return !matcher.hasWhere() && !matcher.hasGeoQuery();
};

View File

@@ -1,6 +1,6 @@
var Future = Npm.require('fibers/future');
var OPLOG_COLLECTION = 'oplog.rs';
OPLOG_COLLECTION = 'oplog.rs';
var REPLSET_COLLECTION = 'system.replset';
// Like Perl's quotemeta: quotes all regexp metacharacters. See

View File

@@ -4,8 +4,8 @@ Tinytest.add("mongo-livedata - oplog - cursorSupported", function (test) {
var oplogEnabled =
!!MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle;
var supported = function (expected, selector) {
var cursor = OplogCollection.find(selector);
var supported = function (expected, selector, options) {
var cursor = OplogCollection.find(selector, options);
var handle = cursor.observeChanges({added: function () {}});
// If there's no oplog at all, we shouldn't ever use it.
if (!oplogEnabled)
@@ -44,4 +44,10 @@ Tinytest.add("mongo-livedata - oplog - cursorSupported", function (test) {
// Nothing Minimongo doesn't understand. (Minimongo happens to fail to
// implement $elemMatch inside $all which MongoDB supports.)
supported(false, {x: {$all: [{$elemMatch: {y: 2}}]}});
supported(true, {}, { sort: {x:1} });
supported(true, {}, { sort: {x:1}, limit: 5 });
supported(false, {}, { limit: 5 });
supported(false, {}, { skip: 2, limit: 5 });
supported(false, {}, { skip: 2 });
});

View File

@@ -24,6 +24,10 @@ Package.on_use(function (api) {
['client', 'server']);
api.use('check', ['client', 'server']);
// Binary Heap data structure is used to optimize oplog observe driver
// performance.
api.use('binary-heap', 'server');
// Allow us to detect 'insecure'.
api.use('insecure', {weak: true});

View File

@@ -57,7 +57,7 @@ _.extend(Retry.prototype, {
var timeout = self._timeout(count);
if (self.retryTimer)
clearTimeout(self.retryTimer);
self.retryTimer = setTimeout(fn, timeout);
self.retryTimer = Meteor.setTimeout(fn, timeout);
return timeout;
}

View File

@@ -24,6 +24,13 @@ var REQUEST_TIMEOUT = 15*1000;
var MAX_BUFFER = 5*1024*1024; // 5MB
WebApp.connectHandlers.use(function (req, res, next) {
// _escaped_fragment_ comes from Google's AJAX crawling spec:
// https://developers.google.com/webmasters/ajax-crawling/docs/specification
// This spec was designed during the brief era where using "#!" URLs was
// common, so it mostly describes how to translate "#!" URLs into
// _escaped_fragment_ URLs. Since then, "#!" URLs have gone out of style, but
// the <meta name="fragment" content="!"> (see spiderable.html) approach also
// described in the spec is still common and used by several crawlers.
if (/\?.*_escaped_fragment_=/.test(req.url) ||
_.any(Spiderable.userAgentRegExps, function (re) {
return re.test(req.headers['user-agent']); })) {

View File

@@ -26,3 +26,10 @@ Meteor.methods({
return null;
}
});
// provide some notification we're started. This is to allow use
// in automated scripts with `meteor run --once` which does not
// print when the proxy is listening.
Meteor.startup(function () {
Meteor._debug("test-in-console listening");
});

View File

@@ -1,7 +1,4 @@
=> Meteor 0.7.1.1: Extend oplog tailing driver to support most common
MongoDB queries. Introduce Meteor developer accounts, a new way of
managing your meteor.com deployed sites. When you use `meteor
deploy`, you will be prompted to create a developer account.
=> Meteor 0.7.1.2: Fix crash on OSX machines with no hostname set.
This release is being downloaded in the background. Update your
project to Meteor 0.7.1.1 by running 'meteor update'.
project to Meteor 0.7.1.2 by running 'meteor update'.

View File

@@ -88,6 +88,9 @@
"http://jquery.com/upgrade-guide/1.9/"]
}
},
{
"release": "0.7.1.2"
},
{
"release": "NEXT"
}

View File

@@ -107,9 +107,11 @@ npm install kexec@0.2.0
npm install source-map@0.1.32
npm install source-map-support@0.2.5
npm install bcrypt@0.7.7
npm install http-proxy@1.0.2
npm install heapdump@0.2.5
# Fork of 1.0.2 with https://github.com/nodejitsu/node-http-proxy/pull/592
npm install https://github.com/meteor/node-http-proxy/tarball/99f757251b42aeb5d26535a7363c96804ee057f0
# Using the unreleased 1.1 branch. We can probably switch to a built NPM version
# when it gets released.
npm install https://github.com/ariya/esprima/tarball/5044b87f94fb802d9609f1426c838874ec2007b3

View File

@@ -78,7 +78,7 @@ var ServiceConnection = function (galaxy, service) {
// from the hostname of endpointUrl, and run the login command for
// that galaxy.
if (! authToken)
throw new Error("not logged in to galaxy?")
throw new Error("not logged in to galaxy?");
self.connection = Package.livedata.DDP.connect(endpointUrl, {
headers: {
@@ -117,10 +117,12 @@ _.extend(ServiceConnection.prototype, {
var args = _.toArray(arguments);
var name = args.shift();
self.connection.apply(name, args, function (err, result) {
if (err)
if (err) {
fut['throw'](err);
else
} else {
self._cleanUpTimer();
fut['return'](result);
}
});
return fut.wait();
@@ -141,6 +143,7 @@ _.extend(ServiceConnection.prototype, {
args.push({
onReady: function () {
ready = true;
self._cleanUpTimer();
fut['return']();
},
onError: function (e) {
@@ -151,8 +154,16 @@ _.extend(ServiceConnection.prototype, {
}
});
self.connection.subscribe.apply(self.connection, args);
return fut.wait();
var sub = self.connection.subscribe.apply(self.connection, args);
fut.wait();
return sub;
},
_cleanUpTimer: function () {
var self = this;
var Package = getPackage();
Package.meteor.Meteor.clearTimeout(self.connectionTimer);
self.connectionTimer = null;
},
close: function () {
@@ -163,9 +174,7 @@ _.extend(ServiceConnection.prototype, {
}
if (self.connectionTimer) {
// Clean up the timer so that Node can exit cleanly
var Package = getPackage();
Package.meteor.Meteor.clearTimeout(self.connectionTimer);
self.connectionTimer = null;
self._cleanUpTimer();
}
}
});
@@ -456,17 +465,6 @@ exports.logs = function (options) {
throw new Error("Can't listen to messages on the logs collection");
var logsSubscription = null;
// In case of reconnect recover the state so user sees only new logs
logReader.connection.onReconnect = function () {
logsSubscription && logsSubscription.stop();
var opts = { streaming: options.streaming };
if (lastLogId)
opts.resumeAfterId = lastLogId;
// XXX correctly handle errors on resubscribe
logsSubscription = logReader.subscribeAndWait("logsForApp",
options.app, opts);
};
try {
logsSubscription =
logReader.subscribeAndWait("logsForApp", options.app,
@@ -477,6 +475,29 @@ exports.logs = function (options) {
});
}
// In case of reconnect recover the state so user sees only new logs.
// Only set up the onReconnect handler after the subscribe and wait
// has returned; if we set it up before, then we'll end up with two
// subscriptions, because the onReconnect handler will run for the
// first time before the subscribeAndWait returns.
logReader.connection.onReconnect = function () {
logsSubscription && logsSubscription.stop();
var opts = { streaming: options.streaming };
if (lastLogId)
opts.resumeAfterId = lastLogId;
// Don't use subscribeAndWait here; it'll deadlock. We can't
// process the sub messages until `onReconnect` returns, and
// `onReconnect` won't return unless the sub messages have been
// processed. There's no reason we need to wait for the sub to be
// ready here anyway.
// XXX correctly handle errors on resubscribe
logsSubscription = logReader.connection.subscribe(
"logsForApp",
options.app,
opts
);
};
return options.streaming ? null : 0;
} finally {
// If not streaming, close the connection to log-reader so that

View File

@@ -55,6 +55,7 @@ selftest.define("run", function () {
run.waitSecs(5);
run.match("restarted (x2)"); // see that restart counter reset
s.write("crash.js", "process.kill(process.pid, 'SIGKILL');");
run.waitSecs(5);
run.match("from signal: SIGKILL");
run.waitSecs(5);
run.match("is crashing");
@@ -82,6 +83,7 @@ selftest.define("run", function () {
" fs.writeFileSync(crashmark);\n" +
" process.exit(137);\n" +
"}\n");
run.waitSecs(5);
run.match("with code: 137");
run.match("restarted");
run.stop();

View File

@@ -191,7 +191,7 @@ WatchSet.fromJSON = function (json) {
set.files = _.clone(json.files);
var reFromJSON = function (j) {
if (j.$regex)
if (_.has(j, '$regex'))
return new RegExp(j.$regex, j.$options);
return new RegExp(j);
};