diff --git a/.changeset/smart-glasses-confess.md b/.changeset/smart-glasses-confess.md new file mode 100644 index 00000000..bd5bfdab --- /dev/null +++ b/.changeset/smart-glasses-confess.md @@ -0,0 +1,6 @@ +--- +'@farcaster/core': patch +'@farcaster/hubble': patch +--- + +Reject prunable messages on merge diff --git a/apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts b/apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts index 07a8996e..6824bec0 100644 --- a/apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts +++ b/apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts @@ -22,6 +22,7 @@ import { sleep, sleepWhile } from '~/utils/crypto'; import { EthEventsProvider } from '~/eth/ethEventsProvider'; import { Contract } from 'ethers'; import { IdRegistry, NameRegistry } from '~/eth/abis'; +import { getFarcasterTime } from '@farcaster/core'; /* eslint-disable security/detect-non-literal-fs-filename */ @@ -51,11 +52,12 @@ beforeAll(async () => { }); describe('Multi peer sync engine', () => { - const addMessagesWithTimestamps = async (engine: Engine, timestamps: number[]) => { + const addMessagesWithTimeDelta = async (engine: Engine, timeDelta: number[]) => { return await Promise.all( - timestamps.map(async (t) => { + timeDelta.map(async (t) => { + const farcasterTime = getFarcasterTime()._unsafeUnwrap(); const cast = await Factories.CastAddMessage.create( - { data: { fid, network, timestamp: t } }, + { data: { fid, network, timestamp: farcasterTime + t } }, { transient: { signer } } ); @@ -156,7 +158,7 @@ describe('Multi peer sync engine', () => { await engine1.mergeMessage(signerAdd); // Add messages to engine 1 - await addMessagesWithTimestamps(engine1, [30662167, 30662169, 30662172]); + await addMessagesWithTimeDelta(engine1, [167, 169, 172]); await sleepWhile(() => syncEngine1.syncTrieQSize > 0, 1000); const engine2 = new Engine(testDb2, network); @@ -185,7 +187,7 @@ describe('Multi peer sync engine', () => { ).toBeFalsy(); // Add more messages - await addMessagesWithTimestamps(engine1, [30663167, 30663169, 30663172]); + await addMessagesWithTimeDelta(engine1, [367, 369, 372]); await sleepWhile(() => syncEngine1.syncTrieQSize > 0, 1000); // grab a new snapshot from the RPC for engine1 @@ -220,7 +222,7 @@ describe('Multi peer sync engine', () => { await engine1.mergeMessage(signerAdd); // Add a cast to engine1 - const castAdd = (await addMessagesWithTimestamps(engine1, [30662167]))[0] as Message; + const castAdd = (await addMessagesWithTimeDelta(engine1, [167]))[0] as Message; await sleepWhile(() => syncEngine1.syncTrieQSize > 0, 1000); const engine2 = new Engine(testDb2, network); @@ -308,7 +310,7 @@ describe('Multi peer sync engine', () => { await engine1.mergeMessage(signerAdd); // Add a cast to engine1 - await addMessagesWithTimestamps(engine1, [30662167]); + await addMessagesWithTimeDelta(engine1, [167]); // Do not merge the custory event into engine2 const engine2 = new Engine(testDb2, network); @@ -408,8 +410,8 @@ describe('Multi peer sync engine', () => { expect(await syncEngine1.trie.rootHash()).toEqual(await syncEngine2.trie.rootHash()); // Add two different messages to engine1 and engine2 - await addMessagesWithTimestamps(engine1, [30662167]); - await addMessagesWithTimestamps(engine2, [30662169]); + await addMessagesWithTimeDelta(engine1, [167]); + await addMessagesWithTimeDelta(engine2, [169]); await sleepWhile(async () => (await syncEngine1.trie.items()) !== 2, 1000); await sleepWhile(async () => (await syncEngine2.trie.items()) !== 2, 1000); @@ -456,8 +458,8 @@ describe('Multi peer sync engine', () => { expect(await syncEngine1.trie.rootHash()).toEqual(await syncEngine2.trie.rootHash()); // Add two different messages to engine1 and engine2 - await addMessagesWithTimestamps(engine1, [30662167, 30662168]); - await addMessagesWithTimestamps(engine2, [30662169]); + await addMessagesWithTimeDelta(engine1, [167, 168]); + await addMessagesWithTimeDelta(engine2, [169]); await sleepWhile(() => syncEngine1.syncTrieQSize > 0, 1000); await sleepWhile(() => syncEngine2.syncTrieQSize > 0, 1000); @@ -534,7 +536,7 @@ describe('Multi peer sync engine', () => { timestamps.push(msgTimestamp + j); } // console.log('adding batch', i, ' of ', numBatches); - const addedMessages = await addMessagesWithTimestamps(engine1, timestamps); + const addedMessages = await addMessagesWithTimeDelta(engine1, timestamps); await sleepWhile(() => syncEngine1.syncTrieQSize > 0, 1000); castMessagesToRemove = addedMessages.slice(0, 10); diff --git a/apps/hubble/src/network/sync/syncEngine.test.ts b/apps/hubble/src/network/sync/syncEngine.test.ts index 4f9f7e68..24e3e3ce 100644 --- a/apps/hubble/src/network/sync/syncEngine.test.ts +++ b/apps/hubble/src/network/sync/syncEngine.test.ts @@ -65,11 +65,12 @@ describe('SyncEngine', () => { await engine.stop(); }); - const addMessagesWithTimestamps = async (timestamps: number[]) => { + const addMessagesWithTimestamps = async (timeDelta: number[]) => { const results = await Promise.all( - timestamps.map(async (t) => { + timeDelta.map(async (t) => { + const farcasterTime = getFarcasterTime()._unsafeUnwrap(); const cast = await Factories.CastAddMessage.create( - { data: { fid, network, timestamp: t } }, + { data: { fid, network, timestamp: farcasterTime + t } }, { transient: { signer } } ); @@ -168,19 +169,21 @@ describe('SyncEngine', () => { const rsigneradd = await engine.mergeMessage(signerAdd); expect(rsigneradd.isOk()).toBeTruthy(); + const currentTime = getFarcasterTime()._unsafeUnwrap(); + // Reaction const reactionBody = { targetCastId: { fid, hash: castAdd.hash }, type: ReactionType.LIKE, }; const reaction1 = await Factories.ReactionAddMessage.create( - { data: { fid, network, timestamp: 30662167, reactionBody } }, + { data: { fid, network, timestamp: currentTime + 10, reactionBody } }, { transient: { signer } } ); // Same reaction, but with different timestamp const reaction2 = await Factories.ReactionAddMessage.create( - { data: { fid, network, timestamp: 30662168, reactionBody } }, + { data: { fid, network, timestamp: currentTime + 15, reactionBody } }, { transient: { signer } } ); @@ -280,7 +283,7 @@ describe('SyncEngine', () => { await engine.mergeIdRegistryEvent(custodyEvent); await engine.mergeMessage(signerAdd); - await addMessagesWithTimestamps([30662167, 30662169, 30662172]); + await addMessagesWithTimestamps([167, 169, 172]); expect( (await syncEngine.syncStatus('test', (await syncEngine.getSnapshot())._unsafeUnwrap()))._unsafeUnwrap().shouldSync ).toBeFalsy(); @@ -290,9 +293,9 @@ describe('SyncEngine', () => { await engine.mergeIdRegistryEvent(custodyEvent); await engine.mergeMessage(signerAdd); - await addMessagesWithTimestamps([30662167, 30662169, 30662172]); + await addMessagesWithTimestamps([167, 169, 172]); const oldSnapshot = (await syncEngine.getSnapshot())._unsafeUnwrap(); - await addMessagesWithTimestamps([30662372]); + await addMessagesWithTimestamps([372]); expect(oldSnapshot.excludedHashes).not.toEqual((await syncEngine.getSnapshot())._unsafeUnwrap().excludedHashes); expect((await syncEngine.syncStatus('test', oldSnapshot))._unsafeUnwrap().shouldSync).toBeTruthy(); }); @@ -327,7 +330,7 @@ describe('SyncEngine', () => { await engine.mergeNameRegistryEvent(Factories.NameRegistryEvent.build()); await engine.mergeNameRegistryEvent(Factories.NameRegistryEvent.build()); await engine.mergeMessage(signerAdd); - await addMessagesWithTimestamps([30662167, 30662169]); + await addMessagesWithTimestamps([167, 169]); const stats = await syncEngine.getSyncStats(); expect(stats.numFids).toEqual(1); @@ -339,7 +342,7 @@ describe('SyncEngine', () => { await engine.mergeIdRegistryEvent(custodyEvent); await engine.mergeMessage(signerAdd); - const messages = await addMessagesWithTimestamps([30662167, 30662169, 30662172]); + const messages = await addMessagesWithTimestamps([167, 169, 172]); expect(await syncEngine.trie.items()).toEqual(4); // signerAdd + 3 messages @@ -360,7 +363,7 @@ describe('SyncEngine', () => { await engine.mergeIdRegistryEvent(custodyEvent); await engine.mergeMessage(signerAdd); - const messages = await addMessagesWithTimestamps([30662167, 30662169, 30662172]); + const messages = await addMessagesWithTimestamps([167, 169, 172]); expect(await syncEngine.trie.items()).toEqual(4); // signerAdd + 3 messages @@ -381,13 +384,14 @@ describe('SyncEngine', () => { await engine.mergeIdRegistryEvent(custodyEvent); await engine.mergeMessage(signerAdd); - await addMessagesWithTimestamps([30662160, 30662169, 30662172]); const nowOrig = Date.now; - Date.now = () => 1609459200000 + 30662167 * 1000; + Date.now = () => 1683074200000; try { + await addMessagesWithTimestamps([160, 169, 172]); + Date.now = () => 1683074200000 + 167 * 1000; const result = await syncEngine.getSnapshot(); const snapshot = result._unsafeUnwrap(); - expect((snapshot.prefix as Buffer).toString('utf8')).toEqual('0030662160'); + expect((snapshot.prefix as Buffer).toString('utf8')).toEqual('0073615160'); } finally { Date.now = nowOrig; } diff --git a/apps/hubble/src/network/sync/syncEnginePerf.test.ts b/apps/hubble/src/network/sync/syncEnginePerf.test.ts index c76e139e..2f10a7df 100644 --- a/apps/hubble/src/network/sync/syncEnginePerf.test.ts +++ b/apps/hubble/src/network/sync/syncEnginePerf.test.ts @@ -13,6 +13,7 @@ import { jestRocksDB } from '~/storage/db/jestUtils'; import { MockHub } from '~/test/mocks'; import { MockRpcClient } from './mock'; import { EMPTY_HASH } from './trieNode'; +import { getFarcasterTime } from '@farcaster/core'; const testDb = jestRocksDB(`engine.syncEnginePerf.test`); const testDb2 = jestRocksDB(`engine2.syncEnginePerf.test`); @@ -36,11 +37,15 @@ beforeAll(async () => { ); }); -describe('SyncEngine', () => { - const makeMessagesWithTimestamps = async (timestamps: number[]): Promise => { +describe('SyncEnginePerfTest', () => { + const makeMessagesWithTimeDelta = async (timeDeltas: number[]): Promise => { return await Promise.all( - timestamps.map(async (t) => { - return Factories.CastAddMessage.create({ data: { fid, network, timestamp: t } }, { transient: { signer } }); + timeDeltas.map(async (t) => { + const farcasterTime = getFarcasterTime()._unsafeUnwrap(); + return Factories.CastAddMessage.create( + { data: { fid, network, timestamp: farcasterTime + t } }, + { transient: { signer } } + ); }) ); }; @@ -61,21 +66,24 @@ describe('SyncEngine', () => { await hub2.submitIdRegistryEvent(custodyEvent); await hub2.submitMessage(signerAdd); + Date.now = () => 1683074200000; // Merge the same messages into both engines. - const messages = await makeMessagesWithTimestamps([30662167, 30662169, 30662172]); + const messages = await makeMessagesWithTimeDelta([167, 169, 172]); for (const message of messages) { - await hub1.submitMessage(message); - await hub2.submitMessage(message); + let res = await hub1.submitMessage(message); + expect(res.isOk()).toBeTruthy(); + res = await hub2.submitMessage(message); + expect(res.isOk()).toBeTruthy(); } // Sanity check, they should equal expect(await syncEngine1.trie.rootHash()).toEqual(await syncEngine2.trie.rootHash()); // A timestamp after all the messages - Date.now = () => 1609459200000 + 30662200 * 1000; + Date.now = () => 1683074200000 + 200 * 1000; const snapshot2 = (await syncEngine2.getSnapshot())._unsafeUnwrap(); - expect((snapshot2.prefix as Buffer).toString('utf8')).toEqual('0030662'); + expect((snapshot2.prefix as Buffer).toString('utf8')).toEqual('0073615'); // Force a non-existent prefix (the original bug #536 is fixed) snapshot2.prefix = Buffer.from('00306622', 'hex'); diff --git a/apps/hubble/src/rpc/test/castService.test.ts b/apps/hubble/src/rpc/test/castService.test.ts index ce799422..ba3c8829 100644 --- a/apps/hubble/src/rpc/test/castService.test.ts +++ b/apps/hubble/src/rpc/test/castService.test.ts @@ -18,6 +18,7 @@ import Server from '~/rpc/server'; import { jestRocksDB } from '~/storage/db/jestUtils'; import Engine from '~/storage/engine'; import { MockHub } from '~/test/mocks'; +import { getFarcasterTime } from '@farcaster/core'; const db = jestRocksDB('protobufs.rpc.castService.test'); const network = FarcasterNetwork.TESTNET; @@ -108,10 +109,11 @@ describe('getCast', () => { test('returns casts in chronological order', async () => { const castsAsJson = []; let latestCast; + const currentTime = getFarcasterTime()._unsafeUnwrap(); for (let i = 0; i < 4; i++) { latestCast = await Factories.CastAddMessage.create( { - data: { fid, network, timestamp: i }, + data: { fid, network, timestamp: currentTime + i }, }, { transient: { signer } } ); diff --git a/apps/hubble/src/storage/jobs/pruneMessagesJob.test.ts b/apps/hubble/src/storage/jobs/pruneMessagesJob.test.ts index 52aeb2fd..a3fb5a0b 100644 --- a/apps/hubble/src/storage/jobs/pruneMessagesJob.test.ts +++ b/apps/hubble/src/storage/jobs/pruneMessagesJob.test.ts @@ -3,6 +3,7 @@ import { jestRocksDB } from '~/storage/db/jestUtils'; import Engine from '~/storage/engine'; import { seedSigner } from '~/storage/engine/seed'; import { PruneMessagesJobScheduler } from '~/storage/jobs/pruneMessagesJob'; +import { FARCASTER_EPOCH, getFarcasterTime } from '@farcaster/core'; const db = jestRocksDB('jobs.pruneMessagesJob.test'); @@ -48,21 +49,21 @@ describe('doJobs', () => { test( 'prunes messages for all fids', async () => { - const timestampToPrune = 1; // 1 second after farcaster epoch (1/1/22) + const currentTime = getFarcasterTime()._unsafeUnwrap(); const fid1 = Factories.Fid.build(); const signer1 = Factories.Ed25519Signer.build(); const signer1Key = (await signer1.getSignerKey())._unsafeUnwrap(); await seedSigner(engine, fid1, signer1Key); - await seedMessagesFromTimestamp(engine, fid1, signer1, timestampToPrune); + await seedMessagesFromTimestamp(engine, fid1, signer1, currentTime); const fid2 = Factories.Fid.build(); const signer2 = Factories.Ed25519Signer.build(); const signer2Key = (await signer2.getSignerKey())._unsafeUnwrap(); await seedSigner(engine, fid2, signer2Key); - await seedMessagesFromTimestamp(engine, fid2, signer2, timestampToPrune); + await seedMessagesFromTimestamp(engine, fid2, signer2, currentTime); for (const fid of [fid1, fid2]) { const casts = await engine.getCastsByFid(fid); @@ -72,8 +73,14 @@ describe('doJobs', () => { expect(reactions._unsafeUnwrap().messages.length).toEqual(1); } - const result = await scheduler.doJobs(); - expect(result._unsafeUnwrap()).toEqual(undefined); + const nowOrig = Date.now; + Date.now = () => FARCASTER_EPOCH + (currentTime + 60 * 60 * 24 * 365 + 1) * 1000; // advance 1 year and 1 second + try { + const result = await scheduler.doJobs(); + expect(result._unsafeUnwrap()).toEqual(undefined); + } finally { + Date.now = nowOrig; + } for (const fid of [fid1, fid2]) { const casts = await engine.getCastsByFid(fid); diff --git a/apps/hubble/src/storage/stores/castStore.test.ts b/apps/hubble/src/storage/stores/castStore.test.ts index 31452ab6..e2e96244 100644 --- a/apps/hubble/src/storage/stores/castStore.test.ts +++ b/apps/hubble/src/storage/stores/castStore.test.ts @@ -18,8 +18,9 @@ import { UserPostfix } from '~/storage/db/types'; import CastStore from '~/storage/stores/castStore'; import StoreEventHandler from '~/storage/stores/storeEventHandler'; import { sleep } from '~/utils/crypto'; -import { err } from 'neverthrow'; +import { err, ok } from 'neverthrow'; import { faker } from '@faker-js/faker'; +import { FARCASTER_EPOCH } from '@farcaster/core'; const db = jestRocksDB('protobufs.castStore.test'); const eventHandler = new StoreEventHandler(db); @@ -662,14 +663,12 @@ describe('pruneMessages', () => { let add4: CastAddMessage; let add5: CastAddMessage; let addOld1: CastAddMessage; - let addOld2: CastAddMessage; let remove1: CastRemoveMessage; let remove2: CastRemoveMessage; let remove3: CastRemoveMessage; let remove4: CastRemoveMessage; let remove5: CastRemoveMessage; - let removeOld3: CastRemoveMessage; const generateAddWithTimestamp = async (fid: number, timestamp: number): Promise => { return Factories.CastAddMessage.create({ @@ -695,14 +694,12 @@ describe('pruneMessages', () => { add4 = await generateAddWithTimestamp(fid, time + 4); add5 = await generateAddWithTimestamp(fid, time + 5); addOld1 = await generateAddWithTimestamp(fid, time - 60 * 60); - addOld2 = await generateAddWithTimestamp(fid, time - 60 * 60 + 1); remove1 = await generateRemoveWithTimestamp(fid, time + 1, add1); remove2 = await generateRemoveWithTimestamp(fid, time + 2, add2); remove3 = await generateRemoveWithTimestamp(fid, time + 3, add3); remove4 = await generateRemoveWithTimestamp(fid, time + 4, add4); remove5 = await generateRemoveWithTimestamp(fid, time + 5, add5); - removeOld3 = await generateRemoveWithTimestamp(fid, time - 60 * 60 + 2); }); beforeEach(async () => { @@ -775,27 +772,99 @@ describe('pruneMessages', () => { expect(prunedMessages).toEqual([]); }); + + test('fails to merge message which would be immediately pruned', async () => { + await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(undefined)); + + await expect(sizePrunedStore.merge(add3)).resolves.toBeGreaterThan(0); + await expect(eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1)); + await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual( + makeTsHash(add3.data.timestamp, add3.hash) + ); + + await expect(sizePrunedStore.merge(add2)).resolves.toBeGreaterThan(0); + await expect(eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(2)); + await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual( + makeTsHash(add2.data.timestamp, add2.hash) + ); + + await expect(sizePrunedStore.merge(remove2)).resolves.toBeGreaterThan(0); + await expect(eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(2)); + await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual( + makeTsHash(remove2.data.timestamp, remove2.hash) + ); + + await expect(sizePrunedStore.merge(add4)).resolves.toBeGreaterThan(0); + await expect(eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(3)); + await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual( + makeTsHash(remove2.data.timestamp, remove2.hash) + ); + + // remove1 is older than remove2 and the store is at capacity so it's rejected + await expect(sizePrunedStore.merge(remove1)).rejects.toEqual( + new HubError('bad_request.prunable', 'message would be pruned') + ); + + // add1 is older than remove2 and the store is at capacity so it's rejected + await expect(sizePrunedStore.merge(add1)).rejects.toEqual( + new HubError('bad_request.prunable', 'message would be pruned') + ); + + // merging add5 succeeds because while the store is at capacity, add5 would not be pruned + await expect(sizePrunedStore.merge(add5)).resolves.toBeGreaterThan(0); + await expect(eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(4)); + await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual( + makeTsHash(remove2.data.timestamp, remove2.hash) + ); + + const result = await sizePrunedStore.pruneMessages(fid); + expect(result.isOk()).toBeTruthy(); + + expect(prunedMessages).toEqual([remove2]); + }); }); describe('with time limit', () => { const timePrunedStore = new CastStore(db, eventHandler, { pruneTimeLimit: 60 * 60 - 1 }); test('prunes earliest messages', async () => { - const messages = [add1, remove2, addOld1, addOld2, removeOld3]; + const messages = [add1, add2, remove3, add4]; for (const message of messages) { await timePrunedStore.merge(message); } - const result = await timePrunedStore.pruneMessages(fid); - expect(result.isOk()).toBeTruthy(); + const nowOrig = Date.now; + Date.now = () => FARCASTER_EPOCH + (add4.data.timestamp - 1 + 60 * 60) * 1000; + try { + const result = await timePrunedStore.pruneMessages(fid); + expect(result.isOk()).toBeTruthy(); - expect(prunedMessages).toEqual([addOld1, addOld2, removeOld3]); + expect(prunedMessages).toEqual([add1, add2, remove3]); + } finally { + Date.now = nowOrig; + } - await expect(timePrunedStore.getCastAdd(fid, addOld1.hash)).rejects.toThrow(HubError); - await expect(timePrunedStore.getCastAdd(fid, addOld2.hash)).rejects.toThrow(HubError); - await expect(timePrunedStore.getCastRemove(fid, removeOld3.data.castRemoveBody.targetHash)).rejects.toThrow( + await expect(timePrunedStore.getCastAdd(fid, add1.hash)).rejects.toThrow(HubError); + await expect(timePrunedStore.getCastAdd(fid, add1.hash)).rejects.toThrow(HubError); + await expect(timePrunedStore.getCastRemove(fid, remove3.data.castRemoveBody.targetHash)).rejects.toThrow( HubError ); }); + + test('fails to merge message which would be immediately pruned', async () => { + const messages = [add1, add2]; + for (const message of messages) { + await timePrunedStore.merge(message); + } + + await expect(timePrunedStore.merge(addOld1)).rejects.toEqual( + new HubError('bad_request.prunable', 'message would be pruned') + ); + + const result = await timePrunedStore.pruneMessages(fid); + expect(result.isOk()).toBeTruthy(); + + expect(prunedMessages).toEqual([]); + }); }); }); diff --git a/apps/hubble/src/storage/stores/castStore.ts b/apps/hubble/src/storage/stores/castStore.ts index c52dbfbe..b50db303 100644 --- a/apps/hubble/src/storage/stores/castStore.ts +++ b/apps/hubble/src/storage/stores/castStore.ts @@ -282,6 +282,18 @@ class CastStore { .acquire( message.data.fid.toString(), async () => { + const prunableResult = await this._eventHandler.isPrunable( + message, + UserPostfix.CastMessage, + this._pruneSizeLimit, + this._pruneTimeLimit + ); + if (prunableResult.isErr()) { + throw prunableResult.error; + } else if (prunableResult.value) { + throw new HubError('bad_request.prunable', 'message would be pruned'); + } + if (isCastAddMessage(message)) { return this.mergeAdd(message); } else if (isCastRemoveMessage(message)) { @@ -316,7 +328,7 @@ class CastStore { async pruneMessages(fid: number): HubAsyncResult { const commits: number[] = []; - const cachedCount = this._eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage); + const cachedCount = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage); // Require storage cache to be synced to prune if (cachedCount.isErr()) { @@ -345,7 +357,7 @@ class CastStore { return ok(undefined); // Nothing left to prune } - const count = this._eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage); + const count = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage); if (count.isErr()) { return err(count.error); } diff --git a/apps/hubble/src/storage/stores/reactionStore.test.ts b/apps/hubble/src/storage/stores/reactionStore.test.ts index 45ab04da..97511f54 100644 --- a/apps/hubble/src/storage/stores/reactionStore.test.ts +++ b/apps/hubble/src/storage/stores/reactionStore.test.ts @@ -22,6 +22,7 @@ import { getMessage, makeTsHash } from '~/storage/db/message'; import { UserPostfix } from '~/storage/db/types'; import ReactionStore from '~/storage/stores/reactionStore'; import StoreEventHandler from '~/storage/stores/storeEventHandler'; +import { FARCASTER_EPOCH } from '@farcaster/core'; const db = jestRocksDB('protobufs.reactionStore.test'); const eventHandler = new StoreEventHandler(db); @@ -824,7 +825,6 @@ describe('pruneMessages', () => { let remove3: ReactionRemoveMessage; let remove4: ReactionRemoveMessage; let remove5: ReactionRemoveMessage; - let removeOld3: ReactionRemoveMessage; const generateAddWithTimestamp = async (fid: number, timestamp: number): Promise => { return Factories.ReactionAddMessage.create({ data: { fid, timestamp } }); @@ -859,7 +859,6 @@ describe('pruneMessages', () => { remove3 = await generateRemoveWithTimestamp(fid, time + 3, add3.data.reactionBody); remove4 = await generateRemoveWithTimestamp(fid, time + 4, add4.data.reactionBody); remove5 = await generateRemoveWithTimestamp(fid, time + 5, add5.data.reactionBody); - removeOld3 = await generateRemoveWithTimestamp(fid, time - 60 * 60 + 2); }); beforeEach(async () => { @@ -945,43 +944,90 @@ describe('pruneMessages', () => { expect(prunedMessages).toEqual([]); }); + + test('fails to add messages older than the earliest message', async () => { + const messages = [add1, add2, add3]; + for (const message of messages) { + await sizePrunedStore.merge(message); + } + + // Older messages are rejected + await expect(sizePrunedStore.merge(addOld1)).rejects.toEqual( + new HubError('bad_request.prunable', 'message would be pruned') + ); + + // newer messages can still be added + await expect(sizePrunedStore.merge(add4)).resolves.toBeGreaterThan(0); + + // Prune removes earliest + const result = await sizePrunedStore.pruneMessages(fid); + expect(result.isOk()).toBeTruthy(); + expect(result._unsafeUnwrap().length).toEqual(1); + + expect(prunedMessages).toEqual([add1]); + }); }); describe('with time limit', () => { const timePrunedStore = new ReactionStore(db, eventHandler, { pruneTimeLimit: 60 * 60 - 1 }); test('prunes earliest messages', async () => { - const messages = [add1, remove2, addOld1, addOld2, removeOld3]; + const messages = [add1, add2, remove3, add4]; for (const message of messages) { await timePrunedStore.merge(message); } - const result = await timePrunedStore.pruneMessages(fid); - expect(result.isOk()).toBeTruthy(); + const nowOrig = Date.now; + Date.now = () => FARCASTER_EPOCH + (add4.data.timestamp - 1 + 60 * 60) * 1000; + try { + const result = await timePrunedStore.pruneMessages(fid); + expect(result.isOk()).toBeTruthy(); + } finally { + Date.now = nowOrig; + } - expect(prunedMessages).toEqual([addOld1, addOld2, removeOld3]); + expect(prunedMessages).toEqual([add1, add2, remove3]); await expect( timePrunedStore.getReactionAdd( fid, - addOld1.data.reactionBody.type, - addOld1.data.reactionBody.targetCastId ?? Factories.CastId.build() + add1.data.reactionBody.type, + add1.data.reactionBody.targetCastId ?? Factories.CastId.build() ) ).rejects.toThrow(HubError); await expect( timePrunedStore.getReactionAdd( fid, - addOld2.data.reactionBody.type, - addOld2.data.reactionBody.targetCastId ?? Factories.CastId.build() + add2.data.reactionBody.type, + add2.data.reactionBody.targetCastId ?? Factories.CastId.build() ) ).rejects.toThrow(HubError); await expect( timePrunedStore.getReactionRemove( fid, - removeOld3.data.reactionBody.type, - removeOld3.data.reactionBody.targetCastId ?? Factories.CastId.build() + remove3.data.reactionBody.type, + remove3.data.reactionBody.targetCastId ?? Factories.CastId.build() ) ).rejects.toThrow(HubError); }); + + test('fails to merge messages that would be immediately pruned', async () => { + const messages = [add1, add2]; + for (const message of messages) { + await timePrunedStore.merge(message); + } + + await expect(timePrunedStore.merge(addOld1)).rejects.toEqual( + new HubError('bad_request.prunable', 'message would be pruned') + ); + await expect(timePrunedStore.merge(addOld2)).rejects.toEqual( + new HubError('bad_request.prunable', 'message would be pruned') + ); + + const result = await timePrunedStore.pruneMessages(fid); + expect(result.isOk()).toBeTruthy(); + + expect(prunedMessages).toEqual([]); + }); }); }); diff --git a/apps/hubble/src/storage/stores/reactionStore.ts b/apps/hubble/src/storage/stores/reactionStore.ts index 8b3a80ac..c42e6336 100644 --- a/apps/hubble/src/storage/stores/reactionStore.ts +++ b/apps/hubble/src/storage/stores/reactionStore.ts @@ -289,6 +289,18 @@ class ReactionStore { .acquire( message.data.fid.toString(), async () => { + const prunableResult = await this._eventHandler.isPrunable( + message, + UserPostfix.ReactionMessage, + this._pruneSizeLimit, + this._pruneTimeLimit + ); + if (prunableResult.isErr()) { + throw prunableResult.error; + } else if (prunableResult.value) { + throw new HubError('bad_request.prunable', 'message would be pruned'); + } + if (isReactionAddMessage(message)) { return this.mergeAdd(message); } else if (isReactionRemoveMessage(message)) { @@ -323,7 +335,7 @@ class ReactionStore { async pruneMessages(fid: number): HubAsyncResult { const commits: number[] = []; - const cachedCount = this._eventHandler.getCacheMessageCount(fid, UserPostfix.ReactionMessage); + const cachedCount = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.ReactionMessage); // Require storage cache to be synced to prune if (cachedCount.isErr()) { @@ -352,7 +364,7 @@ class ReactionStore { return ok(undefined); // Nothing left to prune } - const count = this._eventHandler.getCacheMessageCount(fid, UserPostfix.ReactionMessage); + const count = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.ReactionMessage); if (count.isErr()) { return err(count.error); } diff --git a/apps/hubble/src/storage/stores/signerStore.test.ts b/apps/hubble/src/storage/stores/signerStore.test.ts index 973f6b09..2818ff98 100644 --- a/apps/hubble/src/storage/stores/signerStore.test.ts +++ b/apps/hubble/src/storage/stores/signerStore.test.ts @@ -997,6 +997,7 @@ describe('pruneMessages', () => { }); beforeEach(() => { + eventHandler.syncCache(); prunedMessages = []; }); @@ -1009,6 +1010,7 @@ describe('pruneMessages', () => { let add3: SignerAddMessage; let add4: SignerAddMessage; let add5: SignerAddMessage; + let addOld1: SignerAddMessage; let remove1: SignerRemoveMessage; let remove2: SignerRemoveMessage; @@ -1039,6 +1041,7 @@ describe('pruneMessages', () => { add3 = await generateAddWithTimestamp(fid, time + 3); add4 = await generateAddWithTimestamp(fid, time + 4); add5 = await generateAddWithTimestamp(fid, time + 5); + addOld1 = await generateAddWithTimestamp(fid, time - 60 * 60); remove1 = await generateRemoveWithTimestamp(fid, time + 1, add1.data.signerAddBody.signer); remove2 = await generateRemoveWithTimestamp(fid, time + 2, add2.data.signerAddBody.signer); @@ -1118,5 +1121,17 @@ describe('pruneMessages', () => { expect(prunedMessages).toEqual([]); }); + + test('fails to add messages older than the earliest message', async () => { + const messages = [add1, add2, add3]; + for (const message of messages) { + await sizePrunedStore.merge(message); + } + + // Older messages are rejected + await expect(sizePrunedStore.merge(addOld1)).rejects.toEqual( + new HubError('bad_request.prunable', 'message would be pruned') + ); + }); }); }); diff --git a/apps/hubble/src/storage/stores/signerStore.ts b/apps/hubble/src/storage/stores/signerStore.ts index d4dd84a4..eb30afa1 100644 --- a/apps/hubble/src/storage/stores/signerStore.ts +++ b/apps/hubble/src/storage/stores/signerStore.ts @@ -257,6 +257,17 @@ class SignerStore { .acquire( message.data.fid.toString(), async () => { + const prunableResult = await this._eventHandler.isPrunable( + message, + UserPostfix.SignerMessage, + this._pruneSizeLimit + ); + if (prunableResult.isErr()) { + throw prunableResult.error; + } else if (prunableResult.value) { + throw new HubError('bad_request.prunable', 'message would be pruned'); + } + if (isSignerAddMessage(message)) { return this.mergeAdd(message); } else if (isSignerRemoveMessage(message)) { @@ -291,7 +302,7 @@ class SignerStore { async pruneMessages(fid: number): HubAsyncResult { const commits: number[] = []; - const cachedCount = this._eventHandler.getCacheMessageCount(fid, UserPostfix.SignerMessage); + const cachedCount = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.SignerMessage); // Require storage cache to be synced to prune if (cachedCount.isErr()) { @@ -312,7 +323,7 @@ class SignerStore { return ok(undefined); // Nothing left to prune } - const count = this._eventHandler.getCacheMessageCount(fid, UserPostfix.SignerMessage); + const count = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.SignerMessage); if (count.isErr()) { return err(count.error); } diff --git a/apps/hubble/src/storage/stores/storageCache.test.ts b/apps/hubble/src/storage/stores/storageCache.test.ts index e80a8cf4..703eb6a0 100644 --- a/apps/hubble/src/storage/stores/storageCache.test.ts +++ b/apps/hubble/src/storage/stores/storageCache.test.ts @@ -1,7 +1,7 @@ -import { ok, err } from 'neverthrow'; -import { HubEvent, HubEventType, Factories, HubError } from '@farcaster/hub-nodejs'; +import { ok } from 'neverthrow'; +import { Factories, HubEvent, HubEventType } from '@farcaster/hub-nodejs'; import { jestRocksDB } from '~/storage/db/jestUtils'; -import { putMessage } from '~/storage/db/message'; +import { makeTsHash, putMessage } from '~/storage/db/message'; import { UserPostfix } from '~/storage/db/types'; import { StorageCache } from '~/storage/stores/storageCache'; @@ -10,7 +10,7 @@ const db = jestRocksDB('engine.storageCache.test'); let cache: StorageCache; beforeEach(() => { - cache = new StorageCache(); + cache = new StorageCache(db); }); describe('syncFromDb', () => { @@ -45,29 +45,81 @@ describe('syncFromDb', () => { await putMessage(db, message); } } - await cache.syncFromDb(db); + await cache.syncFromDb(); for (const fidUsage of usage) { - expect(cache.getMessageCount(fidUsage.fid, UserPostfix.CastMessage)).toEqual(ok(fidUsage.usage.cast)); - expect(cache.getMessageCount(fidUsage.fid, UserPostfix.ReactionMessage)).toEqual(ok(fidUsage.usage.reaction)); - expect(cache.getMessageCount(fidUsage.fid, UserPostfix.VerificationMessage)).toEqual( + await expect(cache.getMessageCount(fidUsage.fid, UserPostfix.CastMessage)).resolves.toEqual( + ok(fidUsage.usage.cast) + ); + await expect(cache.getMessageCount(fidUsage.fid, UserPostfix.ReactionMessage)).resolves.toEqual( + ok(fidUsage.usage.reaction) + ); + await expect(cache.getMessageCount(fidUsage.fid, UserPostfix.VerificationMessage)).resolves.toEqual( ok(fidUsage.usage.verification) ); - expect(cache.getMessageCount(fidUsage.fid, UserPostfix.UserDataMessage)).toEqual(ok(fidUsage.usage.userData)); - expect(cache.getMessageCount(fidUsage.fid, UserPostfix.SignerMessage)).toEqual(ok(fidUsage.usage.signer)); + await expect(cache.getMessageCount(fidUsage.fid, UserPostfix.UserDataMessage)).resolves.toEqual( + ok(fidUsage.usage.userData) + ); + await expect(cache.getMessageCount(fidUsage.fid, UserPostfix.SignerMessage)).resolves.toEqual( + ok(fidUsage.usage.signer) + ); } }); }); +describe('getMessageCount', () => { + test('returns the correct count even if the cache is not synced', async () => { + const fid = Factories.Fid.build(); + const message = await Factories.CastAddMessage.create({ data: { fid } }); + const message2 = await Factories.CastAddMessage.create({ data: { fid } }); + const message3_differnt_fid = await Factories.CastAddMessage.create(); + await putMessage(db, message); + await putMessage(db, message2); + await putMessage(db, message3_differnt_fid); + await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(2)); + await expect(cache.getMessageCount(message3_differnt_fid.data.fid, UserPostfix.CastMessage)).resolves.toEqual( + ok(1) + ); + await expect(cache.getMessageCount(Factories.Fid.build(), UserPostfix.CastMessage)).resolves.toEqual(ok(0)); + }); +}); + +describe('getEarliestTsHash', () => { + test('returns undefined if there are no messages', async () => { + await expect(cache.getEarliestTsHash(Factories.Fid.build(), UserPostfix.CastMessage)).resolves.toEqual( + ok(undefined) + ); + }); + + test('returns the earliest tsHash by scanning the db on first use', async () => { + const fid = Factories.Fid.build(); + const first = await Factories.CastAddMessage.create({ data: { fid, timestamp: 123 } }); + const second = await Factories.CastAddMessage.create({ data: { fid, timestamp: 213 } }); + const third = await Factories.CastAddMessage.create({ data: { fid, timestamp: 321 } }); + await putMessage(db, second); + await putMessage(db, first); + + await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual( + makeTsHash(first.data.timestamp, first.hash) + ); + + await putMessage(db, third); + // Unchanged + await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual( + makeTsHash(first.data.timestamp, first.hash) + ); + }); +}); + describe('processEvent', () => { test('increments count with merge cast message event', async () => { const fid = Factories.Fid.build(); const message = await Factories.CastAddMessage.create({ data: { fid } }); const event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message } }); - await cache.syncFromDb(db); - expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(0)); + await cache.syncFromDb(); + await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(0)); cache.processEvent(event); - expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(1)); + await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1)); }); test('increments count with merge cast remove message event', async () => { @@ -75,10 +127,10 @@ describe('processEvent', () => { const message = await Factories.CastRemoveMessage.create({ data: { fid } }); const event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message } }); - await cache.syncFromDb(db); - expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(0)); + await cache.syncFromDb(); + await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(0)); cache.processEvent(event); - expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(1)); + await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1)); }); test('count is unchanged when removing existing cast', async () => { @@ -93,10 +145,10 @@ describe('processEvent', () => { }); await putMessage(db, cast); - await cache.syncFromDb(db); - expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(1)); + await cache.syncFromDb(); + await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1)); cache.processEvent(event); - expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(1)); + await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1)); }); test('count is decremented with prune message event', async () => { @@ -105,10 +157,10 @@ describe('processEvent', () => { const event = HubEvent.create({ type: HubEventType.PRUNE_MESSAGE, pruneMessageBody: { message } }); await putMessage(db, message); - await cache.syncFromDb(db); - expect(cache.getMessageCount(fid, UserPostfix.ReactionMessage)).toEqual(ok(1)); + await cache.syncFromDb(); + await expect(cache.getMessageCount(fid, UserPostfix.ReactionMessage)).resolves.toEqual(ok(1)); cache.processEvent(event); - expect(cache.getMessageCount(fid, UserPostfix.ReactionMessage)).toEqual(ok(0)); + await expect(cache.getMessageCount(fid, UserPostfix.ReactionMessage)).resolves.toEqual(ok(0)); }); test('count is decremented with revoke message event', async () => { @@ -117,18 +169,78 @@ describe('processEvent', () => { const event = HubEvent.create({ type: HubEventType.REVOKE_MESSAGE, revokeMessageBody: { message } }); await putMessage(db, message); - await cache.syncFromDb(db); - expect(cache.getMessageCount(fid, UserPostfix.SignerMessage)).toEqual(ok(1)); + await cache.syncFromDb(); + await expect(cache.getMessageCount(fid, UserPostfix.SignerMessage)).resolves.toEqual(ok(1)); cache.processEvent(event); - expect(cache.getMessageCount(fid, UserPostfix.SignerMessage)).toEqual(ok(0)); + await expect(cache.getMessageCount(fid, UserPostfix.SignerMessage)).resolves.toEqual(ok(0)); }); - test('fails when cache is not synced', async () => { + test('sets earliest tsHash with merge cast message event', async () => { const fid = Factories.Fid.build(); - const message = await Factories.CastAddMessage.create({ data: { fid } }); - const event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message } }); - expect(cache.processEvent(event)).toEqual( - err(new HubError('unavailable.storage_failure', 'storage cache is not synced with db')) + + const middleMessage = await Factories.CastAddMessage.create({ data: { fid } }); + let event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message: middleMessage } }); + + // Earliest tsHash is undefined initially + await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(undefined)); + cache.processEvent(event); + + // Earliest tsHash is set + await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual( + makeTsHash(middleMessage.data.timestamp, middleMessage.hash) + ); + + // Adding a later messages does not change the earliest tsHash + const laterMessage = await Factories.CastAddMessage.create({ + data: { fid, timestamp: middleMessage.data.timestamp + 10 }, + }); + event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message: laterMessage } }); + cache.processEvent(event); + await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual( + makeTsHash(middleMessage.data.timestamp, middleMessage.hash) + ); + + // Adding an earlier message changes the earliest tsHash + const earlierMessage = await Factories.CastAddMessage.create({ + data: { fid, timestamp: middleMessage.data.timestamp - 10 }, + }); + event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message: earlierMessage } }); + cache.processEvent(event); + await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual( + makeTsHash(earlierMessage.data.timestamp, earlierMessage.hash) ); }); + + test('unsets the earliest tsHash if the earliest message is removed', async () => { + const fid = Factories.Fid.build(); + const firstMessage = await Factories.ReactionAddMessage.create({ data: { fid } }); + const laterMessage = await Factories.ReactionAddMessage.create({ + data: { fid, timestamp: firstMessage.data.timestamp + 10 }, + }); + const firstEvent = HubEvent.create({ + type: HubEventType.PRUNE_MESSAGE, + pruneMessageBody: { message: firstMessage }, + }); + const laterEvent = HubEvent.create({ + type: HubEventType.PRUNE_MESSAGE, + pruneMessageBody: { message: laterMessage }, + }); + + await putMessage(db, firstMessage); + await putMessage(db, laterMessage); + await cache.syncFromDb(); + await expect(cache.getEarliestTsHash(fid, UserPostfix.ReactionMessage)).resolves.toEqual( + makeTsHash(firstMessage.data.timestamp, firstMessage.hash) + ); + + cache.processEvent(laterEvent); + // Unchanged + await expect(cache.getEarliestTsHash(fid, UserPostfix.ReactionMessage)).resolves.toEqual( + makeTsHash(firstMessage.data.timestamp, firstMessage.hash) + ); + + cache.processEvent(firstEvent); + // Unset + await expect(cache.getEarliestTsHash(fid, UserPostfix.ReactionMessage)).resolves.toEqual(ok(undefined)); + }); }); diff --git a/apps/hubble/src/storage/stores/storageCache.ts b/apps/hubble/src/storage/stores/storageCache.ts index b0afa4d7..9091e228 100644 --- a/apps/hubble/src/storage/stores/storageCache.ts +++ b/apps/hubble/src/storage/stores/storageCache.ts @@ -1,17 +1,24 @@ import { + HubError, HubEvent, + HubResult, isMergeMessageHubEvent, isPruneMessageHubEvent, isRevokeMessageHubEvent, Message, - HubError, - HubResult, } from '@farcaster/hub-nodejs'; -import { ok, err } from 'neverthrow'; +import { err, ok } from 'neverthrow'; import RocksDB from '~/storage/db/rocksdb'; import { FID_BYTES, RootPrefix, UserMessagePostfix, UserMessagePostfixMax } from '~/storage/db/types'; import { logger } from '~/utils/logger'; -import { makeFidKey, typeToSetPostfix } from '~/storage/db/message'; +import { + getMessagesPruneIterator, + makeFidKey, + makeMessagePrimaryKey, + makeTsHash, + typeToSetPostfix, +} from '~/storage/db/message'; +import { bytesCompare, HubAsyncResult } from '@farcaster/core'; const makeKey = (fid: number, set: UserMessagePostfix): string => { return Buffer.concat([makeFidKey(fid), Buffer.from([set])]).toString('hex'); @@ -20,51 +27,87 @@ const makeKey = (fid: number, set: UserMessagePostfix): string => { const log = logger.child({ component: 'StorageCache' }); export class StorageCache { - private _usage: Map; - private _synced = false; + private _db: RocksDB; + private _counts: Map; + private _earliestTsHashes: Map; - constructor(usage?: Map) { - this._usage = usage ?? new Map(); + constructor(db: RocksDB, usage?: Map) { + this._counts = usage ?? new Map(); + this._earliestTsHashes = new Map(); + this._db = db; } - async syncFromDb(db: RocksDB): Promise { + async syncFromDb(): Promise { log.info('starting storage cache sync'); const usage = new Map(); const prefix = Buffer.from([RootPrefix.User]); - const iterator = db.iteratorByPrefix(prefix); + const iterator = this._db.iteratorByPrefix(prefix); for await (const [key] of iterator) { const postfix = (key as Buffer).readUint8(1 + FID_BYTES); if (postfix < UserMessagePostfixMax) { - const usageKey = (key as Buffer).subarray(1, 1 + FID_BYTES + 1).toString('hex'); - const count = usage.get(usageKey) ?? 0; - usage.set(usageKey, count + 1); + const lookupKey = (key as Buffer).subarray(1, 1 + FID_BYTES + 1).toString('hex'); + const count = usage.get(lookupKey) ?? 0; + if (this._earliestTsHashes.get(lookupKey) === undefined) { + const tsHash = Uint8Array.from((key as Buffer).subarray(1 + FID_BYTES + 1)); + this._earliestTsHashes.set(lookupKey, tsHash); + } + usage.set(lookupKey, count + 1); } } - this._usage = usage; - this._synced = true; + this._counts = usage; + this._earliestTsHashes = new Map(); log.info('storage cache synced'); } - getMessageCount(fid: number, set: UserMessagePostfix): HubResult { - if (this._synced !== true) { - const error = new HubError('unavailable.storage_failure', 'storage cache is not synced with db'); - log.warn({ errCode: error.errCode }, `getMessageCount error: ${error.message}`); - return err(error); - } + async getMessageCount(fid: number, set: UserMessagePostfix): HubAsyncResult { const key = makeKey(fid, set); - return ok(this._usage.get(key) ?? 0); + if (this._counts.get(key) === undefined) { + const iterator = getMessagesPruneIterator(this._db, fid, set); + for await (const [,] of iterator) { + const count = this._counts.get(key) ?? 0; + this._counts.set(key, count + 1); + } + } + return ok(this._counts.get(key) ?? 0); + } + + async getEarliestTsHash(fid: number, set: UserMessagePostfix): HubAsyncResult { + const key = makeKey(fid, set); + const messageCount = await this.getMessageCount(fid, set); + if (messageCount.isErr()) { + return err(messageCount.error); + } + if (messageCount.value === 0) { + return ok(undefined); + } + const value = this._earliestTsHashes.get(key); + if (value === undefined) { + const prefix = makeMessagePrimaryKey(fid, set); + const iterator = this._db.iteratorByPrefix(prefix, { values: false }); + const [firstKey] = await iterator.next(); + await iterator.end(); + + if (firstKey === undefined) { + return ok(undefined); + } + + if (firstKey && firstKey.length === 0) { + return err(new HubError('unavailable.storage_failure', 'could not read earliest message from db')); + } + + const tsHash = Uint8Array.from(firstKey.subarray(1 + FID_BYTES + 1)); + this._earliestTsHashes.set(key, tsHash); + return ok(tsHash); + } else { + return ok(value); + } } processEvent(event: HubEvent): HubResult { - if (this._synced !== true) { - const error = new HubError('unavailable.storage_failure', 'storage cache is not synced with db'); - log.error({ errCode: error.errCode }, `processEvent error: ${error.message}`); - return err(error); - } if (isMergeMessageHubEvent(event)) { this.addMessage(event.mergeMessageBody.message); for (const message of event.mergeMessageBody.deletedMessages) { @@ -83,8 +126,18 @@ export class StorageCache { const set = typeToSetPostfix(message.data.type); const fid = message.data.fid; const key = makeKey(fid, set); - const count = this._usage.get(key) ?? 0; - this._usage.set(key, count + 1); + const count = this._counts.get(key) ?? 0; + this._counts.set(key, count + 1); + + const tsHashResult = makeTsHash(message.data.timestamp, message.hash); + if (!tsHashResult.isOk()) { + log.error(`error: could not make ts hash for message ${message.hash}`); + return; + } + const currentEarliest = this._earliestTsHashes.get(key); + if (currentEarliest === undefined || bytesCompare(currentEarliest, tsHashResult.value) > 0) { + this._earliestTsHashes.set(key, tsHashResult.value); + } } } @@ -93,11 +146,21 @@ export class StorageCache { const set = typeToSetPostfix(message.data.type); const fid = message.data.fid; const key = makeKey(fid, set); - const count = this._usage.get(key) ?? 0; + const count = this._counts.get(key) ?? 0; if (count === 0) { log.error(`error: ${set} store message count is already at 0 for fid ${fid}`); } else { - this._usage.set(key, count - 1); + this._counts.set(key, count - 1); + } + + const tsHashResult = makeTsHash(message.data.timestamp, message.hash); + if (!tsHashResult.isOk()) { + log.error(`error: could not make ts hash for message ${message.hash}`); + return; + } + const currentEarliest = this._earliestTsHashes.get(key); + if (currentEarliest === undefined || bytesCompare(currentEarliest, tsHashResult.value) === 0) { + this._earliestTsHashes.delete(key); } } } diff --git a/apps/hubble/src/storage/stores/storeEventHandler.test.ts b/apps/hubble/src/storage/stores/storeEventHandler.test.ts index 727b364e..9786434d 100644 --- a/apps/hubble/src/storage/stores/storeEventHandler.test.ts +++ b/apps/hubble/src/storage/stores/storeEventHandler.test.ts @@ -1,15 +1,17 @@ -import { CastAddMessage, HubEvent, HubEventType, Factories } from '@farcaster/hub-nodejs'; +import { CastAddMessage, Factories, HubEvent, HubEventType } from '@farcaster/hub-nodejs'; import { ok, Result } from 'neverthrow'; import { jestRocksDB } from '~/storage/db/jestUtils'; -import { getMessage, makeTsHash, putMessageTransaction } from '~/storage/db/message'; +import { getMessage, makeTsHash, putMessage, putMessageTransaction } from '~/storage/db/message'; import { UserPostfix } from '~/storage/db/types'; import StoreEventHandler, { HubEventArgs, HubEventIdGenerator } from '~/storage/stores/storeEventHandler'; import { sleep } from '~/utils/crypto'; +import { getFarcasterTime } from '@farcaster/core'; const db = jestRocksDB('stores.storeEventHandler.test'); const eventHandler = new StoreEventHandler(db); let events: HubEvent[] = []; +let currentTime = 0; const eventListener = (event: HubEvent) => { events.push(event); @@ -21,6 +23,7 @@ beforeAll(() => { beforeEach(() => { events = []; + currentTime = getFarcasterTime()._unsafeUnwrap(); }); afterAll(() => { @@ -73,6 +76,50 @@ describe('commitTransaction', () => { }); }); +describe('isPrunable', () => { + test('returns true if messsage is earlier than prune time limit', async () => { + message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 101 } }); + await expect(eventHandler.isPrunable(message, UserPostfix.CastMessage, 10, 100)).resolves.toEqual(ok(true)); + }); + test('returns false if there is no prune time limit', async () => { + message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 101 } }); + await expect(eventHandler.isPrunable(message, UserPostfix.CastMessage, 10)).resolves.toEqual(ok(false)); + }); + test('returns false if message is later than prune time limit', async () => { + message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 50 } }); + await expect(eventHandler.isPrunable(message, UserPostfix.CastMessage, 10, 100)).resolves.toEqual(ok(false)); + }); + test('returns false if under size limit', async () => { + message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 50 } }); + await putMessage(db, message); + await expect(eventHandler.getCacheMessageCount(message.data.fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1)); + await expect(eventHandler.isPrunable(message, UserPostfix.CastMessage, 1, 100)).resolves.toEqual(ok(false)); + }); + test('returns false if over size limit and message is later than earliest message', async () => { + message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 50 } }); + await putMessage(db, message); + await expect(eventHandler.getCacheMessageCount(message.data.fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1)); + + const laterMessage = await Factories.CastAddMessage.create({ + data: { fid: message.data.fid, timestamp: currentTime + 50 }, + }); + await expect(eventHandler.isPrunable(laterMessage, UserPostfix.CastMessage, 1, 100)).resolves.toEqual(ok(false)); + }); + test('returns true if over size limit and message is earlier than earliest message', async () => { + message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 50 } }); + await putMessage(db, message); + await expect(eventHandler.getCacheMessageCount(message.data.fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1)); + await expect(eventHandler.getEarliestTsHash(message.data.fid, UserPostfix.CastMessage)).resolves.toEqual( + makeTsHash(message.data.timestamp, message.hash) + ); + + const earlierMessage = await Factories.CastAddMessage.create({ + data: { fid: message.data.fid, timestamp: currentTime - 75 }, + }); + await expect(eventHandler.isPrunable(earlierMessage, UserPostfix.CastMessage, 1, 100)).resolves.toEqual(ok(true)); + }); +}); + describe('pruneEvents', () => { test('deletes events based on time limit', async () => { const message1 = await Factories.Message.create(); diff --git a/apps/hubble/src/storage/stores/storeEventHandler.ts b/apps/hubble/src/storage/stores/storeEventHandler.ts index f755958e..b1ce995e 100644 --- a/apps/hubble/src/storage/stores/storeEventHandler.ts +++ b/apps/hubble/src/storage/stores/storeEventHandler.ts @@ -23,11 +23,36 @@ import { TypedEmitter } from 'tiny-typed-emitter'; import RocksDB, { Iterator, Transaction } from '~/storage/db/rocksdb'; import { RootPrefix, UserMessagePostfix } from '~/storage/db/types'; import { StorageCache } from '~/storage/stores/storageCache'; +import { makeTsHash } from '~/storage/db/message'; +import { + bytesCompare, + CastAddMessage, + CastRemoveMessage, + getFarcasterTime, + ReactionAddMessage, + ReactionRemoveMessage, + SignerAddMessage, + SignerRemoveMessage, + UserDataAddMessage, + VerificationAddEthAddressMessage, + VerificationRemoveMessage, +} from '@farcaster/core'; const PRUNE_TIME_LIMIT_DEFAULT = 60 * 60 * 24 * 3 * 1000; // 3 days in ms const DEFAULT_LOCK_MAX_PENDING = 1_000; const DEFAULT_LOCK_TIMEOUT = 500; // in ms +type PrunableMessage = + | CastAddMessage + | CastRemoveMessage + | ReactionAddMessage + | ReactionRemoveMessage + | SignerAddMessage + | SignerRemoveMessage + | UserDataAddMessage + | VerificationAddEthAddressMessage + | VerificationRemoveMessage; + export type StoreEvents = { /** * mergeMessage is emitted when a message is merged into one of the stores. If @@ -149,15 +174,19 @@ class StoreEventHandler extends TypedEmitter { timeout: options.lockTimeout ?? DEFAULT_LOCK_TIMEOUT, }); - this._storageCache = new StorageCache(); + this._storageCache = new StorageCache(this._db); } - getCacheMessageCount(fid: number, set: UserMessagePostfix): HubResult { + async getCacheMessageCount(fid: number, set: UserMessagePostfix): HubAsyncResult { return this._storageCache.getMessageCount(fid, set); } + async getEarliestTsHash(fid: number, set: UserMessagePostfix): HubAsyncResult { + return this._storageCache.getEarliestTsHash(fid, set); + } + async syncCache(): HubAsyncResult { - return ResultAsync.fromPromise(this._storageCache.syncFromDb(this._db), (e) => e as HubError); + return ResultAsync.fromPromise(this._storageCache.syncFromDb(), (e) => e as HubError); } async getEvent(id: number): HubAsyncResult { @@ -188,6 +217,52 @@ class StoreEventHandler extends TypedEmitter { return ok(events); } + public async isPrunable( + message: PrunableMessage, + set: UserMessagePostfix, + sizeLimit: number, + timeLimit: number | undefined = undefined + ): HubAsyncResult { + const farcasterTime = getFarcasterTime(); + if (farcasterTime.isErr()) { + return err(farcasterTime.error); + } + + // Calculate the timestamp cut-off to prune if set supports time based expiry + if (timeLimit !== undefined) { + const timestampToPrune = farcasterTime.value - timeLimit; + + if (message.data.timestamp < timestampToPrune) { + return ok(true); + } + } + + const messageCount = await this.getCacheMessageCount(message.data.fid, set); + if (messageCount.isErr()) { + return err(messageCount.error); + } + if (messageCount.value < sizeLimit) { + return ok(false); + } + + const earliestTimestamp = await this.getEarliestTsHash(message.data.fid, set); + if (earliestTimestamp.isErr()) { + return err(earliestTimestamp.error); + } + const tsHash = makeTsHash(message.data.timestamp, message.hash); + if (tsHash.isErr()) { + return err(tsHash.error); + } + if (earliestTimestamp.value === undefined) { + return ok(false); + } + if (bytesCompare(tsHash.value, earliestTimestamp.value) < 0) { + return ok(true); + } else { + return ok(false); + } + } + async commitTransaction(txn: Transaction, eventArgs: HubEventArgs): HubAsyncResult { return this._lock .acquire('commit', async () => { diff --git a/apps/hubble/src/storage/stores/userDataStore.test.ts b/apps/hubble/src/storage/stores/userDataStore.test.ts index 926795a0..075da7bb 100644 --- a/apps/hubble/src/storage/stores/userDataStore.test.ts +++ b/apps/hubble/src/storage/stores/userDataStore.test.ts @@ -275,6 +275,7 @@ describe('pruneMessages', () => { let add2: UserDataAddMessage; let add3: UserDataAddMessage; let add4: UserDataAddMessage; + let addOld1: UserDataAddMessage; const generateAddWithTimestamp = async ( fid: number, @@ -290,6 +291,7 @@ describe('pruneMessages', () => { add2 = await generateAddWithTimestamp(fid, time + 2, UserDataType.DISPLAY); add3 = await generateAddWithTimestamp(fid, time + 3, UserDataType.BIO); add4 = await generateAddWithTimestamp(fid, time + 5, UserDataType.URL); + addOld1 = await generateAddWithTimestamp(fid, time - 60 * 60, UserDataType.URL); }); beforeEach(async () => { @@ -321,5 +323,17 @@ describe('pruneMessages', () => { await expect(getAdd()).rejects.toThrow(HubError); } }); + + test('fails to add messages older than the earliest message', async () => { + const messages = [add1, add2, add3]; + for (const message of messages) { + await sizePrunedStore.merge(message); + } + + // Older messages are rejected + await expect(sizePrunedStore.merge(addOld1)).rejects.toEqual( + new HubError('bad_request.prunable', 'message would be pruned') + ); + }); }); }); diff --git a/apps/hubble/src/storage/stores/userDataStore.ts b/apps/hubble/src/storage/stores/userDataStore.ts index 87c5ce0d..8a3ef559 100644 --- a/apps/hubble/src/storage/stores/userDataStore.ts +++ b/apps/hubble/src/storage/stores/userDataStore.ts @@ -138,6 +138,16 @@ class UserDataStore { .acquire( message.data.fid.toString(), async () => { + const prunableResult = await this._eventHandler.isPrunable( + message, + UserPostfix.UserDataMessage, + this._pruneSizeLimit + ); + if (prunableResult.isErr()) { + throw prunableResult.error; + } else if (prunableResult.value) { + throw new HubError('bad_request.prunable', 'message would be pruned'); + } return this.mergeDataAdd(message); }, { timeout: MERGE_TIMEOUT_DEFAULT } @@ -164,7 +174,7 @@ class UserDataStore { async pruneMessages(fid: number): HubAsyncResult { const commits: number[] = []; - const cachedCount = this._eventHandler.getCacheMessageCount(fid, UserPostfix.UserDataMessage); + const cachedCount = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.UserDataMessage); // Require storage cache to be synced to prune if (cachedCount.isErr()) { @@ -185,7 +195,7 @@ class UserDataStore { return ok(undefined); // Nothing left to prune } - const count = this._eventHandler.getCacheMessageCount(fid, UserPostfix.UserDataMessage); + const count = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.UserDataMessage); if (count.isErr()) { return err(count.error); } diff --git a/apps/hubble/src/storage/stores/verificationStore.test.ts b/apps/hubble/src/storage/stores/verificationStore.test.ts index dc0f4d9d..3168a198 100644 --- a/apps/hubble/src/storage/stores/verificationStore.test.ts +++ b/apps/hubble/src/storage/stores/verificationStore.test.ts @@ -493,6 +493,7 @@ describe('pruneMessages', () => { let add3: VerificationAddEthAddressMessage; let add4: VerificationAddEthAddressMessage; let add5: VerificationAddEthAddressMessage; + let addOld1: VerificationAddEthAddressMessage; let remove1: VerificationRemoveMessage; let remove2: VerificationRemoveMessage; @@ -524,6 +525,7 @@ describe('pruneMessages', () => { add3 = await generateAddWithTimestamp(fid, time + 3); add4 = await generateAddWithTimestamp(fid, time + 4); add5 = await generateAddWithTimestamp(fid, time + 5); + addOld1 = await generateAddWithTimestamp(fid, time - 60 * 60); remove1 = await generateRemoveWithTimestamp(fid, time + 1, add1.data.verificationAddEthAddressBody.address); remove2 = await generateRemoveWithTimestamp(fid, time + 2, add2.data.verificationAddEthAddressBody.address); @@ -603,5 +605,17 @@ describe('pruneMessages', () => { expect(prunedMessages).toEqual([]); }); + + test('fails to add messages older than the earliest message', async () => { + const messages = [add1, add2, add3]; + for (const message of messages) { + await sizePrunedStore.merge(message); + } + + // Older messages are rejected + await expect(sizePrunedStore.merge(addOld1)).rejects.toEqual( + new HubError('bad_request.prunable', 'message would be pruned') + ); + }); }); }); diff --git a/apps/hubble/src/storage/stores/verificationStore.ts b/apps/hubble/src/storage/stores/verificationStore.ts index 9fa58a5e..bbcc161e 100644 --- a/apps/hubble/src/storage/stores/verificationStore.ts +++ b/apps/hubble/src/storage/stores/verificationStore.ts @@ -170,6 +170,16 @@ class VerificationStore { .acquire( message.data.fid.toString(), async () => { + const prunableResult = await this._eventHandler.isPrunable( + message, + UserPostfix.VerificationMessage, + this._pruneSizeLimit + ); + if (prunableResult.isErr()) { + throw prunableResult.error; + } else if (prunableResult.value) { + throw new HubError('bad_request.prunable', 'message would be pruned'); + } if (isVerificationAddEthAddressMessage(message)) { return this.mergeAdd(message); } else if (isVerificationRemoveMessage(message)) { @@ -204,7 +214,7 @@ class VerificationStore { async pruneMessages(fid: number): HubAsyncResult { const commits: number[] = []; - const cachedCount = this._eventHandler.getCacheMessageCount(fid, UserPostfix.VerificationMessage); + const cachedCount = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.VerificationMessage); // Require storage cache to be synced to prune if (cachedCount.isErr()) { @@ -225,7 +235,7 @@ class VerificationStore { return ok(undefined); // Nothing left to prune } - const count = this._eventHandler.getCacheMessageCount(fid, UserPostfix.VerificationMessage); + const count = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.VerificationMessage); if (count.isErr()) { return err(count.error); } diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index 97237453..c805eb47 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -66,6 +66,7 @@ export type HubErrorCode = | 'bad_request.validation_failure' | 'bad_request.duplicate' | 'bad_request.conflict' + | 'bad_request.prunable' /* The requested resource could not be found */ | 'not_found' /* The request could not be completed because the operation is not executable */