mirror of
https://github.com/farcasterxyz/hub-monorepo.git
synced 2026-04-18 03:00:22 -04:00
feat: revoke signers 1hr after custody event (#1177)
* feat: revoke signers 1hr after custody event * add changeset
This commit is contained in:
5
.changeset/fifty-feet-punch.md
Normal file
5
.changeset/fifty-feet-punch.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@farcaster/hubble": patch
|
||||
---
|
||||
|
||||
feat: revoke signers 1hr after custody event
|
||||
@@ -47,6 +47,7 @@ import { setReferenceDateForTest } from "../../utils/versions.js";
|
||||
import { getUserNameProof } from "../db/nameRegistryEvent.js";
|
||||
import { publicClient } from "../../test/utils.js";
|
||||
import { jest } from "@jest/globals";
|
||||
import { RevokeMessagesBySignerJobQueue, RevokeMessagesBySignerJobWorker } from "../jobs/revokeMessagesBySignerJob.js";
|
||||
|
||||
const db = jestRocksDB("protobufs.engine.test");
|
||||
const network = FarcasterNetwork.TESTNET;
|
||||
@@ -882,9 +883,22 @@ describe("with listeners and workers", () => {
|
||||
blockNumber: custodyEvent.blockNumber + 1,
|
||||
});
|
||||
await liveEngine.mergeIdRegistryEvent(custodyTransfer);
|
||||
await sleep(200);
|
||||
// Does not immediately revoke messages, will wait 1 hr
|
||||
expect(revokedMessages).toEqual([]);
|
||||
await sleep(200); // Wait for engine to revoke messages
|
||||
expect(revokedMessages).toEqual([signerAdd, castAdd, reactionAdd, linkAdd]);
|
||||
|
||||
// Manually trigger the job
|
||||
const queue = new RevokeMessagesBySignerJobQueue(db);
|
||||
const worker = new RevokeMessagesBySignerJobWorker(queue, db, liveEngine);
|
||||
await worker.processJobs(Date.now() + 1000 * 10 * 60 + 5000);
|
||||
expect(revokedMessages).toEqual([]); // No messages revoked yet, after 10 mins
|
||||
|
||||
// Revokes messages after 1 hr
|
||||
await worker.processJobs(Date.now() + 1000 * 60 * 60 + 5000);
|
||||
expect(revokedMessages).toContainEqual(signerAdd);
|
||||
expect(revokedMessages).toContainEqual(castAdd);
|
||||
expect(revokedMessages).toContainEqual(reactionAdd);
|
||||
expect(revokedMessages).toContainEqual(linkAdd);
|
||||
});
|
||||
|
||||
test("revokes messages when SignerAdd is pruned", async () => {
|
||||
|
||||
@@ -981,7 +981,8 @@ class Engine {
|
||||
fid: idRegistryEvent.fid,
|
||||
signer: fromAddress,
|
||||
});
|
||||
const enqueueRevoke = await this._revokeSignerQueue.enqueueJob(payload);
|
||||
const oneHourFromNow = Date.now() + 60 * 60 * 1000;
|
||||
const enqueueRevoke = await this._revokeSignerQueue.enqueueJob(payload, oneHourFromNow);
|
||||
if (enqueueRevoke.isErr()) {
|
||||
log.error(
|
||||
{ errCode: enqueueRevoke.error.errCode },
|
||||
|
||||
@@ -22,6 +22,7 @@ export class RevokeMessagesBySignerJobWorker {
|
||||
private _db: RocksDB;
|
||||
private _engine: Engine;
|
||||
private _status: "working" | "waiting";
|
||||
private _processJobs: () => Promise<void>;
|
||||
|
||||
constructor(queue: RevokeMessagesBySignerJobQueue, db: RocksDB, engine: Engine) {
|
||||
this._queue = queue;
|
||||
@@ -29,18 +30,21 @@ export class RevokeMessagesBySignerJobWorker {
|
||||
this._engine = engine;
|
||||
this._status = "waiting";
|
||||
|
||||
this.processJobs = this.processJobs.bind(this);
|
||||
this._processJobs = async () => {
|
||||
await this.processJobs();
|
||||
};
|
||||
}
|
||||
|
||||
start() {
|
||||
this._queue.on("enqueueJob", this.processJobs);
|
||||
this._queue.on("enqueueJob", this._processJobs);
|
||||
}
|
||||
|
||||
stop() {
|
||||
this._queue.off("enqueueJob", this.processJobs);
|
||||
this._queue.off("enqueueJob", this._processJobs);
|
||||
}
|
||||
|
||||
async processJobs(): HubAsyncResult<void> {
|
||||
async processJobs(doBefore?: number): HubAsyncResult<void> {
|
||||
const doBeforeTs = doBefore || Date.now() + 500; // Add a 500ms buffer for tests
|
||||
if (this._status === "working") {
|
||||
return err(new HubError("unavailable", "worker is already processing jobs"));
|
||||
}
|
||||
@@ -50,11 +54,11 @@ export class RevokeMessagesBySignerJobWorker {
|
||||
|
||||
this._status = "working";
|
||||
|
||||
let nextJob = await this._queue.popNextJob();
|
||||
let nextJob = await this._queue.popNextJob(doBeforeTs);
|
||||
while (nextJob.isOk()) {
|
||||
await this.processJob(nextJob.value);
|
||||
|
||||
nextJob = await this._queue.popNextJob();
|
||||
nextJob = await this._queue.popNextJob(doBeforeTs);
|
||||
}
|
||||
|
||||
this._status = "waiting";
|
||||
@@ -142,7 +146,13 @@ export class RevokeMessagesBySignerJobQueue extends TypedEmitter<JobQueueEvents>
|
||||
return err(result.error);
|
||||
}
|
||||
|
||||
this.emit("enqueueJob", key.value);
|
||||
if (doAt) {
|
||||
setTimeout(() => {
|
||||
this.emit("enqueueJob", key.value);
|
||||
}, doAt - Date.now() + 1000);
|
||||
} else {
|
||||
this.emit("enqueueJob", key.value);
|
||||
}
|
||||
|
||||
return ok(key.value);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user