feat: reject prunable messages on merge (#928)

* feat: don't merge messages that would immediately be pruned

* Fix tests and minor cleanup/review comments

* Support all other stores

* Add changeset

* use prune iterator with keys available and expand cast store tests

* re-add prune iterator args

* Additional tests

* Add tests for other stores

---------

Co-authored-by: Sanjay Raveendran <sanjayprabhu@gmail.com>
This commit is contained in:
Paul Fletcher-Hill
2023-05-04 13:35:32 -04:00
committed by GitHub
parent d2cb5e4e30
commit 2e633db09b
21 changed files with 681 additions and 141 deletions

View File

@@ -0,0 +1,6 @@
---
'@farcaster/core': patch
'@farcaster/hubble': patch
---
Reject prunable messages on merge

View File

@@ -22,6 +22,7 @@ import { sleep, sleepWhile } from '~/utils/crypto';
import { EthEventsProvider } from '~/eth/ethEventsProvider';
import { Contract } from 'ethers';
import { IdRegistry, NameRegistry } from '~/eth/abis';
import { getFarcasterTime } from '@farcaster/core';
/* eslint-disable security/detect-non-literal-fs-filename */
@@ -51,11 +52,12 @@ beforeAll(async () => {
});
describe('Multi peer sync engine', () => {
const addMessagesWithTimestamps = async (engine: Engine, timestamps: number[]) => {
const addMessagesWithTimeDelta = async (engine: Engine, timeDelta: number[]) => {
return await Promise.all(
timestamps.map(async (t) => {
timeDelta.map(async (t) => {
const farcasterTime = getFarcasterTime()._unsafeUnwrap();
const cast = await Factories.CastAddMessage.create(
{ data: { fid, network, timestamp: t } },
{ data: { fid, network, timestamp: farcasterTime + t } },
{ transient: { signer } }
);
@@ -156,7 +158,7 @@ describe('Multi peer sync engine', () => {
await engine1.mergeMessage(signerAdd);
// Add messages to engine 1
await addMessagesWithTimestamps(engine1, [30662167, 30662169, 30662172]);
await addMessagesWithTimeDelta(engine1, [167, 169, 172]);
await sleepWhile(() => syncEngine1.syncTrieQSize > 0, 1000);
const engine2 = new Engine(testDb2, network);
@@ -185,7 +187,7 @@ describe('Multi peer sync engine', () => {
).toBeFalsy();
// Add more messages
await addMessagesWithTimestamps(engine1, [30663167, 30663169, 30663172]);
await addMessagesWithTimeDelta(engine1, [367, 369, 372]);
await sleepWhile(() => syncEngine1.syncTrieQSize > 0, 1000);
// grab a new snapshot from the RPC for engine1
@@ -220,7 +222,7 @@ describe('Multi peer sync engine', () => {
await engine1.mergeMessage(signerAdd);
// Add a cast to engine1
const castAdd = (await addMessagesWithTimestamps(engine1, [30662167]))[0] as Message;
const castAdd = (await addMessagesWithTimeDelta(engine1, [167]))[0] as Message;
await sleepWhile(() => syncEngine1.syncTrieQSize > 0, 1000);
const engine2 = new Engine(testDb2, network);
@@ -308,7 +310,7 @@ describe('Multi peer sync engine', () => {
await engine1.mergeMessage(signerAdd);
// Add a cast to engine1
await addMessagesWithTimestamps(engine1, [30662167]);
await addMessagesWithTimeDelta(engine1, [167]);
// Do not merge the custory event into engine2
const engine2 = new Engine(testDb2, network);
@@ -408,8 +410,8 @@ describe('Multi peer sync engine', () => {
expect(await syncEngine1.trie.rootHash()).toEqual(await syncEngine2.trie.rootHash());
// Add two different messages to engine1 and engine2
await addMessagesWithTimestamps(engine1, [30662167]);
await addMessagesWithTimestamps(engine2, [30662169]);
await addMessagesWithTimeDelta(engine1, [167]);
await addMessagesWithTimeDelta(engine2, [169]);
await sleepWhile(async () => (await syncEngine1.trie.items()) !== 2, 1000);
await sleepWhile(async () => (await syncEngine2.trie.items()) !== 2, 1000);
@@ -456,8 +458,8 @@ describe('Multi peer sync engine', () => {
expect(await syncEngine1.trie.rootHash()).toEqual(await syncEngine2.trie.rootHash());
// Add two different messages to engine1 and engine2
await addMessagesWithTimestamps(engine1, [30662167, 30662168]);
await addMessagesWithTimestamps(engine2, [30662169]);
await addMessagesWithTimeDelta(engine1, [167, 168]);
await addMessagesWithTimeDelta(engine2, [169]);
await sleepWhile(() => syncEngine1.syncTrieQSize > 0, 1000);
await sleepWhile(() => syncEngine2.syncTrieQSize > 0, 1000);
@@ -534,7 +536,7 @@ describe('Multi peer sync engine', () => {
timestamps.push(msgTimestamp + j);
}
// console.log('adding batch', i, ' of ', numBatches);
const addedMessages = await addMessagesWithTimestamps(engine1, timestamps);
const addedMessages = await addMessagesWithTimeDelta(engine1, timestamps);
await sleepWhile(() => syncEngine1.syncTrieQSize > 0, 1000);
castMessagesToRemove = addedMessages.slice(0, 10);

View File

@@ -65,11 +65,12 @@ describe('SyncEngine', () => {
await engine.stop();
});
const addMessagesWithTimestamps = async (timestamps: number[]) => {
const addMessagesWithTimestamps = async (timeDelta: number[]) => {
const results = await Promise.all(
timestamps.map(async (t) => {
timeDelta.map(async (t) => {
const farcasterTime = getFarcasterTime()._unsafeUnwrap();
const cast = await Factories.CastAddMessage.create(
{ data: { fid, network, timestamp: t } },
{ data: { fid, network, timestamp: farcasterTime + t } },
{ transient: { signer } }
);
@@ -168,19 +169,21 @@ describe('SyncEngine', () => {
const rsigneradd = await engine.mergeMessage(signerAdd);
expect(rsigneradd.isOk()).toBeTruthy();
const currentTime = getFarcasterTime()._unsafeUnwrap();
// Reaction
const reactionBody = {
targetCastId: { fid, hash: castAdd.hash },
type: ReactionType.LIKE,
};
const reaction1 = await Factories.ReactionAddMessage.create(
{ data: { fid, network, timestamp: 30662167, reactionBody } },
{ data: { fid, network, timestamp: currentTime + 10, reactionBody } },
{ transient: { signer } }
);
// Same reaction, but with different timestamp
const reaction2 = await Factories.ReactionAddMessage.create(
{ data: { fid, network, timestamp: 30662168, reactionBody } },
{ data: { fid, network, timestamp: currentTime + 15, reactionBody } },
{ transient: { signer } }
);
@@ -280,7 +283,7 @@ describe('SyncEngine', () => {
await engine.mergeIdRegistryEvent(custodyEvent);
await engine.mergeMessage(signerAdd);
await addMessagesWithTimestamps([30662167, 30662169, 30662172]);
await addMessagesWithTimestamps([167, 169, 172]);
expect(
(await syncEngine.syncStatus('test', (await syncEngine.getSnapshot())._unsafeUnwrap()))._unsafeUnwrap().shouldSync
).toBeFalsy();
@@ -290,9 +293,9 @@ describe('SyncEngine', () => {
await engine.mergeIdRegistryEvent(custodyEvent);
await engine.mergeMessage(signerAdd);
await addMessagesWithTimestamps([30662167, 30662169, 30662172]);
await addMessagesWithTimestamps([167, 169, 172]);
const oldSnapshot = (await syncEngine.getSnapshot())._unsafeUnwrap();
await addMessagesWithTimestamps([30662372]);
await addMessagesWithTimestamps([372]);
expect(oldSnapshot.excludedHashes).not.toEqual((await syncEngine.getSnapshot())._unsafeUnwrap().excludedHashes);
expect((await syncEngine.syncStatus('test', oldSnapshot))._unsafeUnwrap().shouldSync).toBeTruthy();
});
@@ -327,7 +330,7 @@ describe('SyncEngine', () => {
await engine.mergeNameRegistryEvent(Factories.NameRegistryEvent.build());
await engine.mergeNameRegistryEvent(Factories.NameRegistryEvent.build());
await engine.mergeMessage(signerAdd);
await addMessagesWithTimestamps([30662167, 30662169]);
await addMessagesWithTimestamps([167, 169]);
const stats = await syncEngine.getSyncStats();
expect(stats.numFids).toEqual(1);
@@ -339,7 +342,7 @@ describe('SyncEngine', () => {
await engine.mergeIdRegistryEvent(custodyEvent);
await engine.mergeMessage(signerAdd);
const messages = await addMessagesWithTimestamps([30662167, 30662169, 30662172]);
const messages = await addMessagesWithTimestamps([167, 169, 172]);
expect(await syncEngine.trie.items()).toEqual(4); // signerAdd + 3 messages
@@ -360,7 +363,7 @@ describe('SyncEngine', () => {
await engine.mergeIdRegistryEvent(custodyEvent);
await engine.mergeMessage(signerAdd);
const messages = await addMessagesWithTimestamps([30662167, 30662169, 30662172]);
const messages = await addMessagesWithTimestamps([167, 169, 172]);
expect(await syncEngine.trie.items()).toEqual(4); // signerAdd + 3 messages
@@ -381,13 +384,14 @@ describe('SyncEngine', () => {
await engine.mergeIdRegistryEvent(custodyEvent);
await engine.mergeMessage(signerAdd);
await addMessagesWithTimestamps([30662160, 30662169, 30662172]);
const nowOrig = Date.now;
Date.now = () => 1609459200000 + 30662167 * 1000;
Date.now = () => 1683074200000;
try {
await addMessagesWithTimestamps([160, 169, 172]);
Date.now = () => 1683074200000 + 167 * 1000;
const result = await syncEngine.getSnapshot();
const snapshot = result._unsafeUnwrap();
expect((snapshot.prefix as Buffer).toString('utf8')).toEqual('0030662160');
expect((snapshot.prefix as Buffer).toString('utf8')).toEqual('0073615160');
} finally {
Date.now = nowOrig;
}

View File

@@ -13,6 +13,7 @@ import { jestRocksDB } from '~/storage/db/jestUtils';
import { MockHub } from '~/test/mocks';
import { MockRpcClient } from './mock';
import { EMPTY_HASH } from './trieNode';
import { getFarcasterTime } from '@farcaster/core';
const testDb = jestRocksDB(`engine.syncEnginePerf.test`);
const testDb2 = jestRocksDB(`engine2.syncEnginePerf.test`);
@@ -36,11 +37,15 @@ beforeAll(async () => {
);
});
describe('SyncEngine', () => {
const makeMessagesWithTimestamps = async (timestamps: number[]): Promise<CastAddMessage[]> => {
describe('SyncEnginePerfTest', () => {
const makeMessagesWithTimeDelta = async (timeDeltas: number[]): Promise<CastAddMessage[]> => {
return await Promise.all(
timestamps.map(async (t) => {
return Factories.CastAddMessage.create({ data: { fid, network, timestamp: t } }, { transient: { signer } });
timeDeltas.map(async (t) => {
const farcasterTime = getFarcasterTime()._unsafeUnwrap();
return Factories.CastAddMessage.create(
{ data: { fid, network, timestamp: farcasterTime + t } },
{ transient: { signer } }
);
})
);
};
@@ -61,21 +66,24 @@ describe('SyncEngine', () => {
await hub2.submitIdRegistryEvent(custodyEvent);
await hub2.submitMessage(signerAdd);
Date.now = () => 1683074200000;
// Merge the same messages into both engines.
const messages = await makeMessagesWithTimestamps([30662167, 30662169, 30662172]);
const messages = await makeMessagesWithTimeDelta([167, 169, 172]);
for (const message of messages) {
await hub1.submitMessage(message);
await hub2.submitMessage(message);
let res = await hub1.submitMessage(message);
expect(res.isOk()).toBeTruthy();
res = await hub2.submitMessage(message);
expect(res.isOk()).toBeTruthy();
}
// Sanity check, they should equal
expect(await syncEngine1.trie.rootHash()).toEqual(await syncEngine2.trie.rootHash());
// A timestamp after all the messages
Date.now = () => 1609459200000 + 30662200 * 1000;
Date.now = () => 1683074200000 + 200 * 1000;
const snapshot2 = (await syncEngine2.getSnapshot())._unsafeUnwrap();
expect((snapshot2.prefix as Buffer).toString('utf8')).toEqual('0030662');
expect((snapshot2.prefix as Buffer).toString('utf8')).toEqual('0073615');
// Force a non-existent prefix (the original bug #536 is fixed)
snapshot2.prefix = Buffer.from('00306622', 'hex');

View File

@@ -18,6 +18,7 @@ import Server from '~/rpc/server';
import { jestRocksDB } from '~/storage/db/jestUtils';
import Engine from '~/storage/engine';
import { MockHub } from '~/test/mocks';
import { getFarcasterTime } from '@farcaster/core';
const db = jestRocksDB('protobufs.rpc.castService.test');
const network = FarcasterNetwork.TESTNET;
@@ -108,10 +109,11 @@ describe('getCast', () => {
test('returns casts in chronological order', async () => {
const castsAsJson = [];
let latestCast;
const currentTime = getFarcasterTime()._unsafeUnwrap();
for (let i = 0; i < 4; i++) {
latestCast = await Factories.CastAddMessage.create(
{
data: { fid, network, timestamp: i },
data: { fid, network, timestamp: currentTime + i },
},
{ transient: { signer } }
);

View File

@@ -3,6 +3,7 @@ import { jestRocksDB } from '~/storage/db/jestUtils';
import Engine from '~/storage/engine';
import { seedSigner } from '~/storage/engine/seed';
import { PruneMessagesJobScheduler } from '~/storage/jobs/pruneMessagesJob';
import { FARCASTER_EPOCH, getFarcasterTime } from '@farcaster/core';
const db = jestRocksDB('jobs.pruneMessagesJob.test');
@@ -48,21 +49,21 @@ describe('doJobs', () => {
test(
'prunes messages for all fids',
async () => {
const timestampToPrune = 1; // 1 second after farcaster epoch (1/1/22)
const currentTime = getFarcasterTime()._unsafeUnwrap();
const fid1 = Factories.Fid.build();
const signer1 = Factories.Ed25519Signer.build();
const signer1Key = (await signer1.getSignerKey())._unsafeUnwrap();
await seedSigner(engine, fid1, signer1Key);
await seedMessagesFromTimestamp(engine, fid1, signer1, timestampToPrune);
await seedMessagesFromTimestamp(engine, fid1, signer1, currentTime);
const fid2 = Factories.Fid.build();
const signer2 = Factories.Ed25519Signer.build();
const signer2Key = (await signer2.getSignerKey())._unsafeUnwrap();
await seedSigner(engine, fid2, signer2Key);
await seedMessagesFromTimestamp(engine, fid2, signer2, timestampToPrune);
await seedMessagesFromTimestamp(engine, fid2, signer2, currentTime);
for (const fid of [fid1, fid2]) {
const casts = await engine.getCastsByFid(fid);
@@ -72,8 +73,14 @@ describe('doJobs', () => {
expect(reactions._unsafeUnwrap().messages.length).toEqual(1);
}
const result = await scheduler.doJobs();
expect(result._unsafeUnwrap()).toEqual(undefined);
const nowOrig = Date.now;
Date.now = () => FARCASTER_EPOCH + (currentTime + 60 * 60 * 24 * 365 + 1) * 1000; // advance 1 year and 1 second
try {
const result = await scheduler.doJobs();
expect(result._unsafeUnwrap()).toEqual(undefined);
} finally {
Date.now = nowOrig;
}
for (const fid of [fid1, fid2]) {
const casts = await engine.getCastsByFid(fid);

View File

@@ -18,8 +18,9 @@ import { UserPostfix } from '~/storage/db/types';
import CastStore from '~/storage/stores/castStore';
import StoreEventHandler from '~/storage/stores/storeEventHandler';
import { sleep } from '~/utils/crypto';
import { err } from 'neverthrow';
import { err, ok } from 'neverthrow';
import { faker } from '@faker-js/faker';
import { FARCASTER_EPOCH } from '@farcaster/core';
const db = jestRocksDB('protobufs.castStore.test');
const eventHandler = new StoreEventHandler(db);
@@ -662,14 +663,12 @@ describe('pruneMessages', () => {
let add4: CastAddMessage;
let add5: CastAddMessage;
let addOld1: CastAddMessage;
let addOld2: CastAddMessage;
let remove1: CastRemoveMessage;
let remove2: CastRemoveMessage;
let remove3: CastRemoveMessage;
let remove4: CastRemoveMessage;
let remove5: CastRemoveMessage;
let removeOld3: CastRemoveMessage;
const generateAddWithTimestamp = async (fid: number, timestamp: number): Promise<CastAddMessage> => {
return Factories.CastAddMessage.create({
@@ -695,14 +694,12 @@ describe('pruneMessages', () => {
add4 = await generateAddWithTimestamp(fid, time + 4);
add5 = await generateAddWithTimestamp(fid, time + 5);
addOld1 = await generateAddWithTimestamp(fid, time - 60 * 60);
addOld2 = await generateAddWithTimestamp(fid, time - 60 * 60 + 1);
remove1 = await generateRemoveWithTimestamp(fid, time + 1, add1);
remove2 = await generateRemoveWithTimestamp(fid, time + 2, add2);
remove3 = await generateRemoveWithTimestamp(fid, time + 3, add3);
remove4 = await generateRemoveWithTimestamp(fid, time + 4, add4);
remove5 = await generateRemoveWithTimestamp(fid, time + 5, add5);
removeOld3 = await generateRemoveWithTimestamp(fid, time - 60 * 60 + 2);
});
beforeEach(async () => {
@@ -775,27 +772,99 @@ describe('pruneMessages', () => {
expect(prunedMessages).toEqual([]);
});
test('fails to merge message which would be immediately pruned', async () => {
await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(undefined));
await expect(sizePrunedStore.merge(add3)).resolves.toBeGreaterThan(0);
await expect(eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(add3.data.timestamp, add3.hash)
);
await expect(sizePrunedStore.merge(add2)).resolves.toBeGreaterThan(0);
await expect(eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(2));
await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(add2.data.timestamp, add2.hash)
);
await expect(sizePrunedStore.merge(remove2)).resolves.toBeGreaterThan(0);
await expect(eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(2));
await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(remove2.data.timestamp, remove2.hash)
);
await expect(sizePrunedStore.merge(add4)).resolves.toBeGreaterThan(0);
await expect(eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(3));
await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(remove2.data.timestamp, remove2.hash)
);
// remove1 is older than remove2 and the store is at capacity so it's rejected
await expect(sizePrunedStore.merge(remove1)).rejects.toEqual(
new HubError('bad_request.prunable', 'message would be pruned')
);
// add1 is older than remove2 and the store is at capacity so it's rejected
await expect(sizePrunedStore.merge(add1)).rejects.toEqual(
new HubError('bad_request.prunable', 'message would be pruned')
);
// merging add5 succeeds because while the store is at capacity, add5 would not be pruned
await expect(sizePrunedStore.merge(add5)).resolves.toBeGreaterThan(0);
await expect(eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(4));
await expect(eventHandler.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(remove2.data.timestamp, remove2.hash)
);
const result = await sizePrunedStore.pruneMessages(fid);
expect(result.isOk()).toBeTruthy();
expect(prunedMessages).toEqual([remove2]);
});
});
describe('with time limit', () => {
const timePrunedStore = new CastStore(db, eventHandler, { pruneTimeLimit: 60 * 60 - 1 });
test('prunes earliest messages', async () => {
const messages = [add1, remove2, addOld1, addOld2, removeOld3];
const messages = [add1, add2, remove3, add4];
for (const message of messages) {
await timePrunedStore.merge(message);
}
const result = await timePrunedStore.pruneMessages(fid);
expect(result.isOk()).toBeTruthy();
const nowOrig = Date.now;
Date.now = () => FARCASTER_EPOCH + (add4.data.timestamp - 1 + 60 * 60) * 1000;
try {
const result = await timePrunedStore.pruneMessages(fid);
expect(result.isOk()).toBeTruthy();
expect(prunedMessages).toEqual([addOld1, addOld2, removeOld3]);
expect(prunedMessages).toEqual([add1, add2, remove3]);
} finally {
Date.now = nowOrig;
}
await expect(timePrunedStore.getCastAdd(fid, addOld1.hash)).rejects.toThrow(HubError);
await expect(timePrunedStore.getCastAdd(fid, addOld2.hash)).rejects.toThrow(HubError);
await expect(timePrunedStore.getCastRemove(fid, removeOld3.data.castRemoveBody.targetHash)).rejects.toThrow(
await expect(timePrunedStore.getCastAdd(fid, add1.hash)).rejects.toThrow(HubError);
await expect(timePrunedStore.getCastAdd(fid, add1.hash)).rejects.toThrow(HubError);
await expect(timePrunedStore.getCastRemove(fid, remove3.data.castRemoveBody.targetHash)).rejects.toThrow(
HubError
);
});
test('fails to merge message which would be immediately pruned', async () => {
const messages = [add1, add2];
for (const message of messages) {
await timePrunedStore.merge(message);
}
await expect(timePrunedStore.merge(addOld1)).rejects.toEqual(
new HubError('bad_request.prunable', 'message would be pruned')
);
const result = await timePrunedStore.pruneMessages(fid);
expect(result.isOk()).toBeTruthy();
expect(prunedMessages).toEqual([]);
});
});
});

View File

@@ -282,6 +282,18 @@ class CastStore {
.acquire(
message.data.fid.toString(),
async () => {
const prunableResult = await this._eventHandler.isPrunable(
message,
UserPostfix.CastMessage,
this._pruneSizeLimit,
this._pruneTimeLimit
);
if (prunableResult.isErr()) {
throw prunableResult.error;
} else if (prunableResult.value) {
throw new HubError('bad_request.prunable', 'message would be pruned');
}
if (isCastAddMessage(message)) {
return this.mergeAdd(message);
} else if (isCastRemoveMessage(message)) {
@@ -316,7 +328,7 @@ class CastStore {
async pruneMessages(fid: number): HubAsyncResult<number[]> {
const commits: number[] = [];
const cachedCount = this._eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage);
const cachedCount = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage);
// Require storage cache to be synced to prune
if (cachedCount.isErr()) {
@@ -345,7 +357,7 @@ class CastStore {
return ok(undefined); // Nothing left to prune
}
const count = this._eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage);
const count = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.CastMessage);
if (count.isErr()) {
return err(count.error);
}

View File

@@ -22,6 +22,7 @@ import { getMessage, makeTsHash } from '~/storage/db/message';
import { UserPostfix } from '~/storage/db/types';
import ReactionStore from '~/storage/stores/reactionStore';
import StoreEventHandler from '~/storage/stores/storeEventHandler';
import { FARCASTER_EPOCH } from '@farcaster/core';
const db = jestRocksDB('protobufs.reactionStore.test');
const eventHandler = new StoreEventHandler(db);
@@ -824,7 +825,6 @@ describe('pruneMessages', () => {
let remove3: ReactionRemoveMessage;
let remove4: ReactionRemoveMessage;
let remove5: ReactionRemoveMessage;
let removeOld3: ReactionRemoveMessage;
const generateAddWithTimestamp = async (fid: number, timestamp: number): Promise<ReactionAddMessage> => {
return Factories.ReactionAddMessage.create({ data: { fid, timestamp } });
@@ -859,7 +859,6 @@ describe('pruneMessages', () => {
remove3 = await generateRemoveWithTimestamp(fid, time + 3, add3.data.reactionBody);
remove4 = await generateRemoveWithTimestamp(fid, time + 4, add4.data.reactionBody);
remove5 = await generateRemoveWithTimestamp(fid, time + 5, add5.data.reactionBody);
removeOld3 = await generateRemoveWithTimestamp(fid, time - 60 * 60 + 2);
});
beforeEach(async () => {
@@ -945,43 +944,90 @@ describe('pruneMessages', () => {
expect(prunedMessages).toEqual([]);
});
test('fails to add messages older than the earliest message', async () => {
const messages = [add1, add2, add3];
for (const message of messages) {
await sizePrunedStore.merge(message);
}
// Older messages are rejected
await expect(sizePrunedStore.merge(addOld1)).rejects.toEqual(
new HubError('bad_request.prunable', 'message would be pruned')
);
// newer messages can still be added
await expect(sizePrunedStore.merge(add4)).resolves.toBeGreaterThan(0);
// Prune removes earliest
const result = await sizePrunedStore.pruneMessages(fid);
expect(result.isOk()).toBeTruthy();
expect(result._unsafeUnwrap().length).toEqual(1);
expect(prunedMessages).toEqual([add1]);
});
});
describe('with time limit', () => {
const timePrunedStore = new ReactionStore(db, eventHandler, { pruneTimeLimit: 60 * 60 - 1 });
test('prunes earliest messages', async () => {
const messages = [add1, remove2, addOld1, addOld2, removeOld3];
const messages = [add1, add2, remove3, add4];
for (const message of messages) {
await timePrunedStore.merge(message);
}
const result = await timePrunedStore.pruneMessages(fid);
expect(result.isOk()).toBeTruthy();
const nowOrig = Date.now;
Date.now = () => FARCASTER_EPOCH + (add4.data.timestamp - 1 + 60 * 60) * 1000;
try {
const result = await timePrunedStore.pruneMessages(fid);
expect(result.isOk()).toBeTruthy();
} finally {
Date.now = nowOrig;
}
expect(prunedMessages).toEqual([addOld1, addOld2, removeOld3]);
expect(prunedMessages).toEqual([add1, add2, remove3]);
await expect(
timePrunedStore.getReactionAdd(
fid,
addOld1.data.reactionBody.type,
addOld1.data.reactionBody.targetCastId ?? Factories.CastId.build()
add1.data.reactionBody.type,
add1.data.reactionBody.targetCastId ?? Factories.CastId.build()
)
).rejects.toThrow(HubError);
await expect(
timePrunedStore.getReactionAdd(
fid,
addOld2.data.reactionBody.type,
addOld2.data.reactionBody.targetCastId ?? Factories.CastId.build()
add2.data.reactionBody.type,
add2.data.reactionBody.targetCastId ?? Factories.CastId.build()
)
).rejects.toThrow(HubError);
await expect(
timePrunedStore.getReactionRemove(
fid,
removeOld3.data.reactionBody.type,
removeOld3.data.reactionBody.targetCastId ?? Factories.CastId.build()
remove3.data.reactionBody.type,
remove3.data.reactionBody.targetCastId ?? Factories.CastId.build()
)
).rejects.toThrow(HubError);
});
test('fails to merge messages that would be immediately pruned', async () => {
const messages = [add1, add2];
for (const message of messages) {
await timePrunedStore.merge(message);
}
await expect(timePrunedStore.merge(addOld1)).rejects.toEqual(
new HubError('bad_request.prunable', 'message would be pruned')
);
await expect(timePrunedStore.merge(addOld2)).rejects.toEqual(
new HubError('bad_request.prunable', 'message would be pruned')
);
const result = await timePrunedStore.pruneMessages(fid);
expect(result.isOk()).toBeTruthy();
expect(prunedMessages).toEqual([]);
});
});
});

View File

@@ -289,6 +289,18 @@ class ReactionStore {
.acquire(
message.data.fid.toString(),
async () => {
const prunableResult = await this._eventHandler.isPrunable(
message,
UserPostfix.ReactionMessage,
this._pruneSizeLimit,
this._pruneTimeLimit
);
if (prunableResult.isErr()) {
throw prunableResult.error;
} else if (prunableResult.value) {
throw new HubError('bad_request.prunable', 'message would be pruned');
}
if (isReactionAddMessage(message)) {
return this.mergeAdd(message);
} else if (isReactionRemoveMessage(message)) {
@@ -323,7 +335,7 @@ class ReactionStore {
async pruneMessages(fid: number): HubAsyncResult<number[]> {
const commits: number[] = [];
const cachedCount = this._eventHandler.getCacheMessageCount(fid, UserPostfix.ReactionMessage);
const cachedCount = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.ReactionMessage);
// Require storage cache to be synced to prune
if (cachedCount.isErr()) {
@@ -352,7 +364,7 @@ class ReactionStore {
return ok(undefined); // Nothing left to prune
}
const count = this._eventHandler.getCacheMessageCount(fid, UserPostfix.ReactionMessage);
const count = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.ReactionMessage);
if (count.isErr()) {
return err(count.error);
}

View File

@@ -997,6 +997,7 @@ describe('pruneMessages', () => {
});
beforeEach(() => {
eventHandler.syncCache();
prunedMessages = [];
});
@@ -1009,6 +1010,7 @@ describe('pruneMessages', () => {
let add3: SignerAddMessage;
let add4: SignerAddMessage;
let add5: SignerAddMessage;
let addOld1: SignerAddMessage;
let remove1: SignerRemoveMessage;
let remove2: SignerRemoveMessage;
@@ -1039,6 +1041,7 @@ describe('pruneMessages', () => {
add3 = await generateAddWithTimestamp(fid, time + 3);
add4 = await generateAddWithTimestamp(fid, time + 4);
add5 = await generateAddWithTimestamp(fid, time + 5);
addOld1 = await generateAddWithTimestamp(fid, time - 60 * 60);
remove1 = await generateRemoveWithTimestamp(fid, time + 1, add1.data.signerAddBody.signer);
remove2 = await generateRemoveWithTimestamp(fid, time + 2, add2.data.signerAddBody.signer);
@@ -1118,5 +1121,17 @@ describe('pruneMessages', () => {
expect(prunedMessages).toEqual([]);
});
test('fails to add messages older than the earliest message', async () => {
const messages = [add1, add2, add3];
for (const message of messages) {
await sizePrunedStore.merge(message);
}
// Older messages are rejected
await expect(sizePrunedStore.merge(addOld1)).rejects.toEqual(
new HubError('bad_request.prunable', 'message would be pruned')
);
});
});
});

View File

@@ -257,6 +257,17 @@ class SignerStore {
.acquire(
message.data.fid.toString(),
async () => {
const prunableResult = await this._eventHandler.isPrunable(
message,
UserPostfix.SignerMessage,
this._pruneSizeLimit
);
if (prunableResult.isErr()) {
throw prunableResult.error;
} else if (prunableResult.value) {
throw new HubError('bad_request.prunable', 'message would be pruned');
}
if (isSignerAddMessage(message)) {
return this.mergeAdd(message);
} else if (isSignerRemoveMessage(message)) {
@@ -291,7 +302,7 @@ class SignerStore {
async pruneMessages(fid: number): HubAsyncResult<number[]> {
const commits: number[] = [];
const cachedCount = this._eventHandler.getCacheMessageCount(fid, UserPostfix.SignerMessage);
const cachedCount = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.SignerMessage);
// Require storage cache to be synced to prune
if (cachedCount.isErr()) {
@@ -312,7 +323,7 @@ class SignerStore {
return ok(undefined); // Nothing left to prune
}
const count = this._eventHandler.getCacheMessageCount(fid, UserPostfix.SignerMessage);
const count = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.SignerMessage);
if (count.isErr()) {
return err(count.error);
}

View File

@@ -1,7 +1,7 @@
import { ok, err } from 'neverthrow';
import { HubEvent, HubEventType, Factories, HubError } from '@farcaster/hub-nodejs';
import { ok } from 'neverthrow';
import { Factories, HubEvent, HubEventType } from '@farcaster/hub-nodejs';
import { jestRocksDB } from '~/storage/db/jestUtils';
import { putMessage } from '~/storage/db/message';
import { makeTsHash, putMessage } from '~/storage/db/message';
import { UserPostfix } from '~/storage/db/types';
import { StorageCache } from '~/storage/stores/storageCache';
@@ -10,7 +10,7 @@ const db = jestRocksDB('engine.storageCache.test');
let cache: StorageCache;
beforeEach(() => {
cache = new StorageCache();
cache = new StorageCache(db);
});
describe('syncFromDb', () => {
@@ -45,29 +45,81 @@ describe('syncFromDb', () => {
await putMessage(db, message);
}
}
await cache.syncFromDb(db);
await cache.syncFromDb();
for (const fidUsage of usage) {
expect(cache.getMessageCount(fidUsage.fid, UserPostfix.CastMessage)).toEqual(ok(fidUsage.usage.cast));
expect(cache.getMessageCount(fidUsage.fid, UserPostfix.ReactionMessage)).toEqual(ok(fidUsage.usage.reaction));
expect(cache.getMessageCount(fidUsage.fid, UserPostfix.VerificationMessage)).toEqual(
await expect(cache.getMessageCount(fidUsage.fid, UserPostfix.CastMessage)).resolves.toEqual(
ok(fidUsage.usage.cast)
);
await expect(cache.getMessageCount(fidUsage.fid, UserPostfix.ReactionMessage)).resolves.toEqual(
ok(fidUsage.usage.reaction)
);
await expect(cache.getMessageCount(fidUsage.fid, UserPostfix.VerificationMessage)).resolves.toEqual(
ok(fidUsage.usage.verification)
);
expect(cache.getMessageCount(fidUsage.fid, UserPostfix.UserDataMessage)).toEqual(ok(fidUsage.usage.userData));
expect(cache.getMessageCount(fidUsage.fid, UserPostfix.SignerMessage)).toEqual(ok(fidUsage.usage.signer));
await expect(cache.getMessageCount(fidUsage.fid, UserPostfix.UserDataMessage)).resolves.toEqual(
ok(fidUsage.usage.userData)
);
await expect(cache.getMessageCount(fidUsage.fid, UserPostfix.SignerMessage)).resolves.toEqual(
ok(fidUsage.usage.signer)
);
}
});
});
describe('getMessageCount', () => {
test('returns the correct count even if the cache is not synced', async () => {
const fid = Factories.Fid.build();
const message = await Factories.CastAddMessage.create({ data: { fid } });
const message2 = await Factories.CastAddMessage.create({ data: { fid } });
const message3_differnt_fid = await Factories.CastAddMessage.create();
await putMessage(db, message);
await putMessage(db, message2);
await putMessage(db, message3_differnt_fid);
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(2));
await expect(cache.getMessageCount(message3_differnt_fid.data.fid, UserPostfix.CastMessage)).resolves.toEqual(
ok(1)
);
await expect(cache.getMessageCount(Factories.Fid.build(), UserPostfix.CastMessage)).resolves.toEqual(ok(0));
});
});
describe('getEarliestTsHash', () => {
test('returns undefined if there are no messages', async () => {
await expect(cache.getEarliestTsHash(Factories.Fid.build(), UserPostfix.CastMessage)).resolves.toEqual(
ok(undefined)
);
});
test('returns the earliest tsHash by scanning the db on first use', async () => {
const fid = Factories.Fid.build();
const first = await Factories.CastAddMessage.create({ data: { fid, timestamp: 123 } });
const second = await Factories.CastAddMessage.create({ data: { fid, timestamp: 213 } });
const third = await Factories.CastAddMessage.create({ data: { fid, timestamp: 321 } });
await putMessage(db, second);
await putMessage(db, first);
await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(first.data.timestamp, first.hash)
);
await putMessage(db, third);
// Unchanged
await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(first.data.timestamp, first.hash)
);
});
});
describe('processEvent', () => {
test('increments count with merge cast message event', async () => {
const fid = Factories.Fid.build();
const message = await Factories.CastAddMessage.create({ data: { fid } });
const event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message } });
await cache.syncFromDb(db);
expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(0));
await cache.syncFromDb();
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(0));
cache.processEvent(event);
expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(1));
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
});
test('increments count with merge cast remove message event', async () => {
@@ -75,10 +127,10 @@ describe('processEvent', () => {
const message = await Factories.CastRemoveMessage.create({ data: { fid } });
const event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message } });
await cache.syncFromDb(db);
expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(0));
await cache.syncFromDb();
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(0));
cache.processEvent(event);
expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(1));
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
});
test('count is unchanged when removing existing cast', async () => {
@@ -93,10 +145,10 @@ describe('processEvent', () => {
});
await putMessage(db, cast);
await cache.syncFromDb(db);
expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(1));
await cache.syncFromDb();
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
cache.processEvent(event);
expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).toEqual(ok(1));
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
});
test('count is decremented with prune message event', async () => {
@@ -105,10 +157,10 @@ describe('processEvent', () => {
const event = HubEvent.create({ type: HubEventType.PRUNE_MESSAGE, pruneMessageBody: { message } });
await putMessage(db, message);
await cache.syncFromDb(db);
expect(cache.getMessageCount(fid, UserPostfix.ReactionMessage)).toEqual(ok(1));
await cache.syncFromDb();
await expect(cache.getMessageCount(fid, UserPostfix.ReactionMessage)).resolves.toEqual(ok(1));
cache.processEvent(event);
expect(cache.getMessageCount(fid, UserPostfix.ReactionMessage)).toEqual(ok(0));
await expect(cache.getMessageCount(fid, UserPostfix.ReactionMessage)).resolves.toEqual(ok(0));
});
test('count is decremented with revoke message event', async () => {
@@ -117,18 +169,78 @@ describe('processEvent', () => {
const event = HubEvent.create({ type: HubEventType.REVOKE_MESSAGE, revokeMessageBody: { message } });
await putMessage(db, message);
await cache.syncFromDb(db);
expect(cache.getMessageCount(fid, UserPostfix.SignerMessage)).toEqual(ok(1));
await cache.syncFromDb();
await expect(cache.getMessageCount(fid, UserPostfix.SignerMessage)).resolves.toEqual(ok(1));
cache.processEvent(event);
expect(cache.getMessageCount(fid, UserPostfix.SignerMessage)).toEqual(ok(0));
await expect(cache.getMessageCount(fid, UserPostfix.SignerMessage)).resolves.toEqual(ok(0));
});
test('fails when cache is not synced', async () => {
test('sets earliest tsHash with merge cast message event', async () => {
const fid = Factories.Fid.build();
const message = await Factories.CastAddMessage.create({ data: { fid } });
const event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message } });
expect(cache.processEvent(event)).toEqual(
err(new HubError('unavailable.storage_failure', 'storage cache is not synced with db'))
const middleMessage = await Factories.CastAddMessage.create({ data: { fid } });
let event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message: middleMessage } });
// Earliest tsHash is undefined initially
await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(undefined));
cache.processEvent(event);
// Earliest tsHash is set
await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(middleMessage.data.timestamp, middleMessage.hash)
);
// Adding a later messages does not change the earliest tsHash
const laterMessage = await Factories.CastAddMessage.create({
data: { fid, timestamp: middleMessage.data.timestamp + 10 },
});
event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message: laterMessage } });
cache.processEvent(event);
await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(middleMessage.data.timestamp, middleMessage.hash)
);
// Adding an earlier message changes the earliest tsHash
const earlierMessage = await Factories.CastAddMessage.create({
data: { fid, timestamp: middleMessage.data.timestamp - 10 },
});
event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message: earlierMessage } });
cache.processEvent(event);
await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(earlierMessage.data.timestamp, earlierMessage.hash)
);
});
test('unsets the earliest tsHash if the earliest message is removed', async () => {
const fid = Factories.Fid.build();
const firstMessage = await Factories.ReactionAddMessage.create({ data: { fid } });
const laterMessage = await Factories.ReactionAddMessage.create({
data: { fid, timestamp: firstMessage.data.timestamp + 10 },
});
const firstEvent = HubEvent.create({
type: HubEventType.PRUNE_MESSAGE,
pruneMessageBody: { message: firstMessage },
});
const laterEvent = HubEvent.create({
type: HubEventType.PRUNE_MESSAGE,
pruneMessageBody: { message: laterMessage },
});
await putMessage(db, firstMessage);
await putMessage(db, laterMessage);
await cache.syncFromDb();
await expect(cache.getEarliestTsHash(fid, UserPostfix.ReactionMessage)).resolves.toEqual(
makeTsHash(firstMessage.data.timestamp, firstMessage.hash)
);
cache.processEvent(laterEvent);
// Unchanged
await expect(cache.getEarliestTsHash(fid, UserPostfix.ReactionMessage)).resolves.toEqual(
makeTsHash(firstMessage.data.timestamp, firstMessage.hash)
);
cache.processEvent(firstEvent);
// Unset
await expect(cache.getEarliestTsHash(fid, UserPostfix.ReactionMessage)).resolves.toEqual(ok(undefined));
});
});

View File

@@ -1,17 +1,24 @@
import {
HubError,
HubEvent,
HubResult,
isMergeMessageHubEvent,
isPruneMessageHubEvent,
isRevokeMessageHubEvent,
Message,
HubError,
HubResult,
} from '@farcaster/hub-nodejs';
import { ok, err } from 'neverthrow';
import { err, ok } from 'neverthrow';
import RocksDB from '~/storage/db/rocksdb';
import { FID_BYTES, RootPrefix, UserMessagePostfix, UserMessagePostfixMax } from '~/storage/db/types';
import { logger } from '~/utils/logger';
import { makeFidKey, typeToSetPostfix } from '~/storage/db/message';
import {
getMessagesPruneIterator,
makeFidKey,
makeMessagePrimaryKey,
makeTsHash,
typeToSetPostfix,
} from '~/storage/db/message';
import { bytesCompare, HubAsyncResult } from '@farcaster/core';
const makeKey = (fid: number, set: UserMessagePostfix): string => {
return Buffer.concat([makeFidKey(fid), Buffer.from([set])]).toString('hex');
@@ -20,51 +27,87 @@ const makeKey = (fid: number, set: UserMessagePostfix): string => {
const log = logger.child({ component: 'StorageCache' });
export class StorageCache {
private _usage: Map<string, number>;
private _synced = false;
private _db: RocksDB;
private _counts: Map<string, number>;
private _earliestTsHashes: Map<string, Uint8Array>;
constructor(usage?: Map<string, number>) {
this._usage = usage ?? new Map();
constructor(db: RocksDB, usage?: Map<string, number>) {
this._counts = usage ?? new Map();
this._earliestTsHashes = new Map();
this._db = db;
}
async syncFromDb(db: RocksDB): Promise<void> {
async syncFromDb(): Promise<void> {
log.info('starting storage cache sync');
const usage = new Map<string, number>();
const prefix = Buffer.from([RootPrefix.User]);
const iterator = db.iteratorByPrefix(prefix);
const iterator = this._db.iteratorByPrefix(prefix);
for await (const [key] of iterator) {
const postfix = (key as Buffer).readUint8(1 + FID_BYTES);
if (postfix < UserMessagePostfixMax) {
const usageKey = (key as Buffer).subarray(1, 1 + FID_BYTES + 1).toString('hex');
const count = usage.get(usageKey) ?? 0;
usage.set(usageKey, count + 1);
const lookupKey = (key as Buffer).subarray(1, 1 + FID_BYTES + 1).toString('hex');
const count = usage.get(lookupKey) ?? 0;
if (this._earliestTsHashes.get(lookupKey) === undefined) {
const tsHash = Uint8Array.from((key as Buffer).subarray(1 + FID_BYTES + 1));
this._earliestTsHashes.set(lookupKey, tsHash);
}
usage.set(lookupKey, count + 1);
}
}
this._usage = usage;
this._synced = true;
this._counts = usage;
this._earliestTsHashes = new Map();
log.info('storage cache synced');
}
getMessageCount(fid: number, set: UserMessagePostfix): HubResult<number> {
if (this._synced !== true) {
const error = new HubError('unavailable.storage_failure', 'storage cache is not synced with db');
log.warn({ errCode: error.errCode }, `getMessageCount error: ${error.message}`);
return err(error);
}
async getMessageCount(fid: number, set: UserMessagePostfix): HubAsyncResult<number> {
const key = makeKey(fid, set);
return ok(this._usage.get(key) ?? 0);
if (this._counts.get(key) === undefined) {
const iterator = getMessagesPruneIterator(this._db, fid, set);
for await (const [,] of iterator) {
const count = this._counts.get(key) ?? 0;
this._counts.set(key, count + 1);
}
}
return ok(this._counts.get(key) ?? 0);
}
async getEarliestTsHash(fid: number, set: UserMessagePostfix): HubAsyncResult<Uint8Array | undefined> {
const key = makeKey(fid, set);
const messageCount = await this.getMessageCount(fid, set);
if (messageCount.isErr()) {
return err(messageCount.error);
}
if (messageCount.value === 0) {
return ok(undefined);
}
const value = this._earliestTsHashes.get(key);
if (value === undefined) {
const prefix = makeMessagePrimaryKey(fid, set);
const iterator = this._db.iteratorByPrefix(prefix, { values: false });
const [firstKey] = await iterator.next();
await iterator.end();
if (firstKey === undefined) {
return ok(undefined);
}
if (firstKey && firstKey.length === 0) {
return err(new HubError('unavailable.storage_failure', 'could not read earliest message from db'));
}
const tsHash = Uint8Array.from(firstKey.subarray(1 + FID_BYTES + 1));
this._earliestTsHashes.set(key, tsHash);
return ok(tsHash);
} else {
return ok(value);
}
}
processEvent(event: HubEvent): HubResult<void> {
if (this._synced !== true) {
const error = new HubError('unavailable.storage_failure', 'storage cache is not synced with db');
log.error({ errCode: error.errCode }, `processEvent error: ${error.message}`);
return err(error);
}
if (isMergeMessageHubEvent(event)) {
this.addMessage(event.mergeMessageBody.message);
for (const message of event.mergeMessageBody.deletedMessages) {
@@ -83,8 +126,18 @@ export class StorageCache {
const set = typeToSetPostfix(message.data.type);
const fid = message.data.fid;
const key = makeKey(fid, set);
const count = this._usage.get(key) ?? 0;
this._usage.set(key, count + 1);
const count = this._counts.get(key) ?? 0;
this._counts.set(key, count + 1);
const tsHashResult = makeTsHash(message.data.timestamp, message.hash);
if (!tsHashResult.isOk()) {
log.error(`error: could not make ts hash for message ${message.hash}`);
return;
}
const currentEarliest = this._earliestTsHashes.get(key);
if (currentEarliest === undefined || bytesCompare(currentEarliest, tsHashResult.value) > 0) {
this._earliestTsHashes.set(key, tsHashResult.value);
}
}
}
@@ -93,11 +146,21 @@ export class StorageCache {
const set = typeToSetPostfix(message.data.type);
const fid = message.data.fid;
const key = makeKey(fid, set);
const count = this._usage.get(key) ?? 0;
const count = this._counts.get(key) ?? 0;
if (count === 0) {
log.error(`error: ${set} store message count is already at 0 for fid ${fid}`);
} else {
this._usage.set(key, count - 1);
this._counts.set(key, count - 1);
}
const tsHashResult = makeTsHash(message.data.timestamp, message.hash);
if (!tsHashResult.isOk()) {
log.error(`error: could not make ts hash for message ${message.hash}`);
return;
}
const currentEarliest = this._earliestTsHashes.get(key);
if (currentEarliest === undefined || bytesCompare(currentEarliest, tsHashResult.value) === 0) {
this._earliestTsHashes.delete(key);
}
}
}

View File

@@ -1,15 +1,17 @@
import { CastAddMessage, HubEvent, HubEventType, Factories } from '@farcaster/hub-nodejs';
import { CastAddMessage, Factories, HubEvent, HubEventType } from '@farcaster/hub-nodejs';
import { ok, Result } from 'neverthrow';
import { jestRocksDB } from '~/storage/db/jestUtils';
import { getMessage, makeTsHash, putMessageTransaction } from '~/storage/db/message';
import { getMessage, makeTsHash, putMessage, putMessageTransaction } from '~/storage/db/message';
import { UserPostfix } from '~/storage/db/types';
import StoreEventHandler, { HubEventArgs, HubEventIdGenerator } from '~/storage/stores/storeEventHandler';
import { sleep } from '~/utils/crypto';
import { getFarcasterTime } from '@farcaster/core';
const db = jestRocksDB('stores.storeEventHandler.test');
const eventHandler = new StoreEventHandler(db);
let events: HubEvent[] = [];
let currentTime = 0;
const eventListener = (event: HubEvent) => {
events.push(event);
@@ -21,6 +23,7 @@ beforeAll(() => {
beforeEach(() => {
events = [];
currentTime = getFarcasterTime()._unsafeUnwrap();
});
afterAll(() => {
@@ -73,6 +76,50 @@ describe('commitTransaction', () => {
});
});
describe('isPrunable', () => {
test('returns true if messsage is earlier than prune time limit', async () => {
message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 101 } });
await expect(eventHandler.isPrunable(message, UserPostfix.CastMessage, 10, 100)).resolves.toEqual(ok(true));
});
test('returns false if there is no prune time limit', async () => {
message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 101 } });
await expect(eventHandler.isPrunable(message, UserPostfix.CastMessage, 10)).resolves.toEqual(ok(false));
});
test('returns false if message is later than prune time limit', async () => {
message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 50 } });
await expect(eventHandler.isPrunable(message, UserPostfix.CastMessage, 10, 100)).resolves.toEqual(ok(false));
});
test('returns false if under size limit', async () => {
message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 50 } });
await putMessage(db, message);
await expect(eventHandler.getCacheMessageCount(message.data.fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
await expect(eventHandler.isPrunable(message, UserPostfix.CastMessage, 1, 100)).resolves.toEqual(ok(false));
});
test('returns false if over size limit and message is later than earliest message', async () => {
message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 50 } });
await putMessage(db, message);
await expect(eventHandler.getCacheMessageCount(message.data.fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
const laterMessage = await Factories.CastAddMessage.create({
data: { fid: message.data.fid, timestamp: currentTime + 50 },
});
await expect(eventHandler.isPrunable(laterMessage, UserPostfix.CastMessage, 1, 100)).resolves.toEqual(ok(false));
});
test('returns true if over size limit and message is earlier than earliest message', async () => {
message = await Factories.CastAddMessage.create({ data: { timestamp: currentTime - 50 } });
await putMessage(db, message);
await expect(eventHandler.getCacheMessageCount(message.data.fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
await expect(eventHandler.getEarliestTsHash(message.data.fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(message.data.timestamp, message.hash)
);
const earlierMessage = await Factories.CastAddMessage.create({
data: { fid: message.data.fid, timestamp: currentTime - 75 },
});
await expect(eventHandler.isPrunable(earlierMessage, UserPostfix.CastMessage, 1, 100)).resolves.toEqual(ok(true));
});
});
describe('pruneEvents', () => {
test('deletes events based on time limit', async () => {
const message1 = await Factories.Message.create();

View File

@@ -23,11 +23,36 @@ import { TypedEmitter } from 'tiny-typed-emitter';
import RocksDB, { Iterator, Transaction } from '~/storage/db/rocksdb';
import { RootPrefix, UserMessagePostfix } from '~/storage/db/types';
import { StorageCache } from '~/storage/stores/storageCache';
import { makeTsHash } from '~/storage/db/message';
import {
bytesCompare,
CastAddMessage,
CastRemoveMessage,
getFarcasterTime,
ReactionAddMessage,
ReactionRemoveMessage,
SignerAddMessage,
SignerRemoveMessage,
UserDataAddMessage,
VerificationAddEthAddressMessage,
VerificationRemoveMessage,
} from '@farcaster/core';
const PRUNE_TIME_LIMIT_DEFAULT = 60 * 60 * 24 * 3 * 1000; // 3 days in ms
const DEFAULT_LOCK_MAX_PENDING = 1_000;
const DEFAULT_LOCK_TIMEOUT = 500; // in ms
type PrunableMessage =
| CastAddMessage
| CastRemoveMessage
| ReactionAddMessage
| ReactionRemoveMessage
| SignerAddMessage
| SignerRemoveMessage
| UserDataAddMessage
| VerificationAddEthAddressMessage
| VerificationRemoveMessage;
export type StoreEvents = {
/**
* mergeMessage is emitted when a message is merged into one of the stores. If
@@ -149,15 +174,19 @@ class StoreEventHandler extends TypedEmitter<StoreEvents> {
timeout: options.lockTimeout ?? DEFAULT_LOCK_TIMEOUT,
});
this._storageCache = new StorageCache();
this._storageCache = new StorageCache(this._db);
}
getCacheMessageCount(fid: number, set: UserMessagePostfix): HubResult<number> {
async getCacheMessageCount(fid: number, set: UserMessagePostfix): HubAsyncResult<number> {
return this._storageCache.getMessageCount(fid, set);
}
async getEarliestTsHash(fid: number, set: UserMessagePostfix): HubAsyncResult<Uint8Array | undefined> {
return this._storageCache.getEarliestTsHash(fid, set);
}
async syncCache(): HubAsyncResult<void> {
return ResultAsync.fromPromise(this._storageCache.syncFromDb(this._db), (e) => e as HubError);
return ResultAsync.fromPromise(this._storageCache.syncFromDb(), (e) => e as HubError);
}
async getEvent(id: number): HubAsyncResult<HubEvent> {
@@ -188,6 +217,52 @@ class StoreEventHandler extends TypedEmitter<StoreEvents> {
return ok(events);
}
public async isPrunable(
message: PrunableMessage,
set: UserMessagePostfix,
sizeLimit: number,
timeLimit: number | undefined = undefined
): HubAsyncResult<boolean> {
const farcasterTime = getFarcasterTime();
if (farcasterTime.isErr()) {
return err(farcasterTime.error);
}
// Calculate the timestamp cut-off to prune if set supports time based expiry
if (timeLimit !== undefined) {
const timestampToPrune = farcasterTime.value - timeLimit;
if (message.data.timestamp < timestampToPrune) {
return ok(true);
}
}
const messageCount = await this.getCacheMessageCount(message.data.fid, set);
if (messageCount.isErr()) {
return err(messageCount.error);
}
if (messageCount.value < sizeLimit) {
return ok(false);
}
const earliestTimestamp = await this.getEarliestTsHash(message.data.fid, set);
if (earliestTimestamp.isErr()) {
return err(earliestTimestamp.error);
}
const tsHash = makeTsHash(message.data.timestamp, message.hash);
if (tsHash.isErr()) {
return err(tsHash.error);
}
if (earliestTimestamp.value === undefined) {
return ok(false);
}
if (bytesCompare(tsHash.value, earliestTimestamp.value) < 0) {
return ok(true);
} else {
return ok(false);
}
}
async commitTransaction(txn: Transaction, eventArgs: HubEventArgs): HubAsyncResult<number> {
return this._lock
.acquire('commit', async () => {

View File

@@ -275,6 +275,7 @@ describe('pruneMessages', () => {
let add2: UserDataAddMessage;
let add3: UserDataAddMessage;
let add4: UserDataAddMessage;
let addOld1: UserDataAddMessage;
const generateAddWithTimestamp = async (
fid: number,
@@ -290,6 +291,7 @@ describe('pruneMessages', () => {
add2 = await generateAddWithTimestamp(fid, time + 2, UserDataType.DISPLAY);
add3 = await generateAddWithTimestamp(fid, time + 3, UserDataType.BIO);
add4 = await generateAddWithTimestamp(fid, time + 5, UserDataType.URL);
addOld1 = await generateAddWithTimestamp(fid, time - 60 * 60, UserDataType.URL);
});
beforeEach(async () => {
@@ -321,5 +323,17 @@ describe('pruneMessages', () => {
await expect(getAdd()).rejects.toThrow(HubError);
}
});
test('fails to add messages older than the earliest message', async () => {
const messages = [add1, add2, add3];
for (const message of messages) {
await sizePrunedStore.merge(message);
}
// Older messages are rejected
await expect(sizePrunedStore.merge(addOld1)).rejects.toEqual(
new HubError('bad_request.prunable', 'message would be pruned')
);
});
});
});

View File

@@ -138,6 +138,16 @@ class UserDataStore {
.acquire(
message.data.fid.toString(),
async () => {
const prunableResult = await this._eventHandler.isPrunable(
message,
UserPostfix.UserDataMessage,
this._pruneSizeLimit
);
if (prunableResult.isErr()) {
throw prunableResult.error;
} else if (prunableResult.value) {
throw new HubError('bad_request.prunable', 'message would be pruned');
}
return this.mergeDataAdd(message);
},
{ timeout: MERGE_TIMEOUT_DEFAULT }
@@ -164,7 +174,7 @@ class UserDataStore {
async pruneMessages(fid: number): HubAsyncResult<number[]> {
const commits: number[] = [];
const cachedCount = this._eventHandler.getCacheMessageCount(fid, UserPostfix.UserDataMessage);
const cachedCount = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.UserDataMessage);
// Require storage cache to be synced to prune
if (cachedCount.isErr()) {
@@ -185,7 +195,7 @@ class UserDataStore {
return ok(undefined); // Nothing left to prune
}
const count = this._eventHandler.getCacheMessageCount(fid, UserPostfix.UserDataMessage);
const count = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.UserDataMessage);
if (count.isErr()) {
return err(count.error);
}

View File

@@ -493,6 +493,7 @@ describe('pruneMessages', () => {
let add3: VerificationAddEthAddressMessage;
let add4: VerificationAddEthAddressMessage;
let add5: VerificationAddEthAddressMessage;
let addOld1: VerificationAddEthAddressMessage;
let remove1: VerificationRemoveMessage;
let remove2: VerificationRemoveMessage;
@@ -524,6 +525,7 @@ describe('pruneMessages', () => {
add3 = await generateAddWithTimestamp(fid, time + 3);
add4 = await generateAddWithTimestamp(fid, time + 4);
add5 = await generateAddWithTimestamp(fid, time + 5);
addOld1 = await generateAddWithTimestamp(fid, time - 60 * 60);
remove1 = await generateRemoveWithTimestamp(fid, time + 1, add1.data.verificationAddEthAddressBody.address);
remove2 = await generateRemoveWithTimestamp(fid, time + 2, add2.data.verificationAddEthAddressBody.address);
@@ -603,5 +605,17 @@ describe('pruneMessages', () => {
expect(prunedMessages).toEqual([]);
});
test('fails to add messages older than the earliest message', async () => {
const messages = [add1, add2, add3];
for (const message of messages) {
await sizePrunedStore.merge(message);
}
// Older messages are rejected
await expect(sizePrunedStore.merge(addOld1)).rejects.toEqual(
new HubError('bad_request.prunable', 'message would be pruned')
);
});
});
});

View File

@@ -170,6 +170,16 @@ class VerificationStore {
.acquire(
message.data.fid.toString(),
async () => {
const prunableResult = await this._eventHandler.isPrunable(
message,
UserPostfix.VerificationMessage,
this._pruneSizeLimit
);
if (prunableResult.isErr()) {
throw prunableResult.error;
} else if (prunableResult.value) {
throw new HubError('bad_request.prunable', 'message would be pruned');
}
if (isVerificationAddEthAddressMessage(message)) {
return this.mergeAdd(message);
} else if (isVerificationRemoveMessage(message)) {
@@ -204,7 +214,7 @@ class VerificationStore {
async pruneMessages(fid: number): HubAsyncResult<number[]> {
const commits: number[] = [];
const cachedCount = this._eventHandler.getCacheMessageCount(fid, UserPostfix.VerificationMessage);
const cachedCount = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.VerificationMessage);
// Require storage cache to be synced to prune
if (cachedCount.isErr()) {
@@ -225,7 +235,7 @@ class VerificationStore {
return ok(undefined); // Nothing left to prune
}
const count = this._eventHandler.getCacheMessageCount(fid, UserPostfix.VerificationMessage);
const count = await this._eventHandler.getCacheMessageCount(fid, UserPostfix.VerificationMessage);
if (count.isErr()) {
return err(count.error);
}

View File

@@ -66,6 +66,7 @@ export type HubErrorCode =
| 'bad_request.validation_failure'
| 'bad_request.duplicate'
| 'bad_request.conflict'
| 'bad_request.prunable'
/* The requested resource could not be found */
| 'not_found'
/* The request could not be completed because the operation is not executable */