mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
2 Commits
main
...
dan/latenc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f57b37442f | ||
|
|
a7a8ed0ae1 |
123
.github/scripts/bench-reth-run.sh
vendored
123
.github/scripts/bench-reth-run.sh
vendored
@@ -28,6 +28,127 @@ mkdir -p "$OUTPUT_DIR"
|
||||
LOG="${OUTPUT_DIR}/node.log"
|
||||
|
||||
RETH_SCOPE="${RETH_SCOPE:-reth-bench.scope}"
|
||||
DELAY_DM_NAME=""
|
||||
|
||||
mount_fs() {
|
||||
local source="$1"
|
||||
local target="$2"
|
||||
local fstype="$3"
|
||||
local options="$4"
|
||||
|
||||
local -a mount_cmd=(sudo mount -t "$fstype")
|
||||
if [ -n "$options" ]; then
|
||||
mount_cmd+=(-o "$options")
|
||||
fi
|
||||
mount_cmd+=("$source" "$target")
|
||||
"${mount_cmd[@]}"
|
||||
}
|
||||
|
||||
wait_for_block_device() {
|
||||
local device="$1"
|
||||
local timeout_s="${2:-5}"
|
||||
|
||||
for _ in $(seq 1 "$timeout_s"); do
|
||||
if [ -b "$device" ]; then
|
||||
return 0
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
resolve_dm_block_device() {
|
||||
local device_name="$1"
|
||||
local devno
|
||||
|
||||
devno="$(sudo dmsetup info -C --noheadings --separator : -o major,minor "$device_name" 2>/dev/null | tr -d '[:space:]')"
|
||||
if [ -z "$devno" ]; then
|
||||
return 1
|
||||
fi
|
||||
|
||||
printf '/dev/block/%s\n' "$devno"
|
||||
}
|
||||
|
||||
setup_delay_target() {
|
||||
local read_delay_ms="${BENCH_DISK_READ_DELAY_MS:-0}"
|
||||
local write_delay_ms="${BENCH_DISK_WRITE_DELAY_MS:-0}"
|
||||
read_delay_ms="${read_delay_ms:-0}"
|
||||
write_delay_ms="${write_delay_ms:-0}"
|
||||
|
||||
if ! [[ "$read_delay_ms" =~ ^[0-9]+$ && "$write_delay_ms" =~ ^[0-9]+$ ]]; then
|
||||
echo "::error::BENCH_DISK_READ_DELAY_MS and BENCH_DISK_WRITE_DELAY_MS must be non-negative integers"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$read_delay_ms" = "0" ] && [ "$write_delay_ms" = "0" ]; then
|
||||
return
|
||||
fi
|
||||
|
||||
local mount_source
|
||||
local mount_fstype
|
||||
local mount_options
|
||||
mount_source="$(findmnt -no SOURCE --target "$SCHELK_MOUNT")"
|
||||
mount_fstype="$(findmnt -no FSTYPE --target "$SCHELK_MOUNT")"
|
||||
mount_options="$(findmnt -no OPTIONS --target "$SCHELK_MOUNT")"
|
||||
local sectors
|
||||
sectors="$(sudo blockdev --getsz "$mount_source")"
|
||||
|
||||
DELAY_DM_NAME="reth-bench-delay-${LABEL}-$$"
|
||||
local delayed_device=""
|
||||
local table="0 ${sectors} delay ${mount_source} 0 ${read_delay_ms} ${mount_source} 0 ${write_delay_ms}"
|
||||
|
||||
echo "Applying dm-delay to ${mount_source}: read=${read_delay_ms}ms write=${write_delay_ms}ms"
|
||||
sudo umount "$SCHELK_MOUNT"
|
||||
if ! sudo dmsetup create "$DELAY_DM_NAME" --addnodeoncreate --table "$table"; then
|
||||
DELAY_DM_NAME=""
|
||||
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
|
||||
echo "::error::Failed to create dm-delay target"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
delayed_device="$(resolve_dm_block_device "$DELAY_DM_NAME" || true)"
|
||||
if [ -z "$delayed_device" ]; then
|
||||
local failed_device_name="$DELAY_DM_NAME"
|
||||
sudo dmsetup remove "$DELAY_DM_NAME" || true
|
||||
DELAY_DM_NAME=""
|
||||
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
|
||||
echo "::error::Failed to resolve delayed device node for ${failed_device_name}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! wait_for_block_device "$delayed_device"; then
|
||||
delayed_device="$(resolve_dm_block_device "$DELAY_DM_NAME" || true)"
|
||||
if ! wait_for_block_device "$delayed_device"; then
|
||||
sudo dmsetup remove "$DELAY_DM_NAME" || true
|
||||
DELAY_DM_NAME=""
|
||||
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
|
||||
echo "::error::Delayed device node did not appear: ${delayed_device}"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
if ! mount_fs "$delayed_device" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"; then
|
||||
sudo dmsetup remove "$DELAY_DM_NAME" || true
|
||||
DELAY_DM_NAME=""
|
||||
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
|
||||
echo "::error::Failed to remount delayed device"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
teardown_delay_target() {
|
||||
if [ -z "$DELAY_DM_NAME" ]; then
|
||||
return
|
||||
fi
|
||||
|
||||
echo "Removing dm-delay target ${DELAY_DM_NAME}"
|
||||
if mountpoint -q "$SCHELK_MOUNT"; then
|
||||
sudo umount "$SCHELK_MOUNT" || true
|
||||
fi
|
||||
sudo dmsetup remove "$DELAY_DM_NAME" || sudo dmsetup remove -f "$DELAY_DM_NAME" || true
|
||||
DELAY_DM_NAME=""
|
||||
}
|
||||
|
||||
cleanup() {
|
||||
kill "$TAIL_PID" 2>/dev/null || true
|
||||
@@ -76,6 +197,7 @@ cleanup() {
|
||||
sudo systemctl reset-failed "$RETH_SCOPE" 2>/dev/null || true
|
||||
# Fix ownership of reth-created files (reth runs as root)
|
||||
sudo chown -R "$(id -un):$(id -gn)" "$OUTPUT_DIR" 2>/dev/null || true
|
||||
teardown_delay_target
|
||||
# Let schelk recover the mounted volume in place so dm-era can restore only
|
||||
# the changed blocks and clean up its own state.
|
||||
sudo schelk recover -y --kill || true
|
||||
@@ -92,6 +214,7 @@ sudo schelk recover -y --kill || sudo schelk full-recover -y || true
|
||||
|
||||
# Mount
|
||||
sudo schelk mount -y || true
|
||||
setup_delay_target
|
||||
if [ ! -d "$DATADIR/db" ] || [ ! -d "$DATADIR/static_files" ]; then
|
||||
echo "::error::Failed to mount benchmark datadir at ${DATADIR}"
|
||||
ls -la "$SCHELK_MOUNT" || true
|
||||
|
||||
51
.github/workflows/bench.yml
vendored
51
.github/workflows/bench.yml
vendored
@@ -51,6 +51,16 @@ on:
|
||||
required: false
|
||||
default: ""
|
||||
type: string
|
||||
disk_read_delay_ms:
|
||||
description: "Apply dm-delay read latency to the schelk volume (milliseconds)"
|
||||
required: false
|
||||
default: ""
|
||||
type: string
|
||||
disk_write_delay_ms:
|
||||
description: "Apply dm-delay write latency to the schelk volume (milliseconds)"
|
||||
required: false
|
||||
default: ""
|
||||
type: string
|
||||
baseline_args:
|
||||
description: "Extra CLI args for the baseline reth node"
|
||||
required: false
|
||||
@@ -125,6 +135,8 @@ jobs:
|
||||
big-blocks: ${{ steps.args.outputs.big-blocks }}
|
||||
bal: ${{ steps.args.outputs.bal }}
|
||||
wait-time: ${{ steps.args.outputs.wait-time }}
|
||||
disk-read-delay-ms: ${{ steps.args.outputs.disk-read-delay-ms }}
|
||||
disk-write-delay-ms: ${{ steps.args.outputs.disk-write-delay-ms }}
|
||||
baseline-args: ${{ steps.args.outputs.baseline-args }}
|
||||
feature-args: ${{ steps.args.outputs.feature-args }}
|
||||
abba: ${{ steps.args.outputs.abba }}
|
||||
@@ -158,7 +170,7 @@ jobs:
|
||||
script: |
|
||||
const validBalModes = new Set(['false', 'true', 'feature', 'baseline']);
|
||||
const validSlackModes = new Set(['always', 'on-win', 'on-error', 'never']);
|
||||
const usage = '`@decofe bench [blocks=N] [big-blocks[=true|false]] [bal=true|false|feature|baseline] [warmup=N] [baseline=REF] [feature=REF] [samply] [slack=always|on-win|on-error|never] [cores=N] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]`';
|
||||
const usage = '`@decofe bench [blocks=N] [big-blocks[=true|false]] [bal=true|false|feature|baseline] [warmup=N] [baseline=REF] [feature=REF] [samply] [slack=always|on-win|on-error|never] [cores=N] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [disk-read-delay-ms=N] [disk-write-delay-ms=N] [baseline-args="..."] [feature-args="..."]`';
|
||||
let pr, actor, blocks, warmup, baseline, feature, samply, cores, bigBlocks, bal;
|
||||
let explicitWarmup = false;
|
||||
|
||||
@@ -177,6 +189,8 @@ jobs:
|
||||
var abba = '${{ github.event.inputs.abba }}' !== 'false' ? 'true' : 'false';
|
||||
var otlp = '${{ github.event.inputs.otlp }}' !== 'false' ? 'true' : 'false';
|
||||
var waitTime = '${{ github.event.inputs.wait_time }}' || '';
|
||||
var diskReadDelayMs = '${{ github.event.inputs.disk_read_delay_ms }}' || '';
|
||||
var diskWriteDelayMs = '${{ github.event.inputs.disk_write_delay_ms }}' || '';
|
||||
var baselineNodeArgs = '${{ github.event.inputs.baseline_args }}' || '';
|
||||
var featureNodeArgs = '${{ github.event.inputs.feature_args }}' || '';
|
||||
|
||||
@@ -204,8 +218,9 @@ jobs:
|
||||
const boolDefaultTrue = new Set(['abba', 'otlp']);
|
||||
const enumArgs = new Map([['bal', validBalModes], ['slack', validSlackModes]]);
|
||||
const durationArgs = new Set(['wait-time']);
|
||||
const optionalIntArgs = new Set(['disk-read-delay-ms', 'disk-write-delay-ms']);
|
||||
const stringArgs = new Set(['baseline-args', 'feature-args']);
|
||||
const defaults = { blocks: '500', warmup: '200', baseline: '', feature: '', samply: 'false', slack: 'always', 'big-blocks': 'false', bal: 'false', cores: '0', abba: 'true', otlp: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' };
|
||||
const defaults = { blocks: '500', warmup: '200', baseline: '', feature: '', samply: 'false', slack: 'always', 'big-blocks': 'false', bal: 'false', cores: '0', abba: 'true', otlp: 'true', 'wait-time': '', 'disk-read-delay-ms': '', 'disk-write-delay-ms': '', 'baseline-args': '', 'feature-args': '' };
|
||||
const unknown = [];
|
||||
const invalid = [];
|
||||
const args = body.replace(/^(?:@decofe|derek) bench\s*/, '');
|
||||
@@ -244,6 +259,12 @@ jobs:
|
||||
} else {
|
||||
invalid.push(`\`${key}=${value}\` (must be a duration like 500ms, 1s, 2m)`);
|
||||
}
|
||||
} else if (optionalIntArgs.has(key)) {
|
||||
if (!/^\d+$/.test(value)) {
|
||||
invalid.push(`\`${key}=${value}\` (must be a non-negative integer in milliseconds)`);
|
||||
} else {
|
||||
defaults[key] = value;
|
||||
}
|
||||
} else if (enumArgs.has(key)) {
|
||||
if (enumArgs.get(key).has(value)) {
|
||||
defaults[key] = value;
|
||||
@@ -295,6 +316,8 @@ jobs:
|
||||
var abba = defaults.abba;
|
||||
var otlp = defaults.otlp;
|
||||
var waitTime = defaults['wait-time'];
|
||||
var diskReadDelayMs = defaults['disk-read-delay-ms'];
|
||||
var diskWriteDelayMs = defaults['disk-write-delay-ms'];
|
||||
var baselineNodeArgs = defaults['baseline-args'];
|
||||
var featureNodeArgs = defaults['feature-args'];
|
||||
}
|
||||
@@ -352,6 +375,8 @@ jobs:
|
||||
core.setOutput('big-blocks', bigBlocks);
|
||||
core.setOutput('bal', bal);
|
||||
core.setOutput('wait-time', waitTime);
|
||||
core.setOutput('disk-read-delay-ms', diskReadDelayMs);
|
||||
core.setOutput('disk-write-delay-ms', diskWriteDelayMs);
|
||||
core.setOutput('baseline-args', baselineNodeArgs);
|
||||
core.setOutput('feature-args', featureNodeArgs);
|
||||
core.setOutput('abba', abba);
|
||||
@@ -427,12 +452,16 @@ jobs:
|
||||
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
|
||||
const waitTimeVal = '${{ steps.args.outputs.wait-time }}';
|
||||
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
|
||||
const diskReadDelayVal = '${{ steps.args.outputs.disk-read-delay-ms }}';
|
||||
const diskReadDelayNote = diskReadDelayVal && diskReadDelayVal !== '0' ? `, disk-read-delay: \`${diskReadDelayVal}ms\`` : '';
|
||||
const diskWriteDelayVal = '${{ steps.args.outputs.disk-write-delay-ms }}';
|
||||
const diskWriteDelayNote = diskWriteDelayVal && diskWriteDelayVal !== '0' ? `, disk-write-delay: \`${diskWriteDelayVal}ms\`` : '';
|
||||
const baselineArgsVal = '${{ steps.args.outputs.baseline-args }}';
|
||||
const baselineArgsNote = baselineArgsVal ? `, baseline-args: \`${baselineArgsVal}\`` : '';
|
||||
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
|
||||
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
|
||||
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
|
||||
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
|
||||
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${diskReadDelayNote}${diskWriteDelayNote}${baselineArgsNote}${featureArgsNote}`;
|
||||
|
||||
const { data: comment } = await github.rest.issues.createComment({
|
||||
owner: context.repo.owner,
|
||||
@@ -471,12 +500,16 @@ jobs:
|
||||
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
|
||||
const waitTimeVal = '${{ steps.args.outputs.wait-time }}';
|
||||
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
|
||||
const diskReadDelayVal = '${{ steps.args.outputs.disk-read-delay-ms }}';
|
||||
const diskReadDelayNote = diskReadDelayVal && diskReadDelayVal !== '0' ? `, disk-read-delay: \`${diskReadDelayVal}ms\`` : '';
|
||||
const diskWriteDelayVal = '${{ steps.args.outputs.disk-write-delay-ms }}';
|
||||
const diskWriteDelayNote = diskWriteDelayVal && diskWriteDelayVal !== '0' ? `, disk-write-delay: \`${diskWriteDelayVal}ms\`` : '';
|
||||
const baselineArgsVal = '${{ steps.args.outputs.baseline-args }}';
|
||||
const baselineArgsNote = baselineArgsVal ? `, baseline-args: \`${baselineArgsVal}\`` : '';
|
||||
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
|
||||
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
|
||||
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
|
||||
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
|
||||
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${diskReadDelayNote}${diskWriteDelayNote}${baselineArgsNote}${featureArgsNote}`;
|
||||
const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
|
||||
|
||||
const numRunners = parseInt(process.env.BENCH_RUNNERS) || 1;
|
||||
@@ -544,6 +577,8 @@ jobs:
|
||||
BENCH_BIG_BLOCKS: ${{ needs.reth-bench-ack.outputs.big-blocks }}
|
||||
BENCH_BAL: ${{ needs.reth-bench-ack.outputs.bal }}
|
||||
BENCH_WAIT_TIME: ${{ needs.reth-bench-ack.outputs.wait-time }}
|
||||
BENCH_DISK_READ_DELAY_MS: ${{ needs.reth-bench-ack.outputs.disk-read-delay-ms }}
|
||||
BENCH_DISK_WRITE_DELAY_MS: ${{ needs.reth-bench-ack.outputs.disk-write-delay-ms }}
|
||||
BENCH_BASELINE_ARGS: ${{ needs.reth-bench-ack.outputs.baseline-args }}
|
||||
BENCH_FEATURE_ARGS: ${{ needs.reth-bench-ack.outputs.feature-args }}
|
||||
BENCH_ABBA: ${{ needs.reth-bench-ack.outputs.abba }}
|
||||
@@ -618,12 +653,16 @@ jobs:
|
||||
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
|
||||
const waitTimeVal = process.env.BENCH_WAIT_TIME || '';
|
||||
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
|
||||
const diskReadDelayVal = process.env.BENCH_DISK_READ_DELAY_MS || '';
|
||||
const diskReadDelayNote = diskReadDelayVal && diskReadDelayVal !== '0' ? `, disk-read-delay: \`${diskReadDelayVal}ms\`` : '';
|
||||
const diskWriteDelayVal = process.env.BENCH_DISK_WRITE_DELAY_MS || '';
|
||||
const diskWriteDelayNote = diskWriteDelayVal && diskWriteDelayVal !== '0' ? `, disk-write-delay: \`${diskWriteDelayVal}ms\`` : '';
|
||||
const baselineArgsVal = process.env.BENCH_BASELINE_ARGS || '';
|
||||
const baselineArgsNote = baselineArgsVal ? `, baseline-args: \`${baselineArgsVal}\`` : '';
|
||||
const featureArgsVal = process.env.BENCH_FEATURE_ARGS || '';
|
||||
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
|
||||
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
|
||||
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`);
|
||||
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${diskReadDelayNote}${diskWriteDelayNote}${baselineArgsNote}${featureArgsNote}`);
|
||||
|
||||
const { buildBody } = require('./.github/scripts/bench-update-status.js');
|
||||
await github.rest.issues.updateComment({
|
||||
@@ -684,7 +723,7 @@ jobs:
|
||||
echo "$HOME/.local/bin" >> "$GITHUB_PATH"
|
||||
echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
|
||||
missing=()
|
||||
for cmd in schelk cpupower taskset stdbuf python3 curl make uv jq; do
|
||||
for cmd in schelk cpupower taskset stdbuf python3 curl make uv jq dmsetup; do
|
||||
command -v "$cmd" &>/dev/null || missing+=("$cmd")
|
||||
done
|
||||
if [ ${#missing[@]} -gt 0 ]; then
|
||||
|
||||
@@ -60,6 +60,12 @@ pub struct DatabaseArgs {
|
||||
value_parser = value_parser!(SyncMode),
|
||||
)]
|
||||
pub sync_mode: Option<SyncMode>,
|
||||
/// Hidden benchmark/debug knob that adds an artificial delay to each MDBX read operation.
|
||||
///
|
||||
/// This is primarily intended for storage-latency simulation when kernel/device-level
|
||||
/// shaping cannot represent sub-millisecond delays.
|
||||
#[arg(long = "db.read-delay", value_parser = humantime::parse_duration, hide = true)]
|
||||
pub read_delay: Option<Duration>,
|
||||
}
|
||||
|
||||
impl DatabaseArgs {
|
||||
@@ -89,6 +95,7 @@ impl DatabaseArgs {
|
||||
.with_growth_step(self.growth_step)
|
||||
.with_max_readers(self.max_readers)
|
||||
.with_sync_mode(self.sync_mode)
|
||||
.with_read_delay(self.read_delay)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -435,4 +442,12 @@ mod tests {
|
||||
CommandParser::<DatabaseArgs>::try_parse_from(["reth", "--db.sync-mode", "ultra-fast"]);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_command_parser_with_read_delay() {
|
||||
let cmd =
|
||||
CommandParser::<DatabaseArgs>::try_parse_from(["reth", "--db.read-delay", "500us"])
|
||||
.unwrap();
|
||||
assert_eq!(cmd.args.read_delay, Some(Duration::from_micros(500)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,8 @@ pub struct Cursor<K: TransactionKind, T: Table> {
|
||||
buf: Vec<u8>,
|
||||
/// Reference to metric handles in the DB environment. If `None`, metrics are not recorded.
|
||||
metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
/// Optional artificial delay applied to MDBX read operations.
|
||||
read_delay: Option<std::time::Duration>,
|
||||
/// Phantom data to enforce encoding/decoding.
|
||||
_dbi: PhantomData<T>,
|
||||
}
|
||||
@@ -39,8 +41,9 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
|
||||
pub(crate) const fn new_with_metrics(
|
||||
inner: reth_libmdbx::Cursor<K>,
|
||||
metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
read_delay: Option<std::time::Duration>,
|
||||
) -> Self {
|
||||
Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData }
|
||||
Self { inner, buf: Vec::new(), metrics, read_delay, _dbi: PhantomData }
|
||||
}
|
||||
|
||||
/// If `self.metrics` is `Some(...)`, record a metric with the provided operation and value
|
||||
@@ -59,6 +62,15 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
|
||||
f(self)
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_with_read_delay<R>(
|
||||
&mut self,
|
||||
f: impl FnOnce(&mut reth_libmdbx::Cursor<K>) -> R,
|
||||
) -> R {
|
||||
let result = f(&mut self.inner);
|
||||
apply_read_delay(self.read_delay);
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes a `(key, value)` pair from the database.
|
||||
@@ -90,39 +102,43 @@ macro_rules! compress_to_buf_or_ref {
|
||||
|
||||
impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
|
||||
fn first(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.first())
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| cursor.first()))
|
||||
}
|
||||
|
||||
fn seek_exact(&mut self, key: <T as Table>::Key) -> PairResult<T> {
|
||||
decode::<T>(self.inner.set_key(key.encode().as_ref()))
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| cursor.set_key(key.encode().as_ref())))
|
||||
}
|
||||
|
||||
fn seek(&mut self, key: <T as Table>::Key) -> PairResult<T> {
|
||||
decode::<T>(self.inner.set_range(key.encode().as_ref()))
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| cursor.set_range(key.encode().as_ref())))
|
||||
}
|
||||
|
||||
fn next(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.next())
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| cursor.next()))
|
||||
}
|
||||
|
||||
fn prev(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.prev())
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| cursor.prev()))
|
||||
}
|
||||
|
||||
fn last(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.last())
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| cursor.last()))
|
||||
}
|
||||
|
||||
fn current(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.get_current())
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| cursor.get_current()))
|
||||
}
|
||||
|
||||
fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, T, Self>, DatabaseError> {
|
||||
let start = if let Some(start_key) = start_key {
|
||||
decode::<T>(self.inner.set_range(start_key.encode().as_ref())).transpose()
|
||||
} else {
|
||||
self.first().transpose()
|
||||
};
|
||||
let start =
|
||||
if let Some(start_key) = start_key {
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| {
|
||||
cursor.set_range(start_key.encode().as_ref())
|
||||
}))
|
||||
.transpose()
|
||||
} else {
|
||||
self.first().transpose()
|
||||
};
|
||||
|
||||
Ok(Walker::new(self, start))
|
||||
}
|
||||
@@ -132,11 +148,13 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
|
||||
range: impl RangeBounds<T::Key>,
|
||||
) -> Result<RangeWalker<'_, T, Self>, DatabaseError> {
|
||||
let start = match range.start_bound().cloned() {
|
||||
Bound::Included(key) => self.inner.set_range(key.encode().as_ref()),
|
||||
Bound::Included(key) => {
|
||||
self.execute_with_read_delay(|cursor| cursor.set_range(key.encode().as_ref()))
|
||||
}
|
||||
Bound::Excluded(_key) => {
|
||||
unreachable!("Rust doesn't allow for Bound::Excluded in starting bounds");
|
||||
}
|
||||
Bound::Unbounded => self.inner.first(),
|
||||
Bound::Unbounded => self.execute_with_read_delay(|cursor| cursor.first()),
|
||||
};
|
||||
let start = decode::<T>(start).transpose();
|
||||
Ok(RangeWalker::new(self, start, range.end_bound().cloned()))
|
||||
@@ -146,12 +164,15 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
|
||||
&mut self,
|
||||
start_key: Option<T::Key>,
|
||||
) -> Result<ReverseWalker<'_, T, Self>, DatabaseError> {
|
||||
let start = if let Some(start_key) = start_key {
|
||||
decode::<T>(self.inner.set_range(start_key.encode().as_ref()))
|
||||
} else {
|
||||
self.last()
|
||||
}
|
||||
.transpose();
|
||||
let start =
|
||||
if let Some(start_key) = start_key {
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| {
|
||||
cursor.set_range(start_key.encode().as_ref())
|
||||
}))
|
||||
} else {
|
||||
self.last()
|
||||
}
|
||||
.transpose();
|
||||
|
||||
Ok(ReverseWalker::new(self, start))
|
||||
}
|
||||
@@ -160,18 +181,17 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
|
||||
impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
|
||||
/// Returns the previous `(key, value)` pair of a DUPSORT table.
|
||||
fn prev_dup(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.prev_dup())
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| cursor.prev_dup()))
|
||||
}
|
||||
|
||||
/// Returns the next `(key, value)` pair of a DUPSORT table.
|
||||
fn next_dup(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.next_dup())
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| cursor.next_dup()))
|
||||
}
|
||||
|
||||
/// Returns the last `value` of the current duplicate `key`.
|
||||
fn last_dup(&mut self) -> ValueOnlyResult<T> {
|
||||
self.inner
|
||||
.last_dup()
|
||||
self.execute_with_read_delay(|cursor| cursor.last_dup())
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(decode_one::<T>)
|
||||
.transpose()
|
||||
@@ -179,13 +199,12 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
|
||||
|
||||
/// Returns the next `(key, value)` pair skipping the duplicates.
|
||||
fn next_no_dup(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.next_nodup())
|
||||
decode::<T>(self.execute_with_read_delay(|cursor| cursor.next_nodup()))
|
||||
}
|
||||
|
||||
/// Returns the next `value` of a duplicate `key`.
|
||||
fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
|
||||
self.inner
|
||||
.next_dup()
|
||||
self.execute_with_read_delay(|cursor| cursor.next_dup())
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(decode_value::<T>)
|
||||
.transpose()
|
||||
@@ -196,11 +215,12 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
|
||||
key: <T as Table>::Key,
|
||||
subkey: <T as DupSort>::SubKey,
|
||||
) -> ValueOnlyResult<T> {
|
||||
self.inner
|
||||
.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(decode_one::<T>)
|
||||
.transpose()
|
||||
self.execute_with_read_delay(|cursor| {
|
||||
cursor.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
|
||||
})
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(decode_one::<T>)
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Depending on its arguments, returns an iterator starting at:
|
||||
@@ -216,25 +236,26 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
|
||||
let start = match (key, subkey) {
|
||||
(Some(key), Some(subkey)) => {
|
||||
let encoded_key = key.encode();
|
||||
self.inner
|
||||
.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
|
||||
self.execute_with_read_delay(|cursor| {
|
||||
cursor.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
|
||||
})
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
|
||||
}
|
||||
(Some(key), None) => {
|
||||
let encoded_key = key.encode();
|
||||
self.inner
|
||||
.set(encoded_key.as_ref())
|
||||
self.execute_with_read_delay(|cursor| cursor.set(encoded_key.as_ref()))
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
|
||||
}
|
||||
(None, Some(subkey)) => {
|
||||
if let Some((key, _)) = self.first()? {
|
||||
let encoded_key = key.encode();
|
||||
self.inner
|
||||
.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
|
||||
self.execute_with_read_delay(|cursor| {
|
||||
cursor.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
|
||||
})
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
|
||||
} else {
|
||||
Some(Err(DatabaseError::Read(MDBXError::NotFound.into())))
|
||||
}
|
||||
@@ -363,7 +384,7 @@ impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
|
||||
mod tests {
|
||||
use crate::{
|
||||
mdbx::{DatabaseArguments, DatabaseEnv, DatabaseEnvKind},
|
||||
tables::StorageChangeSets,
|
||||
tables::{self, StorageChangeSets},
|
||||
Database,
|
||||
};
|
||||
use alloy_primitives::{address, Address, B256, U256};
|
||||
@@ -374,6 +395,7 @@ mod tests {
|
||||
transaction::{DbTx, DbTxMut},
|
||||
};
|
||||
use reth_primitives_traits::StorageEntry;
|
||||
use std::time::{Duration, Instant};
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn create_test_db() -> DatabaseEnv {
|
||||
@@ -463,4 +485,27 @@ mod tests {
|
||||
assert_eq!(copied_value, expected_value);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_delay_applies_to_cursor_reads() {
|
||||
let path = TempDir::new().unwrap();
|
||||
let mut db = DatabaseEnv::open(
|
||||
path.path(),
|
||||
DatabaseEnvKind::RW,
|
||||
DatabaseArguments::new(ClientVersion::default())
|
||||
.with_read_delay(Some(Duration::from_millis(2))),
|
||||
)
|
||||
.unwrap();
|
||||
db.create_tables().unwrap();
|
||||
|
||||
let tx = db.tx_mut().unwrap();
|
||||
tx.put::<tables::HeaderNumbers>(B256::ZERO, 1).unwrap();
|
||||
tx.commit().unwrap();
|
||||
|
||||
let tx = db.tx().unwrap();
|
||||
let mut cursor = tx.cursor_read::<tables::HeaderNumbers>().unwrap();
|
||||
let start = Instant::now();
|
||||
let _ = cursor.first().unwrap();
|
||||
assert!(start.elapsed() >= Duration::from_millis(2));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,6 +119,8 @@ pub struct DatabaseArguments {
|
||||
/// environments). Choose `SafeNoSync` if performance is more important and occasional data
|
||||
/// loss is acceptable (e.g., testing or ephemeral data).
|
||||
sync_mode: SyncMode,
|
||||
/// Optional artificial delay applied to every MDBX read operation.
|
||||
read_delay: Option<std::time::Duration>,
|
||||
}
|
||||
|
||||
impl Default for DatabaseArguments {
|
||||
@@ -143,6 +145,7 @@ impl DatabaseArguments {
|
||||
exclusive: None,
|
||||
max_readers: None,
|
||||
sync_mode: SyncMode::Durable,
|
||||
read_delay: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,6 +193,12 @@ impl DatabaseArguments {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets an artificial delay applied to each MDBX read operation.
|
||||
pub const fn with_read_delay(mut self, read_delay: Option<std::time::Duration>) -> Self {
|
||||
self.read_delay = read_delay;
|
||||
self
|
||||
}
|
||||
|
||||
/// Configures the database growth step in bytes.
|
||||
pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
|
||||
if let Some(growth_step) = growth_step {
|
||||
@@ -254,6 +263,8 @@ pub struct DatabaseEnv {
|
||||
dbis: Arc<HashMap<&'static str, ffi::MDBX_dbi>>,
|
||||
/// Cache for metric handles. If `None`, metrics are not recorded.
|
||||
metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
/// Optional artificial delay applied to every MDBX read operation.
|
||||
read_delay: Option<std::time::Duration>,
|
||||
/// Write lock for when dealing with a read-write environment.
|
||||
_lock_file: Option<StorageLock>,
|
||||
}
|
||||
@@ -267,6 +278,7 @@ impl Database for DatabaseEnv {
|
||||
self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
|
||||
self.dbis.clone(),
|
||||
self.metrics.clone(),
|
||||
self.read_delay,
|
||||
)
|
||||
.map_err(|e| DatabaseError::InitTx(e.into()))
|
||||
}
|
||||
@@ -276,6 +288,7 @@ impl Database for DatabaseEnv {
|
||||
self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
|
||||
self.dbis.clone(),
|
||||
self.metrics.clone(),
|
||||
self.read_delay,
|
||||
)
|
||||
.map_err(|e| DatabaseError::InitTx(e.into()))
|
||||
}
|
||||
@@ -537,6 +550,7 @@ impl DatabaseEnv {
|
||||
path: path.to_path_buf(),
|
||||
dbis: Arc::default(),
|
||||
metrics: None,
|
||||
read_delay: args.read_delay,
|
||||
_lock_file,
|
||||
};
|
||||
|
||||
|
||||
@@ -40,6 +40,8 @@ pub struct Tx<K: TransactionKind> {
|
||||
///
|
||||
/// If [Some], then metrics are reported.
|
||||
metrics_handler: Option<MetricsHandler<K>>,
|
||||
/// Optional artificial delay applied to MDBX read operations.
|
||||
read_delay: Option<Duration>,
|
||||
}
|
||||
|
||||
impl<K: TransactionKind> Tx<K> {
|
||||
@@ -50,6 +52,7 @@ impl<K: TransactionKind> Tx<K> {
|
||||
inner: Transaction<K>,
|
||||
dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
|
||||
env_metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
read_delay: Option<Duration>,
|
||||
) -> reth_libmdbx::Result<Self> {
|
||||
let metrics_handler = env_metrics
|
||||
.map(|env_metrics| {
|
||||
@@ -59,7 +62,7 @@ impl<K: TransactionKind> Tx<K> {
|
||||
Ok(handler)
|
||||
})
|
||||
.transpose()?;
|
||||
Ok(Self { inner, dbis, metrics_handler })
|
||||
Ok(Self { inner, dbis, metrics_handler, read_delay })
|
||||
}
|
||||
|
||||
/// Returns a reference to the inner libmdbx transaction.
|
||||
@@ -101,6 +104,7 @@ impl<K: TransactionKind> Tx<K> {
|
||||
Ok(Cursor::new_with_metrics(
|
||||
inner,
|
||||
self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
|
||||
self.read_delay,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -170,6 +174,19 @@ impl<K: TransactionKind> Tx<K> {
|
||||
f(&self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_with_read_delay<T: Table, R>(
|
||||
&self,
|
||||
operation: Operation,
|
||||
value_size: Option<usize>,
|
||||
f: impl FnOnce(&Transaction<K>) -> R,
|
||||
) -> R {
|
||||
self.execute_with_operation_metric::<T, _>(operation, value_size, |tx| {
|
||||
let result = f(tx);
|
||||
apply_read_delay(self.read_delay);
|
||||
result
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -294,7 +311,7 @@ impl<K: TransactionKind> DbTx for Tx<K> {
|
||||
&self,
|
||||
key: &<T::Key as Encode>::Encoded,
|
||||
) -> Result<Option<T::Value>, DatabaseError> {
|
||||
self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
|
||||
self.execute_with_read_delay::<T, _>(Operation::Get, None, |tx| {
|
||||
tx.get(self.get_dbi::<T>()?, key.as_ref())
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(decode_one::<T>)
|
||||
@@ -447,7 +464,11 @@ mod tests {
|
||||
use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
|
||||
use reth_libmdbx::MaxReadTransactionDuration;
|
||||
use reth_storage_errors::db::DatabaseError;
|
||||
use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
|
||||
use std::{
|
||||
sync::atomic::Ordering,
|
||||
thread::sleep,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
@@ -498,4 +519,18 @@ mod tests {
|
||||
// Backtrace is recorded.
|
||||
assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_delay_applies_to_get() {
|
||||
let dir = tempdir().unwrap();
|
||||
let args = DatabaseArguments::new(ClientVersion::default())
|
||||
.with_read_delay(Some(Duration::from_millis(2)));
|
||||
let mut db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap();
|
||||
db.create_tables().unwrap();
|
||||
|
||||
let tx = db.tx().unwrap();
|
||||
let start = Instant::now();
|
||||
let _ = tx.get::<tables::Transactions>(0).unwrap();
|
||||
assert!(start.elapsed() >= Duration::from_millis(2));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,10 @@ use crate::{
|
||||
table::{Decode, Decompress, Table, TableRow},
|
||||
DatabaseError,
|
||||
};
|
||||
use std::borrow::Cow;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
/// Helper function to decode a `(key, value)` pair.
|
||||
pub(crate) fn decoder<'a, T>(
|
||||
@@ -50,3 +53,17 @@ where
|
||||
Cow::Owned(v) => Decompress::decompress_owned(v)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Applies an artificial read delay.
|
||||
///
|
||||
/// This uses a short busy-wait instead of `thread::sleep` so hidden benchmark knobs like
|
||||
/// `--db.read-delay=500us` can approximate sub-millisecond latency without depending on OS sleep
|
||||
/// granularity.
|
||||
pub(crate) fn apply_read_delay(read_delay: Option<Duration>) {
|
||||
let Some(read_delay) = read_delay.filter(|delay| !delay.is_zero()) else { return };
|
||||
|
||||
let start = Instant::now();
|
||||
while start.elapsed() < read_delay {
|
||||
std::hint::spin_loop();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user