keep observeChanges return a query handler and implement observeChangesAsync

This commit is contained in:
Nacho Codoñer
2024-02-21 17:14:41 +01:00
parent 46339a919d
commit 7ae8141f16
6 changed files with 29 additions and 8 deletions

View File

@@ -999,7 +999,7 @@ export class AccountsServer extends AccountsCommon {
const observe = await this.users.find({
_id: userId,
'services.resume.loginTokens.hashedToken': newToken
}, { fields: { _id: 1 } }).observeChanges({
}, { fields: { _id: 1 } }).observeChangesAsync({
added: () => {
foundMatchingUser = true;
},
@@ -1860,4 +1860,3 @@ const generateCasePermutationsForString = string => {
}
return permutations;
}

View File

@@ -373,6 +373,13 @@ export default class Cursor {
return handle;
}
observeChangesAsync(options) {
return new Promise((resolve) => {
const handle = this.observeChanges(options);
handle.isReadyPromise.then(() => resolve(handle));
});
}
// XXX Maybe we need a version of observe that just calls a callback if
// anything changed.
_depend(changers, _allow_unordered) {

View File

@@ -565,7 +565,7 @@ Object.assign(Mongo.Collection.prototype, {
Object.assign(Mongo.Collection, {
async _publishCursor(cursor, sub, collection) {
var observeHandle = await cursor.observeChanges(
var observeHandle = await cursor.observeChangesAsync(
{
added: function(id, fields) {
sub.added(collection, id, fields);

View File

@@ -965,6 +965,13 @@ Cursor.prototype.observeChanges = function (callbacks, options = {}) {
self._cursorDescription, ordered, callbacks, options.nonMutatingCallbacks);
};
Cursor.prototype.observeChangesAsync = async function (callbacks, options = {}) {
var self = this;
var handler = self.observeChanges(callbacks, options);
await handler.ready();
return handler;
};
MongoConnection.prototype._createSynchronousCursor = function(
cursorDescription, options) {
var self = this;
@@ -1436,7 +1443,7 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeo
};
Object.assign(MongoConnection.prototype, {
_observeChanges: async function (
_observeChanges: function (
cursorDescription, ordered, callbacks, nonMutatingCallbacks) {
var self = this;
@@ -1531,7 +1538,7 @@ Object.assign(MongoConnection.prototype, {
});
if (observeDriver._init) {
await observeDriver._init();
observeHandle.initObserver = observeDriver._init();
}
// This field is only set for use in tests.
@@ -1539,7 +1546,7 @@ Object.assign(MongoConnection.prototype, {
}
self._observeMultiplexers[observeKey] = multiplexer;
// Blocks until the initial adds have been sent.
await multiplexer.addHandleAndSendInitialAdds(observeHandle);
observeHandle.initHandler = multiplexer.addHandleAndSendInitialAdds(observeHandle);
return observeHandle;
},

View File

@@ -222,11 +222,19 @@ ObserveHandle = class {
this._stopped = false;
this._id = nextObserveHandleId++;
this.nonMutatingCallbacks = nonMutatingCallbacks;
this.initObserver = undefined;
this.initHandler = undefined;
}
async ready() {
if (this.initObserver) await this.initObserver;
if (this.initHandler) await this.initHandler;
}
async stop() {
if (this._stopped) return;
this._stopped = true;
await this.ready();
await this._multiplexer.removeHandle(this._id);
}
};

View File

@@ -8,7 +8,7 @@ Tinytest.addAsync('mongo-livedata - oplog - cursorSupported', async function(
var supported = async function(expected, selector, options) {
var cursor = OplogCollection.find(selector, options);
var handle = await cursor.observeChanges({ added: function() {} });
var handle = await cursor.observeChangesAsync({ added: function() {} });
// If there's no oplog at all, we shouldn't ever use it.
if (!oplogEnabled) expected = false;
test.equal(!!handle._multiplexer._observeDriver._usesOplog, expected);
@@ -125,7 +125,7 @@ process.env.MONGO_OPLOG_URL &&
species: 'dog',
color: 'blue',
})
.observeChanges({
.observeChangesAsync({
added(id, fields) {
if (fields.name === 'dog 5') {
blueDog5Id = id;