fix: handle edge cases of rocksdb instantiation (#1844)

## Motivation

- There were certain edge cases in instantiation that could cause
snapshots to be uploaded with 0 messages
- If snapshot flag is enabled but zero messages are found, we now return
an error
- path checks for trie db uses normalized paths and properly checks in
cases where there's prefix of `.rocks`
- If the trie DB directory exists, but hasn't loaded any messages yet,
we catch the error in getting root node and return 0

## Change Summary

- If snapshot flag is enabled but zero messages are found, we now return
an error
- path checks for trie db uses normalized paths and properly checks in
cases where there's prefix of `.rocks`
- If the trie DB directory exists, but hasn't loaded any messages yet,
we catch the error in getting root node and return 0


## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.
- [x] All [commits have been
signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits)

## Additional Context

If this is a relatively large or complex change, provide more details
here that will help reviewers

<!-- start pr-codex -->

---

## PR-Codex overview
The focus of this PR is to enhance the `hubble` app by handling edge
cases of rocksdb instantiation for snapshot uploads.

### Detailed summary
- Handle edge cases of rocksdb instantiation for snapshot uploads in
`hubble`
- Synchronously fetch the number of elements in the trie DB
- Throw an error if message count is not obtained for snapshot upload
- Support catch up sync with snapshot only on mainnet
- Improve logging for catchup sync using snapshot
- Refactor trie database instantiation for consistency and error
handling

>  Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
This commit is contained in:
Wasif Iqbal
2024-03-25 21:14:39 -05:00
committed by GitHub
parent 0e26c7643b
commit e3f4997615
4 changed files with 67 additions and 39 deletions

View File

@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---
fix(hubble): handle edge cases of rocksdb instantiation for snapshot uploads

View File

@@ -17,6 +17,7 @@ use neon::types::{
};
use rocksdb::{Options, TransactionDB};
use slog::{info, o};
use std::fmt::format;
use std::fs::{self, File};
use std::path::Path;
use std::sync::{Arc, RwLock, RwLockReadGuard};

View File

@@ -563,18 +563,34 @@ export class Hub implements HubInterface {
const tarResult = await ResultAsync.fromPromise(rsCreateTarBackup(this.rocksDB.rustDb), (e) => e as Error);
if (tarResult.isOk()) {
// Upload to S3. Run this in the background so we don't block startup.
setTimeout(async () => {
const messages = await MerkleTrie.numItems(this.syncEngine.trie);
if (messages.isErr()) {
// Fetch the number of elements in the trie DB synchronously, to avoid race conditions with other services
// that may open and access the DB.
const messages = await MerkleTrie.numItems(this.syncEngine.trie);
let messageCount = 0;
messages.match(
(numMessages) => {
messageCount = numMessages;
},
(error) => {
log.error(
{
error: messages.error,
error: error,
},
"failed to get message count from sync engine trie",
);
}
const messageCount = messages.isErr() ? -1 : messages.value;
// Throw an error if message count is not obtained, since it's required for snapshot upload
throw error;
},
);
// If snapshot to S3 flag is explicitly set, we throw an error if message count is zero,
// since it would be atypical to set this flag when there are no messages in the trie
if (messageCount <= 0) {
log.error("no messages found in sync engine trie, cannot upload snapshot");
throw new HubError("unavailable", "no messages found in sync engine trie, snapshot upload failed");
}
// Upload to S3. Run this in the background so we don't block startup.
setTimeout(async () => {
log.info({ messageCount }, "uploading snapshot to S3");
const s3Result = await uploadToS3(
this.options.network,
@@ -599,7 +615,8 @@ export class Hub implements HubInterface {
// Check if we need to catchup sync using snapshot
let catchupSyncResult: Result<boolean, Error> = ok(false);
if (this.options.catchupSyncWithSnapshot) {
// NOTE: catch up sync with snapshot is only supported on mainnet
if (this.options.catchupSyncWithSnapshot && this.options.network === FarcasterNetwork.MAINNET) {
log.info("attempting catchup sync with snapshot");
catchupSyncResult = await this.attemptCatchupSyncWithSnapshot();
if (catchupSyncResult.isErr()) {
@@ -874,7 +891,7 @@ export class Hub implements HubInterface {
delta = snapshotMetadata.numMessages - currentItemCount;
if (delta > limit) {
log.info({ delta, limit }, "catchup sync using snapshot");
log.info({ delta, limit, current_item_count: currentItemCount }, "catchup sync using snapshot");
shouldCatchupSync = true;
}

View File

@@ -14,7 +14,7 @@ import {
import { logger } from "../../utils/logger.js";
import { getStatsdInitialization } from "../../utils/statsd.js";
import { messageDecode } from "../../storage/db/message.js";
import path from "path";
import path, { dirname } from "path";
import fs from "fs";
/**
@@ -206,43 +206,48 @@ class MerkleTrie {
// If there are any other processes that operate on RocksDB while this is running, there may be
// inconsistencies or errors.
public static async numItems(trie: MerkleTrie): HubAsyncResult<number> {
// MerkleTrie has rocksdb instance, however the merkle trie worker
// uses a separate instance under trieDb prefix which needs to be used here instead.
const location = `${path.basename(trie._db.location)}/${TrieDBPathPrefix}`;
if (!fs.existsSync(path.basename(`.rocks/${location}`))) {
// The trie database is instantiated with new Rocksdb, which will prefix an input path with ".rocks"
const fullPath = path.join(trie._db.location, TrieDBPathPrefix);
const normalizedPath = path.normalize(fullPath);
const parts = normalizedPath.split(path.sep);
// Remove the first directory. Note that the first element might be empty
// if the path starts with a separator, indicating it's an absolute path.
// In such a case, remove the second element instead.
if (parts[0] === "") {
parts.splice(1, 1); // Remove the second element for absolute paths
} else {
parts.splice(0, 1); // Remove the first element for relative paths
}
// NOTE: trie._db.location has `.rocks` prefix. If we don't remove it, calling new RocksDB will end up with
// `.rocks/.rocks` prefix. This will throw an error because RocksDB won't be able to find the parent path.
const location = parts.join(path.sep);
if (!fs.existsSync(dirname(normalizedPath))) {
return ok(0);
}
const db = new RocksDB(location);
if (!db) {
return err(new HubError("unavailable", "RocksDB not provided"));
}
const isEmptyDir = await fs.promises.readdir(path.resolve(db.location)).then((files) => files.length === 0);
if (isEmptyDir) {
await db.open();
const rootResult = await ResultAsync.fromPromise(
db.get(TrieNode.makePrimaryKey(new Uint8Array())),
(e) => e as HubError,
);
db.close();
// If the root key was not found, return 0
if (rootResult.isErr()) {
return ok(0);
}
let wasOpen = true;
if (db.status !== "open") {
wasOpen = false;
await db.open();
const rootBytes = rootResult.value;
// If the root is empty, return 0
if (!(rootBytes && rootBytes.length > 0)) {
return ok(0);
}
if (db.status === "open") {
const rootBytes = await db.get(TrieNode.makePrimaryKey(new Uint8Array()));
if (!(rootBytes && rootBytes.length > 0)) {
return ok(0);
}
const root = TrieNode.deserialize(rootBytes);
// If db was open prior to this method call, leave it open after the method call.
// Otherwise, close the db.
if (!wasOpen) {
db.close();
}
return ok(root.items);
}
return err(new HubError("unavailable", "Unable to open RocksDB"));
const root = TrieNode.deserialize(rootBytes);
return ok(root.items);
}
public async stop(): Promise<void> {