mirror of
https://github.com/farcasterxyz/hub-monorepo.git
synced 2026-01-23 12:07:59 -05:00
fix: handle edge cases of rocksdb instantiation (#1844)
## Motivation - There were certain edge cases in instantiation that could cause snapshots to be uploaded with 0 messages - If snapshot flag is enabled but zero messages are found, we now return an error - path checks for trie db uses normalized paths and properly checks in cases where there's prefix of `.rocks` - If the trie DB directory exists, but hasn't loaded any messages yet, we catch the error in getting root node and return 0 ## Change Summary - If snapshot flag is enabled but zero messages are found, we now return an error - path checks for trie db uses normalized paths and properly checks in cases where there's prefix of `.rocks` - If the trie DB directory exists, but hasn't loaded any messages yet, we catch the error in getting root node and return 0 ## Merge Checklist _Choose all relevant options below by adding an `x` now or at any time before submitting for review_ - [x] PR title adheres to the [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) standard - [x] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [x] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [ ] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. - [x] All [commits have been signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits) ## Additional Context If this is a relatively large or complex change, provide more details here that will help reviewers <!-- start pr-codex --> --- ## PR-Codex overview The focus of this PR is to enhance the `hubble` app by handling edge cases of rocksdb instantiation for snapshot uploads. ### Detailed summary - Handle edge cases of rocksdb instantiation for snapshot uploads in `hubble` - Synchronously fetch the number of elements in the trie DB - Throw an error if message count is not obtained for snapshot upload - Support catch up sync with snapshot only on mainnet - Improve logging for catchup sync using snapshot - Refactor trie database instantiation for consistency and error handling > ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your question}` <!-- end pr-codex -->
This commit is contained in:
5
.changeset/modern-planes-nail.md
Normal file
5
.changeset/modern-planes-nail.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@farcaster/hubble": patch
|
||||
---
|
||||
|
||||
fix(hubble): handle edge cases of rocksdb instantiation for snapshot uploads
|
||||
@@ -17,6 +17,7 @@ use neon::types::{
|
||||
};
|
||||
use rocksdb::{Options, TransactionDB};
|
||||
use slog::{info, o};
|
||||
use std::fmt::format;
|
||||
use std::fs::{self, File};
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, RwLock, RwLockReadGuard};
|
||||
|
||||
@@ -563,18 +563,34 @@ export class Hub implements HubInterface {
|
||||
const tarResult = await ResultAsync.fromPromise(rsCreateTarBackup(this.rocksDB.rustDb), (e) => e as Error);
|
||||
|
||||
if (tarResult.isOk()) {
|
||||
// Upload to S3. Run this in the background so we don't block startup.
|
||||
setTimeout(async () => {
|
||||
const messages = await MerkleTrie.numItems(this.syncEngine.trie);
|
||||
if (messages.isErr()) {
|
||||
// Fetch the number of elements in the trie DB synchronously, to avoid race conditions with other services
|
||||
// that may open and access the DB.
|
||||
const messages = await MerkleTrie.numItems(this.syncEngine.trie);
|
||||
let messageCount = 0;
|
||||
messages.match(
|
||||
(numMessages) => {
|
||||
messageCount = numMessages;
|
||||
},
|
||||
(error) => {
|
||||
log.error(
|
||||
{
|
||||
error: messages.error,
|
||||
error: error,
|
||||
},
|
||||
"failed to get message count from sync engine trie",
|
||||
);
|
||||
}
|
||||
const messageCount = messages.isErr() ? -1 : messages.value;
|
||||
// Throw an error if message count is not obtained, since it's required for snapshot upload
|
||||
throw error;
|
||||
},
|
||||
);
|
||||
// If snapshot to S3 flag is explicitly set, we throw an error if message count is zero,
|
||||
// since it would be atypical to set this flag when there are no messages in the trie
|
||||
if (messageCount <= 0) {
|
||||
log.error("no messages found in sync engine trie, cannot upload snapshot");
|
||||
throw new HubError("unavailable", "no messages found in sync engine trie, snapshot upload failed");
|
||||
}
|
||||
|
||||
// Upload to S3. Run this in the background so we don't block startup.
|
||||
setTimeout(async () => {
|
||||
log.info({ messageCount }, "uploading snapshot to S3");
|
||||
const s3Result = await uploadToS3(
|
||||
this.options.network,
|
||||
@@ -599,7 +615,8 @@ export class Hub implements HubInterface {
|
||||
|
||||
// Check if we need to catchup sync using snapshot
|
||||
let catchupSyncResult: Result<boolean, Error> = ok(false);
|
||||
if (this.options.catchupSyncWithSnapshot) {
|
||||
// NOTE: catch up sync with snapshot is only supported on mainnet
|
||||
if (this.options.catchupSyncWithSnapshot && this.options.network === FarcasterNetwork.MAINNET) {
|
||||
log.info("attempting catchup sync with snapshot");
|
||||
catchupSyncResult = await this.attemptCatchupSyncWithSnapshot();
|
||||
if (catchupSyncResult.isErr()) {
|
||||
@@ -874,7 +891,7 @@ export class Hub implements HubInterface {
|
||||
|
||||
delta = snapshotMetadata.numMessages - currentItemCount;
|
||||
if (delta > limit) {
|
||||
log.info({ delta, limit }, "catchup sync using snapshot");
|
||||
log.info({ delta, limit, current_item_count: currentItemCount }, "catchup sync using snapshot");
|
||||
shouldCatchupSync = true;
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ import {
|
||||
import { logger } from "../../utils/logger.js";
|
||||
import { getStatsdInitialization } from "../../utils/statsd.js";
|
||||
import { messageDecode } from "../../storage/db/message.js";
|
||||
import path from "path";
|
||||
import path, { dirname } from "path";
|
||||
import fs from "fs";
|
||||
|
||||
/**
|
||||
@@ -206,43 +206,48 @@ class MerkleTrie {
|
||||
// If there are any other processes that operate on RocksDB while this is running, there may be
|
||||
// inconsistencies or errors.
|
||||
public static async numItems(trie: MerkleTrie): HubAsyncResult<number> {
|
||||
// MerkleTrie has rocksdb instance, however the merkle trie worker
|
||||
// uses a separate instance under trieDb prefix which needs to be used here instead.
|
||||
const location = `${path.basename(trie._db.location)}/${TrieDBPathPrefix}`;
|
||||
if (!fs.existsSync(path.basename(`.rocks/${location}`))) {
|
||||
// The trie database is instantiated with new Rocksdb, which will prefix an input path with ".rocks"
|
||||
const fullPath = path.join(trie._db.location, TrieDBPathPrefix);
|
||||
const normalizedPath = path.normalize(fullPath);
|
||||
const parts = normalizedPath.split(path.sep);
|
||||
// Remove the first directory. Note that the first element might be empty
|
||||
// if the path starts with a separator, indicating it's an absolute path.
|
||||
// In such a case, remove the second element instead.
|
||||
if (parts[0] === "") {
|
||||
parts.splice(1, 1); // Remove the second element for absolute paths
|
||||
} else {
|
||||
parts.splice(0, 1); // Remove the first element for relative paths
|
||||
}
|
||||
|
||||
// NOTE: trie._db.location has `.rocks` prefix. If we don't remove it, calling new RocksDB will end up with
|
||||
// `.rocks/.rocks` prefix. This will throw an error because RocksDB won't be able to find the parent path.
|
||||
const location = parts.join(path.sep);
|
||||
if (!fs.existsSync(dirname(normalizedPath))) {
|
||||
return ok(0);
|
||||
}
|
||||
|
||||
const db = new RocksDB(location);
|
||||
if (!db) {
|
||||
return err(new HubError("unavailable", "RocksDB not provided"));
|
||||
}
|
||||
const isEmptyDir = await fs.promises.readdir(path.resolve(db.location)).then((files) => files.length === 0);
|
||||
if (isEmptyDir) {
|
||||
await db.open();
|
||||
|
||||
const rootResult = await ResultAsync.fromPromise(
|
||||
db.get(TrieNode.makePrimaryKey(new Uint8Array())),
|
||||
(e) => e as HubError,
|
||||
);
|
||||
db.close();
|
||||
|
||||
// If the root key was not found, return 0
|
||||
if (rootResult.isErr()) {
|
||||
return ok(0);
|
||||
}
|
||||
|
||||
let wasOpen = true;
|
||||
if (db.status !== "open") {
|
||||
wasOpen = false;
|
||||
await db.open();
|
||||
const rootBytes = rootResult.value;
|
||||
// If the root is empty, return 0
|
||||
if (!(rootBytes && rootBytes.length > 0)) {
|
||||
return ok(0);
|
||||
}
|
||||
|
||||
if (db.status === "open") {
|
||||
const rootBytes = await db.get(TrieNode.makePrimaryKey(new Uint8Array()));
|
||||
if (!(rootBytes && rootBytes.length > 0)) {
|
||||
return ok(0);
|
||||
}
|
||||
|
||||
const root = TrieNode.deserialize(rootBytes);
|
||||
// If db was open prior to this method call, leave it open after the method call.
|
||||
// Otherwise, close the db.
|
||||
if (!wasOpen) {
|
||||
db.close();
|
||||
}
|
||||
return ok(root.items);
|
||||
}
|
||||
|
||||
return err(new HubError("unavailable", "Unable to open RocksDB"));
|
||||
const root = TrieNode.deserialize(rootBytes);
|
||||
return ok(root.items);
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
|
||||
Reference in New Issue
Block a user