diff --git a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts index 84c55fbcea..126b714c54 100644 --- a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts @@ -27,7 +27,7 @@ import { isDenebBlockContents, sszTypesFor, } from "@lodestar/types"; -import {fromHex, sleep, toHex, toRootHex} from "@lodestar/utils"; +import {fromAsync, fromHex, sleep, toHex, toRootHex} from "@lodestar/utils"; import { BlobsSource, BlockInput, @@ -656,9 +656,9 @@ export function getBeaconBlockApi({ ); } - let dataColumnSidecars = await db.dataColumnSidecar.values(blockRoot); + let dataColumnSidecars = await fromAsync(db.dataColumnSidecar.valuesStream(blockRoot)); if (dataColumnSidecars.length === 0) { - dataColumnSidecars = await db.dataColumnSidecarArchive.values(block.message.slot); + dataColumnSidecars = await fromAsync(db.dataColumnSidecarArchive.valuesStream(block.message.slot)); } if (dataColumnSidecars.length === 0) { diff --git a/packages/beacon-node/src/api/impl/debug/index.ts b/packages/beacon-node/src/api/impl/debug/index.ts index 64969dced5..233e9633c8 100644 --- a/packages/beacon-node/src/api/impl/debug/index.ts +++ b/packages/beacon-node/src/api/impl/debug/index.ts @@ -3,7 +3,7 @@ import {ApplicationMethods} from "@lodestar/api/server"; import {ExecutionStatus} from "@lodestar/fork-choice"; import {ZERO_HASH_HEX} from "@lodestar/params"; import {BeaconState} from "@lodestar/types"; -import {toRootHex} from "@lodestar/utils"; +import {fromAsync, toRootHex} from "@lodestar/utils"; import {isOptimisticBlock} from "../../../util/forkChoice.js"; import {getStateSlotFromBytes} from "../../../util/multifork.js"; import {getBlockResponse} from "../beacon/blocks/utils.js"; @@ -96,10 +96,10 @@ export function getDebugApi({ const {block, executionOptimistic, finalized} = await getBlockResponse(chain, blockId); const blockRoot = config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message); - let dataColumnSidecars = await db.dataColumnSidecar.values(blockRoot); + let dataColumnSidecars = await fromAsync(db.dataColumnSidecar.valuesStream(blockRoot)); if (dataColumnSidecars.length === 0) { - dataColumnSidecars = await db.dataColumnSidecarArchive.values(block.message.slot); + dataColumnSidecars = await fromAsync(db.dataColumnSidecarArchive.valuesStream(block.message.slot)); } if (dataColumnSidecars.length === 0) { diff --git a/packages/beacon-node/src/chain/archiveStore/utils/archiveBlocks.ts b/packages/beacon-node/src/chain/archiveStore/utils/archiveBlocks.ts index 82dbad6a4f..34f2987aab 100644 --- a/packages/beacon-node/src/chain/archiveStore/utils/archiveBlocks.ts +++ b/packages/beacon-node/src/chain/archiveStore/utils/archiveBlocks.ts @@ -5,7 +5,7 @@ import {IForkChoice} from "@lodestar/fork-choice"; import {ForkSeq, SLOTS_PER_EPOCH} from "@lodestar/params"; import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition"; import {Epoch, RootHex, Slot} from "@lodestar/types"; -import {Logger, fromHex, prettyPrintIndices, toRootHex} from "@lodestar/utils"; +import {Logger, fromAsync, fromHex, prettyPrintIndices, toRootHex} from "@lodestar/utils"; import {IBeaconDb} from "../../../db/index.js"; import {BlockArchiveBatchPutBinaryItem} from "../../../db/repositories/index.js"; import {ensureDir, writeIfNotExist} from "../../../util/file.js"; @@ -175,10 +175,12 @@ export async function archiveBlocks( const dataColumnSidecarsMinEpoch = currentEpoch - dataColumnSidecarsArchiveWindow; if (dataColumnSidecarsMinEpoch >= config.FULU_FORK_EPOCH) { const prefixedKeys = await db.dataColumnSidecarArchive.keys({ - lt: db.dataColumnSidecarArchive.getMaxKeyRaw(computeStartSlotAtEpoch(dataColumnSidecarsMinEpoch)), + // The `id` value `0` refers to the column index. So we want to fetch all sidecars less than zero column of `dataColumnSidecarsMinEpoch` + lt: {prefix: computeStartSlotAtEpoch(dataColumnSidecarsMinEpoch), id: 0}, }); // for each slot there could be multiple dataColumnSidecar, so we need to deduplicate it const slotsToDelete = [...new Set(prefixedKeys.map(({prefix}) => prefix))].sort((a, b) => a - b); + if (slotsToDelete.length > 0) { await db.dataColumnSidecarArchive.deleteMany(slotsToDelete); `dataColumnSidecars prune slotRange=${prettyPrintIndices(slotsToDelete)}, numOfSlots=${slotsToDelete.length} totalNumOfSidecars=${prefixedKeys.length}`; @@ -331,7 +333,7 @@ async function migrateDataColumnSidecarsFromHotToColdDb( continue; } - const dataColumnSidecarBytes = await db.dataColumnSidecar.valuesBinary(block.root); + const dataColumnSidecarBytes = await fromAsync(db.dataColumnSidecar.valuesStreamBinary(block.root)); // there could be 0 dataColumnSidecarBytes if block has no blob logger.verbose("migrateDataColumnSidecarsFromHotToColdDb", { slot: block.slot, diff --git a/packages/beacon-node/src/db/repositories/dataColumnSidecar.ts b/packages/beacon-node/src/db/repositories/dataColumnSidecar.ts index d29c00e0fb..9a3175d20a 100644 --- a/packages/beacon-node/src/db/repositories/dataColumnSidecar.ts +++ b/packages/beacon-node/src/db/repositories/dataColumnSidecar.ts @@ -1,10 +1,12 @@ import {ChainForkConfig} from "@lodestar/config"; -import {Db, PrefixedRepository} from "@lodestar/db"; +import {Db, decodeNumberForDbKey, encodeNumberForDbKey, PrefixedRepository} from "@lodestar/db"; import {NUMBER_OF_COLUMNS} from "@lodestar/params"; import {ColumnIndex, Root, fulu, ssz} from "@lodestar/types"; -import {bytesToInt, intToBytes} from "@lodestar/utils"; import {Bucket, getBucketNameByValue} from "../buckets.js"; +const COLUMN_INDEX_BYTE_SIZE = 2; +const BLOCK_ROOT_BYTE_SIZE = 32; + type BlockRoot = Root; /** @@ -27,21 +29,21 @@ export class DataColumnSidecarRepository extends PrefixedRepository(items: T[]): AsyncIterable { + return { + async *[Symbol.asyncIterator]() { + for (const it of items) yield it; + }, + }; +} + describe("block archiver task", () => { const logger = testLogger(); @@ -102,9 +110,9 @@ describe("block archiver task", () => { const dataColumnBytes = ssz.fulu.DataColumnSidecar.serialize(dataColumn); vi.spyOn(dbStub.block, "getBinary").mockResolvedValue(blockBytes); - vi.spyOn(dbStub.dataColumnSidecar, "valuesBinary").mockResolvedValue([ - {id: dataColumn.index, prefix: block.message.stateRoot, value: dataColumnBytes}, - ]); + vi.spyOn(dbStub.dataColumnSidecar, "valuesStreamBinary").mockReturnValue( + toAsyncIterable([{id: dataColumn.index, prefix: block.message.stateRoot, value: dataColumnBytes}]) + ); // Create blocks after fulu fork const blocks = Array.from({length: 5}, (_, i) => @@ -158,9 +166,7 @@ describe("block archiver task", () => { ); expect(dbStub.dataColumnSidecarArchive.keys).toBeCalledWith({ - lt: dbStub.dataColumnSidecarArchive.getMaxKeyRaw( - computeStartSlotAtEpoch(currentEpoch - config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS) - ), + lt: {prefix: computeStartSlotAtEpoch(currentEpoch - config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS), id: 0}, }); }); }); diff --git a/packages/beacon-node/test/unit/db/api/repositories/dataColumn.test.ts b/packages/beacon-node/test/unit/db/api/repositories/dataColumn.test.ts index 46d0288b5d..98936b3912 100644 --- a/packages/beacon-node/test/unit/db/api/repositories/dataColumn.test.ts +++ b/packages/beacon-node/test/unit/db/api/repositories/dataColumn.test.ts @@ -1,7 +1,7 @@ import {createChainForkConfig} from "@lodestar/config"; import {LevelDbController} from "@lodestar/db"; import {Root, fulu, ssz} from "@lodestar/types"; -import {toHex} from "@lodestar/utils"; +import {fromAsync, toHex} from "@lodestar/utils"; import {rimraf} from "rimraf"; import {afterEach, beforeEach, describe, expect, it} from "vitest"; import {DataColumnSidecarRepository} from "../../../../../src/db/repositories/dataColumnSidecar.js"; @@ -9,6 +9,7 @@ import {getDataColumnSidecarsFromBlock} from "../../../../../src/util/dataColumn import {kzg} from "../../../../../src/util/kzg.js"; import {testLogger} from "../../../../utils/logger.js"; import {DataColumnSidecarArchiveRepository} from "../../../../../src/db/repositories/dataColumnSidecarArchive.js"; +import {NUMBER_OF_COLUMNS} from "@lodestar/params"; /* eslint-disable @typescript-eslint/naming-convention */ const config = createChainForkConfig({ @@ -17,7 +18,109 @@ const config = createChainForkConfig({ DENEB_FORK_EPOCH: 0, FULU_FORK_EPOCH: 0, }); -describe("block archive repository", () => { + +describe("dataColumnSidecar repository", () => { + const testDir = "./.tmp"; + let dataColumnRepo: DataColumnSidecarRepository; + let db: LevelDbController; + let allDataColumnSidecars: fulu.DataColumnSidecars; + const blobKzgCommitmentsLen = 3; + const dataColumn = ssz.fulu.DataColumnSidecar.defaultValue(); + const blockSlot = 11; + dataColumn.signedBlockHeader.message.slot = blockSlot; + + beforeEach(async () => { + db = await LevelDbController.create({name: testDir}, {logger: testLogger()}); + dataColumnRepo = new DataColumnSidecarRepository(config, db); + + const blob = ssz.deneb.Blob.defaultValue(); + const commitment = kzg.blobToKzgCommitment(blob); + const signedBlock = ssz.fulu.SignedBeaconBlock.defaultValue(); + const blobs = [blob, blob, blob]; + const commitments = Array.from({length: blobKzgCommitmentsLen}, () => commitment); + signedBlock.message.body.blobKzgCommitments = commitments; + const cellsAndProofs = blobs.map((b) => kzg.computeCellsAndKzgProofs(b)); + allDataColumnSidecars = getDataColumnSidecarsFromBlock(config, signedBlock, cellsAndProofs); + for (let j = 0; j < allDataColumnSidecars.length; j++) { + allDataColumnSidecars[j].index = j; + } + }); + + afterEach(async () => { + await db.close(); + rimraf.sync(testDir); + }); + + describe("encodeKeyRaw", () => { + const root = Buffer.from("0102030405060708010203040506070801020304050607080102030405060708", "hex"); + const columnIndex = 9; + + it("should correctly encode the key in right size", () => { + const bytes = dataColumnRepo.encodeKeyRaw(root, columnIndex); + + // 32 byte root and 2 byte column index + expect(bytes).toHaveLength(32 + 2); + }); + + it("should use correct byte size for slot", () => { + const bytes = dataColumnRepo.encodeKeyRaw(root, columnIndex); + + // encoded as `be` 8 bytes value + expect(bytes.slice(0, 32)).toEqual( + Buffer.from([1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8]) + ); + }); + + it("should use correct byte size for column index", () => { + const bytes = dataColumnRepo.encodeKeyRaw(root, columnIndex); + + // encoded as `be` 2 bytes value + expect(bytes.slice(32)).toEqual(Buffer.from([0, 9])); + }); + }); + + describe("decodeKeyRaw", () => { + const root = new Uint8Array([ + 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, + ]); + const columnIndex = 9; + const bytes = new Uint8Array([ + 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 0, 9, + ]); + + it("should correctly decode key", () => { + expect(dataColumnRepo.decodeKeyRaw(bytes)).toEqual({prefix: root, id: columnIndex}); + }); + }); + + describe("getMaxKeyRaw", () => { + it("should return inclusive max key", () => { + const root = new Uint8Array([ + 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, + ]); + + const bytes = dataColumnRepo.getMaxKeyRaw(root); + + // We subtract 1 from total number of columns to have inclusive range + expect(bytes.slice(32)).toEqual(Buffer.from([0, NUMBER_OF_COLUMNS - 1])); + }); + }); + + describe("getMinKeyRaw", () => { + it("should return inclusive max key", () => { + const root = new Uint8Array([ + 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, + ]); + + const bytes = dataColumnRepo.getMinKeyRaw(root); + + // Columns starts from 0 + expect(bytes.slice(32)).toEqual(Buffer.from([0, 0])); + }); + }); +}); + +describe("dataColumnSidecarArchive repository", () => { const testDir = "./.tmp"; let dataColumnRepo: DataColumnSidecarRepository; let dataColumnArchiveRepo: DataColumnSidecarArchiveRepository; @@ -53,12 +156,70 @@ describe("block archive repository", () => { rimraf.sync(testDir); }); + describe("encodeKeyRaw", () => { + const slot = 12; + const columnIndex = 3; + + it("should correctly encode the key in right size", () => { + const bytes = dataColumnArchiveRepo.encodeKeyRaw(slot, columnIndex); + + // 8 byte slot and 2 byte column index + expect(bytes).toHaveLength(8 + 2); + }); + + it("should use correct byte size for slot", () => { + const bytes = dataColumnArchiveRepo.encodeKeyRaw(slot, columnIndex); + + // encoded as `be` 8 bytes value + expect(bytes.slice(0, 8)).toEqual(Buffer.from([0, 0, 0, 0, 0, 0, 0, 12])); + }); + + it("should use correct byte size for column index", () => { + const bytes = dataColumnArchiveRepo.encodeKeyRaw(slot, columnIndex); + + // encoded as `be` 2 bytes value + expect(bytes.slice(8)).toEqual(Buffer.from([0, 3])); + }); + }); + + describe("decodeKeyRaw", () => { + const slot = 12; + const columnIndex = 3; + const bytes = Buffer.from([0, 0, 0, 0, 0, 0, 0, 12, 0, 3]); + + it("should correctly decode key", () => { + expect(dataColumnArchiveRepo.decodeKeyRaw(bytes)).toEqual({prefix: slot, id: columnIndex}); + }); + }); + + describe("getMaxKeyRaw", () => { + it("should return inclusive max key", () => { + const slot = 9; + + const bytes = dataColumnArchiveRepo.getMaxKeyRaw(slot); + + // We subtract 1 from total number of columns to have inclusive range + expect(bytes.slice(8)).toEqual(Buffer.from([0, NUMBER_OF_COLUMNS - 1])); + }); + }); + + describe("getMinKeyRaw", () => { + it("should return inclusive max key", () => { + const slot = 9; + + const bytes = dataColumnArchiveRepo.getMinKeyRaw(slot); + + // Columns starts from 0 + expect(bytes.slice(8)).toEqual(Buffer.from([0, 0])); + }); + }); + it("should get data column sidecars by parent root", async () => { const dataColumnSidecars = allDataColumnSidecars.slice(0, 7); const dataColumnsLen = dataColumnSidecars.length; await dataColumnRepo.putMany(blockRoot, dataColumnSidecars); - const retrievedDataColumnSidecars = await dataColumnRepo.values(blockRoot); + const retrievedDataColumnSidecars = await fromAsync(dataColumnRepo.valuesStream(blockRoot)); expect(retrievedDataColumnSidecars).toHaveLength(dataColumnsLen); expect(retrievedDataColumnSidecars.map((c) => c.index)).toEqual(dataColumnSidecars.map((c) => c.index)); @@ -74,7 +235,7 @@ describe("block archive repository", () => { const dataColumnsLen = dataColumnSidecars.length; await dataColumnRepo.putMany(blockRoot, dataColumnSidecars); - const retrievedDataColumnSidecarBytes = await dataColumnRepo.valuesBinary(blockRoot); + const retrievedDataColumnSidecarBytes = await fromAsync(dataColumnRepo.valuesStreamBinary(blockRoot)); expect(retrievedDataColumnSidecarBytes).toHaveLength(dataColumnsLen); @@ -90,7 +251,7 @@ describe("block archive repository", () => { // same api to writeBlockInputToDb await dataColumnRepo.putMany(blockRoot, allDataColumnSidecars); // same api to migrateDataColumnSidecarsFromHotToColDb - const dataColumnSidecarBytes = await dataColumnRepo.valuesBinary(blockRoot); + const dataColumnSidecarBytes = await fromAsync(dataColumnRepo.valuesStreamBinary(blockRoot)); await dataColumnArchiveRepo.putManyBinary( blockSlot, dataColumnSidecarBytes.map((p) => ({key: p.id, value: p.value})) diff --git a/packages/db/src/abstractPrefixedRepository.ts b/packages/db/src/abstractPrefixedRepository.ts index 4aa9e92837..d27e630d16 100644 --- a/packages/db/src/abstractPrefixedRepository.ts +++ b/packages/db/src/abstractPrefixedRepository.ts @@ -16,6 +16,10 @@ type Id = Uint8Array | string | number | bigint; */ export abstract class PrefixedRepository { private readonly dbReqOpts: DbReqOpts; + /** Inclusive range for the minimum key for the bucket */ + private readonly minKey: Uint8Array; + /** Exclusive range for the maximum key for the bucket */ + private readonly maxKey: Uint8Array; protected constructor( protected config: ChainForkConfig, @@ -25,6 +29,8 @@ export abstract class PrefixedRepository { private readonly bucketId: string ) { this.dbReqOpts = {bucketId: this.bucketId}; + this.minKey = encodeKey(bucket, Buffer.alloc(0)); + this.maxKey = encodeKey(bucket + 1, Buffer.alloc(0)); } abstract encodeKeyRaw(prefix: P, id: I): Uint8Array; @@ -154,17 +160,6 @@ export abstract class PrefixedRepository { } } - /** - * Non iterative version of `valuesStream`. - */ - async values(prefix: P | P[]): Promise { - const result: T[] = []; - for await (const value of this.valuesStream(prefix)) { - result.push(value); - } - return result; - } - async *valuesStreamBinary(prefix: P | P[]): AsyncIterable<{prefix: P; id: I; value: Uint8Array}> { for (const p of Array.isArray(prefix) ? prefix : [prefix]) { for await (const {key, value} of this.db.entriesStream({ @@ -183,17 +178,6 @@ export abstract class PrefixedRepository { } } - /** - * Non iterative version of `valuesStreamBinary`. - */ - async valuesBinary(prefix: P | P[]): Promise<{prefix: P; id: I; value: Uint8Array}[]> { - const result = []; - for await (const value of this.valuesStreamBinary(prefix)) { - result.push(value); - } - return result; - } - async *entriesStream(prefix: P | P[]): AsyncIterable<{prefix: P; id: I; value: T}> { for (const v of Array.isArray(prefix) ? prefix : [prefix]) { for await (const {key, value} of this.db.entriesStream({ @@ -216,7 +200,7 @@ export abstract class PrefixedRepository { for (const v of Array.isArray(prefix) ? prefix : [prefix]) { for await (const {key, value} of this.db.entriesStream({ gte: this.wrapKey(this.getMinKeyRaw(v)), - lt: this.wrapKey(this.getMaxKeyRaw(v)), + lte: this.wrapKey(this.getMaxKeyRaw(v)), bucketId: this.bucketId, })) { const {prefix, id} = this.decodeKeyRaw(this.unwrapKey(key)); @@ -230,23 +214,25 @@ export abstract class PrefixedRepository { } } - async keys(opts?: FilterOptions): Promise<{prefix: P; id: I}[]> { + async keys(opts?: FilterOptions<{prefix: P; id: I}>): Promise<{prefix: P; id: I}[]> { const optsBuff: FilterOptions = { bucketId: this.bucketId, }; - // Set at least one min key - if (opts?.lt !== undefined) { - optsBuff.lt = this.wrapKey(opts.lt); - } else if (opts?.lte !== undefined) { - optsBuff.lte = this.wrapKey(opts.lte); + if (opts?.gte !== undefined) { + optsBuff.gte = this.wrapKey(this.encodeKeyRaw(opts.gte.prefix, opts.gte.id)); + } else if (opts?.gt !== undefined) { + optsBuff.gt = this.wrapKey(this.encodeKeyRaw(opts.gt.prefix, opts.gt.id)); + } else { + optsBuff.gte = this.minKey; } - // Set at least on max key - if (opts?.gt !== undefined) { - optsBuff.gt = this.wrapKey(opts.gt); - } else if (opts?.gte !== undefined) { - optsBuff.gte = this.wrapKey(opts.gte); + if (opts?.lte !== undefined) { + optsBuff.lte = this.wrapKey(this.encodeKeyRaw(opts.lte.prefix, opts.lte.id)); + } else if (opts?.lt !== undefined) { + optsBuff.lt = this.wrapKey(this.encodeKeyRaw(opts.lt.prefix, opts.lt.id)); + } else { + optsBuff.lt = this.maxKey; } if (opts?.reverse !== undefined) optsBuff.reverse = opts.reverse; diff --git a/packages/db/src/util.ts b/packages/db/src/util.ts index 3b10085940..8012565bcf 100644 --- a/packages/db/src/util.ts +++ b/packages/db/src/util.ts @@ -1,10 +1,18 @@ -import {intToBytes} from "@lodestar/utils"; +import {bytesToInt, intToBytes} from "@lodestar/utils"; import {BUCKET_LENGTH} from "./const.js"; export const uintLen = 8; /** - * Prepend a bucket to a key + * Encode a key for the db write/read, Prepend a bucket to a key + * + * The encoding of key is very important step that can cause failure of proper indexing and querying of data + * + * We are using LevelDB which have pluggable comparator support, so you can decide how to + * compare keys. But for NodeJS binding only default comparison algorithm is supported which + * uses lexicographical comparison of the raw bytes of the keys + * + * It is important to use **helpers implemented here** to encode db keys so that key comparison properly work. */ export function encodeKey(bucket: number, key: Uint8Array | string | number | bigint): Uint8Array { let buf: Buffer; @@ -24,3 +32,19 @@ export function encodeKey(bucket: number, key: Uint8Array | string | number | bi buf.set(intToBytes(bucket, BUCKET_LENGTH, "le"), 0); return buf; } + +export function encodeNumberForDbKey(value: number, byteSize: number): Uint8Array { + return intToBytes(value, byteSize, "be"); +} + +export function decodeNumberForDbKey(value: Uint8Array, byteSize: number): number { + return bytesToInt(value.slice(0, byteSize), "be"); +} + +export function encodeStringForDbKey(value: string): Uint8Array { + return Buffer.from(value, "utf-8"); +} + +export function decodeStringForDbKey(value: Uint8Array): string { + return Buffer.from(value).toString("utf8"); +} diff --git a/packages/db/test/e2e/abstractPrefixedRepository.test.ts b/packages/db/test/e2e/abstractPrefixedRepository.test.ts index eadf53e665..1a1ef78734 100644 --- a/packages/db/test/e2e/abstractPrefixedRepository.test.ts +++ b/packages/db/test/e2e/abstractPrefixedRepository.test.ts @@ -1,38 +1,67 @@ /** biome-ignore-all lint/style/noNonNullAssertion: values all exist */ import {beforeAll, afterAll, beforeEach, describe, it, expect} from "vitest"; import {getEnvLogger} from "@lodestar/logger/env"; -import {PrefixedRepository, LevelDbController, type Db} from "../../src/index.js"; +import { + PrefixedRepository, + LevelDbController, + type Db, + encodeNumberForDbKey, + decodeNumberForDbKey, +} from "../../src/index.js"; +import {fromAsync} from "@lodestar/utils"; + +type Slot = number; +type Column = number; + +type TestPrefixedType = {column: Column; value: string}; // Fake SSZ-like Type for string values -const fakeType = { - serialize: (v: string): Uint8Array => Buffer.from(v, "utf8"), - deserialize: (d: Uint8Array): string => Buffer.from(d).toString("utf8"), +const testPrefixedType = { + serialize: (v: TestPrefixedType): Uint8Array => Buffer.from(JSON.stringify(v), "utf8"), + deserialize: (d: Uint8Array): TestPrefixedType => JSON.parse(Buffer.from(d).toString("utf8")) as TestPrefixedType, hashTreeRoot: (v: string): Uint8Array => Buffer.from("id:" + v, "utf8"), } as any; // P = number (single-byte prefix), I = Uint8Array id (raw bytes) -class TestPrefixedRepository extends PrefixedRepository { +class TestPrefixedRepository extends PrefixedRepository { constructor(db: Db, bucket: number, bucketId: string) { - super({} as any, db, bucket, fakeType, bucketId); + super({} as any, db, bucket, testPrefixedType, bucketId); } - encodeKeyRaw(prefix: number, id: Uint8Array): Uint8Array { - return Buffer.concat([Buffer.from([prefix]), Buffer.from(id)]); + encodeKeyRaw(prefix: number, id: number): Uint8Array { + return Buffer.concat([encodeNumberForDbKey(prefix, 2), encodeNumberForDbKey(id, 2)]); } - decodeKeyRaw(raw: Uint8Array): {prefix: number; id: Uint8Array} { - return {prefix: raw[0], id: raw.slice(1)}; + decodeKeyRaw(raw: Uint8Array): {prefix: number; id: number} { + return {prefix: decodeNumberForDbKey(raw, 2), id: decodeNumberForDbKey(raw.slice(2), 2)}; } getMaxKeyRaw(prefix: number): Uint8Array { - return Buffer.from([prefix, 0xff]); + return Buffer.concat([encodeNumberForDbKey(prefix, 2), encodeNumberForDbKey(0xffff, 2)]); } getMinKeyRaw(prefix: number): Uint8Array { - return Buffer.from([prefix, 0x00]); + return Buffer.concat([encodeNumberForDbKey(prefix, 2), encodeNumberForDbKey(0, 2)]); + } + + getId(value: TestPrefixedType): number { + return value.column; } } +const numberOfSlots = 50; + +// We need to store columns which are more than 1 byte to test the proper encoding +const numberOfColumns = 300; + +const generateColumnsData = (slot: Slot) => + Array.from({length: numberOfColumns}, (_, c) => ({column: c, value: `s:${slot}-c:${c}`})); + +// Generate fixtures to be used later with the db +const testData: Record = Array.from({length: numberOfSlots}, (_, s) => + generateColumnsData(s) +); + describe("abstractPrefixedRepository", () => { const bucket = 12; const bucketId = "prefixed-repo-e2e"; @@ -57,163 +86,388 @@ describe("abstractPrefixedRepository", () => { it("put/get/getBinary/delete per prefix", async () => { const p = 1; - const value = "hello"; - await repo.put(p, value); - const id = repo.getId(value); - expect(await repo.get(p, id)).toBe(value); + const id = 0; + const value = testData[p][id]; + + // Put + await expect(repo.put(p, value)).resolves.toBeUndefined(); + + // Get + expect(await repo.get(p, id)).toEqual(value); + + // Get Binary const bin = await repo.getBinary(p, id); - expect(Buffer.from(bin!).toString("utf8")).toBe(value); + expect(testPrefixedType.deserialize(bin)).toEqual(value); + + // Delete await repo.delete(p, id); expect(await repo.get(p, id)).toBeNull(); }); it("getMany and getManyBinary in order and with missing ids", async () => { const p = 3; - const v1 = "a"; - const v2 = "b"; - await repo.put(p, v1); - await repo.put(p, v2); - const id1 = repo.getId(v1); - const id2 = repo.getId(v2); - const idMissing = Buffer.from([0x99]); + const ids = [0, 10, 4, 20]; + const values = [testData[p][0], testData[p][10], testData[p][4], testData[p][20]]; + const valuesBinaries = values.map((v) => testPrefixedType.serialize(v)); - await expect(repo.getMany(p, [id1, id2, idMissing])).resolves.toEqual([v1, v2, undefined]); - const resBin = await repo.getManyBinary(p, [id1, id2, idMissing]); - expect(resBin.map((b) => (b ? Buffer.from(b).toString("utf8") : undefined))).toEqual([v1, v2, undefined]); + for (const v of values) { + await repo.put(p, v); + } + + const result = await repo.getMany(p, ids); + + expect(result).toHaveLength(ids.length); + expect(result).toEqual(values); + + const resultBinary = await repo.getManyBinary(p, ids); + + expect(resultBinary).toHaveLength(ids.length); + expect(resultBinary).toEqual(valuesBinaries); }); - it("putMany and putManyBinary store correctly", async () => { - const p = 4; - await repo.putMany(p, ["x", "y"]); - const idX = repo.getId("x"); - const idY = repo.getId("y"); - expect(await repo.get(p, idX)).toBe("x"); - expect(await repo.get(p, idY)).toBe("y"); + it("putMany store correctly", async () => { + const p = 3; + const ids = [0, 10, 4, 20]; + const values = [testData[p][0], testData[p][10], testData[p][4], testData[p][20]]; - const p2 = 5; - const idA = Buffer.from([0x2a]); - const idB = Buffer.from([0x2b]); - await repo.putManyBinary(p2, [ - {key: idA, value: Buffer.from("A")}, - {key: idB, value: Buffer.from("B")}, - ]); - expect(await repo.get(p2, idA)).toBe("A"); - expect(await repo.get(p2, idB)).toBe("B"); + await repo.putMany(p, values); + + for (const [index, id] of ids.entries()) { + await expect(repo.get(p, id)).resolves.toEqual(values[index]); + } }); - it("values and valuesStream for single and multiple prefixes", async () => { - const p1 = 7; - const p2 = 8; - await repo.put(p1, "p1-1"); - await repo.put(p1, "p1-2"); - await repo.put(p2, "p2-1"); + it("putManyBinary store correctly", async () => { + const p = 3; + const ids = [0, 10, 4, 20]; + const values = [testData[p][0], testData[p][10], testData[p][4], testData[p][20]]; + const valuesBinaries = values.map((v) => ({key: v.column, value: testPrefixedType.serialize(v)})); - // Single prefix - await expect(repo.values(p1)).resolves.toEqual(["p1-1", "p1-2"]); + await repo.putManyBinary(p, valuesBinaries); - // Multiple prefixes preserve provided order - await expect(repo.values([p2, p1])).resolves.toEqual(["p2-1", "p1-1", "p1-2"]); - - const fromStream: string[] = []; - for await (const v of repo.valuesStream([p2, p1])) fromStream.push(v); - expect(fromStream).toEqual(["p2-1", "p1-1", "p1-2"]); - }); - - it("valuesStreamBinary and valuesBinary decode prefix and id", async () => { - const p = 10; - const id1 = Buffer.from([0x01]); - const id2 = Buffer.from([0x02]); - await repo.putManyBinary(p, [ - {key: id1, value: Buffer.from("v1")}, - {key: id2, value: Buffer.from("v2")}, - ]); - - const list = await repo.valuesBinary(p); - expect(list.map((e) => ({p: e.prefix, id: Buffer.from(e.id), v: Buffer.from(e.value).toString("utf8")}))).toEqual([ - {p: p, id: id1, v: "v1"}, - {p: p, id: id2, v: "v2"}, - ]); - - const fromStream: {prefix: number; id: Uint8Array; value: Uint8Array}[] = []; - for await (const e of repo.valuesStreamBinary(p)) fromStream.push(e); - expect( - fromStream.map((e) => ({p: e.prefix, id: Buffer.from(e.id), v: Buffer.from(e.value).toString("utf8")})) - ).toEqual([ - {p: p, id: id1, v: "v1"}, - {p: p, id: id2, v: "v2"}, - ]); - }); - - it("entriesStream and entriesStreamBinary provide decoded data", async () => { - const p = 11; - await repo.put(p, "a"); - await repo.put(p, "b"); - const idA = repo.getId("a"); - const idB = repo.getId("b"); - - const entries: {prefix: number; id: Uint8Array; value: string}[] = []; - for await (const e of repo.entriesStream(p)) entries.push(e); - expect(entries).toEqual([ - {prefix: p, id: idA, value: "a"}, - {prefix: p, id: idB, value: "b"}, - ]); - - const entriesBin: {prefix: number; id: Uint8Array; value: Uint8Array}[] = []; - for await (const e of repo.entriesStreamBinary(p)) entriesBin.push(e); - expect( - entriesBin.map((e) => ({prefix: e.prefix, id: Buffer.from(e.id), value: Buffer.from(e.value).toString("utf8")})) - ).toEqual([ - {prefix: p, id: idA, value: "a"}, - {prefix: p, id: idB, value: "b"}, - ]); + for (const [index, id] of ids.entries()) { + await expect(repo.get(p, id)).resolves.toEqual(values[index]); + } }); it("deleteMany removes all for provided prefixes", async () => { const p1 = 20; const p2 = 21; - await repo.put(p1, "a"); - await repo.put(p1, "b"); - await repo.put(p2, "c"); + // Put two columns for slot 20 + await repo.put(p1, testData[p1][1]); + await repo.put(p1, testData[p1][2]); + + // Put two columns for slot 21 + await repo.put(p2, testData[p2][1]); + await repo.put(p2, testData[p2][2]); await repo.deleteMany(p1); - await expect(repo.values(p1)).resolves.toEqual([]); - await expect(repo.values(p2)).resolves.toEqual(["c"]); + await expect(fromAsync(repo.valuesStream(p1))).resolves.toEqual([]); + await expect(fromAsync(repo.valuesStream(p2))).resolves.toEqual([testData[p2][1], testData[p2][2]]); // Re-fill and delete both - await repo.put(p1, "a"); - await repo.put(p1, "b"); - await repo.put(p2, "c"); + await repo.put(p1, testData[p1][1]); + await repo.put(p1, testData[p1][2]); + await repo.put(p2, testData[p2][1]); + await repo.put(p2, testData[p2][2]); + await repo.deleteMany([p1, p2]); - await expect(repo.values([p1, p2])).resolves.toEqual([]); + await expect(fromAsync(repo.valuesStream(p1))).resolves.toEqual([]); + await expect(fromAsync(repo.valuesStream(p2))).resolves.toEqual([]); }); - it("keys returns decoded prefix+id with filters and options", async () => { - const p1 = 30; - const p2 = 31; - const id1 = Buffer.from([0x01]); - const id2 = Buffer.from([0x02]); - const id3 = Buffer.from([0x03]); + describe("valuesStream,valuesStreamBinary,entriesStream,entriesStreamBinary", () => { + it("valuesStream should fetch for single and multiple prefixes", async () => { + const p1 = 7; + const p2 = 8; - await repo.putManyBinary(p1, [ - {key: id1, value: Buffer.from("v1")}, - {key: id2, value: Buffer.from("v2")}, - {key: id3, value: Buffer.from("v3")}, - ]); - await repo.putManyBinary(p2, [{key: Buffer.from([0x01]), value: Buffer.from("w1")}]); + await repo.putMany(p1, testData[p1]); + await repo.putMany(p2, testData[p2]); - const gte = repo.encodeKeyRaw(p1, Buffer.from([0x00])); - const lte = repo.encodeKeyRaw(p1, Buffer.from([0xff])); - const keys = await repo.keys({gte, lte}); - expect(keys).toEqual([ - {prefix: p1, id: id1}, - {prefix: p1, id: id2}, - {prefix: p1, id: id3}, - ]); + // Single prefix + const result1 = await fromAsync(repo.valuesStream(p1)); - const revLimit = await repo.keys({gte, lte, reverse: true, limit: 2}); - expect(revLimit).toEqual([ - {prefix: p1, id: id3}, - {prefix: p1, id: id2}, - ]); + expect(result1).toHaveLength(numberOfColumns); + // For this test we don't emphasis on the order + expect(result1.sort((r1, r2) => r1.column - r2.column)).toEqual(testData[p1]); + + // Multiple prefix + const result2 = await fromAsync(repo.valuesStream([p1, p2])); + + // For this test we don't emphasis on the order + expect(result2).toHaveLength(numberOfColumns * 2); + expect(result2.sort((r1, r2) => r1.column - r2.column)).toEqual( + [...testData[p1], ...testData[p2]].sort((r1, r2) => r1.column - r2.column) + ); + }); + + it("valuesStreamBinary should fetch for single and multiple prefixes", async () => { + const p1 = 7; + const dataBinaryP1 = testData[p1].map((c) => ({id: c.column, prefix: p1, value: testPrefixedType.serialize(c)})); + const p2 = 8; + const dataBinaryP2 = testData[p2].map((c) => ({id: c.column, prefix: p2, value: testPrefixedType.serialize(c)})); + + await repo.putMany(p1, testData[p1]); + await repo.putMany(p2, testData[p2]); + + // Single prefix + const result1 = await fromAsync(repo.valuesStreamBinary(p1)); + + expect(result1).toHaveLength(numberOfColumns); + // For this test we don't emphasis on the order + expect(result1.sort((r1, r2) => r1.id - r2.id)).toEqual(dataBinaryP1); + + // Multiple prefix + const result2 = await fromAsync(repo.valuesStreamBinary([p1, p2])); + + // For this test we don't emphasis on the order + expect(result2).toHaveLength(numberOfColumns * 2); + expect(result2.sort((r1, r2) => r1.id - r2.id)).toEqual( + [...dataBinaryP1, ...dataBinaryP2].sort((r1, r2) => r1.id - r2.id) + ); + }); + + it("entriesStream should fetch for single and multiple prefixes", async () => { + const p1 = 7; + const entriesDataP1 = testData[p1].map((c) => ({id: c.column, prefix: p1, value: c})); + const p2 = 8; + const entriesDataP2 = testData[p2].map((c) => ({id: c.column, prefix: p2, value: c})); + + await repo.putMany(p1, testData[p1]); + await repo.putMany(p2, testData[p2]); + + // Single prefix + const result1 = await fromAsync(repo.entriesStream(p1)); + + expect(result1).toHaveLength(numberOfColumns); + // For this test we don't emphasis on the order + expect(result1.sort((r1, r2) => r1.id - r2.id)).toEqual(entriesDataP1); + + // Multiple prefix + const result2 = await fromAsync(repo.entriesStream([p1, p2])); + + // For this test we don't emphasis on the order + expect(result2).toHaveLength(numberOfColumns * 2); + expect(result2.sort((r1, r2) => r1.id - r2.id)).toEqual( + [...entriesDataP1, ...entriesDataP2].sort((r1, r2) => r1.id - r2.id) + ); + }); + + it("entriesStreamBinary should fetch for single and multiple prefixes", async () => { + const p1 = 7; + const entriesDataP1 = testData[p1].map((c) => ({id: c.column, prefix: p1, value: testPrefixedType.serialize(c)})); + const p2 = 8; + const entriesDataP2 = testData[p2].map((c) => ({id: c.column, prefix: p2, value: testPrefixedType.serialize(c)})); + + await repo.putMany(p1, testData[p1]); + await repo.putMany(p2, testData[p2]); + + // Single prefix + const result1 = await fromAsync(repo.entriesStreamBinary(p1)); + + expect(result1).toHaveLength(numberOfColumns); + // For this test we don't emphasis on the order + expect(result1.sort((r1, r2) => r1.id - r2.id)).toEqual(entriesDataP1); + + // Multiple prefix + const result2 = await fromAsync(repo.entriesStreamBinary([p1, p2])); + + // For this test we don't emphasis on the order + expect(result2).toHaveLength(numberOfColumns * 2); + expect(result2.sort((r1, r2) => r1.id - r2.id)).toEqual( + [...entriesDataP1, ...entriesDataP2].sort((r1, r2) => r1.id - r2.id) + ); + }); + + it("values should return in correct order of id for single prefix", async () => { + const p1 = 7; + const valuesP1 = [testData[p1][10], testData[p1][11], testData[p1][12]]; + await repo.putMany(p1, valuesP1); + + const result1 = await fromAsync(repo.valuesStream(p1)); + + expect(result1.map((v) => v.column)).toEqual([10, 11, 12]); + }); + + it("values should return in correct order of id for multiple prefixes", async () => { + const p1 = 7; + const valuesP1 = [testData[p1][10], testData[p1][11], testData[p1][12]]; + const p2 = 10; + const valuesP2 = [testData[p2][9], testData[p2][19], testData[p2][21]]; + + await repo.putMany(p1, valuesP1); + await repo.putMany(p2, valuesP2); + + const result1 = await fromAsync(repo.valuesStream([p1, p2])); + + expect(result1.map((v) => v.column)).toEqual([10, 11, 12, 9, 19, 21]); + }); + }); + + describe("keys", () => { + const getRangeDataInclusive = (slot: number, start: number, end: number) => + Array.from({length: end - start + 1}, (_, index) => ({id: start + index, prefix: slot})); + + it("keys returns decoded prefix+id with filters and options", async () => { + const slot1 = 30; + const slot2 = 31; + const column1 = 15; + const column2 = 258; + const column3 = 289; + + await repo.putMany(slot1, [testData[slot1][column1], testData[slot1][column2], testData[slot1][column3]]); + await repo.putMany(slot2, [testData[slot2][column1], testData[slot2][column2], testData[slot2][column3]]); + + const gte = {prefix: slot1, id: 0}; + const lte = {prefix: slot1, id: 400}; + const keys = await repo.keys({gte, lte}); + + expect(keys).toEqual([ + {prefix: slot1, id: column1}, + {prefix: slot1, id: column2}, + {prefix: slot1, id: column3}, + ]); + + const revLimit = await repo.keys({gte, lte, reverse: true, limit: 2}); + expect(revLimit).toEqual([ + {prefix: slot1, id: column3}, + {prefix: slot1, id: column2}, + ]); + }); + + it("should fetch correct range across single prefix", async () => { + const slot1 = 30; + const slot2 = 48; + const getRangeDataInclusive = (slot: number, start: number, end: number) => + Array.from({length: end - start + 1}, (_, index) => ({id: start + index, prefix: slot})); + + await repo.putMany(slot1, testData[slot1]); + await repo.putMany(slot2, testData[slot2]); + + // Across single byte + const result1 = await repo.keys({gt: {prefix: slot1, id: 5}, lt: {prefix: slot1, id: 17}}); + expect(result1).toEqual(getRangeDataInclusive(slot1, 6, 16)); + + // Across higher byte + const result2 = await repo.keys({gt: {prefix: slot1, id: 257}, lt: {prefix: slot1, id: 266}}); + expect(result2).toEqual(getRangeDataInclusive(slot1, 258, 265)); + + // Across multiple byte + const result3 = await repo.keys({gt: {prefix: slot1, id: 17}, lt: {prefix: slot1, id: 275}}); + expect(result3).toEqual(getRangeDataInclusive(slot1, 18, 274)); + }); + + it("should fetch correct range across multiple prefix", async () => { + const slot1 = 30; + const slot2 = 31; + const slot3 = 32; + + await repo.putMany(slot1, testData[slot1]); + await repo.putMany(slot2, testData[slot2]); + await repo.putMany(slot3, testData[slot3]); + + const query = {gt: {prefix: slot1, id: 5}, lt: {prefix: slot3, id: 17}}; + const result = [ + ...getRangeDataInclusive(slot1, 6, 299), + ...getRangeDataInclusive(slot2, 0, 299), + ...getRangeDataInclusive(slot3, 0, 16), + ].sort((a, b) => a.id - b.id); + + const result1 = await repo.keys(query); + expect(result1.sort((r1, r2) => r1.id - r2.id)).toEqual(result); + }); + + it("should fetch keys in correct order", async () => { + const slot = 30; + + await repo.putMany(slot, testData[slot]); + + const gte = {prefix: slot, id: 19}; + const lte = {prefix: slot, id: 23}; + const keys = await repo.keys({gte, lte}); + + expect(keys).toEqual([ + {prefix: slot, id: 19}, + {prefix: slot, id: 20}, + {prefix: slot, id: 21}, + {prefix: slot, id: 22}, + {prefix: slot, id: 23}, + ]); + }); + + it("should fetch keys in correct order across multiple prefixes", async () => { + const slot1 = 30; + const slot2 = 31; + await repo.putMany(slot1, testData[slot1]); + await repo.putMany(slot2, testData[slot2]); + + const query = {gt: {prefix: slot1, id: 295}, lt: {prefix: slot2, id: 4}}; + + const keys = await repo.keys(query); + + expect(keys).toEqual([ + {prefix: slot1, id: 296}, + {prefix: slot1, id: 297}, + {prefix: slot1, id: 298}, + {prefix: slot1, id: 299}, + {prefix: slot2, id: 0}, + {prefix: slot2, id: 1}, + {prefix: slot2, id: 2}, + {prefix: slot2, id: 3}, + ]); + }); + + it("should not cross the bucket boundary towards lower bucket", async () => { + const repo2 = new TestPrefixedRepository(db, bucket - 1, bucketId); + const slot = 30; + await repo.putMany(slot, testData[slot]); + await repo2.putMany(slot, testData[slot]); + + const query = {lt: {prefix: slot, id: 4}}; + + const keys = await repo.keys(query); + + expect(keys).toEqual([ + {prefix: slot, id: 0}, + {prefix: slot, id: 1}, + {prefix: slot, id: 2}, + {prefix: slot, id: 3}, + ]); + }); + + it("should not cross the bucket boundary towards higher bucket", async () => { + const repo2 = new TestPrefixedRepository(db, bucket + 1, bucketId); + const slot = 30; + await repo.putMany(slot, testData[slot]); + await repo2.putMany(slot, testData[slot]); + + const query = {gt: {prefix: slot, id: 295}}; + + const keys = await repo.keys(query); + + expect(keys).toEqual([ + {prefix: slot, id: 296}, + {prefix: slot, id: 297}, + {prefix: slot, id: 298}, + {prefix: slot, id: 299}, + ]); + }); + + it("should not cross the bucket boundary with multiple prefixes", async () => { + const repo2 = new TestPrefixedRepository(db, bucket - 1, bucketId); + const slot1 = 30; + const slot2 = 31; + await repo.putMany(slot1, testData[slot1]); + await repo2.putMany(slot1, testData[slot1]); + + await repo.putMany(slot2, testData[slot2]); + await repo2.putMany(slot2, testData[slot2]); + + const query = {lt: {prefix: slot2, id: 4}}; + + const keys = await repo.keys(query); + + expect(keys).toEqual([...getRangeDataInclusive(slot1, 0, 299), ...getRangeDataInclusive(slot2, 0, 3)]); + }); }); }); diff --git a/packages/db/test/unit/util.test.ts b/packages/db/test/unit/util.test.ts new file mode 100644 index 0000000000..e55e6486f0 --- /dev/null +++ b/packages/db/test/unit/util.test.ts @@ -0,0 +1,54 @@ +import {describe, it, expect} from "vitest"; +import { + encodeNumberForDbKey, + decodeNumberForDbKey, + encodeStringForDbKey, + decodeStringForDbKey, +} from "../../src/index.js"; + +describe("encode/decode number for DB key", () => { + it("roundtrips with fixed byte size (2 bytes)", () => { + const value = 0xffee; + const size = 2; + + const encoded = encodeNumberForDbKey(value, size); + expect(encoded).toEqual(Buffer.from([0xff, 0xee])); + + const decoded = decodeNumberForDbKey(encoded, size); + expect(decoded).toBe(value); + }); + + it("roundtrips with fixed byte size (4 bytes)", () => { + const value = 0xdeadbeef >>> 0; + const size = 4; + + const encoded = encodeNumberForDbKey(value, size); + expect(encoded).toEqual(Buffer.from([0xde, 0xad, 0xbe, 0xef])); + + const decoded = decodeNumberForDbKey(encoded, size); + expect(decoded).toBe(value); + }); + + it("decodes only the first N bytes (ignores trailing)", () => { + const size = 2; + const base = encodeNumberForDbKey(1, size); + const withTrailing = Buffer.concat([base, Buffer.from([0x99, 0x99])]); + const decoded = decodeNumberForDbKey(withTrailing, size); + expect(decoded).toBe(1); + }); +}); + +describe("encode/decode string for DB key", () => { + it("encodes UTF-8 string", () => { + const value = "hello"; + const encoded = encodeStringForDbKey(value); + expect(encoded).toEqual(Buffer.from(value, "utf-8")); + }); + + it("roundtrips Unicode strings", () => { + const value = "héłłø 🌟"; + const encoded = encodeStringForDbKey(value); + const decoded = decodeStringForDbKey(encoded); + expect(decoded).toBe(value); + }); +}); diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index d617e5a906..e81d8c3c6d 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -8,6 +8,7 @@ export * from "./diff.js"; export * from "./err.js"; export * from "./errors.js"; export * from "./format.js"; +export * from "./iterator.js"; export * from "./logger.js"; export * from "./map.js"; export * from "./math.js"; diff --git a/packages/utils/src/iterator.ts b/packages/utils/src/iterator.ts new file mode 100644 index 0000000000..9e09fd4ac1 --- /dev/null +++ b/packages/utils/src/iterator.ts @@ -0,0 +1,10 @@ +// ES2024 onward we have Array.fromAsync which does exactly this +// This function is here as wrapper to be deleted later when we upgrade +// minimum nodejs requirement to > 22 +export async function fromAsync(iter: AsyncIterable): Promise { + const arr: T[] = []; + for await (const v of iter) { + arr.push(v); + } + return arr; +}