chore: Migrate Trie data to TriDB (#1847)

## Motivation

Migrate any remaining entries in the main DB for the sync trie into the
Trie DB (and delete them from the main DB)


## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [X] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [X] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [ ] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.
- [X] All [commits have been
signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits)

<!-- start pr-codex -->

---

## PR-Codex overview
This PR migrates trie node data from the main DB to TrieDB in the
`hubble` app.

### Detailed summary
- Migrates key-values to TrieDB
- Adds `migrate` method to `MerkleTrie` for migration
- Implements key migration logic in batches
- Initiates migration in Prod environment
- Deletes migrated keys from the main DB

>  Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
This commit is contained in:
adityapk00
2024-03-26 09:16:05 -05:00
committed by GitHub
parent 3dad8049b4
commit 5ec735b494
3 changed files with 98 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---
chore: Migrate trie node data to TrieDB

View File

@@ -14,6 +14,7 @@ import {
import { logger } from "../../utils/logger.js";
import { getStatsdInitialization } from "../../utils/statsd.js";
import { messageDecode } from "../../storage/db/message.js";
import { sleep } from "../../utils/crypto.js";
import path, { dirname } from "path";
import fs from "fs";
@@ -56,6 +57,7 @@ export interface MerkleTrieInterface {
loggerFlush(): Promise<void>;
unloadChildrenAtPrefix(prefix: Uint8Array): Promise<void>;
stop(): Promise<void>;
migrate(keys: Uint8Array[], values: Uint8Array[]): Promise<number>;
}
// Typescript types to make sending messages to the worker thread type-safe
@@ -265,6 +267,13 @@ class MerkleTrie {
}
public async initialize(): Promise<void> {
// We'll do a migration only in Prod (not in test)
if (!(process.env["NODE_ENV"] === "test" || process.env["CI"])) {
setTimeout(async () => {
await this.doMigrate();
}, 1000);
}
return this.callMethod("initialize");
}
@@ -342,10 +351,57 @@ class MerkleTrie {
log.info({ count }, "Rebuilt fnmames trie");
}
async doMigrate() {
// We go over the trie keys in the DB and send them to the worker thread to migrate. When
// the worker thread returns, we delete from the DB and continue. We do this until we have no
// more keys left
const start = Date.now();
log.info("Starting migration of keys to new trie");
let keys: Uint8Array[] = [];
let values: Uint8Array[] = [];
// Migrate and delete the keys in batches
const migrateAndDelete = async () => {
const migrated = await this.migrate(keys, values);
log.info({ migrated, total: keys.length }, "Migrated keys to new trie");
// Delete from the DB
for (let i = 0; i < keys.length; i++) {
await this._db.del(Buffer.from(keys[i] as Uint8Array));
}
// Wait a bit before continuing
await sleep(1000);
keys = [];
values = [];
};
await this._db.forEachIteratorByPrefix(Buffer.from([RootPrefix.SyncMerkleTrieNode]), async (key, value) => {
keys.push(new Uint8Array(key as Buffer));
values.push(new Uint8Array(value as Buffer));
if (keys.length >= 10_000) {
await migrateAndDelete();
}
});
// Delete any remaining keys from the DB
await migrateAndDelete();
log.info({ duration: Date.now() - start }, "Finished migration of keys to new trie");
}
public async insert(id: SyncId): Promise<boolean> {
return this.callMethod("insert", id.syncId());
}
public async migrate(keys: Uint8Array[], values: Uint8Array[]): Promise<number> {
return this.callMethod("migrate", keys, values);
}
public async deleteBySyncId(id: SyncId): Promise<boolean> {
return this.callMethod("delete", id.syncId());
}

View File

@@ -268,6 +268,36 @@ class MerkleTrieImpl {
});
}
/**
* Migrate a set of key-values from the main DB to the trie DB. If the key exists in the DB, we
* skip it, otherwise we insert it into the trie DB.
*
* Return the number of keys migrated actually written to the trie DB.
*/
public async migrate(keys: Uint8Array[], values: Uint8Array[]): Promise<number> {
return new Promise((resolve) => {
this._lock.writeLock(async (release) => {
let migrated = 0;
for (let i = 0; i < keys.length; i++) {
const key = keys[i] as Uint8Array;
const value = values[i] as Uint8Array;
const dbValue = await this._dbGet(Buffer.from(key));
if (dbValue && dbValue.length > 0) {
continue;
}
const dbKeyValues = [{ key, value }];
await this._dbPut(dbKeyValues);
migrated++;
}
resolve(migrated);
release();
});
});
}
public async delete(id: Uint8Array): Promise<boolean> {
return new Promise((resolve) => {
this._lock.writeLock(async (release) => {
@@ -570,6 +600,13 @@ parentPort?.on(
parentPort?.postMessage({ methodCallId, result: makeResult<"stop">(undefined) });
break;
}
case "migrate": {
const specificMsg = msg as MerkleTrieInterfaceMessage<"migrate">;
const [keys, values] = specificMsg.args;
const result = await merkleTrie.migrate(keys, values);
parentPort?.postMessage({ methodCallId, result: makeResult<"migrate">(result) });
break;
}
}
},
);