mirror of
https://github.com/farcasterxyz/hub-monorepo.git
synced 2026-02-11 13:35:05 -05:00
We had a report of one user on modern MacBook with 16GB of memory who was running into heap allocation issues. This was likely due to the excessive number of promises we were creating (as well as buffering all data for each call). Change the logic to fetch in batches of 1K records at most. This slows down the initial sync but should reduce the likelihood that someone will hit a memory limit. We also specify a custom limit in the `yarn start` command so that when we test locally we are using the same limit as everyone else.
543 lines
19 KiB
TypeScript
543 lines
19 KiB
TypeScript
import {
|
|
CastAddMessage,
|
|
CastRemoveMessage,
|
|
HubRpcClient,
|
|
IdRegistryEvent,
|
|
Message,
|
|
NameRegistryEvent,
|
|
ReactionAddMessage,
|
|
ReactionRemoveMessage,
|
|
SignerAddMessage,
|
|
SignerRemoveMessage,
|
|
UserDataAddMessage,
|
|
VerificationAddEthAddressMessage,
|
|
VerificationRemoveMessage,
|
|
isCastAddMessage,
|
|
isCastRemoveMessage,
|
|
isMergeIdRegistryEventHubEvent,
|
|
isMergeMessageHubEvent,
|
|
isMergeNameRegistryEventHubEvent,
|
|
isPruneMessageHubEvent,
|
|
isReactionAddMessage,
|
|
isReactionRemoveMessage,
|
|
isRevokeMessageHubEvent,
|
|
isSignerAddMessage,
|
|
isSignerRemoveMessage,
|
|
isUserDataAddMessage,
|
|
isVerificationAddEthAddressMessage,
|
|
isVerificationRemoveMessage,
|
|
getSSLHubRpcClient,
|
|
getInsecureHubRpcClient,
|
|
} from '@farcaster/hub-nodejs';
|
|
import { HubSubscriber } from './hubSubscriber';
|
|
import { Logger } from 'pino';
|
|
import { Database } from './db';
|
|
import { Kysely, sql } from 'kysely';
|
|
import { bytesToHex, farcasterTimeToDate } from './util';
|
|
|
|
type StoreMessageOperation = 'merge' | 'delete' | 'prune' | 'revoke';
|
|
|
|
// If you're hitting out-of-memory errors, try decreasing this to reduce overall
|
|
// memory usage.
|
|
const MAX_PAGE_SIZE = 1_000;
|
|
|
|
export class HubReplicator {
|
|
private client: HubRpcClient;
|
|
private subscriber: HubSubscriber;
|
|
|
|
constructor(private hubAddress: string, private ssl: boolean, private db: Kysely<Database>, private log: Logger) {
|
|
this.client = this.ssl ? getSSLHubRpcClient(hubAddress) : getInsecureHubRpcClient(hubAddress);
|
|
this.subscriber = new HubSubscriber(this.client, log);
|
|
|
|
this.subscriber.on('event', async (hubEvent) => {
|
|
if (isMergeMessageHubEvent(hubEvent)) {
|
|
this.log.info(`Processing merge event ${hubEvent.id} from stream`);
|
|
await this.onMergeMessage(hubEvent.mergeMessageBody.message);
|
|
|
|
for (const deletedMessage of hubEvent.mergeMessageBody.deletedMessages) {
|
|
await this.storeMessage(deletedMessage, 'delete');
|
|
}
|
|
} else if (isPruneMessageHubEvent(hubEvent)) {
|
|
this.log.info(`Processing prune event ${hubEvent.id}`);
|
|
await this.onPruneMessage(hubEvent.pruneMessageBody.message);
|
|
} else if (isRevokeMessageHubEvent(hubEvent)) {
|
|
this.log.info(`Processing revoke event ${hubEvent.id}`);
|
|
await this.onRevokeMessage(hubEvent.revokeMessageBody.message);
|
|
} else if (isMergeIdRegistryEventHubEvent(hubEvent)) {
|
|
this.log.info(`Processing ID registry event ${hubEvent.id}`);
|
|
await this.onIdRegistryEvent(hubEvent.mergeIdRegistryEventBody.idRegistryEvent);
|
|
} else if (isMergeNameRegistryEventHubEvent(hubEvent)) {
|
|
this.log.info(`Processing name registry event ${hubEvent.id}`);
|
|
await this.onNameRegistryEvent(hubEvent.mergeNameRegistryEventBody.nameRegistryEvent);
|
|
} else {
|
|
this.log.warn(`Unknown type ${hubEvent.type} of event ${hubEvent.id}. Ignoring`);
|
|
}
|
|
|
|
// Keep track of how many events we've processed.
|
|
await this.db
|
|
.insertInto('hubSubscriptions')
|
|
.values({ host: this.hubAddress, lastEventId: hubEvent.id })
|
|
.onConflict((oc) =>
|
|
oc.columns(['host']).doUpdateSet({
|
|
lastEventId: hubEvent.id,
|
|
})
|
|
)
|
|
.execute();
|
|
});
|
|
}
|
|
|
|
public async start() {
|
|
const infoResult = await this.client.getInfo({ syncStats: true });
|
|
|
|
if (infoResult.isErr() || infoResult.value.syncStats === undefined) {
|
|
throw new Error(`Unable to get information about hub ${this.hubAddress}`);
|
|
}
|
|
|
|
const { numMessages } = infoResult.value.syncStats;
|
|
|
|
// Not technically true, since hubs don't return CastRemove/etc. messages,
|
|
// but at least gives a rough ballpark of order of magnitude.
|
|
this.log.info(`Syncing messages from hub ${this.hubAddress} (~${numMessages} messages)`);
|
|
|
|
// Start backfilling all historical data in the background
|
|
this.backfill();
|
|
|
|
// Process live events going forward, starting from the last event we
|
|
// processed (if there was one).
|
|
const subscription = await this.db
|
|
.selectFrom('hubSubscriptions')
|
|
.where('host', '=', this.hubAddress)
|
|
.select('lastEventId')
|
|
.executeTakeFirst();
|
|
this.subscriber.start(subscription?.lastEventId);
|
|
}
|
|
|
|
public stop() {
|
|
this.subscriber.stop();
|
|
}
|
|
|
|
public destroy() {
|
|
this.subscriber.destroy();
|
|
}
|
|
|
|
private async backfill() {
|
|
const maxFidResult = await this.client.getFids({ pageSize: 1, reverse: true });
|
|
if (maxFidResult.isErr()) throw new Error('Unable to backfill', { cause: maxFidResult.error });
|
|
|
|
const maxFid = maxFidResult.value.fids[0];
|
|
|
|
for (let fid = 1; fid <= maxFid; fid++) {
|
|
this.log.info(`Starting backfill for FID ${fid}`);
|
|
await this.processAllMessagesForFid(fid);
|
|
this.log.info(`Completed backfill for FID ${fid}`);
|
|
}
|
|
}
|
|
|
|
private async processAllMessagesForFid(fid: number) {
|
|
await this.client
|
|
.getIdRegistryEvent({ fid })
|
|
.then((result) => result.map((event) => this.onIdRegistryEvent(event)));
|
|
|
|
// Fetch all messages serially in batches to reduce memory consumption.
|
|
// Your implementation can likely do more in parallel, but we wanted an
|
|
// example that works on resource constrained hardware.
|
|
for (const fn of [
|
|
this.getCastsByFidInBatchesOf,
|
|
this.getReactionsByFidInBatchesOf,
|
|
this.getSignersByFidInBatchesOf,
|
|
this.getVerificationsByFidInBatchesOf,
|
|
this.getUserDataByFidInBatchesOf,
|
|
]) {
|
|
for await (const messages of fn.call(this, fid, MAX_PAGE_SIZE)) {
|
|
for (const message of messages) {
|
|
this.log.debug(`Backfilling message ${bytesToHex(message.hash)} (type ${message.data?.type})`);
|
|
await this.onMergeMessage(message);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private async *getCastsByFidInBatchesOf(fid: number, pageSize: number) {
|
|
let result = await this.client.getCastsByFid({ pageSize, fid });
|
|
for (;;) {
|
|
if (result.isErr()) {
|
|
throw new Error('Unable to backfill', { cause: result.error });
|
|
}
|
|
|
|
const { messages, nextPageToken: pageToken } = result.value;
|
|
|
|
yield messages;
|
|
|
|
if (!pageToken?.length) break;
|
|
result = await this.client.getCastsByFid({ pageSize, pageToken, fid });
|
|
}
|
|
}
|
|
|
|
private async *getReactionsByFidInBatchesOf(fid: number, pageSize: number) {
|
|
let result = await this.client.getReactionsByFid({ pageSize, fid });
|
|
for (;;) {
|
|
if (result.isErr()) {
|
|
throw new Error('Unable to backfill', { cause: result.error });
|
|
}
|
|
|
|
const { messages, nextPageToken: pageToken } = result.value;
|
|
|
|
yield messages;
|
|
|
|
if (!pageToken?.length) break;
|
|
result = await this.client.getReactionsByFid({ pageSize, pageToken, fid });
|
|
}
|
|
}
|
|
|
|
private async *getSignersByFidInBatchesOf(fid: number, pageSize: number) {
|
|
let result = await this.client.getSignersByFid({ pageSize, fid });
|
|
for (;;) {
|
|
if (result.isErr()) {
|
|
throw new Error('Unable to backfill', { cause: result.error });
|
|
}
|
|
|
|
const { messages, nextPageToken: pageToken } = result.value;
|
|
|
|
yield messages;
|
|
|
|
if (!pageToken?.length) break;
|
|
result = await this.client.getSignersByFid({ pageSize, pageToken, fid });
|
|
}
|
|
}
|
|
|
|
private async *getVerificationsByFidInBatchesOf(fid: number, pageSize: number) {
|
|
let result = await this.client.getVerificationsByFid({ pageSize, fid });
|
|
for (;;) {
|
|
if (result.isErr()) {
|
|
throw new Error('Unable to backfill', { cause: result.error });
|
|
}
|
|
|
|
const { messages, nextPageToken: pageToken } = result.value;
|
|
|
|
yield messages;
|
|
|
|
if (!pageToken?.length) break;
|
|
result = await this.client.getVerificationsByFid({ pageSize, pageToken, fid });
|
|
}
|
|
}
|
|
|
|
private async *getUserDataByFidInBatchesOf(fid: number, pageSize: number) {
|
|
let result = await this.client.getUserDataByFid({ pageSize, fid });
|
|
for (;;) {
|
|
if (result.isErr()) {
|
|
throw new Error('Unable to backfill', { cause: result.error });
|
|
}
|
|
|
|
const { messages, nextPageToken: pageToken } = result.value;
|
|
|
|
yield messages;
|
|
|
|
if (!pageToken?.length) break;
|
|
result = await this.client.getUserDataByFid({ pageSize, pageToken, fid });
|
|
}
|
|
}
|
|
|
|
private async storeMessage(message: Message, operation: StoreMessageOperation) {
|
|
if (!message.data) {
|
|
throw new Error('Message data is missing');
|
|
}
|
|
|
|
const now = new Date();
|
|
|
|
const messageRow = await this.db
|
|
.insertInto('messages')
|
|
.values({
|
|
fid: message.data.fid,
|
|
messageType: message.data.type,
|
|
timestamp: farcasterTimeToDate(message.data.timestamp),
|
|
hash: message.hash,
|
|
hashScheme: message.hashScheme,
|
|
signature: message.signature,
|
|
signatureScheme: message.signatureScheme,
|
|
signer: message.signer,
|
|
raw: Message.encode(message).finish(),
|
|
deletedAt: operation === 'delete' ? now : null,
|
|
prunedAt: operation === 'prune' ? now : null,
|
|
revokedAt: operation === 'revoke' ? now : null,
|
|
})
|
|
.onConflict((oc) =>
|
|
oc
|
|
.columns(['hash'])
|
|
.doUpdateSet({
|
|
updatedAt: now,
|
|
// Only the signer or message state could have changed
|
|
signature: message.signature,
|
|
signatureScheme: message.signatureScheme,
|
|
signer: message.signer,
|
|
deletedAt: operation === 'delete' ? now : null,
|
|
prunedAt: operation === 'prune' ? now : null,
|
|
revokedAt: operation === 'revoke' ? now : null,
|
|
})
|
|
.where(({ or, cmpr, ref }) =>
|
|
// Only update if a value has actually changed
|
|
or([
|
|
cmpr('excluded.signature', '!=', ref('messages.signature')),
|
|
cmpr('excluded.signatureScheme', '!=', ref('messages.signatureScheme')),
|
|
cmpr('excluded.signer', '!=', ref('messages.signer')),
|
|
cmpr('excluded.deletedAt', 'is', sql`distinct from ${ref('messages.deletedAt')}`),
|
|
cmpr('excluded.prunedAt', 'is', sql`distinct from ${ref('messages.prunedAt')}`),
|
|
cmpr('excluded.revokedAt', 'is', sql`distinct from ${ref('messages.revokedAt')}`),
|
|
])
|
|
)
|
|
)
|
|
.returning(['updatedAt', 'createdAt'])
|
|
.executeTakeFirst();
|
|
|
|
// Return boolean indicating whether this is a new message
|
|
return !!(messageRow && messageRow.updatedAt === messageRow.createdAt);
|
|
}
|
|
|
|
private async onIdRegistryEvent(event: IdRegistryEvent) {
|
|
await this.db
|
|
.insertInto('fids')
|
|
.values({ fid: event.fid, custodyAddress: event.to })
|
|
.onConflict((oc) => oc.columns(['fid']).doUpdateSet({ custodyAddress: event.to, updatedAt: new Date() }))
|
|
.execute();
|
|
}
|
|
|
|
private async onNameRegistryEvent(event: NameRegistryEvent) {
|
|
const custodyAddress = event.to;
|
|
const expiresAt = farcasterTimeToDate(event.expiry);
|
|
|
|
await this.db
|
|
.insertInto('fnames')
|
|
.values({
|
|
fname: Buffer.from(event.fname).toString('utf8'),
|
|
custodyAddress,
|
|
expiresAt,
|
|
})
|
|
.onConflict((oc) => oc.columns(['fname']).doUpdateSet({ custodyAddress, expiresAt, updatedAt: new Date() }))
|
|
.execute();
|
|
}
|
|
|
|
private async onMergeMessage(message: Message) {
|
|
this.log.debug(`Merging message ${bytesToHex(message.hash)} (type ${message.data?.type})`);
|
|
|
|
const isInitialCreation = await this.storeMessage(message, 'merge');
|
|
|
|
if (isCastAddMessage(message)) {
|
|
await this.onCastAdd(message, isInitialCreation);
|
|
} else if (isCastRemoveMessage(message)) {
|
|
await this.onCastRemove(message);
|
|
} else if (isReactionAddMessage(message)) {
|
|
await this.onReactionAdd(message, isInitialCreation);
|
|
} else if (isReactionRemoveMessage(message)) {
|
|
await this.onReactionRemove(message);
|
|
} else if (isVerificationAddEthAddressMessage(message)) {
|
|
await this.onVerificationAddEthAddress(message, isInitialCreation);
|
|
} else if (isVerificationRemoveMessage(message)) {
|
|
await this.onVerificationRemove(message);
|
|
} else if (isSignerAddMessage(message)) {
|
|
await this.onSignerAdd(message, isInitialCreation);
|
|
} else if (isSignerRemoveMessage(message)) {
|
|
await this.onSignerRemove(message);
|
|
} else if (isUserDataAddMessage(message)) {
|
|
await this.onUserDataAdd(message, isInitialCreation);
|
|
} else {
|
|
this.log.warn(`Ignoring unknown message type ${message.data?.type}`);
|
|
}
|
|
}
|
|
|
|
private async onPruneMessage(message: Message) {
|
|
this.log.debug(`Pruning message ${bytesToHex(message.hash)} (type ${message.data?.type})`);
|
|
this.storeMessage(message, 'prune');
|
|
}
|
|
|
|
private async onRevokeMessage(message: Message) {
|
|
this.log.debug(`Revoking message ${bytesToHex(message.hash)} (type ${message.data?.type})`);
|
|
this.storeMessage(message, 'revoke');
|
|
}
|
|
|
|
private async onCastAdd(message: CastAddMessage, isInitialCreation: boolean) {
|
|
await this.db
|
|
.insertInto('casts')
|
|
.values({
|
|
timestamp: farcasterTimeToDate(message.data.timestamp),
|
|
fid: message.data.fid,
|
|
text: message.data.castAddBody.text,
|
|
hash: message.hash,
|
|
parentHash: message.data.castAddBody.parentCastId?.hash,
|
|
parentFid: message.data.castAddBody.parentCastId?.fid,
|
|
parentUrl: message.data.castAddBody.parentUrl,
|
|
embeds: message.data.castAddBody.embedsDeprecated,
|
|
mentions: message.data.castAddBody.mentions,
|
|
mentionsPositions: message.data.castAddBody.mentionsPositions,
|
|
})
|
|
// Do nothing on conflict since nothing should have changed if hash is the same.
|
|
.onConflict((oc) => oc.columns(['hash']).doNothing())
|
|
.execute();
|
|
|
|
if (isInitialCreation) {
|
|
// TODO: Execute any one-time side effects, e.g. sending push
|
|
// notifications to user whose cast was replied to, etc.
|
|
}
|
|
}
|
|
|
|
private async onCastRemove(message: CastRemoveMessage) {
|
|
await this.db
|
|
.updateTable('casts')
|
|
.where('fid', '=', message.data.fid)
|
|
.where('hash', '=', message.data.castRemoveBody.targetHash)
|
|
.set({ deletedAt: farcasterTimeToDate(message.data.timestamp) })
|
|
.execute();
|
|
|
|
// TODO: Execute any cleanup side effects to remove the cast
|
|
}
|
|
|
|
private async onReactionAdd(message: ReactionAddMessage, isInitialCreation: boolean) {
|
|
await this.db
|
|
.insertInto('reactions')
|
|
.values({
|
|
fid: message.data.fid,
|
|
timestamp: farcasterTimeToDate(message.data.timestamp),
|
|
hash: message.hash,
|
|
reactionType: message.data.reactionBody.type,
|
|
targetHash: message.data.reactionBody.targetCastId?.hash,
|
|
targetFid: message.data.reactionBody.targetCastId?.fid,
|
|
targetUrl: message.data.reactionBody.targetUrl,
|
|
})
|
|
// Do nothing on conflict since nothing should have changed if hash is the same.
|
|
.onConflict((oc) => oc.columns(['hash']).doNothing())
|
|
.execute();
|
|
|
|
if (isInitialCreation) {
|
|
// TODO: Execute any one-time side effects, e.g. sending push
|
|
// notifications to user whose cast was liked, etc.
|
|
}
|
|
}
|
|
|
|
private async onReactionRemove(message: ReactionRemoveMessage) {
|
|
await this.db
|
|
.updateTable('reactions')
|
|
.where('fid', '=', message.data.fid)
|
|
.where((eb) => {
|
|
// Search based on the type of reaction
|
|
if (message.data.reactionBody.targetUrl) {
|
|
return eb.where('targetUrl', '=', message.data.reactionBody.targetUrl);
|
|
} else if (message.data.reactionBody.targetCastId) {
|
|
return eb
|
|
.where('targetFid', '=', message.data.reactionBody.targetCastId.fid)
|
|
.where('targetHash', '=', message.data.reactionBody.targetCastId.hash);
|
|
} else {
|
|
throw new Error('Reaction had neither targetUrl nor targetCastId');
|
|
}
|
|
})
|
|
.set({ deletedAt: farcasterTimeToDate(message.data.timestamp) })
|
|
.execute();
|
|
|
|
// TODO: Execute any cleanup side effects to remove the cast
|
|
}
|
|
|
|
private async onVerificationAddEthAddress(message: VerificationAddEthAddressMessage, isInitialCreation: boolean) {
|
|
await this.db
|
|
.insertInto('verifications')
|
|
.values({
|
|
fid: message.data.fid,
|
|
timestamp: farcasterTimeToDate(message.data.timestamp),
|
|
hash: message.hash,
|
|
claim: {
|
|
address: bytesToHex(message.data.verificationAddEthAddressBody.address),
|
|
ethSignature: bytesToHex(message.data.verificationAddEthAddressBody.ethSignature),
|
|
blockHash: bytesToHex(message.data.verificationAddEthAddressBody.blockHash),
|
|
},
|
|
})
|
|
// Do nothing on conflict since nothing should have changed if hash is the same.
|
|
.onConflict((oc) => oc.columns(['hash']).doNothing())
|
|
.execute();
|
|
|
|
if (isInitialCreation) {
|
|
// TODO: Execute any one-time side effects
|
|
}
|
|
|
|
// TODO: Execute any side effects that should happen any time the user
|
|
// connects the wallet (even if they are reconnecting a previously
|
|
// disconnected wallet), e.g. fetching token balances and NFTs for their
|
|
// wallet address.
|
|
}
|
|
|
|
private async onVerificationRemove(message: VerificationRemoveMessage) {
|
|
await this.db
|
|
.updateTable('verifications')
|
|
.where('fid', '=', message.data.fid)
|
|
.where(sql`claim ->> 'address'`, '=', bytesToHex(message.data.verificationRemoveBody.address))
|
|
.set({ deletedAt: farcasterTimeToDate(message.data.timestamp) })
|
|
.execute();
|
|
|
|
// TODO: Execute any cleanup side effects, e.g. updating NFT ownership
|
|
}
|
|
|
|
private async onSignerAdd(message: SignerAddMessage, isInitialCreation: boolean) {
|
|
const signerName = message.data.signerAddBody.name;
|
|
|
|
await this.db
|
|
.insertInto('signers')
|
|
.values({
|
|
fid: message.data.fid,
|
|
timestamp: farcasterTimeToDate(message.data.timestamp),
|
|
hash: message.hash,
|
|
custodyAddress: message.signer,
|
|
signer: message.data.signerAddBody.signer,
|
|
name: signerName?.length ? signerName : null, // Treat empty string signer names as not specified
|
|
})
|
|
// Do nothing on conflict since nothing should have changed if hash is the same.
|
|
.onConflict((oc) => oc.columns(['hash']).doNothing())
|
|
.execute();
|
|
|
|
if (isInitialCreation) {
|
|
// TODO: Execute any one-time side effects
|
|
}
|
|
}
|
|
|
|
private async onSignerRemove(message: SignerRemoveMessage) {
|
|
await this.db
|
|
.updateTable('signers')
|
|
.where('fid', '=', message.data.fid)
|
|
.where('signer', '=', message.data.signerRemoveBody.signer)
|
|
.set({ deletedAt: farcasterTimeToDate(message.data.timestamp) })
|
|
.execute();
|
|
|
|
// TODO: Execute any cleanup side effects
|
|
}
|
|
|
|
private async onUserDataAdd(message: UserDataAddMessage, isInitialCreation: boolean) {
|
|
await this.db
|
|
.insertInto('userData')
|
|
.values({
|
|
timestamp: farcasterTimeToDate(message.data.timestamp),
|
|
fid: message.data.fid,
|
|
hash: message.hash,
|
|
type: message.data.userDataBody.type,
|
|
value: message.data.userDataBody.value,
|
|
})
|
|
.onConflict((oc) =>
|
|
oc
|
|
.columns(['fid', 'type'])
|
|
.doUpdateSet({
|
|
hash: message.hash,
|
|
timestamp: farcasterTimeToDate(message.data.timestamp),
|
|
value: message.data.userDataBody.value,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(({ or, cmpr, ref }) =>
|
|
// Only update if a value has actually changed
|
|
or([
|
|
cmpr('excluded.hash', '!=', ref('userData.hash')),
|
|
cmpr('excluded.timestamp', '!=', ref('userData.timestamp')),
|
|
cmpr('excluded.value', '!=', ref('userData.value')),
|
|
cmpr('excluded.updatedAt', '!=', ref('userData.updatedAt')),
|
|
])
|
|
)
|
|
)
|
|
.execute();
|
|
|
|
if (isInitialCreation) {
|
|
// TODO: Execute any one-time side effects
|
|
}
|
|
}
|
|
}
|