mirror of
https://github.com/farcasterxyz/hub-monorepo.git
synced 2026-04-18 03:00:22 -04:00
fix: Store peer contacts instead of RPC proxy objects (#581)
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import * as protobufs from '@farcaster/protobufs';
|
||||
import { bytesToUtf8String, hexStringToBytes, HubAsyncResult, toFarcasterTime } from '@farcaster/utils';
|
||||
import { bytesToUtf8String, hexStringToBytes, HubAsyncResult, HubError, toFarcasterTime } from '@farcaster/utils';
|
||||
import { BigNumber, Contract, Event, providers } from 'ethers';
|
||||
import { err, ok, Result, ResultAsync } from 'neverthrow';
|
||||
import { IdRegistry, NameRegistry } from '~/eth/abis';
|
||||
@@ -137,8 +137,13 @@ export class EthEventsProvider {
|
||||
if (encodedFname.isErr()) {
|
||||
return err(encodedFname.error);
|
||||
}
|
||||
const expiry: BigNumber = await this._nameRegistryContract['expiryOf'](encodedFname.value);
|
||||
return ok(expiry.toNumber() * 1000);
|
||||
|
||||
const expiryResult: Result<BigNumber, HubError> = await ResultAsync.fromPromise(
|
||||
this._nameRegistryContract['expiryOf'](encodedFname.value),
|
||||
(err) => new HubError('unavailable.network_failure', err as Error)
|
||||
);
|
||||
|
||||
return expiryResult.map((expiry) => expiry.toNumber() * 1000);
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------- */
|
||||
|
||||
@@ -42,6 +42,7 @@ import {
|
||||
ipFamilyToString,
|
||||
p2pMultiAddrStr,
|
||||
} from '~/utils/p2p';
|
||||
import { PeriodicSyncJobScheduler } from './network/sync/periodicSyncJob';
|
||||
import { RootPrefix } from './storage/db/types';
|
||||
import {
|
||||
UpdateNameRegistryEventExpiryJobQueue,
|
||||
@@ -136,6 +137,7 @@ export class Hub implements HubInterface {
|
||||
private revokeSignerJobQueue: RevokeSignerJobQueue;
|
||||
private revokeSignerJobScheduler: RevokeSignerJobScheduler;
|
||||
private pruneMessagesJobScheduler: PruneMessagesJobScheduler;
|
||||
private periodSyncJobScheduler: PeriodicSyncJobScheduler;
|
||||
private updateNameRegistryEventExpiryJobQueue: UpdateNameRegistryEventExpiryJobQueue;
|
||||
private updateNameRegistryEventExpiryJobWorker: UpdateNameRegistryEventExpiryJobWorker;
|
||||
|
||||
@@ -166,6 +168,7 @@ export class Hub implements HubInterface {
|
||||
// Setup job schedulers/workers
|
||||
this.revokeSignerJobScheduler = new RevokeSignerJobScheduler(this.revokeSignerJobQueue, this.engine);
|
||||
this.pruneMessagesJobScheduler = new PruneMessagesJobScheduler(this.engine);
|
||||
this.periodSyncJobScheduler = new PeriodicSyncJobScheduler(this, this.syncEngine);
|
||||
this.updateNameRegistryEventExpiryJobWorker = new UpdateNameRegistryEventExpiryJobWorker(
|
||||
this.updateNameRegistryEventExpiryJobQueue,
|
||||
this.rocksDB,
|
||||
@@ -236,6 +239,7 @@ export class Hub implements HubInterface {
|
||||
// Start cron tasks
|
||||
this.revokeSignerJobScheduler.start(this.options.revokeSignerJobCron);
|
||||
this.pruneMessagesJobScheduler.start(this.options.pruneMessagesJobCron);
|
||||
this.periodSyncJobScheduler.start();
|
||||
|
||||
// When we startup, we write into the DB that we have not yet cleanly shutdown. And when we do
|
||||
// shutdown, we'll write "true" to this key, indicating that we've cleanly shutdown.
|
||||
@@ -275,6 +279,7 @@ export class Hub implements HubInterface {
|
||||
// Stop cron tasks
|
||||
this.revokeSignerJobScheduler.stop();
|
||||
this.pruneMessagesJobScheduler.stop();
|
||||
this.periodSyncJobScheduler.stop();
|
||||
|
||||
// Stop sync and gossip
|
||||
await this.gossipNode.stop();
|
||||
@@ -321,11 +326,17 @@ export class Hub implements HubInterface {
|
||||
const message = gossipMessage.message;
|
||||
|
||||
// Get the RPC Client to use to merge this message
|
||||
const rpcClient = this.syncEngine.getRpcClientForPeerId(peerId.toString());
|
||||
if (rpcClient) {
|
||||
return this.syncEngine.mergeMessages([message], rpcClient).then((result) => result[0] as HubResult<void>);
|
||||
const contactInfo = this.syncEngine.getContactInfoForPeerId(peerId.toString());
|
||||
if (contactInfo) {
|
||||
const rpcClient = await this.getRPCClientForPeer(peerId, contactInfo);
|
||||
if (rpcClient) {
|
||||
return this.syncEngine.mergeMessages([message], rpcClient).then((result) => result[0] as HubResult<void>);
|
||||
} else {
|
||||
log.error('No RPC clients available to merge message, attempting to merge directly into the engine');
|
||||
return this.submitMessage(message, 'gossip');
|
||||
}
|
||||
} else {
|
||||
log.error('No RPC clients available to merge message, attempting to merge directly into the engine');
|
||||
log.error('No contact info available for peer, attempting to merge directly into the engine');
|
||||
return this.submitMessage(message, 'gossip');
|
||||
}
|
||||
} else if (gossipMessage.idRegistryEvent) {
|
||||
@@ -375,17 +386,17 @@ export class Hub implements HubInterface {
|
||||
|
||||
// Check if we already have this client
|
||||
if (rpcClient) {
|
||||
if (this.syncEngine.getRpcClientForPeerId(peerId.toString())) {
|
||||
if (this.syncEngine.getContactInfoForPeerId(peerId.toString())) {
|
||||
log.info('Already have this client, skipping sync');
|
||||
return;
|
||||
} else {
|
||||
this.syncEngine.addRpcClientForPeerId(peerId.toString(), rpcClient);
|
||||
await this.syncEngine.diffSyncIfRequired(peerId.toString());
|
||||
this.syncEngine.addContactInfoForPeerId(peerId.toString(), message);
|
||||
await this.syncEngine.diffSyncIfRequired(this, peerId.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async getRPCClientForPeer(peerId: PeerId, peer: ContactInfoContent): Promise<HubRpcClient | undefined> {
|
||||
public async getRPCClientForPeer(peerId: PeerId, peer: ContactInfoContent): Promise<HubRpcClient | undefined> {
|
||||
/*
|
||||
* Find the peer's addrs from our peer list because we cannot use the address
|
||||
* in the contact info directly
|
||||
@@ -408,10 +419,7 @@ export class Hub implements HubInterface {
|
||||
log.info({ peerId }, 'falling back to addressbook lookup for peer');
|
||||
const peerInfo = await this.gossipNode.getPeerInfo(peerId);
|
||||
if (!peerInfo) {
|
||||
log.info(
|
||||
{ function: 'getRPCClientForPeer', identity: this.identity, peer: peer },
|
||||
`failed to find peer's address to request simple sync`
|
||||
);
|
||||
log.info({ function: 'getRPCClientForPeer', peer }, `failed to find peer's address to request simple sync`);
|
||||
|
||||
return;
|
||||
}
|
||||
@@ -419,10 +427,7 @@ export class Hub implements HubInterface {
|
||||
// sorts addresses by Public IPs first
|
||||
const addr = peerInfo.addresses.sort((a, b) => publicAddressesFirst(a, b))[0];
|
||||
if (addr === undefined) {
|
||||
log.info(
|
||||
{ function: 'getRPCClientForPeer', identity: this.identity, peer: peer },
|
||||
`peer found but no address is available to request sync`
|
||||
);
|
||||
log.info({ function: 'getRPCClientForPeer', peer }, `peer found but no address is available to request sync`);
|
||||
|
||||
return;
|
||||
}
|
||||
@@ -499,7 +504,7 @@ export class Hub implements HubInterface {
|
||||
|
||||
this.gossipNode.on('peerDisconnect', async (connection) => {
|
||||
// Remove this peer's connection
|
||||
this.syncEngine.removeRpcClientForPeerId(connection.remotePeer.toString());
|
||||
this.syncEngine.removeContactInfoForPeerId(connection.remotePeer.toString());
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import cron from 'node-cron';
|
||||
import { Hub } from '~/hubble';
|
||||
import { logger } from '~/utils/logger';
|
||||
import SyncEngine from './syncEngine';
|
||||
|
||||
const log = logger.child({
|
||||
component: 'PeriodSyncJob',
|
||||
component: 'PeriodicSyncJob',
|
||||
});
|
||||
|
||||
type SchedulerStatus = 'started' | 'stopped';
|
||||
@@ -11,10 +12,12 @@ type SchedulerStatus = 'started' | 'stopped';
|
||||
const DEFAULT_PERIODIC_JOB_CRON = '*/2 * * * *'; // Every 2 minutes
|
||||
|
||||
export class PeriodicSyncJobScheduler {
|
||||
private _hub: Hub;
|
||||
private _syncEngine: SyncEngine;
|
||||
private _cronTask?: cron.ScheduledTask;
|
||||
|
||||
constructor(_syncEngine: SyncEngine) {
|
||||
constructor(_hub: Hub, _syncEngine: SyncEngine) {
|
||||
this._hub = _hub;
|
||||
this._syncEngine = _syncEngine;
|
||||
}
|
||||
|
||||
@@ -38,6 +41,6 @@ export class PeriodicSyncJobScheduler {
|
||||
log.info('starting doJobs');
|
||||
|
||||
// Do a diff sync
|
||||
await this._syncEngine.diffSyncIfRequired();
|
||||
await this._syncEngine.diffSyncIfRequired(this._hub);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,8 +7,10 @@ import {
|
||||
HubResult,
|
||||
HubRpcClient,
|
||||
} from '@farcaster/utils';
|
||||
import { err, ok } from 'neverthrow';
|
||||
import { peerIdFromString } from '@libp2p/peer-id';
|
||||
import { err, ok, Result } from 'neverthrow';
|
||||
import { TypedEmitter } from 'tiny-typed-emitter';
|
||||
import { Hub } from '~/hubble';
|
||||
import { MerkleTrie, NodeMetadata } from '~/network/sync/merkleTrie';
|
||||
import { SyncId, timestampToPaddedTimestampPrefix } from '~/network/sync/syncId';
|
||||
import { TrieSnapshot } from '~/network/sync/trieNode';
|
||||
@@ -17,7 +19,6 @@ import RocksDB from '~/storage/db/rocksdb';
|
||||
import Engine from '~/storage/engine';
|
||||
import { sleepWhile } from '~/utils/crypto';
|
||||
import { logger } from '~/utils/logger';
|
||||
import { PeriodicSyncJobScheduler } from './periodicSyncJob';
|
||||
|
||||
// Number of seconds to wait for the network to "settle" before syncing. We will only
|
||||
// attempt to sync messages that are older than this time.
|
||||
@@ -44,8 +45,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
|
||||
private _isSyncing = false;
|
||||
private _interruptSync = false;
|
||||
|
||||
private currentHubRpcClients: Map<string, HubRpcClient> = new Map();
|
||||
private periodSyncJobScheduler: PeriodicSyncJobScheduler;
|
||||
private currentHubPeerContacts: Map<string, protobufs.ContactInfoContent> = new Map();
|
||||
|
||||
private _messagesQueuedForSync = 0;
|
||||
|
||||
@@ -55,8 +55,6 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
|
||||
this._trie = new MerkleTrie(rocksDb);
|
||||
this.engine = engine;
|
||||
|
||||
this.periodSyncJobScheduler = new PeriodicSyncJobScheduler(this);
|
||||
|
||||
this.engine.eventHandler.on(
|
||||
'mergeMessage',
|
||||
async (message: protobufs.Message, deletedMessages?: protobufs.Message[]) => {
|
||||
@@ -104,8 +102,6 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
|
||||
}
|
||||
const rootHash = await this._trie.rootHash();
|
||||
|
||||
this.periodSyncJobScheduler.start();
|
||||
|
||||
log.info({ rootHash }, 'Sync engine initialized');
|
||||
}
|
||||
|
||||
@@ -113,8 +109,6 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
|
||||
// Interrupt any ongoing sync
|
||||
this._interruptSync = true;
|
||||
|
||||
this.periodSyncJobScheduler.stop();
|
||||
|
||||
// Wait for syncing to stop.
|
||||
await sleepWhile(() => this._isSyncing, SYNC_INTERRUPT_TIMEOUT);
|
||||
await sleepWhile(() => this.messagesQueuedForSync > 0, SYNC_INTERRUPT_TIMEOUT);
|
||||
@@ -126,49 +120,64 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
|
||||
return this._isSyncing;
|
||||
}
|
||||
|
||||
public getRpcClientForPeerId(peerId: string): HubRpcClient | undefined {
|
||||
return this.currentHubRpcClients.get(peerId);
|
||||
public getContactInfoForPeerId(peerId: string): protobufs.ContactInfoContent | undefined {
|
||||
return this.currentHubPeerContacts.get(peerId);
|
||||
}
|
||||
|
||||
public addRpcClientForPeerId(peerId: string, rpcClient: HubRpcClient) {
|
||||
this.currentHubRpcClients.set(peerId, rpcClient);
|
||||
public addContactInfoForPeerId(peerId: string, contactInfo: protobufs.ContactInfoContent) {
|
||||
this.currentHubPeerContacts.set(peerId, contactInfo);
|
||||
}
|
||||
|
||||
public removeRpcClientForPeerId(peerId: string) {
|
||||
this.currentHubRpcClients.delete(peerId);
|
||||
public removeContactInfoForPeerId(peerId: string) {
|
||||
this.currentHubPeerContacts.delete(peerId);
|
||||
}
|
||||
|
||||
/** ---------------------------------------------------------------------------------- */
|
||||
/** Sync Methods */
|
||||
/** ---------------------------------------------------------------------------------- */
|
||||
|
||||
public async diffSyncIfRequired(peerId?: string) {
|
||||
public async diffSyncIfRequired(hub: Hub, peerId?: string) {
|
||||
this.emit('syncStart');
|
||||
|
||||
let rpcClient;
|
||||
|
||||
if (peerId) {
|
||||
rpcClient = this.currentHubRpcClients.get(peerId);
|
||||
}
|
||||
|
||||
if (this.currentHubRpcClients.size === 0) {
|
||||
log.warn(`No RPC clients, skipping sync`);
|
||||
if (this.currentHubPeerContacts.size === 0) {
|
||||
log.warn(`No peer contacts, skipping sync`);
|
||||
this.emit('syncComplete', false);
|
||||
return;
|
||||
}
|
||||
|
||||
// If we don't have an RPC client, get a random one from the current clients
|
||||
if (!rpcClient) {
|
||||
// Pick a random key from the current clients
|
||||
const randomPeer = Array.from(this.currentHubRpcClients.keys())[
|
||||
Math.floor(Math.random() * this.currentHubRpcClients.size)
|
||||
] as string;
|
||||
rpcClient = this.currentHubRpcClients.get(randomPeer);
|
||||
let peerContact;
|
||||
|
||||
if (peerId) {
|
||||
peerContact = this.currentHubPeerContacts.get(peerId);
|
||||
}
|
||||
|
||||
// If we still don't have an RPC client, skip the sync
|
||||
// If we don't have a peer contact, get a random one from the current list
|
||||
if (!peerContact) {
|
||||
// Pick a random key
|
||||
const randomPeer = Array.from(this.currentHubPeerContacts.keys())[
|
||||
Math.floor(Math.random() * this.currentHubPeerContacts.size)
|
||||
] as string;
|
||||
peerContact = this.currentHubPeerContacts.get(randomPeer);
|
||||
}
|
||||
|
||||
// If we still don't have a peer, skip the sync
|
||||
if (!peerContact) {
|
||||
log.warn(`No contact info for peer, skipping sync`);
|
||||
this.emit('syncComplete', false);
|
||||
return;
|
||||
}
|
||||
|
||||
const peerIdResult = Result.fromThrowable(
|
||||
() => peerIdFromString(peerId ?? ''),
|
||||
(error) => new HubError('bad_request.parse_failure', error as Error)
|
||||
)();
|
||||
if (peerIdResult.isErr()) {
|
||||
return Promise.resolve(err(peerIdResult.error));
|
||||
}
|
||||
|
||||
const rpcClient = await hub.getRPCClientForPeer(peerIdResult.value, peerContact);
|
||||
if (!rpcClient) {
|
||||
log.warn(`No RPC client for peer, skipping sync`);
|
||||
log.warn(`Failed to get RPC client for peer, skipping sync`);
|
||||
this.emit('syncComplete', false);
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user