mirror of
https://github.com/farcasterxyz/hub-monorepo.git
synced 2026-01-23 12:07:59 -05:00
chore: Migrate Trie data to TriDB (#1847)
## Motivation Migrate any remaining entries in the main DB for the sync trie into the Trie DB (and delete them from the main DB) ## Merge Checklist _Choose all relevant options below by adding an `x` now or at any time before submitting for review_ - [X] PR title adheres to the [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) standard - [X] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [ ] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [ ] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. - [X] All [commits have been signed](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#22-signing-commits) <!-- start pr-codex --> --- ## PR-Codex overview This PR migrates trie node data from the main DB to TrieDB in the `hubble` app. ### Detailed summary - Migrates key-values to TrieDB - Adds `migrate` method to `MerkleTrie` for migration - Implements key migration logic in batches - Initiates migration in Prod environment - Deletes migrated keys from the main DB > ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your question}` <!-- end pr-codex -->
This commit is contained in:
5
.changeset/nice-radios-mate.md
Normal file
5
.changeset/nice-radios-mate.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@farcaster/hubble": patch
|
||||
---
|
||||
|
||||
chore: Migrate trie node data to TrieDB
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
import { logger } from "../../utils/logger.js";
|
||||
import { getStatsdInitialization } from "../../utils/statsd.js";
|
||||
import { messageDecode } from "../../storage/db/message.js";
|
||||
import { sleep } from "../../utils/crypto.js";
|
||||
import path, { dirname } from "path";
|
||||
import fs from "fs";
|
||||
|
||||
@@ -56,6 +57,7 @@ export interface MerkleTrieInterface {
|
||||
loggerFlush(): Promise<void>;
|
||||
unloadChildrenAtPrefix(prefix: Uint8Array): Promise<void>;
|
||||
stop(): Promise<void>;
|
||||
migrate(keys: Uint8Array[], values: Uint8Array[]): Promise<number>;
|
||||
}
|
||||
|
||||
// Typescript types to make sending messages to the worker thread type-safe
|
||||
@@ -265,6 +267,13 @@ class MerkleTrie {
|
||||
}
|
||||
|
||||
public async initialize(): Promise<void> {
|
||||
// We'll do a migration only in Prod (not in test)
|
||||
if (!(process.env["NODE_ENV"] === "test" || process.env["CI"])) {
|
||||
setTimeout(async () => {
|
||||
await this.doMigrate();
|
||||
}, 1000);
|
||||
}
|
||||
|
||||
return this.callMethod("initialize");
|
||||
}
|
||||
|
||||
@@ -342,10 +351,57 @@ class MerkleTrie {
|
||||
log.info({ count }, "Rebuilt fnmames trie");
|
||||
}
|
||||
|
||||
async doMigrate() {
|
||||
// We go over the trie keys in the DB and send them to the worker thread to migrate. When
|
||||
// the worker thread returns, we delete from the DB and continue. We do this until we have no
|
||||
// more keys left
|
||||
const start = Date.now();
|
||||
log.info("Starting migration of keys to new trie");
|
||||
|
||||
let keys: Uint8Array[] = [];
|
||||
let values: Uint8Array[] = [];
|
||||
|
||||
// Migrate and delete the keys in batches
|
||||
const migrateAndDelete = async () => {
|
||||
const migrated = await this.migrate(keys, values);
|
||||
|
||||
log.info({ migrated, total: keys.length }, "Migrated keys to new trie");
|
||||
|
||||
// Delete from the DB
|
||||
for (let i = 0; i < keys.length; i++) {
|
||||
await this._db.del(Buffer.from(keys[i] as Uint8Array));
|
||||
}
|
||||
|
||||
// Wait a bit before continuing
|
||||
await sleep(1000);
|
||||
|
||||
keys = [];
|
||||
values = [];
|
||||
};
|
||||
|
||||
await this._db.forEachIteratorByPrefix(Buffer.from([RootPrefix.SyncMerkleTrieNode]), async (key, value) => {
|
||||
keys.push(new Uint8Array(key as Buffer));
|
||||
values.push(new Uint8Array(value as Buffer));
|
||||
|
||||
if (keys.length >= 10_000) {
|
||||
await migrateAndDelete();
|
||||
}
|
||||
});
|
||||
|
||||
// Delete any remaining keys from the DB
|
||||
await migrateAndDelete();
|
||||
|
||||
log.info({ duration: Date.now() - start }, "Finished migration of keys to new trie");
|
||||
}
|
||||
|
||||
public async insert(id: SyncId): Promise<boolean> {
|
||||
return this.callMethod("insert", id.syncId());
|
||||
}
|
||||
|
||||
public async migrate(keys: Uint8Array[], values: Uint8Array[]): Promise<number> {
|
||||
return this.callMethod("migrate", keys, values);
|
||||
}
|
||||
|
||||
public async deleteBySyncId(id: SyncId): Promise<boolean> {
|
||||
return this.callMethod("delete", id.syncId());
|
||||
}
|
||||
|
||||
@@ -268,6 +268,36 @@ class MerkleTrieImpl {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Migrate a set of key-values from the main DB to the trie DB. If the key exists in the DB, we
|
||||
* skip it, otherwise we insert it into the trie DB.
|
||||
*
|
||||
* Return the number of keys migrated actually written to the trie DB.
|
||||
*/
|
||||
public async migrate(keys: Uint8Array[], values: Uint8Array[]): Promise<number> {
|
||||
return new Promise((resolve) => {
|
||||
this._lock.writeLock(async (release) => {
|
||||
let migrated = 0;
|
||||
for (let i = 0; i < keys.length; i++) {
|
||||
const key = keys[i] as Uint8Array;
|
||||
const value = values[i] as Uint8Array;
|
||||
|
||||
const dbValue = await this._dbGet(Buffer.from(key));
|
||||
if (dbValue && dbValue.length > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const dbKeyValues = [{ key, value }];
|
||||
await this._dbPut(dbKeyValues);
|
||||
migrated++;
|
||||
}
|
||||
|
||||
resolve(migrated);
|
||||
release();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public async delete(id: Uint8Array): Promise<boolean> {
|
||||
return new Promise((resolve) => {
|
||||
this._lock.writeLock(async (release) => {
|
||||
@@ -570,6 +600,13 @@ parentPort?.on(
|
||||
parentPort?.postMessage({ methodCallId, result: makeResult<"stop">(undefined) });
|
||||
break;
|
||||
}
|
||||
case "migrate": {
|
||||
const specificMsg = msg as MerkleTrieInterfaceMessage<"migrate">;
|
||||
const [keys, values] = specificMsg.args;
|
||||
const result = await merkleTrie.migrate(keys, values);
|
||||
parentPort?.postMessage({ methodCallId, result: makeResult<"migrate">(result) });
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user