mirror of
https://github.com/farcasterxyz/hub-monorepo.git
synced 2026-04-18 03:00:22 -04:00
[WIP] perf: Prevent un-necessary network calls when syncing (#1096)
* fix: Strip out any syncIDs we already have * changeset * optimization * test
This commit is contained in:
5
.changeset/forty-onions-grin.md
Normal file
5
.changeset/forty-onions-grin.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@farcaster/hubble": patch
|
||||
---
|
||||
|
||||
fix: Prevent unnecessary sync and log messages by filtering out SyncIDs our node already has
|
||||
@@ -162,8 +162,6 @@ class MerkleTrie {
|
||||
|
||||
/**
|
||||
* Check if the SyncId exists in the trie.
|
||||
*
|
||||
* Note: This method is only used in tests and benchmarks, and should not be needed in production.
|
||||
*/
|
||||
public async exists(id: SyncId): Promise<boolean> {
|
||||
return new Promise((resolve) => {
|
||||
@@ -178,6 +176,22 @@ class MerkleTrie {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if we already have this syncID (expressed as bytes)
|
||||
*/
|
||||
public async existsByBytes(id: Uint8Array): Promise<boolean> {
|
||||
return new Promise((resolve) => {
|
||||
this._lock.readLock(async (release) => {
|
||||
const r = await this._root.exists(id, this._db);
|
||||
|
||||
await this._unloadFromMemory(false);
|
||||
|
||||
resolve(r);
|
||||
release();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a snapshot of the trie at a given prefix.
|
||||
*/
|
||||
|
||||
@@ -303,6 +303,33 @@ describe("Multi peer sync engine", () => {
|
||||
await engine2.stop();
|
||||
});
|
||||
|
||||
test("shouldn't fetch messages that already exist", async () => {
|
||||
// Engine1 has 1 message
|
||||
await engine1.mergeIdRegistryEvent(custodyEvent);
|
||||
await engine1.mergeMessage(signerAdd);
|
||||
|
||||
const engine2 = new Engine(testDb2, network);
|
||||
const hub2 = new MockHub(testDb2, engine2);
|
||||
const syncEngine2 = new SyncEngine(hub2, testDb2);
|
||||
|
||||
// Engine2 has 2 messages
|
||||
await engine2.mergeIdRegistryEvent(custodyEvent);
|
||||
await engine2.mergeMessage(signerAdd);
|
||||
await addMessagesWithTimeDelta(engine2, [167]);
|
||||
|
||||
// Syncing engine2 --> engine1 should not fetch any additional messages, since engine2 already
|
||||
// has all the messages
|
||||
{
|
||||
const fetchMessagesSpy = jest.spyOn(syncEngine1, "getAllMessagesBySyncIds");
|
||||
await syncEngine2.performSync("engine1", (await syncEngine1.getSnapshot())._unsafeUnwrap(), clientForServer1);
|
||||
|
||||
expect(fetchMessagesSpy).not.toHaveBeenCalled();
|
||||
}
|
||||
|
||||
await syncEngine2.stop();
|
||||
await engine2.stop();
|
||||
});
|
||||
|
||||
test("retries the id registry event if it is missing", async () => {
|
||||
await engine1.mergeIdRegistryEvent(custodyEvent);
|
||||
await engine1.mergeMessage(signerAdd);
|
||||
|
||||
@@ -650,7 +650,17 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
|
||||
if (result.isErr()) {
|
||||
log.warn(result.error, `Error fetching ids for prefix ${theirNode.prefix}`);
|
||||
} else {
|
||||
await onMissingHashes(result.value.syncIds);
|
||||
// Strip out all syncIds that we already have. This can happen if our node has more messages than the other
|
||||
// hub at this node.
|
||||
// Note that we can optimize this check for the common case of a single missing syncId, since the diff
|
||||
// algorithm will drill down right to the missing syncId.
|
||||
let missingHashes = result.value.syncIds;
|
||||
if (result.value.syncIds.length === 1) {
|
||||
if (await this._trie.existsByBytes(missingHashes[0] as Uint8Array)) {
|
||||
missingHashes = [];
|
||||
}
|
||||
}
|
||||
await onMissingHashes(missingHashes);
|
||||
}
|
||||
} else if (theirNode.children) {
|
||||
const promises = [];
|
||||
|
||||
Reference in New Issue
Block a user