feat: Use bytes instead of hex string for SyncIds (#483)

This commit is contained in:
adityapk00
2023-01-30 17:20:33 -06:00
committed by GitHub
parent ed7ef3ca0f
commit b5daeebc62
14 changed files with 244 additions and 194 deletions

View File

@@ -20,7 +20,7 @@ export class SyncTrieCommand implements ConsoleCommandInterface {
}
rootHash = async () => {
const result = await this.rpcClient.getSyncSnapshotByPrefix(TrieNodePrefix.create({ prefix: '' }));
const result = await this.rpcClient.getSyncSnapshotByPrefix(TrieNodePrefix.create({ prefix: new Uint8Array() }));
return result.match(
(snapshot) => {
return snapshot.rootHash;
@@ -31,8 +31,10 @@ export class SyncTrieCommand implements ConsoleCommandInterface {
);
};
snapshot = async (prefix?: string) => {
const result = await this.rpcClient.getSyncSnapshotByPrefix(TrieNodePrefix.create({ prefix: prefix ?? '' }));
snapshot = async (prefix?: Uint8Array) => {
const result = await this.rpcClient.getSyncSnapshotByPrefix(
TrieNodePrefix.create({ prefix: prefix ?? new Uint8Array() })
);
return result.match<any>(
(snapshot) => {
return snapshot;
@@ -43,8 +45,10 @@ export class SyncTrieCommand implements ConsoleCommandInterface {
);
};
metadata = async (prefix?: string) => {
const result = await this.rpcClient.getSyncMetadataByPrefix(TrieNodePrefix.create({ prefix: prefix ?? '' }));
metadata = async (prefix?: Uint8Array) => {
const result = await this.rpcClient.getSyncMetadataByPrefix(
TrieNodePrefix.create({ prefix: prefix ?? new Uint8Array() })
);
return result.match<any>(
(metadata) => {
return metadata;
@@ -55,8 +59,10 @@ export class SyncTrieCommand implements ConsoleCommandInterface {
);
};
syncIds = async (prefix?: string) => {
const result = await this.rpcClient.getAllSyncIdsByPrefix(TrieNodePrefix.create({ prefix: prefix ?? '' }));
syncIds = async (prefix?: Uint8Array) => {
const result = await this.rpcClient.getAllSyncIdsByPrefix(
TrieNodePrefix.create({ prefix: prefix ?? new Uint8Array() })
);
return result.match<any>(
(syncIds) => {
return syncIds;

View File

@@ -355,7 +355,9 @@ export class Hub extends TypedEmitter<HubEvents> implements HubInterface {
}
// First, get the latest state from the peer
const peerStateResult = await rpcClient.getSyncSnapshotByPrefix(protobufs.TrieNodePrefix.create({ prefix: '' }));
const peerStateResult = await rpcClient.getSyncSnapshotByPrefix(
protobufs.TrieNodePrefix.create({ prefix: new Uint8Array() })
);
if (peerStateResult.isErr()) {
log.warn(`Failed to get peer state, skipping sync`);
this.emit('syncComplete', false);

View File

@@ -154,6 +154,23 @@ describe('MerkleTrie', () => {
expect(trie.exists(nonExistingSyncId)).toBeFalsy();
});
test('test multiple items with delete', async () => {
const trie = new MerkleTrie();
const syncIds = await NetworkFactories.SyncId.createList(20);
// Keep track of start time and memory used
syncIds.forEach((syncId) => trie.insert(syncId));
// Delete half of the items
syncIds.slice(0, syncIds.length / 2).forEach((syncId) => trie.delete(syncId));
// Check that the items are still there
syncIds.slice(0, syncIds.length / 2).forEach((syncId) => expect(trie.exists(syncId)).toBeFalsy());
syncIds.slice(syncIds.length / 2).forEach((syncId) => {
expect(trie.exists(syncId)).toBeTruthy();
});
});
test('value is always undefined for non-leaf nodes', async () => {
const trie = new MerkleTrie();
const syncId = await NetworkFactories.SyncId.create();
@@ -169,7 +186,7 @@ describe('MerkleTrie', () => {
const trie = new MerkleTrie();
trie.insert(syncId);
expect(trie.getTrieNodeMetadata('166518234')).toBeUndefined();
expect(trie.getTrieNodeMetadata(Buffer.from('166518234'))).toBeUndefined();
});
test('returns the root metadata if the prefix is empty', async () => {
@@ -177,24 +194,24 @@ describe('MerkleTrie', () => {
const trie = new MerkleTrie();
trie.insert(syncId);
const nodeMetadata = trie.getTrieNodeMetadata('');
const nodeMetadata = trie.getTrieNodeMetadata(new Uint8Array());
expect(nodeMetadata).toBeDefined();
expect(nodeMetadata?.numMessages).toEqual(1);
expect(nodeMetadata?.prefix).toEqual('');
expect(nodeMetadata?.prefix).toEqual(new Uint8Array());
expect(nodeMetadata?.children?.size).toEqual(1);
expect(nodeMetadata?.children?.get('1')).toBeDefined();
expect(nodeMetadata?.children?.get(syncId.syncId()[0] as number)).toBeDefined();
});
test('returns the correct metadata if prefix is present', async () => {
const trie = await trieWithIds([1665182332, 1665182343]);
const nodeMetadata = trie.getTrieNodeMetadata('16651823');
const nodeMetadata = trie.getTrieNodeMetadata(Buffer.from('16651823'));
expect(nodeMetadata).toBeDefined();
expect(nodeMetadata?.numMessages).toEqual(2);
expect(nodeMetadata?.prefix).toEqual('16651823');
expect(nodeMetadata?.prefix).toEqual(Buffer.from('16651823'));
expect(nodeMetadata?.children?.size).toEqual(2);
expect(nodeMetadata?.children?.get('3')).toBeDefined();
expect(nodeMetadata?.children?.get('4')).toBeDefined();
expect(nodeMetadata?.children?.get(Buffer.from('3')[0] as number)).toBeDefined();
expect(nodeMetadata?.children?.get(Buffer.from('4')[0] as number)).toBeDefined();
});
});
@@ -202,8 +219,8 @@ describe('MerkleTrie', () => {
test('returns basic information', async () => {
const trie = await trieWithIds([1665182332, 1665182343]);
const snapshot = trie.getSnapshot('1665182343');
expect(snapshot.prefix).toEqual('1665182343');
const snapshot = trie.getSnapshot(Buffer.from('1665182343'));
expect(snapshot.prefix).toEqual(Buffer.from('1665182343'));
expect(snapshot.numMessages).toEqual(1);
expect(snapshot.excludedHashes.length).toEqual('1665182343'.length);
});
@@ -211,22 +228,22 @@ describe('MerkleTrie', () => {
test('returns early when prefix is only partially present', async () => {
const trie = await trieWithIds([1665182332, 1665182343]);
const snapshot = trie.getSnapshot('1677123');
expect(snapshot.prefix).toEqual('167');
const snapshot = trie.getSnapshot(Buffer.from('1677123'));
expect(snapshot.prefix).toEqual(Buffer.from('167'));
expect(snapshot.numMessages).toEqual(2);
expect(snapshot.excludedHashes.length).toEqual('167'.length);
});
test('excluded hashes excludes the prefix char at every level', async () => {
const trie = await trieWithIds([1665182332, 1665182343, 1665182345, 1665182351]);
let snapshot = trie.getSnapshot('1665182351');
let node = trie.getTrieNodeMetadata('16651823');
let snapshot = trie.getSnapshot(Buffer.from('1665182351'));
let node = trie.getTrieNodeMetadata(Buffer.from('16651823'));
// We expect the excluded hash to be the hash of the 3 and 4 child nodes, and excludes the 5 child node
const expectedHash = Buffer.from(
blake3
.create({ dkLen: 20 })
.update(node?.children?.get('3')?.hash || '')
.update(node?.children?.get('4')?.hash || '')
.update(node?.children?.get(Buffer.from('3')[0] as number)?.hash || '')
.update(node?.children?.get(Buffer.from('4')[0] as number)?.hash || '')
.digest()
).toString('hex');
expect(snapshot.excludedHashes).toEqual([
@@ -242,15 +259,17 @@ describe('MerkleTrie', () => {
EMPTY_HASH, // 1
]);
snapshot = trie.getSnapshot('1665182343');
node = trie.getTrieNodeMetadata('166518234');
const expectedLastHash = Buffer.from(blake3(node?.children?.get('5')?.hash || '', { dkLen: 20 })).toString('hex');
node = trie.getTrieNodeMetadata('16651823');
snapshot = trie.getSnapshot(Buffer.from('1665182343'));
node = trie.getTrieNodeMetadata(Buffer.from('166518234'));
const expectedLastHash = Buffer.from(
blake3(node?.children?.get(Buffer.from('5')[0] as number)?.hash || '', { dkLen: 20 })
).toString('hex');
node = trie.getTrieNodeMetadata(Buffer.from('16651823'));
const expectedPenultimateHash = Buffer.from(
blake3
.create({ dkLen: 20 })
.update(node?.children?.get('3')?.hash || '')
.update(node?.children?.get('5')?.hash || '')
.update(node?.children?.get(Buffer.from('3')[0] as number)?.hash || '')
.update(node?.children?.get(Buffer.from('5')[0] as number)?.hash || '')
.digest()
).toString('hex');
expect(snapshot.excludedHashes).toEqual([
@@ -271,22 +290,22 @@ describe('MerkleTrie', () => {
test('getAllValues returns all values for child nodes', async () => {
const trie = await trieWithIds([1665182332, 1665182343, 1665182345]);
let values = trie.root.getNode('16651823')?.getAllValues();
let values = trie.root.getNode(Buffer.from('16651823'))?.getAllValues();
expect(values?.length).toEqual(3);
values = trie.root.getNode('166518233')?.getAllValues();
values = trie.root.getNode(Buffer.from('166518233'))?.getAllValues();
expect(values?.length).toEqual(1);
});
describe('getDivergencePrefix', () => {
test('returns the prefix with the most common excluded hashes', async () => {
const trie = await trieWithIds([1665182332, 1665182343, 1665182345]);
const prefixToTest = '1665182343';
const prefixToTest = Buffer.from('1665182343');
const oldSnapshot = trie.getSnapshot(prefixToTest);
trie.insert(await NetworkFactories.SyncId.create(undefined, { transient: { date: new Date(1665182353000) } }));
// Since message above was added at 1665182353, the two tries diverged at 16651823 for our prefix
let divergencePrefix = trie.getDivergencePrefix(prefixToTest, oldSnapshot.excludedHashes);
expect(divergencePrefix).toEqual('16651823');
expect(divergencePrefix).toEqual(Buffer.from('16651823'));
// divergence prefix should be the full prefix, if snapshots are the same
const currentSnapshot = trie.getSnapshot(prefixToTest);
@@ -295,10 +314,11 @@ describe('MerkleTrie', () => {
// divergence prefix should empty if excluded hashes are empty
divergencePrefix = trie.getDivergencePrefix(prefixToTest, []);
expect(divergencePrefix).toEqual('');
expect(divergencePrefix.length).toEqual(0);
// divergence prefix should be our prefix if provided hashes are longer
divergencePrefix = trie.getDivergencePrefix(prefixToTest + '5', [...currentSnapshot.excludedHashes, 'different']);
const with5 = Buffer.concat([prefixToTest, Buffer.from('5')]);
divergencePrefix = trie.getDivergencePrefix(with5, [...currentSnapshot.excludedHashes, 'different']);
expect(divergencePrefix).toEqual(prefixToTest);
});
});

View File

@@ -10,10 +10,10 @@ import { TrieNode, TrieSnapshot } from '~/network/sync/trieNode';
* @children - The immediate children of this node
*/
export type NodeMetadata = {
prefix: string;
prefix: Uint8Array;
numMessages: number;
hash: string;
children?: Map<string, NodeMetadata>;
children?: Map<number, NodeMetadata>;
};
/**
@@ -41,29 +41,28 @@ class MerkleTrie {
}
public insert(id: SyncId): boolean {
// TODO(aditya): We should insert Uint8Array instead of string
return this._root.insert(id.idString());
return this._root.insert(id.syncId());
}
public delete(id: SyncId): boolean {
return this._root.delete(id.idString());
return this._root.delete(id.syncId());
}
public exists(id: SyncId): boolean {
// NOTE: eslint falsely identifies as `fs.exists`.
// eslint-disable-next-line security/detect-non-literal-fs-filename
return this._root.exists(id.idString());
return this._root.exists(id.syncId());
}
// A snapshot captures the state of the trie excluding the nodes
// specified in the prefix. See TrieSnapshot for more
public getSnapshot(prefix: string): TrieSnapshot {
public getSnapshot(prefix: Uint8Array): TrieSnapshot {
return this._root.getSnapshot(prefix);
}
// Compares excluded hashes of another trie with this trie to determine at which prefix the
// states differ. Returns the subset of prefix that's common to both tries.
public getDivergencePrefix(prefix: string, excludedHashes: string[]): string {
public getDivergencePrefix(prefix: Uint8Array, excludedHashes: string[]): Uint8Array {
const ourExcludedHashes = this.getSnapshot(prefix).excludedHashes;
for (let i = 0; i < prefix.length; i++) {
// NOTE: `i` is controlled by for loop and hence not at risk of object injection.
@@ -75,17 +74,18 @@ class MerkleTrie {
return prefix;
}
public getTrieNodeMetadata(prefix: string): NodeMetadata | undefined {
public getTrieNodeMetadata(prefix: Uint8Array): NodeMetadata | undefined {
const node = this._root.getNode(prefix);
if (node === undefined) {
return undefined;
}
const children = node?.children || new Map();
const result = new Map<string, NodeMetadata>();
const result = new Map<number, NodeMetadata>();
for (const [char, child] of children) {
const newPrefix = Buffer.concat([prefix, Buffer.from([char])]);
result.set(char, {
numMessages: child.items,
prefix: prefix + char,
prefix: newPrefix,
hash: child.hash,
});
}

View File

@@ -186,7 +186,7 @@ describe('SyncEngine', () => {
// Return an empty child map so sync will finish with a noop
const emptyMetadata = protobufs.TrieNodeMetadataResponse.create({
prefix: '',
prefix: new Uint8Array(),
numMessages: 1000,
hash: '',
children: [],

View File

@@ -111,7 +111,7 @@ class SyncEngine {
}
}
public async fetchAndMergeMessages(syncIds: string[], rpcClient: HubRpcClient): Promise<boolean> {
public async fetchAndMergeMessages(syncIds: Uint8Array[], rpcClient: HubRpcClient): Promise<boolean> {
if (syncIds.length === 0) {
return false;
}
@@ -166,8 +166,8 @@ class SyncEngine {
theirNode: NodeMetadata,
ourNode: NodeMetadata | undefined,
rpcClient: HubRpcClient
): Promise<string[]> {
const missingHashes: string[] = [];
): Promise<Uint8Array[]> {
const missingHashes: Uint8Array[] = [];
// If the node has fewer than HASHES_PER_FETCH, just fetch them all in go, otherwise,
// iterate through the node's children and fetch them in batches.
if (theirNode.numMessages <= HASHES_PER_FETCH) {
@@ -193,11 +193,11 @@ class SyncEngine {
return missingHashes;
}
async fetchMissingHashesByPrefix(prefix: string, rpcClient: HubRpcClient): Promise<string[]> {
async fetchMissingHashesByPrefix(prefix: Uint8Array, rpcClient: HubRpcClient): Promise<Uint8Array[]> {
const ourNode = this._trie.getTrieNodeMetadata(prefix);
const theirNodeResult = await rpcClient.getSyncMetadataByPrefix(protobufs.TrieNodePrefix.create({ prefix }));
const missingHashes: string[] = [];
const missingHashes: Uint8Array[] = [];
await theirNodeResult.match(
async (theirNode) => {
missingHashes.push(
@@ -222,11 +222,11 @@ class SyncEngine {
this._trie.delete(new SyncId(message));
}
public getTrieNodeMetadata(prefix: string): NodeMetadata | undefined {
public getTrieNodeMetadata(prefix: Uint8Array): NodeMetadata | undefined {
return this._trie.getTrieNodeMetadata(prefix);
}
public getAllSyncIdsByPrefix(prefix: string): string[] {
public getAllSyncIdsByPrefix(prefix: Uint8Array): Uint8Array[] {
return this._trie.root.getNode(prefix)?.getAllValues() ?? [];
}
@@ -234,8 +234,8 @@ class SyncEngine {
return this._trie;
}
public getSnapshotByPrefix(prefix?: string): HubResult<TrieSnapshot> {
if (!prefix || prefix === '') {
public getSnapshotByPrefix(prefix?: Uint8Array): HubResult<TrieSnapshot> {
if (!prefix || prefix.length === 0) {
return this.snapshot;
} else {
return ok(this._trie.getSnapshot(prefix));
@@ -246,7 +246,7 @@ class SyncEngine {
return this.snapshotTimestamp.map((snapshotTimestamp) => {
// Ignore the least significant digit when fetching the snapshot timestamp because
// second resolution is too fine grained, and fall outside sync threshold anyway
return this._trie.getSnapshot(timestampToPaddedTimestampPrefix(snapshotTimestamp / 10).toString());
return this._trie.getSnapshot(Buffer.from(timestampToPaddedTimestampPrefix(snapshotTimestamp / 10)));
});
}
@@ -297,19 +297,21 @@ class SyncEngine {
}
const fromNodeMetadataResponse = (response: protobufs.TrieNodeMetadataResponse): NodeMetadata => {
const children = new Map<string, NodeMetadata>();
const children = new Map<number, NodeMetadata>();
for (let i = 0; i < response.children.length; i++) {
const child = response.children[i];
const prefix = child?.prefix ?? '';
// Char is the last char of prefix
const char = prefix[prefix.length - 1] ?? '';
if (child && child.prefix.length > 0) {
const prefix = child.prefix;
// Char is the last char of prefix
const char = prefix[prefix.length - 1] as number;
children.set(char, {
numMessages: Number(child?.numMessages),
prefix,
hash: child?.hash ?? '',
});
children.set(char, {
numMessages: Number(child?.numMessages),
prefix,
hash: child?.hash ?? '',
});
}
}
return {

View File

@@ -2,7 +2,7 @@ import * as protobufs from '@farcaster/protobufs';
import { makeUserKey, typeToSetPostfix } from '~/storage/db/message';
const TIMESTAMP_LENGTH = 10; // 10 bytes for timestamp in decimal
const HASH_LENGTH = 160; // We're using 20 byte blake2b hashes
const HASH_LENGTH = 20; // We're using 20 byte blake2b hashes
/**
* SyncId allows for a stable, time ordered lexicographic sorting of messages across hubs
@@ -11,32 +11,33 @@ const HASH_LENGTH = 160; // We're using 20 byte blake2b hashes
*/
class SyncId {
private readonly _fid: number;
private readonly _tsHash: Uint8Array;
private readonly _hash: Uint8Array;
private readonly _timestamp: number;
private readonly _type: number;
constructor(message: protobufs.Message) {
this._fid = message.data?.fid || 0;
this._tsHash = message.hash;
this._hash = message.hash;
this._timestamp = message.data?.timestamp || 0;
this._type = message.data?.type || 0;
}
public idString(): string {
public syncId(): Uint8Array {
// For our MerkleTrie, seconds is a good enough resolution
// We also want to normalize the length to 10 characters, so that the MerkleTrie
// will always have the same depth for any timestamp (even 0).
const timestampString = timestampToPaddedTimestampPrefix(this._timestamp);
const buf = makeMessagePrimaryKey(this._fid, this._type, this._tsHash);
return timestampString + buf.toString('hex');
const buf = makeMessagePrimaryKey(this._fid, this._type, this._hash);
// We prepend the timestamp to the hash so that the MerkleTrie is sorted by timestamp
return Buffer.concat([Buffer.from(timestampString), buf]);
}
static pkFromIdString(idString: string): Buffer {
static pkFromSyncId(syncId: Uint8Array): Buffer {
// The first 10 bytes are the timestamp, so we skip them
const pk = idString.slice(TIMESTAMP_LENGTH);
const pk = syncId.slice(TIMESTAMP_LENGTH);
return Buffer.from(pk, 'hex');
return Buffer.from(pk);
}
}

View File

@@ -1,4 +1,4 @@
import { Factories } from '@farcaster/utils';
import { Factories, hexStringToBytes } from '@farcaster/utils';
import { TIMESTAMP_LENGTH } from '~/network/sync/syncId';
import { EMPTY_HASH, TrieNode } from '~/network/sync/trieNode';
import { NetworkFactories } from '~/network/utils/factories';
@@ -20,7 +20,7 @@ describe('TrieNode', () => {
if (children.length > 1) {
return node;
}
return traverse((children[0] as [string, TrieNode])[1]);
return traverse((children[0] as [number, TrieNode])[1]);
};
describe('insert', () => {
@@ -31,7 +31,7 @@ describe('TrieNode', () => {
expect(root.items).toEqual(0);
expect(root.hash).toEqual('');
root.insert(id.idString());
root.insert(id.syncId());
expect(root.items).toEqual(1);
expect(root.hash).toBeTruthy();
@@ -41,10 +41,10 @@ describe('TrieNode', () => {
const root = new TrieNode();
const id = await NetworkFactories.SyncId.create();
root.insert(id.idString());
root.insert(id.syncId());
expect(root.items).toEqual(1);
const previousHash = root.hash;
root.insert(id.idString());
root.insert(id.syncId());
expect(root.hash).toEqual(previousHash);
expect(root.items).toEqual(1);
@@ -54,18 +54,18 @@ describe('TrieNode', () => {
const root = new TrieNode();
const id = await NetworkFactories.SyncId.create();
root.insert(id.idString());
root.insert(id.syncId());
let node = root;
// Timestamp portion of the key is not collapsed, but the hash portion is
for (let i = 0; i < TIMESTAMP_LENGTH; i++) {
const children = Array.from(node.children);
const firstChild = children[0] as [string, TrieNode];
const firstChild = children[0] as [number, TrieNode];
expect(children.length).toEqual(1);
node = firstChild[1];
}
expect(node.isLeaf).toEqual(true);
expect(node.value).toEqual(id.idString());
expect(node.value).toEqual(id.syncId());
});
test('inserting another key with a common prefix splits the node', async () => {
@@ -74,37 +74,37 @@ describe('TrieNode', () => {
const id1 = await NetworkFactories.SyncId.create(undefined, {
transient: { date: sharedDate, hash: sharedPrefixHashA, fid },
});
const hash1 = id1.idString();
const hash1 = id1.syncId();
const id2 = await NetworkFactories.SyncId.create(undefined, {
transient: { date: sharedDate, hash: sharedPrefixHashB, fid },
});
const hash2 = id2.idString();
const hash2 = id2.syncId();
// The node at which the trie splits should be the first character that differs between the two hashes
// eslint-disable-next-line security/detect-object-injection
const firstDiffPos = hash1.split('').findIndex((c, i) => c !== hash2[i]);
const firstDiffPos = hash1.findIndex((c, i) => c !== hash2[i]);
const root = new TrieNode();
root.insert(id1.idString());
root.insert(id2.idString());
root.insert(id1.syncId());
root.insert(id2.syncId());
const splitNode = traverse(root);
expect(splitNode.items).toEqual(2);
const children = Array.from(splitNode.children);
const firstChild = children[0] as [string, TrieNode];
const secondChild = children[1] as [string, TrieNode];
const firstChild = children[0] as [number, TrieNode];
const secondChild = children[1] as [number, TrieNode];
expect(children.length).toEqual(2);
// hash1 node
// eslint-disable-next-line security/detect-object-injection
expect(firstChild[0]).toEqual(hash1[firstDiffPos]);
expect(firstChild[1].isLeaf).toBeTruthy();
expect(firstChild[1].value).toEqual(id1.idString());
expect(firstChild[1].value).toEqual(id1.syncId());
// hash2 node
// eslint-disable-next-line security/detect-object-injection
expect(secondChild[0]).toEqual(hash2[firstDiffPos]);
expect(secondChild[1].isLeaf).toBeTruthy();
expect(secondChild[1].value).toEqual(id2.idString());
expect(secondChild[1].value).toEqual(id2.syncId());
});
});
@@ -113,10 +113,10 @@ describe('TrieNode', () => {
const root = new TrieNode();
const id = await NetworkFactories.SyncId.create();
root.insert(id.idString());
root.insert(id.syncId());
expect(root.items).toEqual(1);
root.delete(id.idString());
root.delete(id.syncId());
expect(root.items).toEqual(0);
expect(root.hash).toEqual(EMPTY_HASH);
});
@@ -126,15 +126,15 @@ describe('TrieNode', () => {
const id1 = await NetworkFactories.SyncId.create(undefined, { transient: { date: sharedDate } });
const id2 = await NetworkFactories.SyncId.create(undefined, { transient: { date: sharedDate } });
root.insert(id1.idString());
root.insert(id1.syncId());
const previousHash = root.hash;
root.insert(id2.idString());
root.insert(id2.syncId());
expect(root.items).toEqual(2);
root.delete(id2.idString());
root.delete(id2.syncId());
expect(root.items).toEqual(1);
// eslint-disable-next-line security/detect-non-literal-fs-filename
expect(root.exists(id2.idString())).toBeFalsy();
expect(root.exists(id2.syncId())).toBeFalsy();
expect(root.hash).toEqual(previousHash);
});
@@ -147,14 +147,14 @@ describe('TrieNode', () => {
});
const root = new TrieNode();
root.insert(id1.idString());
root.insert(id1.syncId());
const previousRootHash = root.hash;
const leafNode = traverse(root);
root.insert(id2.idString());
root.insert(id2.syncId());
expect(root.hash).not.toEqual(previousRootHash);
root.delete(id2.idString());
root.delete(id2.syncId());
const newLeafNode = traverse(root);
expect(newLeafNode).toEqual(leafNode);
@@ -163,25 +163,25 @@ describe('TrieNode', () => {
test('deleting item only compacts the branch of the trie with the deleted item', async () => {
const ids = [
'0'.padStart(TIMESTAMP_LENGTH, '0') + '01068',
'0'.padStart(TIMESTAMP_LENGTH, '0') + '010a1',
'0'.padStart(TIMESTAMP_LENGTH, '0') + '05d22',
];
'0'.padStart(TIMESTAMP_LENGTH * 2, '0') + '010680',
'0'.padStart(TIMESTAMP_LENGTH * 2, '0') + '010a10',
'0'.padStart(TIMESTAMP_LENGTH * 2, '0') + '05d220',
].map((id) => hexStringToBytes(id)._unsafeUnwrap());
const root = new TrieNode();
for (let i = 0; i < ids.length; i++) {
root.insert(ids[i] as string);
root.insert(ids[i] as Uint8Array);
}
// Remove the first id
root.delete(ids[0] as string);
root.delete(ids[0] as Uint8Array);
// Expect the other two ids to be present
// eslint-disable-next-line security/detect-non-literal-fs-filename
expect(root.exists(ids[1] as string)).toBeTruthy();
expect(root.exists(ids[1] as Uint8Array)).toBeTruthy();
// eslint-disable-next-line security/detect-non-literal-fs-filename
expect(root.exists(ids[2] as string)).toBeTruthy();
expect(root.exists(ids[2] as Uint8Array)).toBeTruthy();
expect(root.items).toEqual(2);
});
});
@@ -191,23 +191,23 @@ describe('TrieNode', () => {
const root = new TrieNode();
const id = await NetworkFactories.SyncId.create();
root.insert(id.idString());
root.insert(id.syncId());
expect(root.items).toEqual(1);
// eslint-disable-next-line security/detect-non-literal-fs-filename
expect(root.exists(id.idString())).toBeTruthy();
expect(root.exists(id.syncId())).toBeTruthy();
});
test('getting an item after deleting it returns undefined', async () => {
const root = new TrieNode();
const id = await NetworkFactories.SyncId.create();
root.insert(id.idString());
root.insert(id.syncId());
expect(root.items).toEqual(1);
root.delete(id.idString());
root.delete(id.syncId());
// eslint-disable-next-line security/detect-non-literal-fs-filename
expect(root.exists(id.idString())).toBeFalsy();
expect(root.exists(id.syncId())).toBeFalsy();
expect(root.items).toEqual(0);
});
@@ -220,11 +220,11 @@ describe('TrieNode', () => {
});
const root = new TrieNode();
root.insert(id1.idString());
root.insert(id1.syncId());
// id2 shares the same prefix, but doesn't exist, so it should return undefined
// eslint-disable-next-line security/detect-non-literal-fs-filename
expect(root.exists(id2.idString())).toBeFalsy();
expect(root.exists(id2.syncId())).toBeFalsy();
});
});
});

View File

@@ -1,5 +1,6 @@
import { HubError } from '@farcaster/utils';
import { bytesCompare, HubError } from '@farcaster/utils';
import { blake3 } from '@noble/hashes/blake3';
import { assert } from 'console';
import { TIMESTAMP_LENGTH } from '~/network/sync/syncId';
export const EMPTY_HASH = Buffer.from(blake3('', { dkLen: 20 })).toString('hex');
@@ -13,7 +14,7 @@ export const EMPTY_HASH = Buffer.from(blake3('', { dkLen: 20 })).toString('hex')
* @numMessages - The total number of messages captured in the snapshot (excludes the prefix nodes)
*/
export type TrieSnapshot = {
prefix: string;
prefix: Uint8Array;
excludedHashes: string[];
numMessages: number;
};
@@ -25,8 +26,8 @@ export type TrieSnapshot = {
class TrieNode {
private _hash: string;
private _items: number;
private _children: Map<string, TrieNode>;
private _key: string | undefined;
private _children: Map<number, TrieNode>;
private _key: Uint8Array | undefined;
constructor() {
this._hash = '';
@@ -45,8 +46,12 @@ class TrieNode {
* Recursively traverses the trie by prefix and inserts the value at the end. Updates the hashes for
* every node that was traversed.
*/
public insert(key: string, current_index = 0): boolean {
const char = key.charAt(current_index);
public insert(key: Uint8Array, current_index = 0): boolean {
assert(current_index < key.length, 'Key length exceeded');
if (current_index >= key.length) {
throw 'Key length exceeded';
}
const char = key.at(current_index) as number;
// Do not compact the timestamp portion of the trie, since it's used to compare snapshots
if (current_index >= TIMESTAMP_LENGTH && this.isLeaf && !this._key) {
@@ -57,7 +62,7 @@ class TrieNode {
}
if (current_index >= TIMESTAMP_LENGTH && this.isLeaf) {
if (this._key == key) {
if (bytesCompare(this._key ?? new Uint8Array(), key) === 0) {
// If the same key exists, do nothing
return false;
}
@@ -88,9 +93,9 @@ class TrieNode {
* Ensures that there are no empty nodes after deletion. This is important to make sure the hashes
* will match exactly with another trie that never had the value (e.g. in another hub).
*/
public delete(key: string, current_index = 0): boolean {
public delete(key: Uint8Array, current_index = 0): boolean {
if (this.isLeaf) {
if (this._key === key) {
if (bytesCompare(this._key ?? new Uint8Array(), key) === 0) {
this._items -= 1;
this._setKeyValue(undefined);
return true;
@@ -99,7 +104,11 @@ class TrieNode {
}
}
const char = key.charAt(current_index);
assert(current_index < key.length, 'Key length exceeded2');
if (current_index >= key.length) {
throw 'Key length exceeded2';
}
const char = key.at(current_index) as number;
if (!this._children.has(char)) {
return false;
}
@@ -134,12 +143,16 @@ class TrieNode {
* @param key - The key to look for
* @param current_index - The index of the current character in the key (only used internally)
*/
public exists(key: string, current_index = 0): boolean {
if (this.isLeaf && this._key === key) {
public exists(key: Uint8Array, current_index = 0): boolean {
if (this.isLeaf && bytesCompare(this._key ?? new Uint8Array(), key) === 0) {
return true;
}
const char = key.charAt(current_index);
assert(current_index < key.length, 'Key length exceeded3');
if (current_index >= key.length) {
throw 'Key length exceeded3';
}
const char = key.at(current_index) as number;
if (!this._children.has(char)) {
return false;
}
@@ -149,9 +162,10 @@ class TrieNode {
return this._children.get(char)?.exists(key, current_index + 1) || false;
}
// Generates a snapshot for the current node and below. current_index is the index of the prefix the method is operating on
public getSnapshot(prefix: string, current_index = 0): TrieSnapshot {
const char = prefix.charAt(current_index);
// Generates a snapshot for the current node and below. current_index is the index of the prefix the method
// is operating on
public getSnapshot(prefix: Uint8Array, current_index = 0): TrieSnapshot {
const char = prefix.at(current_index) as number;
if (current_index === prefix.length - 1) {
const excludedHash = this._excludedHash(char);
return {
@@ -183,33 +197,33 @@ class TrieNode {
}
// Only available on leaf nodes
public get value(): string | undefined {
public get value(): Uint8Array | undefined {
if (this.isLeaf) {
return this._key;
}
return undefined;
}
public getNode(prefix: string): TrieNode | undefined {
public getNode(prefix: Uint8Array): TrieNode | undefined {
if (prefix.length === 0) {
return this;
}
const char = prefix.charAt(0);
const char = prefix.at(0) as number;
if (!this._children.has(char)) {
return undefined;
}
return this._children.get(char)?.getNode(prefix.slice(1));
}
public get children(): IterableIterator<[string, TrieNode]> {
public get children(): IterableIterator<[number, TrieNode]> {
return this._children.entries();
}
public getAllValues(): string[] {
public getAllValues(): Uint8Array[] {
if (this.isLeaf) {
return this._key ? [this._key] : [];
}
const values: string[] = [];
const values: Uint8Array[] = [];
this._children.forEach((child) => {
values.push(...child.getAllValues());
});
@@ -218,7 +232,7 @@ class TrieNode {
/* Private methods */
private _excludedHash(char: string): { items: number; hash: string } {
private _excludedHash(char: number): { items: number; hash: string } {
// TODO: Cache this for performance
const hash = blake3.create({ dkLen: 20 });
let excludedItems = 0;
@@ -231,14 +245,14 @@ class TrieNode {
return { hash: Buffer.from(hash.digest()).toString('hex'), items: excludedItems };
}
private _addChild(char: string) {
private _addChild(char: number) {
this._children.set(char, new TrieNode());
// The hash requires the children to be sorted, and sorting on insert/update is cheaper than
// sorting each time we need to update the hash
this._children = new Map([...this._children.entries()].sort());
}
private _setKeyValue(key: string | undefined) {
private _setKeyValue(key: Uint8Array | undefined) {
this._key = key;
this._updateHash();
}
@@ -250,7 +264,10 @@ class TrieNode {
// This should never happen, check is here for type safety
throw new HubError('bad_request', 'Cannot split a leaf node without a key and value');
}
const newChildChar = this._key.charAt(current_index);
assert(current_index < this._key.length, 'Cannot split a leaf node at an index greater than its key length');
const newChildChar = this._key.at(current_index) as number;
this._addChild(newChildChar);
this._children.get(newChildChar)?.insert(this._key, current_index + 1);
this._setKeyValue(undefined);

View File

@@ -46,15 +46,14 @@ const SyncIdFactory = Factory.define<undefined, { date: Date; hash: string; fid:
({ onCreate, transientParams }) => {
onCreate(async () => {
const { date, hash, fid } = transientParams;
const hashBytes = hexStringToBytes(hash || faker.datatype.hexadecimal({ length: HASH_LENGTH }))._unsafeUnwrap();
const ethSigner = Factories.Eip712Signer.build();
const signerMessage = await Factories.SignerAddMessage.create(
{
hash: hashBytes,
data: { fid: fid || Factories.Fid.build(), timestamp: (date || faker.date.recent()).getTime() / 1000 },
},
{ transient: { signer: ethSigner } }
);
const hashBytes = hexStringToBytes(
hash || faker.datatype.hexadecimal({ length: HASH_LENGTH * 2 })
)._unsafeUnwrap();
const signerMessage = await Factories.SignerAddMessage.create({
hash: hashBytes,
data: { fid: fid || Factories.Fid.build(), timestamp: (date || faker.date.recent()).getTime() / 1000 },
});
return new SyncId(signerMessage);
});

View File

@@ -111,7 +111,7 @@ describe('getVerificationsByFid', () => {
expect(verifications._unsafeUnwrap().messages.map((m) => protobufs.Message.toJSON(m))).toEqual(
[verificationAdd].map((m) => protobufs.Message.toJSON(m))
);
}, 1000000);
});
test('returns empty array without messages', async () => {
const verifications = await client.getVerificationsByFid(protobufs.FidRequest.create({ fid }));

View File

@@ -153,8 +153,8 @@ class Engine {
callback(message);
}
}
async getAllMessagesBySyncIds(syncIds: string[]): HubAsyncResult<protobufs.Message[]> {
const hashesBuf = syncIds.map((syncIdHash) => SyncId.pkFromIdString(syncIdHash));
async getAllMessagesBySyncIds(syncIds: Uint8Array[]): HubAsyncResult<protobufs.Message[]> {
const hashesBuf = syncIds.map((syncIdHash) => SyncId.pkFromSyncId(syncIdHash));
const messages = await ResultAsync.fromPromise(getManyMessages(this._db, hashesBuf), (e) => e as HubError);
return messages;

View File

@@ -38,25 +38,25 @@ export interface HubInfoResponse {
}
export interface TrieNodeMetadataResponse {
prefix: string;
prefix: Uint8Array;
numMessages: number;
hash: string;
children: TrieNodeMetadataResponse[];
}
export interface TrieNodeSnapshotResponse {
prefix: string;
prefix: Uint8Array;
excludedHashes: string[];
numMessages: number;
rootHash: string;
}
export interface TrieNodePrefix {
prefix: string;
prefix: Uint8Array;
}
export interface SyncIds {
syncIds: string[];
syncIds: Uint8Array[];
}
export interface FidRequest {
@@ -235,13 +235,13 @@ export const HubInfoResponse = {
};
function createBaseTrieNodeMetadataResponse(): TrieNodeMetadataResponse {
return { prefix: "", numMessages: 0, hash: "", children: [] };
return { prefix: new Uint8Array(), numMessages: 0, hash: "", children: [] };
}
export const TrieNodeMetadataResponse = {
encode(message: TrieNodeMetadataResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.prefix !== "") {
writer.uint32(10).string(message.prefix);
if (message.prefix.length !== 0) {
writer.uint32(10).bytes(message.prefix);
}
if (message.numMessages !== 0) {
writer.uint32(16).uint64(message.numMessages);
@@ -263,7 +263,7 @@ export const TrieNodeMetadataResponse = {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.prefix = reader.string();
message.prefix = reader.bytes();
break;
case 2:
message.numMessages = longToNumber(reader.uint64() as Long);
@@ -284,7 +284,7 @@ export const TrieNodeMetadataResponse = {
fromJSON(object: any): TrieNodeMetadataResponse {
return {
prefix: isSet(object.prefix) ? String(object.prefix) : "",
prefix: isSet(object.prefix) ? bytesFromBase64(object.prefix) : new Uint8Array(),
numMessages: isSet(object.numMessages) ? Number(object.numMessages) : 0,
hash: isSet(object.hash) ? String(object.hash) : "",
children: Array.isArray(object?.children)
@@ -295,7 +295,8 @@ export const TrieNodeMetadataResponse = {
toJSON(message: TrieNodeMetadataResponse): unknown {
const obj: any = {};
message.prefix !== undefined && (obj.prefix = message.prefix);
message.prefix !== undefined &&
(obj.prefix = base64FromBytes(message.prefix !== undefined ? message.prefix : new Uint8Array()));
message.numMessages !== undefined && (obj.numMessages = Math.round(message.numMessages));
message.hash !== undefined && (obj.hash = message.hash);
if (message.children) {
@@ -312,7 +313,7 @@ export const TrieNodeMetadataResponse = {
fromPartial<I extends Exact<DeepPartial<TrieNodeMetadataResponse>, I>>(object: I): TrieNodeMetadataResponse {
const message = createBaseTrieNodeMetadataResponse();
message.prefix = object.prefix ?? "";
message.prefix = object.prefix ?? new Uint8Array();
message.numMessages = object.numMessages ?? 0;
message.hash = object.hash ?? "";
message.children = object.children?.map((e) => TrieNodeMetadataResponse.fromPartial(e)) || [];
@@ -321,13 +322,13 @@ export const TrieNodeMetadataResponse = {
};
function createBaseTrieNodeSnapshotResponse(): TrieNodeSnapshotResponse {
return { prefix: "", excludedHashes: [], numMessages: 0, rootHash: "" };
return { prefix: new Uint8Array(), excludedHashes: [], numMessages: 0, rootHash: "" };
}
export const TrieNodeSnapshotResponse = {
encode(message: TrieNodeSnapshotResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.prefix !== "") {
writer.uint32(10).string(message.prefix);
if (message.prefix.length !== 0) {
writer.uint32(10).bytes(message.prefix);
}
for (const v of message.excludedHashes) {
writer.uint32(18).string(v!);
@@ -349,7 +350,7 @@ export const TrieNodeSnapshotResponse = {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.prefix = reader.string();
message.prefix = reader.bytes();
break;
case 2:
message.excludedHashes.push(reader.string());
@@ -370,7 +371,7 @@ export const TrieNodeSnapshotResponse = {
fromJSON(object: any): TrieNodeSnapshotResponse {
return {
prefix: isSet(object.prefix) ? String(object.prefix) : "",
prefix: isSet(object.prefix) ? bytesFromBase64(object.prefix) : new Uint8Array(),
excludedHashes: Array.isArray(object?.excludedHashes) ? object.excludedHashes.map((e: any) => String(e)) : [],
numMessages: isSet(object.numMessages) ? Number(object.numMessages) : 0,
rootHash: isSet(object.rootHash) ? String(object.rootHash) : "",
@@ -379,7 +380,8 @@ export const TrieNodeSnapshotResponse = {
toJSON(message: TrieNodeSnapshotResponse): unknown {
const obj: any = {};
message.prefix !== undefined && (obj.prefix = message.prefix);
message.prefix !== undefined &&
(obj.prefix = base64FromBytes(message.prefix !== undefined ? message.prefix : new Uint8Array()));
if (message.excludedHashes) {
obj.excludedHashes = message.excludedHashes.map((e) => e);
} else {
@@ -396,7 +398,7 @@ export const TrieNodeSnapshotResponse = {
fromPartial<I extends Exact<DeepPartial<TrieNodeSnapshotResponse>, I>>(object: I): TrieNodeSnapshotResponse {
const message = createBaseTrieNodeSnapshotResponse();
message.prefix = object.prefix ?? "";
message.prefix = object.prefix ?? new Uint8Array();
message.excludedHashes = object.excludedHashes?.map((e) => e) || [];
message.numMessages = object.numMessages ?? 0;
message.rootHash = object.rootHash ?? "";
@@ -405,13 +407,13 @@ export const TrieNodeSnapshotResponse = {
};
function createBaseTrieNodePrefix(): TrieNodePrefix {
return { prefix: "" };
return { prefix: new Uint8Array() };
}
export const TrieNodePrefix = {
encode(message: TrieNodePrefix, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.prefix !== "") {
writer.uint32(10).string(message.prefix);
if (message.prefix.length !== 0) {
writer.uint32(10).bytes(message.prefix);
}
return writer;
},
@@ -424,7 +426,7 @@ export const TrieNodePrefix = {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.prefix = reader.string();
message.prefix = reader.bytes();
break;
default:
reader.skipType(tag & 7);
@@ -435,12 +437,13 @@ export const TrieNodePrefix = {
},
fromJSON(object: any): TrieNodePrefix {
return { prefix: isSet(object.prefix) ? String(object.prefix) : "" };
return { prefix: isSet(object.prefix) ? bytesFromBase64(object.prefix) : new Uint8Array() };
},
toJSON(message: TrieNodePrefix): unknown {
const obj: any = {};
message.prefix !== undefined && (obj.prefix = message.prefix);
message.prefix !== undefined &&
(obj.prefix = base64FromBytes(message.prefix !== undefined ? message.prefix : new Uint8Array()));
return obj;
},
@@ -450,7 +453,7 @@ export const TrieNodePrefix = {
fromPartial<I extends Exact<DeepPartial<TrieNodePrefix>, I>>(object: I): TrieNodePrefix {
const message = createBaseTrieNodePrefix();
message.prefix = object.prefix ?? "";
message.prefix = object.prefix ?? new Uint8Array();
return message;
},
};
@@ -462,7 +465,7 @@ function createBaseSyncIds(): SyncIds {
export const SyncIds = {
encode(message: SyncIds, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
for (const v of message.syncIds) {
writer.uint32(10).string(v!);
writer.uint32(10).bytes(v!);
}
return writer;
},
@@ -475,7 +478,7 @@ export const SyncIds = {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.syncIds.push(reader.string());
message.syncIds.push(reader.bytes());
break;
default:
reader.skipType(tag & 7);
@@ -486,13 +489,13 @@ export const SyncIds = {
},
fromJSON(object: any): SyncIds {
return { syncIds: Array.isArray(object?.syncIds) ? object.syncIds.map((e: any) => String(e)) : [] };
return { syncIds: Array.isArray(object?.syncIds) ? object.syncIds.map((e: any) => bytesFromBase64(e)) : [] };
},
toJSON(message: SyncIds): unknown {
const obj: any = {};
if (message.syncIds) {
obj.syncIds = message.syncIds.map((e) => e);
obj.syncIds = message.syncIds.map((e) => base64FromBytes(e !== undefined ? e : new Uint8Array()));
} else {
obj.syncIds = [];
}

View File

@@ -15,25 +15,25 @@ message HubInfoResponse {
}
message TrieNodeMetadataResponse {
string prefix = 1;
uint64 num_messages = 2;
string hash = 3;
repeated TrieNodeMetadataResponse children = 4;
bytes prefix = 1;
uint64 num_messages = 2;
string hash = 3;
repeated TrieNodeMetadataResponse children = 4;
}
message TrieNodeSnapshotResponse {
string prefix = 1;
repeated string excluded_hashes = 2;
uint64 num_messages = 3;
string root_hash = 4;
bytes prefix = 1;
repeated string excluded_hashes = 2;
uint64 num_messages = 3;
string root_hash = 4;
}
message TrieNodePrefix {
string prefix = 1;
bytes prefix = 1;
}
message SyncIds {
repeated string sync_ids = 1;
repeated bytes sync_ids = 1;
}
message FidRequest {