mirror of
https://github.com/farcasterxyz/hub-monorepo.git
synced 2026-01-09 13:18:04 -05:00
Clear storage cache to workaround race conditions
This commit is contained in:
@@ -540,6 +540,7 @@ class Engine extends TypedEmitter<EngineEvents> {
|
||||
}
|
||||
|
||||
async pruneMessages(fid: number): HubAsyncResult<number> {
|
||||
await this.clearStorageCacheForFid(fid);
|
||||
const logPruneResult = (result: HubResult<number[]>, store: string): number => {
|
||||
return result.match(
|
||||
(ids) => {
|
||||
@@ -968,6 +969,7 @@ class Engine extends TypedEmitter<EngineEvents> {
|
||||
return err(validatedFid.error);
|
||||
}
|
||||
|
||||
await this.clearStorageCacheForFid(fid);
|
||||
const slot = await this.eventHandler.getCurrentStorageSlotForFid(fid);
|
||||
|
||||
if (slot.isErr()) {
|
||||
@@ -1004,6 +1006,14 @@ class Engine extends TypedEmitter<EngineEvents> {
|
||||
});
|
||||
}
|
||||
|
||||
async clearStorageCacheForFid(fid: number): HubAsyncResult<void> {
|
||||
const limits = getStoreLimits([]);
|
||||
for (const limit of limits) {
|
||||
await this.eventHandler.clearCachedMessageCount(fid, limit.storeType);
|
||||
}
|
||||
return ok(undefined);
|
||||
}
|
||||
|
||||
async getUserNameProof(name: Uint8Array, retries = 1): HubAsyncResult<UserNameProof> {
|
||||
const nameString = bytesToUtf8String(name);
|
||||
if (nameString.isErr()) {
|
||||
|
||||
@@ -17,7 +17,7 @@ import {
|
||||
} from "@farcaster/hub-nodejs";
|
||||
import { err, ok } from "neverthrow";
|
||||
import RocksDB from "../db/rocksdb.js";
|
||||
import { FID_BYTES, OnChainEventPostfix, RootPrefix, UserMessagePostfix } from "../db/types.js";
|
||||
import { FID_BYTES, OnChainEventPostfix, RootPrefix, UserMessagePostfix, UserPostfix } from "../db/types.js";
|
||||
import { logger } from "../../utils/logger.js";
|
||||
import { makeFidKey, makeMessagePrimaryKey, makeTsHash, typeToSetPostfix } from "../db/message.js";
|
||||
import { bytesCompare, getFarcasterTime, HubAsyncResult } from "@farcaster/core";
|
||||
@@ -151,6 +151,12 @@ export class StorageCache {
|
||||
}
|
||||
}
|
||||
|
||||
async clearMessageCount(fid: number, set: UserMessagePostfix): Promise<void> {
|
||||
this._counts.delete(makeKey(fid, set));
|
||||
this._earliestTsHashes.delete(makeKey(fid, set));
|
||||
await this.getMessageCount(fid, set, true);
|
||||
}
|
||||
|
||||
async getCurrentStorageSlotForFid(fid: number): HubAsyncResult<StorageSlot> {
|
||||
let slot = this._activeStorageSlots.get(fid);
|
||||
|
||||
|
||||
@@ -227,6 +227,15 @@ class StoreEventHandler extends TypedEmitter<StoreEvents> {
|
||||
return await this._storageCache.getMessageCount(fid, set, forceFetch);
|
||||
}
|
||||
|
||||
async clearCachedMessageCount(fid: number, store: StoreType): HubAsyncResult<void> {
|
||||
const set = STORE_TO_SET[store];
|
||||
if (!set) {
|
||||
return err(new HubError("bad_request.invalid_param", `invalid store type ${store}`));
|
||||
}
|
||||
await this._storageCache.clearMessageCount(fid, set);
|
||||
return ok(undefined);
|
||||
}
|
||||
|
||||
async getMaxMessageCount(fid: number, set: UserMessagePostfix): HubAsyncResult<number> {
|
||||
const slot = await this.getCurrentStorageSlotForFid(fid);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user