From a55fb80c34cd99b33c8c8a8d675aa284acb452b1 Mon Sep 17 00:00:00 2001 From: Paul Fletcher-Hill <1607180+pfletcherhill@users.noreply.github.com> Date: Mon, 3 Apr 2023 19:30:20 -0400 Subject: [PATCH] feat: add validate or revoke messages cleanup job (#801) * add validate or revoke messages cleanup job * make validateMessage private again --- apps/hubble/src/hubble.ts | 5 ++ apps/hubble/src/storage/engine/index.ts | 67 +++++++++++--- .../jobs/validateOrRevokeMessagesJob.ts | 88 +++++++++++++++++++ 3 files changed, 147 insertions(+), 13 deletions(-) create mode 100644 apps/hubble/src/storage/jobs/validateOrRevokeMessagesJob.ts diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index 02e99511..e48b05d8 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -52,6 +52,7 @@ import { import { PeriodicTestDataJobScheduler, TestUser } from '~/utils/periodicTestDataJob'; import { VersionSchedule } from '~/utils/versions'; import { CheckFarcasterVersionJobScheduler } from '~/storage/jobs/checkFarcasterVersionJob'; +import { ValidateOrRevokeMessagesJobScheduler } from '~/storage/jobs/validateOrRevokeMessagesJob'; export type HubSubmitSource = 'gossip' | 'rpc' | 'eth-provider'; @@ -179,6 +180,7 @@ export class Hub implements HubInterface { private pruneEventsJobScheduler: PruneEventsJobScheduler; private testDataJobScheduler?: PeriodicTestDataJobScheduler; private checkFarcasterVersionJobScheduler: CheckFarcasterVersionJobScheduler; + private validateOrRevokeMessagesJobScheduler: ValidateOrRevokeMessagesJobScheduler; private updateNameRegistryEventExpiryJobQueue: UpdateNameRegistryEventExpiryJobQueue; private updateNameRegistryEventExpiryJobWorker?: UpdateNameRegistryEventExpiryJobWorker; @@ -227,6 +229,7 @@ export class Hub implements HubInterface { this.periodSyncJobScheduler = new PeriodicSyncJobScheduler(this, this.syncEngine); this.pruneEventsJobScheduler = new PruneEventsJobScheduler(this.engine); this.checkFarcasterVersionJobScheduler = new CheckFarcasterVersionJobScheduler(this); + this.validateOrRevokeMessagesJobScheduler = new ValidateOrRevokeMessagesJobScheduler(this.rocksDB, this.engine); if (options.testUsers) { this.testDataJobScheduler = new PeriodicTestDataJobScheduler(this.rpcServer, options.testUsers as TestUser[]); @@ -358,6 +361,7 @@ export class Hub implements HubInterface { this.periodSyncJobScheduler.start(); this.pruneEventsJobScheduler.start(this.options.pruneEventsJobCron); this.checkFarcasterVersionJobScheduler.start(); + this.validateOrRevokeMessagesJobScheduler.start(); // Start the test data generator this.testDataJobScheduler?.start(); @@ -416,6 +420,7 @@ export class Hub implements HubInterface { this.checkFarcasterVersionJobScheduler.stop(); this.testDataJobScheduler?.stop(); this.updateNameRegistryEventExpiryJobWorker?.stop(); + this.validateOrRevokeMessagesJobScheduler.stop(); // Stop the ETH registry provider if (this.ethRegistryProvider) { diff --git a/apps/hubble/src/storage/engine/index.ts b/apps/hubble/src/storage/engine/index.ts index 512147cf..56988435 100644 --- a/apps/hubble/src/storage/engine/index.ts +++ b/apps/hubble/src/storage/engine/index.ts @@ -153,18 +153,25 @@ class Engine { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const setPostfix = typeToSetPostfix(message.data!.type); - if (setPostfix === UserPostfix.ReactionMessage) { - return ResultAsync.fromPromise(this._reactionStore.merge(message), (e) => e as HubError); - } else if (setPostfix === UserPostfix.SignerMessage) { - return ResultAsync.fromPromise(this._signerStore.merge(message), (e) => e as HubError); - } else if (setPostfix === UserPostfix.CastMessage) { - return ResultAsync.fromPromise(this._castStore.merge(message), (e) => e as HubError); - } else if (setPostfix === UserPostfix.UserDataMessage) { - return ResultAsync.fromPromise(this._userDataStore.merge(message), (e) => e as HubError); - } else if (setPostfix === UserPostfix.VerificationMessage) { - return ResultAsync.fromPromise(this._verificationStore.merge(message), (e) => e as HubError); - } else { - return err(new HubError('bad_request.validation_failure', 'invalid message type')); + switch (setPostfix) { + case UserPostfix.ReactionMessage: { + return ResultAsync.fromPromise(this._reactionStore.merge(message), (e) => e as HubError); + } + case UserPostfix.SignerMessage: { + return ResultAsync.fromPromise(this._signerStore.merge(message), (e) => e as HubError); + } + case UserPostfix.CastMessage: { + return ResultAsync.fromPromise(this._castStore.merge(message), (e) => e as HubError); + } + case UserPostfix.UserDataMessage: { + return ResultAsync.fromPromise(this._userDataStore.merge(message), (e) => e as HubError); + } + case UserPostfix.VerificationMessage: { + return ResultAsync.fromPromise(this._verificationStore.merge(message), (e) => e as HubError); + } + default: { + return err(new HubError('bad_request.validation_failure', 'invalid message type')); + } } } @@ -250,7 +257,9 @@ class Engine { ); } - log.info(`revoked ${revokedCount} messages from ${signerHex.value} and fid ${fid}`); + if (revokedCount > 0) { + log.info(`revoked ${revokedCount} messages from ${signerHex.value} and fid ${fid}`); + } return ok(undefined); } @@ -287,6 +296,38 @@ class Engine { return ok(undefined); } + /** revoke message if it is not valid */ + async validateOrRevokeMessage(message: protobufs.Message): HubAsyncResult { + const isValid = await this.validateMessage(message); + + if (isValid.isErr() && message.data) { + const setPostfix = typeToSetPostfix(message.data.type); + + switch (setPostfix) { + case UserPostfix.ReactionMessage: { + return this._reactionStore.revoke(message); + } + case UserPostfix.SignerMessage: { + return this._signerStore.revoke(message); + } + case UserPostfix.CastMessage: { + return this._castStore.revoke(message); + } + case UserPostfix.UserDataMessage: { + return this._userDataStore.revoke(message); + } + case UserPostfix.VerificationMessage: { + return this._verificationStore.revoke(message); + } + default: { + return err(new HubError('bad_request.invalid_param', 'invalid message type')); + } + } + } + + return ok(undefined); + } + /* -------------------------------------------------------------------------- */ /* Event Methods */ /* -------------------------------------------------------------------------- */ diff --git a/apps/hubble/src/storage/jobs/validateOrRevokeMessagesJob.ts b/apps/hubble/src/storage/jobs/validateOrRevokeMessagesJob.ts new file mode 100644 index 00000000..f5840201 --- /dev/null +++ b/apps/hubble/src/storage/jobs/validateOrRevokeMessagesJob.ts @@ -0,0 +1,88 @@ +import { bytesToHexString, HubAsyncResult, HubError } from '@farcaster/utils'; +import { ok, Result } from 'neverthrow'; +import cron from 'node-cron'; +import { Message } from '@farcaster/protobufs'; +import { logger } from '~/utils/logger'; +import { FID_BYTES, RootPrefix, TSHASH_LENGTH, UserMessagePostfixMax } from '~/storage/db/types'; +import RocksDB from '~/storage/db/rocksdb'; +import Engine from '~/storage/engine'; + +export const DEFAULT_VALIDATE_AND_REVOKE_MESSAGES_CRON = '0 1 * * *'; // Every day at 01:00 UTC + +const log = logger.child({ + component: 'ValidateOrRevokeMessagesJob', +}); + +type SchedulerStatus = 'started' | 'stopped'; + +export class ValidateOrRevokeMessagesJobScheduler { + private _db: RocksDB; + private _engine: Engine; + private _cronTask?: cron.ScheduledTask; + + constructor(db: RocksDB, engine: Engine) { + this._db = db; + this._engine = engine; + } + + start(cronSchedule?: string) { + this._cronTask = cron.schedule(cronSchedule ?? DEFAULT_VALIDATE_AND_REVOKE_MESSAGES_CRON, () => this.doJobs()); + } + + stop() { + if (this._cronTask) { + this._cronTask.stop(); + } + } + + status(): SchedulerStatus { + return this._cronTask ? 'started' : 'stopped'; + } + + async doJobs(): HubAsyncResult { + log.info({}, 'starting ValidateOrRevokeMessagesJob'); + + const allUserPrefix = Buffer.from([RootPrefix.User]); + + for await (const [key, value] of this._db.iteratorByPrefix(allUserPrefix)) { + if ((key as Buffer).length !== 1 + FID_BYTES + 1 + TSHASH_LENGTH) { + // Not a message key, so we can skip it. + continue; + } + + // Get the UserMessagePostfix from the key, which is the 1 + 32 bytes from the start + const postfix = (key as Buffer).readUint8(1 + FID_BYTES); + if (postfix > UserMessagePostfixMax) { + // Not a message key, so we can skip it. + continue; + } + + const message = Result.fromThrowable( + () => Message.decode(new Uint8Array(value as Buffer)), + (e) => e as HubError + )(); + + if (message.isOk()) { + const result = await this._engine.validateOrRevokeMessage(message.value); + result.match( + (result) => { + if (result !== undefined) { + log.info( + `revoked message ${bytesToHexString(message.value.hash)._unsafeUnwrap()} from fid ${ + message.value.data?.fid + }` + ); + } + }, + (e) => { + log.error({ errCode: e.errCode }, `error validating and revoking message: ${e.message}`); + } + ); + } + } + + log.info({}, 'finished ValidateOrRevokeMessagesJob'); + + return ok(undefined); + } +}