feat: add validate or revoke messages cleanup job (#801)

* add validate or revoke messages cleanup job

* make validateMessage private again
This commit is contained in:
Paul Fletcher-Hill
2023-04-03 19:30:20 -04:00
committed by GitHub
parent 7c9ca5aac0
commit a55fb80c34
3 changed files with 147 additions and 13 deletions

View File

@@ -52,6 +52,7 @@ import {
import { PeriodicTestDataJobScheduler, TestUser } from '~/utils/periodicTestDataJob';
import { VersionSchedule } from '~/utils/versions';
import { CheckFarcasterVersionJobScheduler } from '~/storage/jobs/checkFarcasterVersionJob';
import { ValidateOrRevokeMessagesJobScheduler } from '~/storage/jobs/validateOrRevokeMessagesJob';
export type HubSubmitSource = 'gossip' | 'rpc' | 'eth-provider';
@@ -179,6 +180,7 @@ export class Hub implements HubInterface {
private pruneEventsJobScheduler: PruneEventsJobScheduler;
private testDataJobScheduler?: PeriodicTestDataJobScheduler;
private checkFarcasterVersionJobScheduler: CheckFarcasterVersionJobScheduler;
private validateOrRevokeMessagesJobScheduler: ValidateOrRevokeMessagesJobScheduler;
private updateNameRegistryEventExpiryJobQueue: UpdateNameRegistryEventExpiryJobQueue;
private updateNameRegistryEventExpiryJobWorker?: UpdateNameRegistryEventExpiryJobWorker;
@@ -227,6 +229,7 @@ export class Hub implements HubInterface {
this.periodSyncJobScheduler = new PeriodicSyncJobScheduler(this, this.syncEngine);
this.pruneEventsJobScheduler = new PruneEventsJobScheduler(this.engine);
this.checkFarcasterVersionJobScheduler = new CheckFarcasterVersionJobScheduler(this);
this.validateOrRevokeMessagesJobScheduler = new ValidateOrRevokeMessagesJobScheduler(this.rocksDB, this.engine);
if (options.testUsers) {
this.testDataJobScheduler = new PeriodicTestDataJobScheduler(this.rpcServer, options.testUsers as TestUser[]);
@@ -358,6 +361,7 @@ export class Hub implements HubInterface {
this.periodSyncJobScheduler.start();
this.pruneEventsJobScheduler.start(this.options.pruneEventsJobCron);
this.checkFarcasterVersionJobScheduler.start();
this.validateOrRevokeMessagesJobScheduler.start();
// Start the test data generator
this.testDataJobScheduler?.start();
@@ -416,6 +420,7 @@ export class Hub implements HubInterface {
this.checkFarcasterVersionJobScheduler.stop();
this.testDataJobScheduler?.stop();
this.updateNameRegistryEventExpiryJobWorker?.stop();
this.validateOrRevokeMessagesJobScheduler.stop();
// Stop the ETH registry provider
if (this.ethRegistryProvider) {

View File

@@ -153,18 +153,25 @@ class Engine {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const setPostfix = typeToSetPostfix(message.data!.type);
if (setPostfix === UserPostfix.ReactionMessage) {
return ResultAsync.fromPromise(this._reactionStore.merge(message), (e) => e as HubError);
} else if (setPostfix === UserPostfix.SignerMessage) {
return ResultAsync.fromPromise(this._signerStore.merge(message), (e) => e as HubError);
} else if (setPostfix === UserPostfix.CastMessage) {
return ResultAsync.fromPromise(this._castStore.merge(message), (e) => e as HubError);
} else if (setPostfix === UserPostfix.UserDataMessage) {
return ResultAsync.fromPromise(this._userDataStore.merge(message), (e) => e as HubError);
} else if (setPostfix === UserPostfix.VerificationMessage) {
return ResultAsync.fromPromise(this._verificationStore.merge(message), (e) => e as HubError);
} else {
return err(new HubError('bad_request.validation_failure', 'invalid message type'));
switch (setPostfix) {
case UserPostfix.ReactionMessage: {
return ResultAsync.fromPromise(this._reactionStore.merge(message), (e) => e as HubError);
}
case UserPostfix.SignerMessage: {
return ResultAsync.fromPromise(this._signerStore.merge(message), (e) => e as HubError);
}
case UserPostfix.CastMessage: {
return ResultAsync.fromPromise(this._castStore.merge(message), (e) => e as HubError);
}
case UserPostfix.UserDataMessage: {
return ResultAsync.fromPromise(this._userDataStore.merge(message), (e) => e as HubError);
}
case UserPostfix.VerificationMessage: {
return ResultAsync.fromPromise(this._verificationStore.merge(message), (e) => e as HubError);
}
default: {
return err(new HubError('bad_request.validation_failure', 'invalid message type'));
}
}
}
@@ -250,7 +257,9 @@ class Engine {
);
}
log.info(`revoked ${revokedCount} messages from ${signerHex.value} and fid ${fid}`);
if (revokedCount > 0) {
log.info(`revoked ${revokedCount} messages from ${signerHex.value} and fid ${fid}`);
}
return ok(undefined);
}
@@ -287,6 +296,38 @@ class Engine {
return ok(undefined);
}
/** revoke message if it is not valid */
async validateOrRevokeMessage(message: protobufs.Message): HubAsyncResult<number | undefined> {
const isValid = await this.validateMessage(message);
if (isValid.isErr() && message.data) {
const setPostfix = typeToSetPostfix(message.data.type);
switch (setPostfix) {
case UserPostfix.ReactionMessage: {
return this._reactionStore.revoke(message);
}
case UserPostfix.SignerMessage: {
return this._signerStore.revoke(message);
}
case UserPostfix.CastMessage: {
return this._castStore.revoke(message);
}
case UserPostfix.UserDataMessage: {
return this._userDataStore.revoke(message);
}
case UserPostfix.VerificationMessage: {
return this._verificationStore.revoke(message);
}
default: {
return err(new HubError('bad_request.invalid_param', 'invalid message type'));
}
}
}
return ok(undefined);
}
/* -------------------------------------------------------------------------- */
/* Event Methods */
/* -------------------------------------------------------------------------- */

View File

@@ -0,0 +1,88 @@
import { bytesToHexString, HubAsyncResult, HubError } from '@farcaster/utils';
import { ok, Result } from 'neverthrow';
import cron from 'node-cron';
import { Message } from '@farcaster/protobufs';
import { logger } from '~/utils/logger';
import { FID_BYTES, RootPrefix, TSHASH_LENGTH, UserMessagePostfixMax } from '~/storage/db/types';
import RocksDB from '~/storage/db/rocksdb';
import Engine from '~/storage/engine';
export const DEFAULT_VALIDATE_AND_REVOKE_MESSAGES_CRON = '0 1 * * *'; // Every day at 01:00 UTC
const log = logger.child({
component: 'ValidateOrRevokeMessagesJob',
});
type SchedulerStatus = 'started' | 'stopped';
export class ValidateOrRevokeMessagesJobScheduler {
private _db: RocksDB;
private _engine: Engine;
private _cronTask?: cron.ScheduledTask;
constructor(db: RocksDB, engine: Engine) {
this._db = db;
this._engine = engine;
}
start(cronSchedule?: string) {
this._cronTask = cron.schedule(cronSchedule ?? DEFAULT_VALIDATE_AND_REVOKE_MESSAGES_CRON, () => this.doJobs());
}
stop() {
if (this._cronTask) {
this._cronTask.stop();
}
}
status(): SchedulerStatus {
return this._cronTask ? 'started' : 'stopped';
}
async doJobs(): HubAsyncResult<void> {
log.info({}, 'starting ValidateOrRevokeMessagesJob');
const allUserPrefix = Buffer.from([RootPrefix.User]);
for await (const [key, value] of this._db.iteratorByPrefix(allUserPrefix)) {
if ((key as Buffer).length !== 1 + FID_BYTES + 1 + TSHASH_LENGTH) {
// Not a message key, so we can skip it.
continue;
}
// Get the UserMessagePostfix from the key, which is the 1 + 32 bytes from the start
const postfix = (key as Buffer).readUint8(1 + FID_BYTES);
if (postfix > UserMessagePostfixMax) {
// Not a message key, so we can skip it.
continue;
}
const message = Result.fromThrowable(
() => Message.decode(new Uint8Array(value as Buffer)),
(e) => e as HubError
)();
if (message.isOk()) {
const result = await this._engine.validateOrRevokeMessage(message.value);
result.match(
(result) => {
if (result !== undefined) {
log.info(
`revoked message ${bytesToHexString(message.value.hash)._unsafeUnwrap()} from fid ${
message.value.data?.fid
}`
);
}
},
(e) => {
log.error({ errCode: e.errCode }, `error validating and revoking message: ${e.message}`);
}
);
}
}
log.info({}, 'finished ValidateOrRevokeMessagesJob');
return ok(undefined);
}
}