mirror of
https://github.com/ChainSafe/lodestar.git
synced 2026-01-09 15:48:08 -05:00
fix: min key for the archive data columns (#8345)
**Motivation** Fix bucket boundary for the min the key for data column sidecars. **Description** - Fix the bucket boundary issue for data column side cars. **Steps to test or reproduce** Run all tests **Note** Level db have a builtin feature to define boundary for buckets called `sublevel`, I didn't realize earlier we are not using that, instead we have our own higher level logic for min/max key for the buckets. So didn't provided that second condition to limit to bucket boundary. --------- Co-authored-by: twoeths <10568965+twoeths@users.noreply.github.com> Co-authored-by: Nico Flaig <nflaig@protonmail.com>
This commit is contained in:
@@ -27,7 +27,7 @@ import {
|
||||
isDenebBlockContents,
|
||||
sszTypesFor,
|
||||
} from "@lodestar/types";
|
||||
import {fromHex, sleep, toHex, toRootHex} from "@lodestar/utils";
|
||||
import {fromAsync, fromHex, sleep, toHex, toRootHex} from "@lodestar/utils";
|
||||
import {
|
||||
BlobsSource,
|
||||
BlockInput,
|
||||
@@ -656,9 +656,9 @@ export function getBeaconBlockApi({
|
||||
);
|
||||
}
|
||||
|
||||
let dataColumnSidecars = await db.dataColumnSidecar.values(blockRoot);
|
||||
let dataColumnSidecars = await fromAsync(db.dataColumnSidecar.valuesStream(blockRoot));
|
||||
if (dataColumnSidecars.length === 0) {
|
||||
dataColumnSidecars = await db.dataColumnSidecarArchive.values(block.message.slot);
|
||||
dataColumnSidecars = await fromAsync(db.dataColumnSidecarArchive.valuesStream(block.message.slot));
|
||||
}
|
||||
|
||||
if (dataColumnSidecars.length === 0) {
|
||||
|
||||
@@ -3,7 +3,7 @@ import {ApplicationMethods} from "@lodestar/api/server";
|
||||
import {ExecutionStatus} from "@lodestar/fork-choice";
|
||||
import {ZERO_HASH_HEX} from "@lodestar/params";
|
||||
import {BeaconState} from "@lodestar/types";
|
||||
import {toRootHex} from "@lodestar/utils";
|
||||
import {fromAsync, toRootHex} from "@lodestar/utils";
|
||||
import {isOptimisticBlock} from "../../../util/forkChoice.js";
|
||||
import {getStateSlotFromBytes} from "../../../util/multifork.js";
|
||||
import {getBlockResponse} from "../beacon/blocks/utils.js";
|
||||
@@ -96,10 +96,10 @@ export function getDebugApi({
|
||||
const {block, executionOptimistic, finalized} = await getBlockResponse(chain, blockId);
|
||||
const blockRoot = config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
|
||||
|
||||
let dataColumnSidecars = await db.dataColumnSidecar.values(blockRoot);
|
||||
let dataColumnSidecars = await fromAsync(db.dataColumnSidecar.valuesStream(blockRoot));
|
||||
|
||||
if (dataColumnSidecars.length === 0) {
|
||||
dataColumnSidecars = await db.dataColumnSidecarArchive.values(block.message.slot);
|
||||
dataColumnSidecars = await fromAsync(db.dataColumnSidecarArchive.valuesStream(block.message.slot));
|
||||
}
|
||||
|
||||
if (dataColumnSidecars.length === 0) {
|
||||
|
||||
@@ -5,7 +5,7 @@ import {IForkChoice} from "@lodestar/fork-choice";
|
||||
import {ForkSeq, SLOTS_PER_EPOCH} from "@lodestar/params";
|
||||
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
|
||||
import {Epoch, RootHex, Slot} from "@lodestar/types";
|
||||
import {Logger, fromHex, prettyPrintIndices, toRootHex} from "@lodestar/utils";
|
||||
import {Logger, fromAsync, fromHex, prettyPrintIndices, toRootHex} from "@lodestar/utils";
|
||||
import {IBeaconDb} from "../../../db/index.js";
|
||||
import {BlockArchiveBatchPutBinaryItem} from "../../../db/repositories/index.js";
|
||||
import {ensureDir, writeIfNotExist} from "../../../util/file.js";
|
||||
@@ -175,10 +175,12 @@ export async function archiveBlocks(
|
||||
const dataColumnSidecarsMinEpoch = currentEpoch - dataColumnSidecarsArchiveWindow;
|
||||
if (dataColumnSidecarsMinEpoch >= config.FULU_FORK_EPOCH) {
|
||||
const prefixedKeys = await db.dataColumnSidecarArchive.keys({
|
||||
lt: db.dataColumnSidecarArchive.getMaxKeyRaw(computeStartSlotAtEpoch(dataColumnSidecarsMinEpoch)),
|
||||
// The `id` value `0` refers to the column index. So we want to fetch all sidecars less than zero column of `dataColumnSidecarsMinEpoch`
|
||||
lt: {prefix: computeStartSlotAtEpoch(dataColumnSidecarsMinEpoch), id: 0},
|
||||
});
|
||||
// for each slot there could be multiple dataColumnSidecar, so we need to deduplicate it
|
||||
const slotsToDelete = [...new Set(prefixedKeys.map(({prefix}) => prefix))].sort((a, b) => a - b);
|
||||
|
||||
if (slotsToDelete.length > 0) {
|
||||
await db.dataColumnSidecarArchive.deleteMany(slotsToDelete);
|
||||
`dataColumnSidecars prune slotRange=${prettyPrintIndices(slotsToDelete)}, numOfSlots=${slotsToDelete.length} totalNumOfSidecars=${prefixedKeys.length}`;
|
||||
@@ -331,7 +333,7 @@ async function migrateDataColumnSidecarsFromHotToColdDb(
|
||||
continue;
|
||||
}
|
||||
|
||||
const dataColumnSidecarBytes = await db.dataColumnSidecar.valuesBinary(block.root);
|
||||
const dataColumnSidecarBytes = await fromAsync(db.dataColumnSidecar.valuesStreamBinary(block.root));
|
||||
// there could be 0 dataColumnSidecarBytes if block has no blob
|
||||
logger.verbose("migrateDataColumnSidecarsFromHotToColdDb", {
|
||||
slot: block.slot,
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
import {ChainForkConfig} from "@lodestar/config";
|
||||
import {Db, PrefixedRepository} from "@lodestar/db";
|
||||
import {Db, decodeNumberForDbKey, encodeNumberForDbKey, PrefixedRepository} from "@lodestar/db";
|
||||
import {NUMBER_OF_COLUMNS} from "@lodestar/params";
|
||||
import {ColumnIndex, Root, fulu, ssz} from "@lodestar/types";
|
||||
import {bytesToInt, intToBytes} from "@lodestar/utils";
|
||||
import {Bucket, getBucketNameByValue} from "../buckets.js";
|
||||
|
||||
const COLUMN_INDEX_BYTE_SIZE = 2;
|
||||
const BLOCK_ROOT_BYTE_SIZE = 32;
|
||||
|
||||
type BlockRoot = Root;
|
||||
|
||||
/**
|
||||
@@ -27,21 +29,21 @@ export class DataColumnSidecarRepository extends PrefixedRepository<BlockRoot, C
|
||||
}
|
||||
|
||||
encodeKeyRaw(prefix: BlockRoot, id: ColumnIndex): Uint8Array {
|
||||
return Buffer.concat([prefix, intToBytes(id, 4)]);
|
||||
return Buffer.concat([prefix, encodeNumberForDbKey(id, COLUMN_INDEX_BYTE_SIZE)]);
|
||||
}
|
||||
|
||||
decodeKeyRaw(raw: Uint8Array): {prefix: BlockRoot; id: ColumnIndex} {
|
||||
return {
|
||||
prefix: raw.slice(0, 32) as BlockRoot,
|
||||
id: bytesToInt(raw.slice(32, 36)) as ColumnIndex,
|
||||
prefix: raw.slice(0, BLOCK_ROOT_BYTE_SIZE) as BlockRoot,
|
||||
id: decodeNumberForDbKey(raw.slice(BLOCK_ROOT_BYTE_SIZE), COLUMN_INDEX_BYTE_SIZE) as ColumnIndex,
|
||||
};
|
||||
}
|
||||
|
||||
getMaxKeyRaw(prefix: BlockRoot): Uint8Array {
|
||||
return Buffer.concat([prefix, intToBytes(NUMBER_OF_COLUMNS, 4)]);
|
||||
return Buffer.concat([prefix, encodeNumberForDbKey(NUMBER_OF_COLUMNS - 1, COLUMN_INDEX_BYTE_SIZE)]);
|
||||
}
|
||||
|
||||
getMinKeyRaw(prefix: BlockRoot): Uint8Array {
|
||||
return Buffer.concat([prefix, intToBytes(0, 4)]);
|
||||
return Buffer.concat([prefix, encodeNumberForDbKey(0, COLUMN_INDEX_BYTE_SIZE)]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
import {ChainForkConfig} from "@lodestar/config";
|
||||
import {Db, PrefixedRepository} from "@lodestar/db";
|
||||
import {Db, decodeNumberForDbKey, encodeNumberForDbKey, PrefixedRepository} from "@lodestar/db";
|
||||
import {NUMBER_OF_COLUMNS} from "@lodestar/params";
|
||||
import {ColumnIndex, Slot, fulu, ssz} from "@lodestar/types";
|
||||
import {bytesToInt, intToBytes} from "@lodestar/utils";
|
||||
import {Bucket, getBucketNameByValue} from "../buckets.js";
|
||||
|
||||
const COLUMN_INDEX_BYTE_SIZE = 2;
|
||||
const SLOT_BYTE_SIZE = 8;
|
||||
|
||||
/**
|
||||
* DataColumnSidecarsRepository
|
||||
* Used to store `finalized` DataColumnSidecars
|
||||
@@ -25,21 +27,30 @@ export class DataColumnSidecarArchiveRepository extends PrefixedRepository<Slot,
|
||||
}
|
||||
|
||||
encodeKeyRaw(prefix: Slot, id: ColumnIndex): Uint8Array {
|
||||
return Buffer.concat([intToBytes(prefix, 4), intToBytes(id, 4)]);
|
||||
return Buffer.concat([
|
||||
encodeNumberForDbKey(prefix, SLOT_BYTE_SIZE),
|
||||
encodeNumberForDbKey(id, COLUMN_INDEX_BYTE_SIZE),
|
||||
]);
|
||||
}
|
||||
|
||||
decodeKeyRaw(raw: Uint8Array): {prefix: Slot; id: ColumnIndex} {
|
||||
return {
|
||||
prefix: bytesToInt(raw.slice(0, 4)) as Slot,
|
||||
id: bytesToInt(raw.slice(4, 8)) as ColumnIndex,
|
||||
prefix: decodeNumberForDbKey(raw, SLOT_BYTE_SIZE) as Slot,
|
||||
id: decodeNumberForDbKey(raw.slice(SLOT_BYTE_SIZE), COLUMN_INDEX_BYTE_SIZE) as ColumnIndex,
|
||||
};
|
||||
}
|
||||
|
||||
getMaxKeyRaw(prefix: Slot): Uint8Array {
|
||||
return Buffer.concat([intToBytes(prefix, 4), intToBytes(NUMBER_OF_COLUMNS, 4)]);
|
||||
return Buffer.concat([
|
||||
encodeNumberForDbKey(prefix, SLOT_BYTE_SIZE),
|
||||
encodeNumberForDbKey(NUMBER_OF_COLUMNS - 1, COLUMN_INDEX_BYTE_SIZE),
|
||||
]);
|
||||
}
|
||||
|
||||
getMinKeyRaw(prefix: Slot): Uint8Array {
|
||||
return Buffer.concat([intToBytes(prefix, 4), intToBytes(0, 4)]);
|
||||
return Buffer.concat([
|
||||
encodeNumberForDbKey(prefix, SLOT_BYTE_SIZE),
|
||||
encodeNumberForDbKey(0, COLUMN_INDEX_BYTE_SIZE),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,14 @@ import {MockedBeaconDb, getMockedBeaconDb} from "../../../mocks/mockedBeaconDb.j
|
||||
import {testLogger} from "../../../utils/logger.js";
|
||||
import {generateProtoBlock} from "../../../utils/typeGenerator.js";
|
||||
|
||||
function toAsyncIterable<T>(items: T[]): AsyncIterable<T> {
|
||||
return {
|
||||
async *[Symbol.asyncIterator]() {
|
||||
for (const it of items) yield it;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe("block archiver task", () => {
|
||||
const logger = testLogger();
|
||||
|
||||
@@ -102,9 +110,9 @@ describe("block archiver task", () => {
|
||||
const dataColumnBytes = ssz.fulu.DataColumnSidecar.serialize(dataColumn);
|
||||
|
||||
vi.spyOn(dbStub.block, "getBinary").mockResolvedValue(blockBytes);
|
||||
vi.spyOn(dbStub.dataColumnSidecar, "valuesBinary").mockResolvedValue([
|
||||
{id: dataColumn.index, prefix: block.message.stateRoot, value: dataColumnBytes},
|
||||
]);
|
||||
vi.spyOn(dbStub.dataColumnSidecar, "valuesStreamBinary").mockReturnValue(
|
||||
toAsyncIterable([{id: dataColumn.index, prefix: block.message.stateRoot, value: dataColumnBytes}])
|
||||
);
|
||||
|
||||
// Create blocks after fulu fork
|
||||
const blocks = Array.from({length: 5}, (_, i) =>
|
||||
@@ -158,9 +166,7 @@ describe("block archiver task", () => {
|
||||
);
|
||||
|
||||
expect(dbStub.dataColumnSidecarArchive.keys).toBeCalledWith({
|
||||
lt: dbStub.dataColumnSidecarArchive.getMaxKeyRaw(
|
||||
computeStartSlotAtEpoch(currentEpoch - config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS)
|
||||
),
|
||||
lt: {prefix: computeStartSlotAtEpoch(currentEpoch - config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS), id: 0},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import {createChainForkConfig} from "@lodestar/config";
|
||||
import {LevelDbController} from "@lodestar/db";
|
||||
import {Root, fulu, ssz} from "@lodestar/types";
|
||||
import {toHex} from "@lodestar/utils";
|
||||
import {fromAsync, toHex} from "@lodestar/utils";
|
||||
import {rimraf} from "rimraf";
|
||||
import {afterEach, beforeEach, describe, expect, it} from "vitest";
|
||||
import {DataColumnSidecarRepository} from "../../../../../src/db/repositories/dataColumnSidecar.js";
|
||||
@@ -9,6 +9,7 @@ import {getDataColumnSidecarsFromBlock} from "../../../../../src/util/dataColumn
|
||||
import {kzg} from "../../../../../src/util/kzg.js";
|
||||
import {testLogger} from "../../../../utils/logger.js";
|
||||
import {DataColumnSidecarArchiveRepository} from "../../../../../src/db/repositories/dataColumnSidecarArchive.js";
|
||||
import {NUMBER_OF_COLUMNS} from "@lodestar/params";
|
||||
|
||||
/* eslint-disable @typescript-eslint/naming-convention */
|
||||
const config = createChainForkConfig({
|
||||
@@ -17,7 +18,109 @@ const config = createChainForkConfig({
|
||||
DENEB_FORK_EPOCH: 0,
|
||||
FULU_FORK_EPOCH: 0,
|
||||
});
|
||||
describe("block archive repository", () => {
|
||||
|
||||
describe("dataColumnSidecar repository", () => {
|
||||
const testDir = "./.tmp";
|
||||
let dataColumnRepo: DataColumnSidecarRepository;
|
||||
let db: LevelDbController;
|
||||
let allDataColumnSidecars: fulu.DataColumnSidecars;
|
||||
const blobKzgCommitmentsLen = 3;
|
||||
const dataColumn = ssz.fulu.DataColumnSidecar.defaultValue();
|
||||
const blockSlot = 11;
|
||||
dataColumn.signedBlockHeader.message.slot = blockSlot;
|
||||
|
||||
beforeEach(async () => {
|
||||
db = await LevelDbController.create({name: testDir}, {logger: testLogger()});
|
||||
dataColumnRepo = new DataColumnSidecarRepository(config, db);
|
||||
|
||||
const blob = ssz.deneb.Blob.defaultValue();
|
||||
const commitment = kzg.blobToKzgCommitment(blob);
|
||||
const signedBlock = ssz.fulu.SignedBeaconBlock.defaultValue();
|
||||
const blobs = [blob, blob, blob];
|
||||
const commitments = Array.from({length: blobKzgCommitmentsLen}, () => commitment);
|
||||
signedBlock.message.body.blobKzgCommitments = commitments;
|
||||
const cellsAndProofs = blobs.map((b) => kzg.computeCellsAndKzgProofs(b));
|
||||
allDataColumnSidecars = getDataColumnSidecarsFromBlock(config, signedBlock, cellsAndProofs);
|
||||
for (let j = 0; j < allDataColumnSidecars.length; j++) {
|
||||
allDataColumnSidecars[j].index = j;
|
||||
}
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await db.close();
|
||||
rimraf.sync(testDir);
|
||||
});
|
||||
|
||||
describe("encodeKeyRaw", () => {
|
||||
const root = Buffer.from("0102030405060708010203040506070801020304050607080102030405060708", "hex");
|
||||
const columnIndex = 9;
|
||||
|
||||
it("should correctly encode the key in right size", () => {
|
||||
const bytes = dataColumnRepo.encodeKeyRaw(root, columnIndex);
|
||||
|
||||
// 32 byte root and 2 byte column index
|
||||
expect(bytes).toHaveLength(32 + 2);
|
||||
});
|
||||
|
||||
it("should use correct byte size for slot", () => {
|
||||
const bytes = dataColumnRepo.encodeKeyRaw(root, columnIndex);
|
||||
|
||||
// encoded as `be` 8 bytes value
|
||||
expect(bytes.slice(0, 32)).toEqual(
|
||||
Buffer.from([1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8])
|
||||
);
|
||||
});
|
||||
|
||||
it("should use correct byte size for column index", () => {
|
||||
const bytes = dataColumnRepo.encodeKeyRaw(root, columnIndex);
|
||||
|
||||
// encoded as `be` 2 bytes value
|
||||
expect(bytes.slice(32)).toEqual(Buffer.from([0, 9]));
|
||||
});
|
||||
});
|
||||
|
||||
describe("decodeKeyRaw", () => {
|
||||
const root = new Uint8Array([
|
||||
1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8,
|
||||
]);
|
||||
const columnIndex = 9;
|
||||
const bytes = new Uint8Array([
|
||||
1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 0, 9,
|
||||
]);
|
||||
|
||||
it("should correctly decode key", () => {
|
||||
expect(dataColumnRepo.decodeKeyRaw(bytes)).toEqual({prefix: root, id: columnIndex});
|
||||
});
|
||||
});
|
||||
|
||||
describe("getMaxKeyRaw", () => {
|
||||
it("should return inclusive max key", () => {
|
||||
const root = new Uint8Array([
|
||||
1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8,
|
||||
]);
|
||||
|
||||
const bytes = dataColumnRepo.getMaxKeyRaw(root);
|
||||
|
||||
// We subtract 1 from total number of columns to have inclusive range
|
||||
expect(bytes.slice(32)).toEqual(Buffer.from([0, NUMBER_OF_COLUMNS - 1]));
|
||||
});
|
||||
});
|
||||
|
||||
describe("getMinKeyRaw", () => {
|
||||
it("should return inclusive max key", () => {
|
||||
const root = new Uint8Array([
|
||||
1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8,
|
||||
]);
|
||||
|
||||
const bytes = dataColumnRepo.getMinKeyRaw(root);
|
||||
|
||||
// Columns starts from 0
|
||||
expect(bytes.slice(32)).toEqual(Buffer.from([0, 0]));
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("dataColumnSidecarArchive repository", () => {
|
||||
const testDir = "./.tmp";
|
||||
let dataColumnRepo: DataColumnSidecarRepository;
|
||||
let dataColumnArchiveRepo: DataColumnSidecarArchiveRepository;
|
||||
@@ -53,12 +156,70 @@ describe("block archive repository", () => {
|
||||
rimraf.sync(testDir);
|
||||
});
|
||||
|
||||
describe("encodeKeyRaw", () => {
|
||||
const slot = 12;
|
||||
const columnIndex = 3;
|
||||
|
||||
it("should correctly encode the key in right size", () => {
|
||||
const bytes = dataColumnArchiveRepo.encodeKeyRaw(slot, columnIndex);
|
||||
|
||||
// 8 byte slot and 2 byte column index
|
||||
expect(bytes).toHaveLength(8 + 2);
|
||||
});
|
||||
|
||||
it("should use correct byte size for slot", () => {
|
||||
const bytes = dataColumnArchiveRepo.encodeKeyRaw(slot, columnIndex);
|
||||
|
||||
// encoded as `be` 8 bytes value
|
||||
expect(bytes.slice(0, 8)).toEqual(Buffer.from([0, 0, 0, 0, 0, 0, 0, 12]));
|
||||
});
|
||||
|
||||
it("should use correct byte size for column index", () => {
|
||||
const bytes = dataColumnArchiveRepo.encodeKeyRaw(slot, columnIndex);
|
||||
|
||||
// encoded as `be` 2 bytes value
|
||||
expect(bytes.slice(8)).toEqual(Buffer.from([0, 3]));
|
||||
});
|
||||
});
|
||||
|
||||
describe("decodeKeyRaw", () => {
|
||||
const slot = 12;
|
||||
const columnIndex = 3;
|
||||
const bytes = Buffer.from([0, 0, 0, 0, 0, 0, 0, 12, 0, 3]);
|
||||
|
||||
it("should correctly decode key", () => {
|
||||
expect(dataColumnArchiveRepo.decodeKeyRaw(bytes)).toEqual({prefix: slot, id: columnIndex});
|
||||
});
|
||||
});
|
||||
|
||||
describe("getMaxKeyRaw", () => {
|
||||
it("should return inclusive max key", () => {
|
||||
const slot = 9;
|
||||
|
||||
const bytes = dataColumnArchiveRepo.getMaxKeyRaw(slot);
|
||||
|
||||
// We subtract 1 from total number of columns to have inclusive range
|
||||
expect(bytes.slice(8)).toEqual(Buffer.from([0, NUMBER_OF_COLUMNS - 1]));
|
||||
});
|
||||
});
|
||||
|
||||
describe("getMinKeyRaw", () => {
|
||||
it("should return inclusive max key", () => {
|
||||
const slot = 9;
|
||||
|
||||
const bytes = dataColumnArchiveRepo.getMinKeyRaw(slot);
|
||||
|
||||
// Columns starts from 0
|
||||
expect(bytes.slice(8)).toEqual(Buffer.from([0, 0]));
|
||||
});
|
||||
});
|
||||
|
||||
it("should get data column sidecars by parent root", async () => {
|
||||
const dataColumnSidecars = allDataColumnSidecars.slice(0, 7);
|
||||
const dataColumnsLen = dataColumnSidecars.length;
|
||||
|
||||
await dataColumnRepo.putMany(blockRoot, dataColumnSidecars);
|
||||
const retrievedDataColumnSidecars = await dataColumnRepo.values(blockRoot);
|
||||
const retrievedDataColumnSidecars = await fromAsync(dataColumnRepo.valuesStream(blockRoot));
|
||||
|
||||
expect(retrievedDataColumnSidecars).toHaveLength(dataColumnsLen);
|
||||
expect(retrievedDataColumnSidecars.map((c) => c.index)).toEqual(dataColumnSidecars.map((c) => c.index));
|
||||
@@ -74,7 +235,7 @@ describe("block archive repository", () => {
|
||||
const dataColumnsLen = dataColumnSidecars.length;
|
||||
|
||||
await dataColumnRepo.putMany(blockRoot, dataColumnSidecars);
|
||||
const retrievedDataColumnSidecarBytes = await dataColumnRepo.valuesBinary(blockRoot);
|
||||
const retrievedDataColumnSidecarBytes = await fromAsync(dataColumnRepo.valuesStreamBinary(blockRoot));
|
||||
|
||||
expect(retrievedDataColumnSidecarBytes).toHaveLength(dataColumnsLen);
|
||||
|
||||
@@ -90,7 +251,7 @@ describe("block archive repository", () => {
|
||||
// same api to writeBlockInputToDb
|
||||
await dataColumnRepo.putMany(blockRoot, allDataColumnSidecars);
|
||||
// same api to migrateDataColumnSidecarsFromHotToColDb
|
||||
const dataColumnSidecarBytes = await dataColumnRepo.valuesBinary(blockRoot);
|
||||
const dataColumnSidecarBytes = await fromAsync(dataColumnRepo.valuesStreamBinary(blockRoot));
|
||||
await dataColumnArchiveRepo.putManyBinary(
|
||||
blockSlot,
|
||||
dataColumnSidecarBytes.map((p) => ({key: p.id, value: p.value}))
|
||||
|
||||
@@ -16,6 +16,10 @@ type Id = Uint8Array | string | number | bigint;
|
||||
*/
|
||||
export abstract class PrefixedRepository<P, I extends Id, T> {
|
||||
private readonly dbReqOpts: DbReqOpts;
|
||||
/** Inclusive range for the minimum key for the bucket */
|
||||
private readonly minKey: Uint8Array;
|
||||
/** Exclusive range for the maximum key for the bucket */
|
||||
private readonly maxKey: Uint8Array;
|
||||
|
||||
protected constructor(
|
||||
protected config: ChainForkConfig,
|
||||
@@ -25,6 +29,8 @@ export abstract class PrefixedRepository<P, I extends Id, T> {
|
||||
private readonly bucketId: string
|
||||
) {
|
||||
this.dbReqOpts = {bucketId: this.bucketId};
|
||||
this.minKey = encodeKey(bucket, Buffer.alloc(0));
|
||||
this.maxKey = encodeKey(bucket + 1, Buffer.alloc(0));
|
||||
}
|
||||
|
||||
abstract encodeKeyRaw(prefix: P, id: I): Uint8Array;
|
||||
@@ -154,17 +160,6 @@ export abstract class PrefixedRepository<P, I extends Id, T> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Non iterative version of `valuesStream`.
|
||||
*/
|
||||
async values(prefix: P | P[]): Promise<T[]> {
|
||||
const result: T[] = [];
|
||||
for await (const value of this.valuesStream(prefix)) {
|
||||
result.push(value);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async *valuesStreamBinary(prefix: P | P[]): AsyncIterable<{prefix: P; id: I; value: Uint8Array}> {
|
||||
for (const p of Array.isArray(prefix) ? prefix : [prefix]) {
|
||||
for await (const {key, value} of this.db.entriesStream({
|
||||
@@ -183,17 +178,6 @@ export abstract class PrefixedRepository<P, I extends Id, T> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Non iterative version of `valuesStreamBinary`.
|
||||
*/
|
||||
async valuesBinary(prefix: P | P[]): Promise<{prefix: P; id: I; value: Uint8Array}[]> {
|
||||
const result = [];
|
||||
for await (const value of this.valuesStreamBinary(prefix)) {
|
||||
result.push(value);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async *entriesStream(prefix: P | P[]): AsyncIterable<{prefix: P; id: I; value: T}> {
|
||||
for (const v of Array.isArray(prefix) ? prefix : [prefix]) {
|
||||
for await (const {key, value} of this.db.entriesStream({
|
||||
@@ -216,7 +200,7 @@ export abstract class PrefixedRepository<P, I extends Id, T> {
|
||||
for (const v of Array.isArray(prefix) ? prefix : [prefix]) {
|
||||
for await (const {key, value} of this.db.entriesStream({
|
||||
gte: this.wrapKey(this.getMinKeyRaw(v)),
|
||||
lt: this.wrapKey(this.getMaxKeyRaw(v)),
|
||||
lte: this.wrapKey(this.getMaxKeyRaw(v)),
|
||||
bucketId: this.bucketId,
|
||||
})) {
|
||||
const {prefix, id} = this.decodeKeyRaw(this.unwrapKey(key));
|
||||
@@ -230,23 +214,25 @@ export abstract class PrefixedRepository<P, I extends Id, T> {
|
||||
}
|
||||
}
|
||||
|
||||
async keys(opts?: FilterOptions<Uint8Array>): Promise<{prefix: P; id: I}[]> {
|
||||
async keys(opts?: FilterOptions<{prefix: P; id: I}>): Promise<{prefix: P; id: I}[]> {
|
||||
const optsBuff: FilterOptions<Uint8Array> = {
|
||||
bucketId: this.bucketId,
|
||||
};
|
||||
|
||||
// Set at least one min key
|
||||
if (opts?.lt !== undefined) {
|
||||
optsBuff.lt = this.wrapKey(opts.lt);
|
||||
} else if (opts?.lte !== undefined) {
|
||||
optsBuff.lte = this.wrapKey(opts.lte);
|
||||
if (opts?.gte !== undefined) {
|
||||
optsBuff.gte = this.wrapKey(this.encodeKeyRaw(opts.gte.prefix, opts.gte.id));
|
||||
} else if (opts?.gt !== undefined) {
|
||||
optsBuff.gt = this.wrapKey(this.encodeKeyRaw(opts.gt.prefix, opts.gt.id));
|
||||
} else {
|
||||
optsBuff.gte = this.minKey;
|
||||
}
|
||||
|
||||
// Set at least on max key
|
||||
if (opts?.gt !== undefined) {
|
||||
optsBuff.gt = this.wrapKey(opts.gt);
|
||||
} else if (opts?.gte !== undefined) {
|
||||
optsBuff.gte = this.wrapKey(opts.gte);
|
||||
if (opts?.lte !== undefined) {
|
||||
optsBuff.lte = this.wrapKey(this.encodeKeyRaw(opts.lte.prefix, opts.lte.id));
|
||||
} else if (opts?.lt !== undefined) {
|
||||
optsBuff.lt = this.wrapKey(this.encodeKeyRaw(opts.lt.prefix, opts.lt.id));
|
||||
} else {
|
||||
optsBuff.lt = this.maxKey;
|
||||
}
|
||||
|
||||
if (opts?.reverse !== undefined) optsBuff.reverse = opts.reverse;
|
||||
|
||||
@@ -1,10 +1,18 @@
|
||||
import {intToBytes} from "@lodestar/utils";
|
||||
import {bytesToInt, intToBytes} from "@lodestar/utils";
|
||||
import {BUCKET_LENGTH} from "./const.js";
|
||||
|
||||
export const uintLen = 8;
|
||||
|
||||
/**
|
||||
* Prepend a bucket to a key
|
||||
* Encode a key for the db write/read, Prepend a bucket to a key
|
||||
*
|
||||
* The encoding of key is very important step that can cause failure of proper indexing and querying of data
|
||||
*
|
||||
* We are using LevelDB which have pluggable comparator support, so you can decide how to
|
||||
* compare keys. But for NodeJS binding only default comparison algorithm is supported which
|
||||
* uses lexicographical comparison of the raw bytes of the keys
|
||||
*
|
||||
* It is important to use **helpers implemented here** to encode db keys so that key comparison properly work.
|
||||
*/
|
||||
export function encodeKey(bucket: number, key: Uint8Array | string | number | bigint): Uint8Array {
|
||||
let buf: Buffer;
|
||||
@@ -24,3 +32,19 @@ export function encodeKey(bucket: number, key: Uint8Array | string | number | bi
|
||||
buf.set(intToBytes(bucket, BUCKET_LENGTH, "le"), 0);
|
||||
return buf;
|
||||
}
|
||||
|
||||
export function encodeNumberForDbKey(value: number, byteSize: number): Uint8Array {
|
||||
return intToBytes(value, byteSize, "be");
|
||||
}
|
||||
|
||||
export function decodeNumberForDbKey(value: Uint8Array, byteSize: number): number {
|
||||
return bytesToInt(value.slice(0, byteSize), "be");
|
||||
}
|
||||
|
||||
export function encodeStringForDbKey(value: string): Uint8Array {
|
||||
return Buffer.from(value, "utf-8");
|
||||
}
|
||||
|
||||
export function decodeStringForDbKey(value: Uint8Array): string {
|
||||
return Buffer.from(value).toString("utf8");
|
||||
}
|
||||
|
||||
@@ -1,38 +1,67 @@
|
||||
/** biome-ignore-all lint/style/noNonNullAssertion: values all exist */
|
||||
import {beforeAll, afterAll, beforeEach, describe, it, expect} from "vitest";
|
||||
import {getEnvLogger} from "@lodestar/logger/env";
|
||||
import {PrefixedRepository, LevelDbController, type Db} from "../../src/index.js";
|
||||
import {
|
||||
PrefixedRepository,
|
||||
LevelDbController,
|
||||
type Db,
|
||||
encodeNumberForDbKey,
|
||||
decodeNumberForDbKey,
|
||||
} from "../../src/index.js";
|
||||
import {fromAsync} from "@lodestar/utils";
|
||||
|
||||
type Slot = number;
|
||||
type Column = number;
|
||||
|
||||
type TestPrefixedType = {column: Column; value: string};
|
||||
|
||||
// Fake SSZ-like Type for string values
|
||||
const fakeType = {
|
||||
serialize: (v: string): Uint8Array => Buffer.from(v, "utf8"),
|
||||
deserialize: (d: Uint8Array): string => Buffer.from(d).toString("utf8"),
|
||||
const testPrefixedType = {
|
||||
serialize: (v: TestPrefixedType): Uint8Array => Buffer.from(JSON.stringify(v), "utf8"),
|
||||
deserialize: (d: Uint8Array): TestPrefixedType => JSON.parse(Buffer.from(d).toString("utf8")) as TestPrefixedType,
|
||||
hashTreeRoot: (v: string): Uint8Array => Buffer.from("id:" + v, "utf8"),
|
||||
} as any;
|
||||
|
||||
// P = number (single-byte prefix), I = Uint8Array id (raw bytes)
|
||||
class TestPrefixedRepository extends PrefixedRepository<number, Uint8Array, string> {
|
||||
class TestPrefixedRepository extends PrefixedRepository<Slot, Column, TestPrefixedType> {
|
||||
constructor(db: Db, bucket: number, bucketId: string) {
|
||||
super({} as any, db, bucket, fakeType, bucketId);
|
||||
super({} as any, db, bucket, testPrefixedType, bucketId);
|
||||
}
|
||||
|
||||
encodeKeyRaw(prefix: number, id: Uint8Array): Uint8Array {
|
||||
return Buffer.concat([Buffer.from([prefix]), Buffer.from(id)]);
|
||||
encodeKeyRaw(prefix: number, id: number): Uint8Array {
|
||||
return Buffer.concat([encodeNumberForDbKey(prefix, 2), encodeNumberForDbKey(id, 2)]);
|
||||
}
|
||||
|
||||
decodeKeyRaw(raw: Uint8Array): {prefix: number; id: Uint8Array} {
|
||||
return {prefix: raw[0], id: raw.slice(1)};
|
||||
decodeKeyRaw(raw: Uint8Array): {prefix: number; id: number} {
|
||||
return {prefix: decodeNumberForDbKey(raw, 2), id: decodeNumberForDbKey(raw.slice(2), 2)};
|
||||
}
|
||||
|
||||
getMaxKeyRaw(prefix: number): Uint8Array {
|
||||
return Buffer.from([prefix, 0xff]);
|
||||
return Buffer.concat([encodeNumberForDbKey(prefix, 2), encodeNumberForDbKey(0xffff, 2)]);
|
||||
}
|
||||
|
||||
getMinKeyRaw(prefix: number): Uint8Array {
|
||||
return Buffer.from([prefix, 0x00]);
|
||||
return Buffer.concat([encodeNumberForDbKey(prefix, 2), encodeNumberForDbKey(0, 2)]);
|
||||
}
|
||||
|
||||
getId(value: TestPrefixedType): number {
|
||||
return value.column;
|
||||
}
|
||||
}
|
||||
|
||||
const numberOfSlots = 50;
|
||||
|
||||
// We need to store columns which are more than 1 byte to test the proper encoding
|
||||
const numberOfColumns = 300;
|
||||
|
||||
const generateColumnsData = (slot: Slot) =>
|
||||
Array.from({length: numberOfColumns}, (_, c) => ({column: c, value: `s:${slot}-c:${c}`}));
|
||||
|
||||
// Generate fixtures to be used later with the db
|
||||
const testData: Record<Slot, TestPrefixedType[]> = Array.from({length: numberOfSlots}, (_, s) =>
|
||||
generateColumnsData(s)
|
||||
);
|
||||
|
||||
describe("abstractPrefixedRepository", () => {
|
||||
const bucket = 12;
|
||||
const bucketId = "prefixed-repo-e2e";
|
||||
@@ -57,163 +86,388 @@ describe("abstractPrefixedRepository", () => {
|
||||
|
||||
it("put/get/getBinary/delete per prefix", async () => {
|
||||
const p = 1;
|
||||
const value = "hello";
|
||||
await repo.put(p, value);
|
||||
const id = repo.getId(value);
|
||||
expect(await repo.get(p, id)).toBe(value);
|
||||
const id = 0;
|
||||
const value = testData[p][id];
|
||||
|
||||
// Put
|
||||
await expect(repo.put(p, value)).resolves.toBeUndefined();
|
||||
|
||||
// Get
|
||||
expect(await repo.get(p, id)).toEqual(value);
|
||||
|
||||
// Get Binary
|
||||
const bin = await repo.getBinary(p, id);
|
||||
expect(Buffer.from(bin!).toString("utf8")).toBe(value);
|
||||
expect(testPrefixedType.deserialize(bin)).toEqual(value);
|
||||
|
||||
// Delete
|
||||
await repo.delete(p, id);
|
||||
expect(await repo.get(p, id)).toBeNull();
|
||||
});
|
||||
|
||||
it("getMany and getManyBinary in order and with missing ids", async () => {
|
||||
const p = 3;
|
||||
const v1 = "a";
|
||||
const v2 = "b";
|
||||
await repo.put(p, v1);
|
||||
await repo.put(p, v2);
|
||||
const id1 = repo.getId(v1);
|
||||
const id2 = repo.getId(v2);
|
||||
const idMissing = Buffer.from([0x99]);
|
||||
const ids = [0, 10, 4, 20];
|
||||
const values = [testData[p][0], testData[p][10], testData[p][4], testData[p][20]];
|
||||
const valuesBinaries = values.map((v) => testPrefixedType.serialize(v));
|
||||
|
||||
await expect(repo.getMany(p, [id1, id2, idMissing])).resolves.toEqual([v1, v2, undefined]);
|
||||
const resBin = await repo.getManyBinary(p, [id1, id2, idMissing]);
|
||||
expect(resBin.map((b) => (b ? Buffer.from(b).toString("utf8") : undefined))).toEqual([v1, v2, undefined]);
|
||||
for (const v of values) {
|
||||
await repo.put(p, v);
|
||||
}
|
||||
|
||||
const result = await repo.getMany(p, ids);
|
||||
|
||||
expect(result).toHaveLength(ids.length);
|
||||
expect(result).toEqual(values);
|
||||
|
||||
const resultBinary = await repo.getManyBinary(p, ids);
|
||||
|
||||
expect(resultBinary).toHaveLength(ids.length);
|
||||
expect(resultBinary).toEqual(valuesBinaries);
|
||||
});
|
||||
|
||||
it("putMany and putManyBinary store correctly", async () => {
|
||||
const p = 4;
|
||||
await repo.putMany(p, ["x", "y"]);
|
||||
const idX = repo.getId("x");
|
||||
const idY = repo.getId("y");
|
||||
expect(await repo.get(p, idX)).toBe("x");
|
||||
expect(await repo.get(p, idY)).toBe("y");
|
||||
it("putMany store correctly", async () => {
|
||||
const p = 3;
|
||||
const ids = [0, 10, 4, 20];
|
||||
const values = [testData[p][0], testData[p][10], testData[p][4], testData[p][20]];
|
||||
|
||||
const p2 = 5;
|
||||
const idA = Buffer.from([0x2a]);
|
||||
const idB = Buffer.from([0x2b]);
|
||||
await repo.putManyBinary(p2, [
|
||||
{key: idA, value: Buffer.from("A")},
|
||||
{key: idB, value: Buffer.from("B")},
|
||||
]);
|
||||
expect(await repo.get(p2, idA)).toBe("A");
|
||||
expect(await repo.get(p2, idB)).toBe("B");
|
||||
await repo.putMany(p, values);
|
||||
|
||||
for (const [index, id] of ids.entries()) {
|
||||
await expect(repo.get(p, id)).resolves.toEqual(values[index]);
|
||||
}
|
||||
});
|
||||
|
||||
it("values and valuesStream for single and multiple prefixes", async () => {
|
||||
const p1 = 7;
|
||||
const p2 = 8;
|
||||
await repo.put(p1, "p1-1");
|
||||
await repo.put(p1, "p1-2");
|
||||
await repo.put(p2, "p2-1");
|
||||
it("putManyBinary store correctly", async () => {
|
||||
const p = 3;
|
||||
const ids = [0, 10, 4, 20];
|
||||
const values = [testData[p][0], testData[p][10], testData[p][4], testData[p][20]];
|
||||
const valuesBinaries = values.map((v) => ({key: v.column, value: testPrefixedType.serialize(v)}));
|
||||
|
||||
// Single prefix
|
||||
await expect(repo.values(p1)).resolves.toEqual(["p1-1", "p1-2"]);
|
||||
await repo.putManyBinary(p, valuesBinaries);
|
||||
|
||||
// Multiple prefixes preserve provided order
|
||||
await expect(repo.values([p2, p1])).resolves.toEqual(["p2-1", "p1-1", "p1-2"]);
|
||||
|
||||
const fromStream: string[] = [];
|
||||
for await (const v of repo.valuesStream([p2, p1])) fromStream.push(v);
|
||||
expect(fromStream).toEqual(["p2-1", "p1-1", "p1-2"]);
|
||||
});
|
||||
|
||||
it("valuesStreamBinary and valuesBinary decode prefix and id", async () => {
|
||||
const p = 10;
|
||||
const id1 = Buffer.from([0x01]);
|
||||
const id2 = Buffer.from([0x02]);
|
||||
await repo.putManyBinary(p, [
|
||||
{key: id1, value: Buffer.from("v1")},
|
||||
{key: id2, value: Buffer.from("v2")},
|
||||
]);
|
||||
|
||||
const list = await repo.valuesBinary(p);
|
||||
expect(list.map((e) => ({p: e.prefix, id: Buffer.from(e.id), v: Buffer.from(e.value).toString("utf8")}))).toEqual([
|
||||
{p: p, id: id1, v: "v1"},
|
||||
{p: p, id: id2, v: "v2"},
|
||||
]);
|
||||
|
||||
const fromStream: {prefix: number; id: Uint8Array; value: Uint8Array}[] = [];
|
||||
for await (const e of repo.valuesStreamBinary(p)) fromStream.push(e);
|
||||
expect(
|
||||
fromStream.map((e) => ({p: e.prefix, id: Buffer.from(e.id), v: Buffer.from(e.value).toString("utf8")}))
|
||||
).toEqual([
|
||||
{p: p, id: id1, v: "v1"},
|
||||
{p: p, id: id2, v: "v2"},
|
||||
]);
|
||||
});
|
||||
|
||||
it("entriesStream and entriesStreamBinary provide decoded data", async () => {
|
||||
const p = 11;
|
||||
await repo.put(p, "a");
|
||||
await repo.put(p, "b");
|
||||
const idA = repo.getId("a");
|
||||
const idB = repo.getId("b");
|
||||
|
||||
const entries: {prefix: number; id: Uint8Array; value: string}[] = [];
|
||||
for await (const e of repo.entriesStream(p)) entries.push(e);
|
||||
expect(entries).toEqual([
|
||||
{prefix: p, id: idA, value: "a"},
|
||||
{prefix: p, id: idB, value: "b"},
|
||||
]);
|
||||
|
||||
const entriesBin: {prefix: number; id: Uint8Array; value: Uint8Array}[] = [];
|
||||
for await (const e of repo.entriesStreamBinary(p)) entriesBin.push(e);
|
||||
expect(
|
||||
entriesBin.map((e) => ({prefix: e.prefix, id: Buffer.from(e.id), value: Buffer.from(e.value).toString("utf8")}))
|
||||
).toEqual([
|
||||
{prefix: p, id: idA, value: "a"},
|
||||
{prefix: p, id: idB, value: "b"},
|
||||
]);
|
||||
for (const [index, id] of ids.entries()) {
|
||||
await expect(repo.get(p, id)).resolves.toEqual(values[index]);
|
||||
}
|
||||
});
|
||||
|
||||
it("deleteMany removes all for provided prefixes", async () => {
|
||||
const p1 = 20;
|
||||
const p2 = 21;
|
||||
await repo.put(p1, "a");
|
||||
await repo.put(p1, "b");
|
||||
await repo.put(p2, "c");
|
||||
// Put two columns for slot 20
|
||||
await repo.put(p1, testData[p1][1]);
|
||||
await repo.put(p1, testData[p1][2]);
|
||||
|
||||
// Put two columns for slot 21
|
||||
await repo.put(p2, testData[p2][1]);
|
||||
await repo.put(p2, testData[p2][2]);
|
||||
|
||||
await repo.deleteMany(p1);
|
||||
await expect(repo.values(p1)).resolves.toEqual([]);
|
||||
await expect(repo.values(p2)).resolves.toEqual(["c"]);
|
||||
await expect(fromAsync(repo.valuesStream(p1))).resolves.toEqual([]);
|
||||
await expect(fromAsync(repo.valuesStream(p2))).resolves.toEqual([testData[p2][1], testData[p2][2]]);
|
||||
|
||||
// Re-fill and delete both
|
||||
await repo.put(p1, "a");
|
||||
await repo.put(p1, "b");
|
||||
await repo.put(p2, "c");
|
||||
await repo.put(p1, testData[p1][1]);
|
||||
await repo.put(p1, testData[p1][2]);
|
||||
await repo.put(p2, testData[p2][1]);
|
||||
await repo.put(p2, testData[p2][2]);
|
||||
|
||||
await repo.deleteMany([p1, p2]);
|
||||
await expect(repo.values([p1, p2])).resolves.toEqual([]);
|
||||
await expect(fromAsync(repo.valuesStream(p1))).resolves.toEqual([]);
|
||||
await expect(fromAsync(repo.valuesStream(p2))).resolves.toEqual([]);
|
||||
});
|
||||
|
||||
it("keys returns decoded prefix+id with filters and options", async () => {
|
||||
const p1 = 30;
|
||||
const p2 = 31;
|
||||
const id1 = Buffer.from([0x01]);
|
||||
const id2 = Buffer.from([0x02]);
|
||||
const id3 = Buffer.from([0x03]);
|
||||
describe("valuesStream,valuesStreamBinary,entriesStream,entriesStreamBinary", () => {
|
||||
it("valuesStream should fetch for single and multiple prefixes", async () => {
|
||||
const p1 = 7;
|
||||
const p2 = 8;
|
||||
|
||||
await repo.putManyBinary(p1, [
|
||||
{key: id1, value: Buffer.from("v1")},
|
||||
{key: id2, value: Buffer.from("v2")},
|
||||
{key: id3, value: Buffer.from("v3")},
|
||||
]);
|
||||
await repo.putManyBinary(p2, [{key: Buffer.from([0x01]), value: Buffer.from("w1")}]);
|
||||
await repo.putMany(p1, testData[p1]);
|
||||
await repo.putMany(p2, testData[p2]);
|
||||
|
||||
const gte = repo.encodeKeyRaw(p1, Buffer.from([0x00]));
|
||||
const lte = repo.encodeKeyRaw(p1, Buffer.from([0xff]));
|
||||
const keys = await repo.keys({gte, lte});
|
||||
expect(keys).toEqual([
|
||||
{prefix: p1, id: id1},
|
||||
{prefix: p1, id: id2},
|
||||
{prefix: p1, id: id3},
|
||||
]);
|
||||
// Single prefix
|
||||
const result1 = await fromAsync(repo.valuesStream(p1));
|
||||
|
||||
const revLimit = await repo.keys({gte, lte, reverse: true, limit: 2});
|
||||
expect(revLimit).toEqual([
|
||||
{prefix: p1, id: id3},
|
||||
{prefix: p1, id: id2},
|
||||
]);
|
||||
expect(result1).toHaveLength(numberOfColumns);
|
||||
// For this test we don't emphasis on the order
|
||||
expect(result1.sort((r1, r2) => r1.column - r2.column)).toEqual(testData[p1]);
|
||||
|
||||
// Multiple prefix
|
||||
const result2 = await fromAsync(repo.valuesStream([p1, p2]));
|
||||
|
||||
// For this test we don't emphasis on the order
|
||||
expect(result2).toHaveLength(numberOfColumns * 2);
|
||||
expect(result2.sort((r1, r2) => r1.column - r2.column)).toEqual(
|
||||
[...testData[p1], ...testData[p2]].sort((r1, r2) => r1.column - r2.column)
|
||||
);
|
||||
});
|
||||
|
||||
it("valuesStreamBinary should fetch for single and multiple prefixes", async () => {
|
||||
const p1 = 7;
|
||||
const dataBinaryP1 = testData[p1].map((c) => ({id: c.column, prefix: p1, value: testPrefixedType.serialize(c)}));
|
||||
const p2 = 8;
|
||||
const dataBinaryP2 = testData[p2].map((c) => ({id: c.column, prefix: p2, value: testPrefixedType.serialize(c)}));
|
||||
|
||||
await repo.putMany(p1, testData[p1]);
|
||||
await repo.putMany(p2, testData[p2]);
|
||||
|
||||
// Single prefix
|
||||
const result1 = await fromAsync(repo.valuesStreamBinary(p1));
|
||||
|
||||
expect(result1).toHaveLength(numberOfColumns);
|
||||
// For this test we don't emphasis on the order
|
||||
expect(result1.sort((r1, r2) => r1.id - r2.id)).toEqual(dataBinaryP1);
|
||||
|
||||
// Multiple prefix
|
||||
const result2 = await fromAsync(repo.valuesStreamBinary([p1, p2]));
|
||||
|
||||
// For this test we don't emphasis on the order
|
||||
expect(result2).toHaveLength(numberOfColumns * 2);
|
||||
expect(result2.sort((r1, r2) => r1.id - r2.id)).toEqual(
|
||||
[...dataBinaryP1, ...dataBinaryP2].sort((r1, r2) => r1.id - r2.id)
|
||||
);
|
||||
});
|
||||
|
||||
it("entriesStream should fetch for single and multiple prefixes", async () => {
|
||||
const p1 = 7;
|
||||
const entriesDataP1 = testData[p1].map((c) => ({id: c.column, prefix: p1, value: c}));
|
||||
const p2 = 8;
|
||||
const entriesDataP2 = testData[p2].map((c) => ({id: c.column, prefix: p2, value: c}));
|
||||
|
||||
await repo.putMany(p1, testData[p1]);
|
||||
await repo.putMany(p2, testData[p2]);
|
||||
|
||||
// Single prefix
|
||||
const result1 = await fromAsync(repo.entriesStream(p1));
|
||||
|
||||
expect(result1).toHaveLength(numberOfColumns);
|
||||
// For this test we don't emphasis on the order
|
||||
expect(result1.sort((r1, r2) => r1.id - r2.id)).toEqual(entriesDataP1);
|
||||
|
||||
// Multiple prefix
|
||||
const result2 = await fromAsync(repo.entriesStream([p1, p2]));
|
||||
|
||||
// For this test we don't emphasis on the order
|
||||
expect(result2).toHaveLength(numberOfColumns * 2);
|
||||
expect(result2.sort((r1, r2) => r1.id - r2.id)).toEqual(
|
||||
[...entriesDataP1, ...entriesDataP2].sort((r1, r2) => r1.id - r2.id)
|
||||
);
|
||||
});
|
||||
|
||||
it("entriesStreamBinary should fetch for single and multiple prefixes", async () => {
|
||||
const p1 = 7;
|
||||
const entriesDataP1 = testData[p1].map((c) => ({id: c.column, prefix: p1, value: testPrefixedType.serialize(c)}));
|
||||
const p2 = 8;
|
||||
const entriesDataP2 = testData[p2].map((c) => ({id: c.column, prefix: p2, value: testPrefixedType.serialize(c)}));
|
||||
|
||||
await repo.putMany(p1, testData[p1]);
|
||||
await repo.putMany(p2, testData[p2]);
|
||||
|
||||
// Single prefix
|
||||
const result1 = await fromAsync(repo.entriesStreamBinary(p1));
|
||||
|
||||
expect(result1).toHaveLength(numberOfColumns);
|
||||
// For this test we don't emphasis on the order
|
||||
expect(result1.sort((r1, r2) => r1.id - r2.id)).toEqual(entriesDataP1);
|
||||
|
||||
// Multiple prefix
|
||||
const result2 = await fromAsync(repo.entriesStreamBinary([p1, p2]));
|
||||
|
||||
// For this test we don't emphasis on the order
|
||||
expect(result2).toHaveLength(numberOfColumns * 2);
|
||||
expect(result2.sort((r1, r2) => r1.id - r2.id)).toEqual(
|
||||
[...entriesDataP1, ...entriesDataP2].sort((r1, r2) => r1.id - r2.id)
|
||||
);
|
||||
});
|
||||
|
||||
it("values should return in correct order of id for single prefix", async () => {
|
||||
const p1 = 7;
|
||||
const valuesP1 = [testData[p1][10], testData[p1][11], testData[p1][12]];
|
||||
await repo.putMany(p1, valuesP1);
|
||||
|
||||
const result1 = await fromAsync(repo.valuesStream(p1));
|
||||
|
||||
expect(result1.map((v) => v.column)).toEqual([10, 11, 12]);
|
||||
});
|
||||
|
||||
it("values should return in correct order of id for multiple prefixes", async () => {
|
||||
const p1 = 7;
|
||||
const valuesP1 = [testData[p1][10], testData[p1][11], testData[p1][12]];
|
||||
const p2 = 10;
|
||||
const valuesP2 = [testData[p2][9], testData[p2][19], testData[p2][21]];
|
||||
|
||||
await repo.putMany(p1, valuesP1);
|
||||
await repo.putMany(p2, valuesP2);
|
||||
|
||||
const result1 = await fromAsync(repo.valuesStream([p1, p2]));
|
||||
|
||||
expect(result1.map((v) => v.column)).toEqual([10, 11, 12, 9, 19, 21]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("keys", () => {
|
||||
const getRangeDataInclusive = (slot: number, start: number, end: number) =>
|
||||
Array.from({length: end - start + 1}, (_, index) => ({id: start + index, prefix: slot}));
|
||||
|
||||
it("keys returns decoded prefix+id with filters and options", async () => {
|
||||
const slot1 = 30;
|
||||
const slot2 = 31;
|
||||
const column1 = 15;
|
||||
const column2 = 258;
|
||||
const column3 = 289;
|
||||
|
||||
await repo.putMany(slot1, [testData[slot1][column1], testData[slot1][column2], testData[slot1][column3]]);
|
||||
await repo.putMany(slot2, [testData[slot2][column1], testData[slot2][column2], testData[slot2][column3]]);
|
||||
|
||||
const gte = {prefix: slot1, id: 0};
|
||||
const lte = {prefix: slot1, id: 400};
|
||||
const keys = await repo.keys({gte, lte});
|
||||
|
||||
expect(keys).toEqual([
|
||||
{prefix: slot1, id: column1},
|
||||
{prefix: slot1, id: column2},
|
||||
{prefix: slot1, id: column3},
|
||||
]);
|
||||
|
||||
const revLimit = await repo.keys({gte, lte, reverse: true, limit: 2});
|
||||
expect(revLimit).toEqual([
|
||||
{prefix: slot1, id: column3},
|
||||
{prefix: slot1, id: column2},
|
||||
]);
|
||||
});
|
||||
|
||||
it("should fetch correct range across single prefix", async () => {
|
||||
const slot1 = 30;
|
||||
const slot2 = 48;
|
||||
const getRangeDataInclusive = (slot: number, start: number, end: number) =>
|
||||
Array.from({length: end - start + 1}, (_, index) => ({id: start + index, prefix: slot}));
|
||||
|
||||
await repo.putMany(slot1, testData[slot1]);
|
||||
await repo.putMany(slot2, testData[slot2]);
|
||||
|
||||
// Across single byte
|
||||
const result1 = await repo.keys({gt: {prefix: slot1, id: 5}, lt: {prefix: slot1, id: 17}});
|
||||
expect(result1).toEqual(getRangeDataInclusive(slot1, 6, 16));
|
||||
|
||||
// Across higher byte
|
||||
const result2 = await repo.keys({gt: {prefix: slot1, id: 257}, lt: {prefix: slot1, id: 266}});
|
||||
expect(result2).toEqual(getRangeDataInclusive(slot1, 258, 265));
|
||||
|
||||
// Across multiple byte
|
||||
const result3 = await repo.keys({gt: {prefix: slot1, id: 17}, lt: {prefix: slot1, id: 275}});
|
||||
expect(result3).toEqual(getRangeDataInclusive(slot1, 18, 274));
|
||||
});
|
||||
|
||||
it("should fetch correct range across multiple prefix", async () => {
|
||||
const slot1 = 30;
|
||||
const slot2 = 31;
|
||||
const slot3 = 32;
|
||||
|
||||
await repo.putMany(slot1, testData[slot1]);
|
||||
await repo.putMany(slot2, testData[slot2]);
|
||||
await repo.putMany(slot3, testData[slot3]);
|
||||
|
||||
const query = {gt: {prefix: slot1, id: 5}, lt: {prefix: slot3, id: 17}};
|
||||
const result = [
|
||||
...getRangeDataInclusive(slot1, 6, 299),
|
||||
...getRangeDataInclusive(slot2, 0, 299),
|
||||
...getRangeDataInclusive(slot3, 0, 16),
|
||||
].sort((a, b) => a.id - b.id);
|
||||
|
||||
const result1 = await repo.keys(query);
|
||||
expect(result1.sort((r1, r2) => r1.id - r2.id)).toEqual(result);
|
||||
});
|
||||
|
||||
it("should fetch keys in correct order", async () => {
|
||||
const slot = 30;
|
||||
|
||||
await repo.putMany(slot, testData[slot]);
|
||||
|
||||
const gte = {prefix: slot, id: 19};
|
||||
const lte = {prefix: slot, id: 23};
|
||||
const keys = await repo.keys({gte, lte});
|
||||
|
||||
expect(keys).toEqual([
|
||||
{prefix: slot, id: 19},
|
||||
{prefix: slot, id: 20},
|
||||
{prefix: slot, id: 21},
|
||||
{prefix: slot, id: 22},
|
||||
{prefix: slot, id: 23},
|
||||
]);
|
||||
});
|
||||
|
||||
it("should fetch keys in correct order across multiple prefixes", async () => {
|
||||
const slot1 = 30;
|
||||
const slot2 = 31;
|
||||
await repo.putMany(slot1, testData[slot1]);
|
||||
await repo.putMany(slot2, testData[slot2]);
|
||||
|
||||
const query = {gt: {prefix: slot1, id: 295}, lt: {prefix: slot2, id: 4}};
|
||||
|
||||
const keys = await repo.keys(query);
|
||||
|
||||
expect(keys).toEqual([
|
||||
{prefix: slot1, id: 296},
|
||||
{prefix: slot1, id: 297},
|
||||
{prefix: slot1, id: 298},
|
||||
{prefix: slot1, id: 299},
|
||||
{prefix: slot2, id: 0},
|
||||
{prefix: slot2, id: 1},
|
||||
{prefix: slot2, id: 2},
|
||||
{prefix: slot2, id: 3},
|
||||
]);
|
||||
});
|
||||
|
||||
it("should not cross the bucket boundary towards lower bucket", async () => {
|
||||
const repo2 = new TestPrefixedRepository(db, bucket - 1, bucketId);
|
||||
const slot = 30;
|
||||
await repo.putMany(slot, testData[slot]);
|
||||
await repo2.putMany(slot, testData[slot]);
|
||||
|
||||
const query = {lt: {prefix: slot, id: 4}};
|
||||
|
||||
const keys = await repo.keys(query);
|
||||
|
||||
expect(keys).toEqual([
|
||||
{prefix: slot, id: 0},
|
||||
{prefix: slot, id: 1},
|
||||
{prefix: slot, id: 2},
|
||||
{prefix: slot, id: 3},
|
||||
]);
|
||||
});
|
||||
|
||||
it("should not cross the bucket boundary towards higher bucket", async () => {
|
||||
const repo2 = new TestPrefixedRepository(db, bucket + 1, bucketId);
|
||||
const slot = 30;
|
||||
await repo.putMany(slot, testData[slot]);
|
||||
await repo2.putMany(slot, testData[slot]);
|
||||
|
||||
const query = {gt: {prefix: slot, id: 295}};
|
||||
|
||||
const keys = await repo.keys(query);
|
||||
|
||||
expect(keys).toEqual([
|
||||
{prefix: slot, id: 296},
|
||||
{prefix: slot, id: 297},
|
||||
{prefix: slot, id: 298},
|
||||
{prefix: slot, id: 299},
|
||||
]);
|
||||
});
|
||||
|
||||
it("should not cross the bucket boundary with multiple prefixes", async () => {
|
||||
const repo2 = new TestPrefixedRepository(db, bucket - 1, bucketId);
|
||||
const slot1 = 30;
|
||||
const slot2 = 31;
|
||||
await repo.putMany(slot1, testData[slot1]);
|
||||
await repo2.putMany(slot1, testData[slot1]);
|
||||
|
||||
await repo.putMany(slot2, testData[slot2]);
|
||||
await repo2.putMany(slot2, testData[slot2]);
|
||||
|
||||
const query = {lt: {prefix: slot2, id: 4}};
|
||||
|
||||
const keys = await repo.keys(query);
|
||||
|
||||
expect(keys).toEqual([...getRangeDataInclusive(slot1, 0, 299), ...getRangeDataInclusive(slot2, 0, 3)]);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
54
packages/db/test/unit/util.test.ts
Normal file
54
packages/db/test/unit/util.test.ts
Normal file
@@ -0,0 +1,54 @@
|
||||
import {describe, it, expect} from "vitest";
|
||||
import {
|
||||
encodeNumberForDbKey,
|
||||
decodeNumberForDbKey,
|
||||
encodeStringForDbKey,
|
||||
decodeStringForDbKey,
|
||||
} from "../../src/index.js";
|
||||
|
||||
describe("encode/decode number for DB key", () => {
|
||||
it("roundtrips with fixed byte size (2 bytes)", () => {
|
||||
const value = 0xffee;
|
||||
const size = 2;
|
||||
|
||||
const encoded = encodeNumberForDbKey(value, size);
|
||||
expect(encoded).toEqual(Buffer.from([0xff, 0xee]));
|
||||
|
||||
const decoded = decodeNumberForDbKey(encoded, size);
|
||||
expect(decoded).toBe(value);
|
||||
});
|
||||
|
||||
it("roundtrips with fixed byte size (4 bytes)", () => {
|
||||
const value = 0xdeadbeef >>> 0;
|
||||
const size = 4;
|
||||
|
||||
const encoded = encodeNumberForDbKey(value, size);
|
||||
expect(encoded).toEqual(Buffer.from([0xde, 0xad, 0xbe, 0xef]));
|
||||
|
||||
const decoded = decodeNumberForDbKey(encoded, size);
|
||||
expect(decoded).toBe(value);
|
||||
});
|
||||
|
||||
it("decodes only the first N bytes (ignores trailing)", () => {
|
||||
const size = 2;
|
||||
const base = encodeNumberForDbKey(1, size);
|
||||
const withTrailing = Buffer.concat([base, Buffer.from([0x99, 0x99])]);
|
||||
const decoded = decodeNumberForDbKey(withTrailing, size);
|
||||
expect(decoded).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("encode/decode string for DB key", () => {
|
||||
it("encodes UTF-8 string", () => {
|
||||
const value = "hello";
|
||||
const encoded = encodeStringForDbKey(value);
|
||||
expect(encoded).toEqual(Buffer.from(value, "utf-8"));
|
||||
});
|
||||
|
||||
it("roundtrips Unicode strings", () => {
|
||||
const value = "héłłø 🌟";
|
||||
const encoded = encodeStringForDbKey(value);
|
||||
const decoded = decodeStringForDbKey(encoded);
|
||||
expect(decoded).toBe(value);
|
||||
});
|
||||
});
|
||||
@@ -8,6 +8,7 @@ export * from "./diff.js";
|
||||
export * from "./err.js";
|
||||
export * from "./errors.js";
|
||||
export * from "./format.js";
|
||||
export * from "./iterator.js";
|
||||
export * from "./logger.js";
|
||||
export * from "./map.js";
|
||||
export * from "./math.js";
|
||||
|
||||
10
packages/utils/src/iterator.ts
Normal file
10
packages/utils/src/iterator.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
// ES2024 onward we have Array.fromAsync which does exactly this
|
||||
// This function is here as wrapper to be deleted later when we upgrade
|
||||
// minimum nodejs requirement to > 22
|
||||
export async function fromAsync<T>(iter: AsyncIterable<T>): Promise<T[]> {
|
||||
const arr: T[] = [];
|
||||
for await (const v of iter) {
|
||||
arr.push(v);
|
||||
}
|
||||
return arr;
|
||||
}
|
||||
Reference in New Issue
Block a user