fix(replicator): handle Hub errors in backfill (#1809)

## Motivation

I tried multiple times to sync the replicator.
I found that we may have loss of data on the initial backfill: If the
grpc request has an error, they seems to be no retried and forgotten.

## Change Summary

I added an exponential backoff on backfill jobs.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [X] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [X] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [ ] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.
- [X] All [commits have been
signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits)

## Additional Context


I had a bunch if this kind of errors during the initial backfill in the
logs:
```
ERROR (1): Job failed {"jobName":"BackfillFidRegistration","jobId":"246281","reason":"Unable to backfill","errorName":"Error","errorMessage":"Unable to backfill","errorStack":"Error: Unable to backfill\n    at getOnChainEventsByFidInBatchesOf (/home/node/app/apps/replicator/build/hub.js:18:23)\n    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)\n    at async Object.run (/home/node/app/apps/replicator/build/jobs/backfillFidRegistration.js:19:26)\n    at async /home/node/app/node_modules/bullmq/dist/cjs/classes/child-processor.js:69:33"}
ERROR (1): Job failed {"jobName":"BackfillFidUserData","jobId":"5069991","reason":"Unable to fetch UserData messages for FID 69477","errorName":"Error","errorMessage":"Unable to fetch UserData messages for FID 69477","errorStack":"Error: Unable to fetch UserData messages for FID 69477\n    at getUserDataByFidInBatchesOf (/home/node/app/apps/replicator/build/hub.js:129:19)\n    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)\n    at async Object.run (/home/node/app/apps/replicator/build/jobs/backfillFidUserData.js:12:26)\n    at async /home/node/app/node_modules/bullmq/dist/cjs/classes/child-processor.js:69:33"}
```

Errors on constraints still appear after the backfill, but we need a
more resilient in the initial backfill.

<!-- start pr-codex -->

---

## PR-Codex overview
The focus of this PR is to enhance error handling in the `replicator`
app when interacting with the Hub.

### Detailed summary
- Added `retryHubCallWithExponentialBackoff` function for handling
errors with exponential backoff strategy.
- Updated functions to retry calls to Hub methods with backoff mechanism
in case of errors.
- Improved error messages for backfilling events, proofs, casts,
reactions, links, verifications, and user data.
- Added logging for error warnings during the retry process.

>  Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
This commit is contained in:
Jean Prat
2024-03-12 19:19:23 +01:00
committed by GitHub
parent 4fe57f84b8
commit 02bca881b1
2 changed files with 53 additions and 16 deletions

View File

@@ -0,0 +1,5 @@
---
"@farcaster/replicator": patch
---
fix(replicator): handle Hub errors in backfill

View File

@@ -8,15 +8,45 @@ import {
getSSLHubRpcClient,
isIdRegisterOnChainEvent,
isSignerOnChainEvent,
HubResult,
} from "@farcaster/hub-nodejs";
import { AssertionError } from "./error.js";
import { exhaustiveGuard } from "./util.js";
import { log } from "./log";
export function getHubClient(host: string, { ssl }: { ssl?: boolean }) {
const hub = ssl ? getSSLHubRpcClient(host) : getInsecureHubRpcClient(host);
return hub;
}
async function retryHubCallWithExponentialBackoff<T>(
fn: () => Promise<HubResult<T>>,
attempt = 1,
maxAttempts = 10,
baseDelayMs = 100,
): Promise<HubResult<T>> {
let currentAttempt = attempt;
try {
const result = await fn();
if (result.isErr()) {
throw new Error(`maybe retryable error : ${JSON.stringify(result.error)}`);
}
return result;
} catch (error) {
log.warn(error);
if (currentAttempt >= maxAttempts) {
throw error;
}
const delayMs = baseDelayMs * 2 ** currentAttempt;
log.warn(`Error in backfill, attempt ${currentAttempt}/${maxAttempts}, retrying in ${delayMs}ms`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
currentAttempt++;
return retryHubCallWithExponentialBackoff(fn, currentAttempt, maxAttempts, delayMs);
}
}
export async function* getOnChainEventsByFidInBatchesOf(
hub: HubRpcClient,
{
@@ -36,10 +66,10 @@ export async function* getOnChainEventsByFidInBatchesOf(
const hasSubTypeFilter = signerEventTypes?.length || idRegisterEventTypes?.length;
for (const eventType of eventTypes) {
let result = await hub.getOnChainEvents({ pageSize, fid, eventType });
let result = await retryHubCallWithExponentialBackoff(() => hub.getOnChainEvents({ pageSize, fid, eventType }));
for (;;) {
if (result.isErr()) {
throw new Error("Unable to backfill", { cause: result.error });
throw new Error(`Unable to backfill events for FID ${fid} of type ${eventType}`, { cause: result.error });
}
const { events, nextPageToken: pageToken } = result.value;
@@ -52,7 +82,9 @@ export async function* getOnChainEventsByFidInBatchesOf(
}
if (!pageToken?.length) break;
result = await hub.getOnChainEvents({ pageSize, pageToken, fid, eventType });
result = await retryHubCallWithExponentialBackoff(() =>
hub.getOnChainEvents({ pageSize, pageToken, fid, eventType }),
);
}
}
}
@@ -85,18 +117,18 @@ export function filterEvents(
}
export async function getUserNameProofsByFid(hub: HubRpcClient, fid: number) {
const result = await hub.getUserNameProofsByFid({ fid });
const result = await retryHubCallWithExponentialBackoff(() => hub.getUserNameProofsByFid({ fid }));
if (result.isErr()) {
throw new Error("Unable to backfill", { cause: result.error });
throw new Error(`Unable to backfill proofs for FID ${fid}`, { cause: result.error });
}
return result.value.proofs;
}
export async function* getCastsByFidInBatchesOf(hub: HubRpcClient, fid: number, pageSize: number) {
let result = await hub.getCastsByFid({ pageSize, fid });
let result = await retryHubCallWithExponentialBackoff(() => hub.getCastsByFid({ pageSize, fid }));
for (;;) {
if (result.isErr()) {
throw new Error("Unable to backfill", { cause: result.error });
throw new Error(`Unable to backfill casts for FID ${fid}`, { cause: result.error });
}
const { messages, nextPageToken: pageToken } = result.value;
@@ -104,12 +136,12 @@ export async function* getCastsByFidInBatchesOf(hub: HubRpcClient, fid: number,
yield messages;
if (!pageToken?.length) break;
result = await hub.getCastsByFid({ pageSize, pageToken, fid });
result = await retryHubCallWithExponentialBackoff(() => hub.getCastsByFid({ pageSize, pageToken, fid }));
}
}
export async function* getReactionsByFidInBatchesOf(hub: HubRpcClient, fid: number, pageSize: number) {
let result = await hub.getReactionsByFid({ pageSize, fid });
let result = await retryHubCallWithExponentialBackoff(() => hub.getReactionsByFid({ pageSize, fid }));
for (;;) {
if (result.isErr()) {
throw new Error(`Unable to fetch Reaction messages for FID ${fid}`, { cause: result.error });
@@ -120,12 +152,12 @@ export async function* getReactionsByFidInBatchesOf(hub: HubRpcClient, fid: numb
yield messages;
if (!pageToken?.length) break;
result = await hub.getReactionsByFid({ pageSize, pageToken, fid });
result = await retryHubCallWithExponentialBackoff(() => hub.getReactionsByFid({ pageSize, pageToken, fid }));
}
}
export async function* getLinksByFidInBatchesOf(hub: HubRpcClient, fid: number, pageSize: number) {
let result = await hub.getLinksByFid({ pageSize, fid });
let result = await retryHubCallWithExponentialBackoff(() => hub.getLinksByFid({ pageSize, fid }));
for (;;) {
if (result.isErr()) {
throw new Error(`Unable to fetch Link messages for FID ${fid}`, { cause: result.error });
@@ -136,12 +168,12 @@ export async function* getLinksByFidInBatchesOf(hub: HubRpcClient, fid: number,
yield messages;
if (!pageToken?.length) break;
result = await hub.getLinksByFid({ pageSize, pageToken, fid });
result = await retryHubCallWithExponentialBackoff(() => hub.getLinksByFid({ pageSize, pageToken, fid }));
}
}
export async function* getVerificationsByFidInBatchesOf(hub: HubRpcClient, fid: number, pageSize: number) {
let result = await hub.getVerificationsByFid({ pageSize, fid });
let result = await retryHubCallWithExponentialBackoff(() => hub.getVerificationsByFid({ pageSize, fid }));
for (;;) {
if (result.isErr()) {
throw new Error(`Unable to fetch Verification messages for FID ${fid}`, { cause: result.error });
@@ -152,12 +184,12 @@ export async function* getVerificationsByFidInBatchesOf(hub: HubRpcClient, fid:
yield messages;
if (!pageToken?.length) break;
result = await hub.getVerificationsByFid({ pageSize, pageToken, fid });
result = await retryHubCallWithExponentialBackoff(() => hub.getVerificationsByFid({ pageSize, pageToken, fid }));
}
}
export async function* getUserDataByFidInBatchesOf(hub: HubRpcClient, fid: number, pageSize: number) {
let result = await hub.getUserDataByFid({ pageSize, fid });
let result = await retryHubCallWithExponentialBackoff(() => hub.getUserDataByFid({ pageSize, fid }));
for (;;) {
if (result.isErr()) {
throw new Error(`Unable to fetch UserData messages for FID ${fid}`, { cause: result.error });
@@ -168,6 +200,6 @@ export async function* getUserDataByFidInBatchesOf(hub: HubRpcClient, fid: numbe
yield messages;
if (!pageToken?.length) break;
result = await hub.getUserDataByFid({ pageSize, pageToken, fid });
result = await retryHubCallWithExponentialBackoff(() => hub.getUserDataByFid({ pageSize, pageToken, fid }));
}
}