mirror of
https://github.com/farcasterxyz/hub-monorepo.git
synced 2026-01-24 12:38:00 -05:00
fix(replicator): handle Hub errors in backfill (#1809)
## Motivation I tried multiple times to sync the replicator. I found that we may have loss of data on the initial backfill: If the grpc request has an error, they seems to be no retried and forgotten. ## Change Summary I added an exponential backoff on backfill jobs. ## Merge Checklist _Choose all relevant options below by adding an `x` now or at any time before submitting for review_ - [X] PR title adheres to the [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) standard - [X] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [ ] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [ ] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. - [X] All [commits have been signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits) ## Additional Context I had a bunch if this kind of errors during the initial backfill in the logs: ``` ERROR (1): Job failed {"jobName":"BackfillFidRegistration","jobId":"246281","reason":"Unable to backfill","errorName":"Error","errorMessage":"Unable to backfill","errorStack":"Error: Unable to backfill\n at getOnChainEventsByFidInBatchesOf (/home/node/app/apps/replicator/build/hub.js:18:23)\n at process.processTicksAndRejections (node:internal/process/task_queues:95:5)\n at async Object.run (/home/node/app/apps/replicator/build/jobs/backfillFidRegistration.js:19:26)\n at async /home/node/app/node_modules/bullmq/dist/cjs/classes/child-processor.js:69:33"} ERROR (1): Job failed {"jobName":"BackfillFidUserData","jobId":"5069991","reason":"Unable to fetch UserData messages for FID 69477","errorName":"Error","errorMessage":"Unable to fetch UserData messages for FID 69477","errorStack":"Error: Unable to fetch UserData messages for FID 69477\n at getUserDataByFidInBatchesOf (/home/node/app/apps/replicator/build/hub.js:129:19)\n at process.processTicksAndRejections (node:internal/process/task_queues:95:5)\n at async Object.run (/home/node/app/apps/replicator/build/jobs/backfillFidUserData.js:12:26)\n at async /home/node/app/node_modules/bullmq/dist/cjs/classes/child-processor.js:69:33"} ``` Errors on constraints still appear after the backfill, but we need a more resilient in the initial backfill. <!-- start pr-codex --> --- ## PR-Codex overview The focus of this PR is to enhance error handling in the `replicator` app when interacting with the Hub. ### Detailed summary - Added `retryHubCallWithExponentialBackoff` function for handling errors with exponential backoff strategy. - Updated functions to retry calls to Hub methods with backoff mechanism in case of errors. - Improved error messages for backfilling events, proofs, casts, reactions, links, verifications, and user data. - Added logging for error warnings during the retry process. > ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your question}` <!-- end pr-codex -->
This commit is contained in:
5
.changeset/witty-pants-pay.md
Normal file
5
.changeset/witty-pants-pay.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@farcaster/replicator": patch
|
||||
---
|
||||
|
||||
fix(replicator): handle Hub errors in backfill
|
||||
@@ -8,15 +8,45 @@ import {
|
||||
getSSLHubRpcClient,
|
||||
isIdRegisterOnChainEvent,
|
||||
isSignerOnChainEvent,
|
||||
HubResult,
|
||||
} from "@farcaster/hub-nodejs";
|
||||
import { AssertionError } from "./error.js";
|
||||
import { exhaustiveGuard } from "./util.js";
|
||||
import { log } from "./log";
|
||||
|
||||
export function getHubClient(host: string, { ssl }: { ssl?: boolean }) {
|
||||
const hub = ssl ? getSSLHubRpcClient(host) : getInsecureHubRpcClient(host);
|
||||
return hub;
|
||||
}
|
||||
|
||||
async function retryHubCallWithExponentialBackoff<T>(
|
||||
fn: () => Promise<HubResult<T>>,
|
||||
attempt = 1,
|
||||
maxAttempts = 10,
|
||||
baseDelayMs = 100,
|
||||
): Promise<HubResult<T>> {
|
||||
let currentAttempt = attempt;
|
||||
try {
|
||||
const result = await fn();
|
||||
if (result.isErr()) {
|
||||
throw new Error(`maybe retryable error : ${JSON.stringify(result.error)}`);
|
||||
}
|
||||
return result;
|
||||
} catch (error) {
|
||||
log.warn(error);
|
||||
if (currentAttempt >= maxAttempts) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
const delayMs = baseDelayMs * 2 ** currentAttempt;
|
||||
log.warn(`Error in backfill, attempt ${currentAttempt}/${maxAttempts}, retrying in ${delayMs}ms`);
|
||||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||||
|
||||
currentAttempt++;
|
||||
return retryHubCallWithExponentialBackoff(fn, currentAttempt, maxAttempts, delayMs);
|
||||
}
|
||||
}
|
||||
|
||||
export async function* getOnChainEventsByFidInBatchesOf(
|
||||
hub: HubRpcClient,
|
||||
{
|
||||
@@ -36,10 +66,10 @@ export async function* getOnChainEventsByFidInBatchesOf(
|
||||
const hasSubTypeFilter = signerEventTypes?.length || idRegisterEventTypes?.length;
|
||||
|
||||
for (const eventType of eventTypes) {
|
||||
let result = await hub.getOnChainEvents({ pageSize, fid, eventType });
|
||||
let result = await retryHubCallWithExponentialBackoff(() => hub.getOnChainEvents({ pageSize, fid, eventType }));
|
||||
for (;;) {
|
||||
if (result.isErr()) {
|
||||
throw new Error("Unable to backfill", { cause: result.error });
|
||||
throw new Error(`Unable to backfill events for FID ${fid} of type ${eventType}`, { cause: result.error });
|
||||
}
|
||||
|
||||
const { events, nextPageToken: pageToken } = result.value;
|
||||
@@ -52,7 +82,9 @@ export async function* getOnChainEventsByFidInBatchesOf(
|
||||
}
|
||||
|
||||
if (!pageToken?.length) break;
|
||||
result = await hub.getOnChainEvents({ pageSize, pageToken, fid, eventType });
|
||||
result = await retryHubCallWithExponentialBackoff(() =>
|
||||
hub.getOnChainEvents({ pageSize, pageToken, fid, eventType }),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,18 +117,18 @@ export function filterEvents(
|
||||
}
|
||||
|
||||
export async function getUserNameProofsByFid(hub: HubRpcClient, fid: number) {
|
||||
const result = await hub.getUserNameProofsByFid({ fid });
|
||||
const result = await retryHubCallWithExponentialBackoff(() => hub.getUserNameProofsByFid({ fid }));
|
||||
if (result.isErr()) {
|
||||
throw new Error("Unable to backfill", { cause: result.error });
|
||||
throw new Error(`Unable to backfill proofs for FID ${fid}`, { cause: result.error });
|
||||
}
|
||||
return result.value.proofs;
|
||||
}
|
||||
|
||||
export async function* getCastsByFidInBatchesOf(hub: HubRpcClient, fid: number, pageSize: number) {
|
||||
let result = await hub.getCastsByFid({ pageSize, fid });
|
||||
let result = await retryHubCallWithExponentialBackoff(() => hub.getCastsByFid({ pageSize, fid }));
|
||||
for (;;) {
|
||||
if (result.isErr()) {
|
||||
throw new Error("Unable to backfill", { cause: result.error });
|
||||
throw new Error(`Unable to backfill casts for FID ${fid}`, { cause: result.error });
|
||||
}
|
||||
|
||||
const { messages, nextPageToken: pageToken } = result.value;
|
||||
@@ -104,12 +136,12 @@ export async function* getCastsByFidInBatchesOf(hub: HubRpcClient, fid: number,
|
||||
yield messages;
|
||||
|
||||
if (!pageToken?.length) break;
|
||||
result = await hub.getCastsByFid({ pageSize, pageToken, fid });
|
||||
result = await retryHubCallWithExponentialBackoff(() => hub.getCastsByFid({ pageSize, pageToken, fid }));
|
||||
}
|
||||
}
|
||||
|
||||
export async function* getReactionsByFidInBatchesOf(hub: HubRpcClient, fid: number, pageSize: number) {
|
||||
let result = await hub.getReactionsByFid({ pageSize, fid });
|
||||
let result = await retryHubCallWithExponentialBackoff(() => hub.getReactionsByFid({ pageSize, fid }));
|
||||
for (;;) {
|
||||
if (result.isErr()) {
|
||||
throw new Error(`Unable to fetch Reaction messages for FID ${fid}`, { cause: result.error });
|
||||
@@ -120,12 +152,12 @@ export async function* getReactionsByFidInBatchesOf(hub: HubRpcClient, fid: numb
|
||||
yield messages;
|
||||
|
||||
if (!pageToken?.length) break;
|
||||
result = await hub.getReactionsByFid({ pageSize, pageToken, fid });
|
||||
result = await retryHubCallWithExponentialBackoff(() => hub.getReactionsByFid({ pageSize, pageToken, fid }));
|
||||
}
|
||||
}
|
||||
|
||||
export async function* getLinksByFidInBatchesOf(hub: HubRpcClient, fid: number, pageSize: number) {
|
||||
let result = await hub.getLinksByFid({ pageSize, fid });
|
||||
let result = await retryHubCallWithExponentialBackoff(() => hub.getLinksByFid({ pageSize, fid }));
|
||||
for (;;) {
|
||||
if (result.isErr()) {
|
||||
throw new Error(`Unable to fetch Link messages for FID ${fid}`, { cause: result.error });
|
||||
@@ -136,12 +168,12 @@ export async function* getLinksByFidInBatchesOf(hub: HubRpcClient, fid: number,
|
||||
yield messages;
|
||||
|
||||
if (!pageToken?.length) break;
|
||||
result = await hub.getLinksByFid({ pageSize, pageToken, fid });
|
||||
result = await retryHubCallWithExponentialBackoff(() => hub.getLinksByFid({ pageSize, pageToken, fid }));
|
||||
}
|
||||
}
|
||||
|
||||
export async function* getVerificationsByFidInBatchesOf(hub: HubRpcClient, fid: number, pageSize: number) {
|
||||
let result = await hub.getVerificationsByFid({ pageSize, fid });
|
||||
let result = await retryHubCallWithExponentialBackoff(() => hub.getVerificationsByFid({ pageSize, fid }));
|
||||
for (;;) {
|
||||
if (result.isErr()) {
|
||||
throw new Error(`Unable to fetch Verification messages for FID ${fid}`, { cause: result.error });
|
||||
@@ -152,12 +184,12 @@ export async function* getVerificationsByFidInBatchesOf(hub: HubRpcClient, fid:
|
||||
yield messages;
|
||||
|
||||
if (!pageToken?.length) break;
|
||||
result = await hub.getVerificationsByFid({ pageSize, pageToken, fid });
|
||||
result = await retryHubCallWithExponentialBackoff(() => hub.getVerificationsByFid({ pageSize, pageToken, fid }));
|
||||
}
|
||||
}
|
||||
|
||||
export async function* getUserDataByFidInBatchesOf(hub: HubRpcClient, fid: number, pageSize: number) {
|
||||
let result = await hub.getUserDataByFid({ pageSize, fid });
|
||||
let result = await retryHubCallWithExponentialBackoff(() => hub.getUserDataByFid({ pageSize, fid }));
|
||||
for (;;) {
|
||||
if (result.isErr()) {
|
||||
throw new Error(`Unable to fetch UserData messages for FID ${fid}`, { cause: result.error });
|
||||
@@ -168,6 +200,6 @@ export async function* getUserDataByFidInBatchesOf(hub: HubRpcClient, fid: numbe
|
||||
yield messages;
|
||||
|
||||
if (!pageToken?.length) break;
|
||||
result = await hub.getUserDataByFid({ pageSize, pageToken, fid });
|
||||
result = await retryHubCallWithExponentialBackoff(() => hub.getUserDataByFid({ pageSize, pageToken, fid }));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user