Compare commits

...

2 Commits

Author SHA1 Message Date
Dan Cline
f57b37442f wip: add dm-delay simulation 2026-04-22 15:03:17 -04:00
Dan Cline
a7a8ed0ae1 wip: add latency simulation 2026-04-22 13:40:53 -04:00
7 changed files with 342 additions and 54 deletions

View File

@@ -28,6 +28,127 @@ mkdir -p "$OUTPUT_DIR"
LOG="${OUTPUT_DIR}/node.log"
RETH_SCOPE="${RETH_SCOPE:-reth-bench.scope}"
DELAY_DM_NAME=""
mount_fs() {
local source="$1"
local target="$2"
local fstype="$3"
local options="$4"
local -a mount_cmd=(sudo mount -t "$fstype")
if [ -n "$options" ]; then
mount_cmd+=(-o "$options")
fi
mount_cmd+=("$source" "$target")
"${mount_cmd[@]}"
}
wait_for_block_device() {
local device="$1"
local timeout_s="${2:-5}"
for _ in $(seq 1 "$timeout_s"); do
if [ -b "$device" ]; then
return 0
fi
sleep 1
done
return 1
}
resolve_dm_block_device() {
local device_name="$1"
local devno
devno="$(sudo dmsetup info -C --noheadings --separator : -o major,minor "$device_name" 2>/dev/null | tr -d '[:space:]')"
if [ -z "$devno" ]; then
return 1
fi
printf '/dev/block/%s\n' "$devno"
}
setup_delay_target() {
local read_delay_ms="${BENCH_DISK_READ_DELAY_MS:-0}"
local write_delay_ms="${BENCH_DISK_WRITE_DELAY_MS:-0}"
read_delay_ms="${read_delay_ms:-0}"
write_delay_ms="${write_delay_ms:-0}"
if ! [[ "$read_delay_ms" =~ ^[0-9]+$ && "$write_delay_ms" =~ ^[0-9]+$ ]]; then
echo "::error::BENCH_DISK_READ_DELAY_MS and BENCH_DISK_WRITE_DELAY_MS must be non-negative integers"
exit 1
fi
if [ "$read_delay_ms" = "0" ] && [ "$write_delay_ms" = "0" ]; then
return
fi
local mount_source
local mount_fstype
local mount_options
mount_source="$(findmnt -no SOURCE --target "$SCHELK_MOUNT")"
mount_fstype="$(findmnt -no FSTYPE --target "$SCHELK_MOUNT")"
mount_options="$(findmnt -no OPTIONS --target "$SCHELK_MOUNT")"
local sectors
sectors="$(sudo blockdev --getsz "$mount_source")"
DELAY_DM_NAME="reth-bench-delay-${LABEL}-$$"
local delayed_device=""
local table="0 ${sectors} delay ${mount_source} 0 ${read_delay_ms} ${mount_source} 0 ${write_delay_ms}"
echo "Applying dm-delay to ${mount_source}: read=${read_delay_ms}ms write=${write_delay_ms}ms"
sudo umount "$SCHELK_MOUNT"
if ! sudo dmsetup create "$DELAY_DM_NAME" --addnodeoncreate --table "$table"; then
DELAY_DM_NAME=""
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
echo "::error::Failed to create dm-delay target"
exit 1
fi
delayed_device="$(resolve_dm_block_device "$DELAY_DM_NAME" || true)"
if [ -z "$delayed_device" ]; then
local failed_device_name="$DELAY_DM_NAME"
sudo dmsetup remove "$DELAY_DM_NAME" || true
DELAY_DM_NAME=""
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
echo "::error::Failed to resolve delayed device node for ${failed_device_name}"
exit 1
fi
if ! wait_for_block_device "$delayed_device"; then
delayed_device="$(resolve_dm_block_device "$DELAY_DM_NAME" || true)"
if ! wait_for_block_device "$delayed_device"; then
sudo dmsetup remove "$DELAY_DM_NAME" || true
DELAY_DM_NAME=""
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
echo "::error::Delayed device node did not appear: ${delayed_device}"
exit 1
fi
fi
if ! mount_fs "$delayed_device" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"; then
sudo dmsetup remove "$DELAY_DM_NAME" || true
DELAY_DM_NAME=""
mount_fs "$mount_source" "$SCHELK_MOUNT" "$mount_fstype" "$mount_options"
echo "::error::Failed to remount delayed device"
exit 1
fi
}
teardown_delay_target() {
if [ -z "$DELAY_DM_NAME" ]; then
return
fi
echo "Removing dm-delay target ${DELAY_DM_NAME}"
if mountpoint -q "$SCHELK_MOUNT"; then
sudo umount "$SCHELK_MOUNT" || true
fi
sudo dmsetup remove "$DELAY_DM_NAME" || sudo dmsetup remove -f "$DELAY_DM_NAME" || true
DELAY_DM_NAME=""
}
cleanup() {
kill "$TAIL_PID" 2>/dev/null || true
@@ -76,6 +197,7 @@ cleanup() {
sudo systemctl reset-failed "$RETH_SCOPE" 2>/dev/null || true
# Fix ownership of reth-created files (reth runs as root)
sudo chown -R "$(id -un):$(id -gn)" "$OUTPUT_DIR" 2>/dev/null || true
teardown_delay_target
# Let schelk recover the mounted volume in place so dm-era can restore only
# the changed blocks and clean up its own state.
sudo schelk recover -y --kill || true
@@ -92,6 +214,7 @@ sudo schelk recover -y --kill || sudo schelk full-recover -y || true
# Mount
sudo schelk mount -y || true
setup_delay_target
if [ ! -d "$DATADIR/db" ] || [ ! -d "$DATADIR/static_files" ]; then
echo "::error::Failed to mount benchmark datadir at ${DATADIR}"
ls -la "$SCHELK_MOUNT" || true

View File

@@ -51,6 +51,16 @@ on:
required: false
default: ""
type: string
disk_read_delay_ms:
description: "Apply dm-delay read latency to the schelk volume (milliseconds)"
required: false
default: ""
type: string
disk_write_delay_ms:
description: "Apply dm-delay write latency to the schelk volume (milliseconds)"
required: false
default: ""
type: string
baseline_args:
description: "Extra CLI args for the baseline reth node"
required: false
@@ -125,6 +135,8 @@ jobs:
big-blocks: ${{ steps.args.outputs.big-blocks }}
bal: ${{ steps.args.outputs.bal }}
wait-time: ${{ steps.args.outputs.wait-time }}
disk-read-delay-ms: ${{ steps.args.outputs.disk-read-delay-ms }}
disk-write-delay-ms: ${{ steps.args.outputs.disk-write-delay-ms }}
baseline-args: ${{ steps.args.outputs.baseline-args }}
feature-args: ${{ steps.args.outputs.feature-args }}
abba: ${{ steps.args.outputs.abba }}
@@ -158,7 +170,7 @@ jobs:
script: |
const validBalModes = new Set(['false', 'true', 'feature', 'baseline']);
const validSlackModes = new Set(['always', 'on-win', 'on-error', 'never']);
const usage = '`@decofe bench [blocks=N] [big-blocks[=true|false]] [bal=true|false|feature|baseline] [warmup=N] [baseline=REF] [feature=REF] [samply] [slack=always|on-win|on-error|never] [cores=N] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]`';
const usage = '`@decofe bench [blocks=N] [big-blocks[=true|false]] [bal=true|false|feature|baseline] [warmup=N] [baseline=REF] [feature=REF] [samply] [slack=always|on-win|on-error|never] [cores=N] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [disk-read-delay-ms=N] [disk-write-delay-ms=N] [baseline-args="..."] [feature-args="..."]`';
let pr, actor, blocks, warmup, baseline, feature, samply, cores, bigBlocks, bal;
let explicitWarmup = false;
@@ -177,6 +189,8 @@ jobs:
var abba = '${{ github.event.inputs.abba }}' !== 'false' ? 'true' : 'false';
var otlp = '${{ github.event.inputs.otlp }}' !== 'false' ? 'true' : 'false';
var waitTime = '${{ github.event.inputs.wait_time }}' || '';
var diskReadDelayMs = '${{ github.event.inputs.disk_read_delay_ms }}' || '';
var diskWriteDelayMs = '${{ github.event.inputs.disk_write_delay_ms }}' || '';
var baselineNodeArgs = '${{ github.event.inputs.baseline_args }}' || '';
var featureNodeArgs = '${{ github.event.inputs.feature_args }}' || '';
@@ -204,8 +218,9 @@ jobs:
const boolDefaultTrue = new Set(['abba', 'otlp']);
const enumArgs = new Map([['bal', validBalModes], ['slack', validSlackModes]]);
const durationArgs = new Set(['wait-time']);
const optionalIntArgs = new Set(['disk-read-delay-ms', 'disk-write-delay-ms']);
const stringArgs = new Set(['baseline-args', 'feature-args']);
const defaults = { blocks: '500', warmup: '200', baseline: '', feature: '', samply: 'false', slack: 'always', 'big-blocks': 'false', bal: 'false', cores: '0', abba: 'true', otlp: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' };
const defaults = { blocks: '500', warmup: '200', baseline: '', feature: '', samply: 'false', slack: 'always', 'big-blocks': 'false', bal: 'false', cores: '0', abba: 'true', otlp: 'true', 'wait-time': '', 'disk-read-delay-ms': '', 'disk-write-delay-ms': '', 'baseline-args': '', 'feature-args': '' };
const unknown = [];
const invalid = [];
const args = body.replace(/^(?:@decofe|derek) bench\s*/, '');
@@ -244,6 +259,12 @@ jobs:
} else {
invalid.push(`\`${key}=${value}\` (must be a duration like 500ms, 1s, 2m)`);
}
} else if (optionalIntArgs.has(key)) {
if (!/^\d+$/.test(value)) {
invalid.push(`\`${key}=${value}\` (must be a non-negative integer in milliseconds)`);
} else {
defaults[key] = value;
}
} else if (enumArgs.has(key)) {
if (enumArgs.get(key).has(value)) {
defaults[key] = value;
@@ -295,6 +316,8 @@ jobs:
var abba = defaults.abba;
var otlp = defaults.otlp;
var waitTime = defaults['wait-time'];
var diskReadDelayMs = defaults['disk-read-delay-ms'];
var diskWriteDelayMs = defaults['disk-write-delay-ms'];
var baselineNodeArgs = defaults['baseline-args'];
var featureNodeArgs = defaults['feature-args'];
}
@@ -352,6 +375,8 @@ jobs:
core.setOutput('big-blocks', bigBlocks);
core.setOutput('bal', bal);
core.setOutput('wait-time', waitTime);
core.setOutput('disk-read-delay-ms', diskReadDelayMs);
core.setOutput('disk-write-delay-ms', diskWriteDelayMs);
core.setOutput('baseline-args', baselineNodeArgs);
core.setOutput('feature-args', featureNodeArgs);
core.setOutput('abba', abba);
@@ -427,12 +452,16 @@ jobs:
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
const waitTimeVal = '${{ steps.args.outputs.wait-time }}';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const diskReadDelayVal = '${{ steps.args.outputs.disk-read-delay-ms }}';
const diskReadDelayNote = diskReadDelayVal && diskReadDelayVal !== '0' ? `, disk-read-delay: \`${diskReadDelayVal}ms\`` : '';
const diskWriteDelayVal = '${{ steps.args.outputs.disk-write-delay-ms }}';
const diskWriteDelayNote = diskWriteDelayVal && diskWriteDelayVal !== '0' ? `, disk-write-delay: \`${diskWriteDelayVal}ms\`` : '';
const baselineArgsVal = '${{ steps.args.outputs.baseline-args }}';
const baselineArgsNote = baselineArgsVal ? `, baseline-args: \`${baselineArgsVal}\`` : '';
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${diskReadDelayNote}${diskWriteDelayNote}${baselineArgsNote}${featureArgsNote}`;
const { data: comment } = await github.rest.issues.createComment({
owner: context.repo.owner,
@@ -471,12 +500,16 @@ jobs:
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
const waitTimeVal = '${{ steps.args.outputs.wait-time }}';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const diskReadDelayVal = '${{ steps.args.outputs.disk-read-delay-ms }}';
const diskReadDelayNote = diskReadDelayVal && diskReadDelayVal !== '0' ? `, disk-read-delay: \`${diskReadDelayVal}ms\`` : '';
const diskWriteDelayVal = '${{ steps.args.outputs.disk-write-delay-ms }}';
const diskWriteDelayNote = diskWriteDelayVal && diskWriteDelayVal !== '0' ? `, disk-write-delay: \`${diskWriteDelayVal}ms\`` : '';
const baselineArgsVal = '${{ steps.args.outputs.baseline-args }}';
const baselineArgsNote = baselineArgsVal ? `, baseline-args: \`${baselineArgsVal}\`` : '';
const featureArgsVal = '${{ steps.args.outputs.feature-args }}';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`;
const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${diskReadDelayNote}${diskWriteDelayNote}${baselineArgsNote}${featureArgsNote}`;
const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;
const numRunners = parseInt(process.env.BENCH_RUNNERS) || 1;
@@ -544,6 +577,8 @@ jobs:
BENCH_BIG_BLOCKS: ${{ needs.reth-bench-ack.outputs.big-blocks }}
BENCH_BAL: ${{ needs.reth-bench-ack.outputs.bal }}
BENCH_WAIT_TIME: ${{ needs.reth-bench-ack.outputs.wait-time }}
BENCH_DISK_READ_DELAY_MS: ${{ needs.reth-bench-ack.outputs.disk-read-delay-ms }}
BENCH_DISK_WRITE_DELAY_MS: ${{ needs.reth-bench-ack.outputs.disk-write-delay-ms }}
BENCH_BASELINE_ARGS: ${{ needs.reth-bench-ack.outputs.baseline-args }}
BENCH_FEATURE_ARGS: ${{ needs.reth-bench-ack.outputs.feature-args }}
BENCH_ABBA: ${{ needs.reth-bench-ack.outputs.abba }}
@@ -618,12 +653,16 @@ jobs:
const otlpNote = !otlpEnabled ? ', otlp: `disabled`' : '';
const waitTimeVal = process.env.BENCH_WAIT_TIME || '';
const waitTimeNote = waitTimeVal ? `, wait-time: \`${waitTimeVal}\`` : '';
const diskReadDelayVal = process.env.BENCH_DISK_READ_DELAY_MS || '';
const diskReadDelayNote = diskReadDelayVal && diskReadDelayVal !== '0' ? `, disk-read-delay: \`${diskReadDelayVal}ms\`` : '';
const diskWriteDelayVal = process.env.BENCH_DISK_WRITE_DELAY_MS || '';
const diskWriteDelayNote = diskWriteDelayVal && diskWriteDelayVal !== '0' ? `, disk-write-delay: \`${diskWriteDelayVal}ms\`` : '';
const baselineArgsVal = process.env.BENCH_BASELINE_ARGS || '';
const baselineArgsNote = baselineArgsVal ? `, baseline-args: \`${baselineArgsVal}\`` : '';
const featureArgsVal = process.env.BENCH_FEATURE_ARGS || '';
const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : '';
const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`;
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`);
core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${slackNote}${balNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${diskReadDelayNote}${diskWriteDelayNote}${baselineArgsNote}${featureArgsNote}`);
const { buildBody } = require('./.github/scripts/bench-update-status.js');
await github.rest.issues.updateComment({
@@ -684,7 +723,7 @@ jobs:
echo "$HOME/.local/bin" >> "$GITHUB_PATH"
echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
missing=()
for cmd in schelk cpupower taskset stdbuf python3 curl make uv jq; do
for cmd in schelk cpupower taskset stdbuf python3 curl make uv jq dmsetup; do
command -v "$cmd" &>/dev/null || missing+=("$cmd")
done
if [ ${#missing[@]} -gt 0 ]; then

View File

@@ -60,6 +60,12 @@ pub struct DatabaseArgs {
value_parser = value_parser!(SyncMode),
)]
pub sync_mode: Option<SyncMode>,
/// Hidden benchmark/debug knob that adds an artificial delay to each MDBX read operation.
///
/// This is primarily intended for storage-latency simulation when kernel/device-level
/// shaping cannot represent sub-millisecond delays.
#[arg(long = "db.read-delay", value_parser = humantime::parse_duration, hide = true)]
pub read_delay: Option<Duration>,
}
impl DatabaseArgs {
@@ -89,6 +95,7 @@ impl DatabaseArgs {
.with_growth_step(self.growth_step)
.with_max_readers(self.max_readers)
.with_sync_mode(self.sync_mode)
.with_read_delay(self.read_delay)
}
}
@@ -435,4 +442,12 @@ mod tests {
CommandParser::<DatabaseArgs>::try_parse_from(["reth", "--db.sync-mode", "ultra-fast"]);
assert!(result.is_err());
}
#[test]
fn test_command_parser_with_read_delay() {
let cmd =
CommandParser::<DatabaseArgs>::try_parse_from(["reth", "--db.read-delay", "500us"])
.unwrap();
assert_eq!(cmd.args.read_delay, Some(Duration::from_micros(500)));
}
}

View File

@@ -31,6 +31,8 @@ pub struct Cursor<K: TransactionKind, T: Table> {
buf: Vec<u8>,
/// Reference to metric handles in the DB environment. If `None`, metrics are not recorded.
metrics: Option<Arc<DatabaseEnvMetrics>>,
/// Optional artificial delay applied to MDBX read operations.
read_delay: Option<std::time::Duration>,
/// Phantom data to enforce encoding/decoding.
_dbi: PhantomData<T>,
}
@@ -39,8 +41,9 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
pub(crate) const fn new_with_metrics(
inner: reth_libmdbx::Cursor<K>,
metrics: Option<Arc<DatabaseEnvMetrics>>,
read_delay: Option<std::time::Duration>,
) -> Self {
Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData }
Self { inner, buf: Vec::new(), metrics, read_delay, _dbi: PhantomData }
}
/// If `self.metrics` is `Some(...)`, record a metric with the provided operation and value
@@ -59,6 +62,15 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
f(self)
}
}
fn execute_with_read_delay<R>(
&mut self,
f: impl FnOnce(&mut reth_libmdbx::Cursor<K>) -> R,
) -> R {
let result = f(&mut self.inner);
apply_read_delay(self.read_delay);
result
}
}
/// Decodes a `(key, value)` pair from the database.
@@ -90,39 +102,43 @@ macro_rules! compress_to_buf_or_ref {
impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
fn first(&mut self) -> PairResult<T> {
decode::<T>(self.inner.first())
decode::<T>(self.execute_with_read_delay(|cursor| cursor.first()))
}
fn seek_exact(&mut self, key: <T as Table>::Key) -> PairResult<T> {
decode::<T>(self.inner.set_key(key.encode().as_ref()))
decode::<T>(self.execute_with_read_delay(|cursor| cursor.set_key(key.encode().as_ref())))
}
fn seek(&mut self, key: <T as Table>::Key) -> PairResult<T> {
decode::<T>(self.inner.set_range(key.encode().as_ref()))
decode::<T>(self.execute_with_read_delay(|cursor| cursor.set_range(key.encode().as_ref())))
}
fn next(&mut self) -> PairResult<T> {
decode::<T>(self.inner.next())
decode::<T>(self.execute_with_read_delay(|cursor| cursor.next()))
}
fn prev(&mut self) -> PairResult<T> {
decode::<T>(self.inner.prev())
decode::<T>(self.execute_with_read_delay(|cursor| cursor.prev()))
}
fn last(&mut self) -> PairResult<T> {
decode::<T>(self.inner.last())
decode::<T>(self.execute_with_read_delay(|cursor| cursor.last()))
}
fn current(&mut self) -> PairResult<T> {
decode::<T>(self.inner.get_current())
decode::<T>(self.execute_with_read_delay(|cursor| cursor.get_current()))
}
fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, T, Self>, DatabaseError> {
let start = if let Some(start_key) = start_key {
decode::<T>(self.inner.set_range(start_key.encode().as_ref())).transpose()
} else {
self.first().transpose()
};
let start =
if let Some(start_key) = start_key {
decode::<T>(self.execute_with_read_delay(|cursor| {
cursor.set_range(start_key.encode().as_ref())
}))
.transpose()
} else {
self.first().transpose()
};
Ok(Walker::new(self, start))
}
@@ -132,11 +148,13 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
range: impl RangeBounds<T::Key>,
) -> Result<RangeWalker<'_, T, Self>, DatabaseError> {
let start = match range.start_bound().cloned() {
Bound::Included(key) => self.inner.set_range(key.encode().as_ref()),
Bound::Included(key) => {
self.execute_with_read_delay(|cursor| cursor.set_range(key.encode().as_ref()))
}
Bound::Excluded(_key) => {
unreachable!("Rust doesn't allow for Bound::Excluded in starting bounds");
}
Bound::Unbounded => self.inner.first(),
Bound::Unbounded => self.execute_with_read_delay(|cursor| cursor.first()),
};
let start = decode::<T>(start).transpose();
Ok(RangeWalker::new(self, start, range.end_bound().cloned()))
@@ -146,12 +164,15 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
&mut self,
start_key: Option<T::Key>,
) -> Result<ReverseWalker<'_, T, Self>, DatabaseError> {
let start = if let Some(start_key) = start_key {
decode::<T>(self.inner.set_range(start_key.encode().as_ref()))
} else {
self.last()
}
.transpose();
let start =
if let Some(start_key) = start_key {
decode::<T>(self.execute_with_read_delay(|cursor| {
cursor.set_range(start_key.encode().as_ref())
}))
} else {
self.last()
}
.transpose();
Ok(ReverseWalker::new(self, start))
}
@@ -160,18 +181,17 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
/// Returns the previous `(key, value)` pair of a DUPSORT table.
fn prev_dup(&mut self) -> PairResult<T> {
decode::<T>(self.inner.prev_dup())
decode::<T>(self.execute_with_read_delay(|cursor| cursor.prev_dup()))
}
/// Returns the next `(key, value)` pair of a DUPSORT table.
fn next_dup(&mut self) -> PairResult<T> {
decode::<T>(self.inner.next_dup())
decode::<T>(self.execute_with_read_delay(|cursor| cursor.next_dup()))
}
/// Returns the last `value` of the current duplicate `key`.
fn last_dup(&mut self) -> ValueOnlyResult<T> {
self.inner
.last_dup()
self.execute_with_read_delay(|cursor| cursor.last_dup())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
.transpose()
@@ -179,13 +199,12 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
/// Returns the next `(key, value)` pair skipping the duplicates.
fn next_no_dup(&mut self) -> PairResult<T> {
decode::<T>(self.inner.next_nodup())
decode::<T>(self.execute_with_read_delay(|cursor| cursor.next_nodup()))
}
/// Returns the next `value` of a duplicate `key`.
fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
self.inner
.next_dup()
self.execute_with_read_delay(|cursor| cursor.next_dup())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_value::<T>)
.transpose()
@@ -196,11 +215,12 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
key: <T as Table>::Key,
subkey: <T as DupSort>::SubKey,
) -> ValueOnlyResult<T> {
self.inner
.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
.transpose()
self.execute_with_read_delay(|cursor| {
cursor.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
})
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
.transpose()
}
/// Depending on its arguments, returns an iterator starting at:
@@ -216,25 +236,26 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
let start = match (key, subkey) {
(Some(key), Some(subkey)) => {
let encoded_key = key.encode();
self.inner
.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
self.execute_with_read_delay(|cursor| {
cursor.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
})
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
}
(Some(key), None) => {
let encoded_key = key.encode();
self.inner
.set(encoded_key.as_ref())
self.execute_with_read_delay(|cursor| cursor.set(encoded_key.as_ref()))
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
}
(None, Some(subkey)) => {
if let Some((key, _)) = self.first()? {
let encoded_key = key.encode();
self.inner
.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
self.execute_with_read_delay(|cursor| {
cursor.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
})
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
} else {
Some(Err(DatabaseError::Read(MDBXError::NotFound.into())))
}
@@ -363,7 +384,7 @@ impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
mod tests {
use crate::{
mdbx::{DatabaseArguments, DatabaseEnv, DatabaseEnvKind},
tables::StorageChangeSets,
tables::{self, StorageChangeSets},
Database,
};
use alloy_primitives::{address, Address, B256, U256};
@@ -374,6 +395,7 @@ mod tests {
transaction::{DbTx, DbTxMut},
};
use reth_primitives_traits::StorageEntry;
use std::time::{Duration, Instant};
use tempfile::TempDir;
fn create_test_db() -> DatabaseEnv {
@@ -463,4 +485,27 @@ mod tests {
assert_eq!(copied_value, expected_value);
}
}
#[test]
fn read_delay_applies_to_cursor_reads() {
let path = TempDir::new().unwrap();
let mut db = DatabaseEnv::open(
path.path(),
DatabaseEnvKind::RW,
DatabaseArguments::new(ClientVersion::default())
.with_read_delay(Some(Duration::from_millis(2))),
)
.unwrap();
db.create_tables().unwrap();
let tx = db.tx_mut().unwrap();
tx.put::<tables::HeaderNumbers>(B256::ZERO, 1).unwrap();
tx.commit().unwrap();
let tx = db.tx().unwrap();
let mut cursor = tx.cursor_read::<tables::HeaderNumbers>().unwrap();
let start = Instant::now();
let _ = cursor.first().unwrap();
assert!(start.elapsed() >= Duration::from_millis(2));
}
}

View File

@@ -119,6 +119,8 @@ pub struct DatabaseArguments {
/// environments). Choose `SafeNoSync` if performance is more important and occasional data
/// loss is acceptable (e.g., testing or ephemeral data).
sync_mode: SyncMode,
/// Optional artificial delay applied to every MDBX read operation.
read_delay: Option<std::time::Duration>,
}
impl Default for DatabaseArguments {
@@ -143,6 +145,7 @@ impl DatabaseArguments {
exclusive: None,
max_readers: None,
sync_mode: SyncMode::Durable,
read_delay: None,
}
}
@@ -190,6 +193,12 @@ impl DatabaseArguments {
self
}
/// Sets an artificial delay applied to each MDBX read operation.
pub const fn with_read_delay(mut self, read_delay: Option<std::time::Duration>) -> Self {
self.read_delay = read_delay;
self
}
/// Configures the database growth step in bytes.
pub const fn with_growth_step(mut self, growth_step: Option<usize>) -> Self {
if let Some(growth_step) = growth_step {
@@ -254,6 +263,8 @@ pub struct DatabaseEnv {
dbis: Arc<HashMap<&'static str, ffi::MDBX_dbi>>,
/// Cache for metric handles. If `None`, metrics are not recorded.
metrics: Option<Arc<DatabaseEnvMetrics>>,
/// Optional artificial delay applied to every MDBX read operation.
read_delay: Option<std::time::Duration>,
/// Write lock for when dealing with a read-write environment.
_lock_file: Option<StorageLock>,
}
@@ -267,6 +278,7 @@ impl Database for DatabaseEnv {
self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.dbis.clone(),
self.metrics.clone(),
self.read_delay,
)
.map_err(|e| DatabaseError::InitTx(e.into()))
}
@@ -276,6 +288,7 @@ impl Database for DatabaseEnv {
self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.dbis.clone(),
self.metrics.clone(),
self.read_delay,
)
.map_err(|e| DatabaseError::InitTx(e.into()))
}
@@ -537,6 +550,7 @@ impl DatabaseEnv {
path: path.to_path_buf(),
dbis: Arc::default(),
metrics: None,
read_delay: args.read_delay,
_lock_file,
};

View File

@@ -40,6 +40,8 @@ pub struct Tx<K: TransactionKind> {
///
/// If [Some], then metrics are reported.
metrics_handler: Option<MetricsHandler<K>>,
/// Optional artificial delay applied to MDBX read operations.
read_delay: Option<Duration>,
}
impl<K: TransactionKind> Tx<K> {
@@ -50,6 +52,7 @@ impl<K: TransactionKind> Tx<K> {
inner: Transaction<K>,
dbis: Arc<HashMap<&'static str, MDBX_dbi>>,
env_metrics: Option<Arc<DatabaseEnvMetrics>>,
read_delay: Option<Duration>,
) -> reth_libmdbx::Result<Self> {
let metrics_handler = env_metrics
.map(|env_metrics| {
@@ -59,7 +62,7 @@ impl<K: TransactionKind> Tx<K> {
Ok(handler)
})
.transpose()?;
Ok(Self { inner, dbis, metrics_handler })
Ok(Self { inner, dbis, metrics_handler, read_delay })
}
/// Returns a reference to the inner libmdbx transaction.
@@ -101,6 +104,7 @@ impl<K: TransactionKind> Tx<K> {
Ok(Cursor::new_with_metrics(
inner,
self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
self.read_delay,
))
}
@@ -170,6 +174,19 @@ impl<K: TransactionKind> Tx<K> {
f(&self.inner)
}
}
fn execute_with_read_delay<T: Table, R>(
&self,
operation: Operation,
value_size: Option<usize>,
f: impl FnOnce(&Transaction<K>) -> R,
) -> R {
self.execute_with_operation_metric::<T, _>(operation, value_size, |tx| {
let result = f(tx);
apply_read_delay(self.read_delay);
result
})
}
}
#[derive(Debug)]
@@ -294,7 +311,7 @@ impl<K: TransactionKind> DbTx for Tx<K> {
&self,
key: &<T::Key as Encode>::Encoded,
) -> Result<Option<T::Value>, DatabaseError> {
self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
self.execute_with_read_delay::<T, _>(Operation::Get, None, |tx| {
tx.get(self.get_dbi::<T>()?, key.as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
@@ -447,7 +464,11 @@ mod tests {
use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
use reth_libmdbx::MaxReadTransactionDuration;
use reth_storage_errors::db::DatabaseError;
use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
use std::{
sync::atomic::Ordering,
thread::sleep,
time::{Duration, Instant},
};
use tempfile::tempdir;
#[test]
@@ -498,4 +519,18 @@ mod tests {
// Backtrace is recorded.
assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
}
#[test]
fn read_delay_applies_to_get() {
let dir = tempdir().unwrap();
let args = DatabaseArguments::new(ClientVersion::default())
.with_read_delay(Some(Duration::from_millis(2)));
let mut db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap();
db.create_tables().unwrap();
let tx = db.tx().unwrap();
let start = Instant::now();
let _ = tx.get::<tables::Transactions>(0).unwrap();
assert!(start.elapsed() >= Duration::from_millis(2));
}
}

View File

@@ -4,7 +4,10 @@ use crate::{
table::{Decode, Decompress, Table, TableRow},
DatabaseError,
};
use std::borrow::Cow;
use std::{
borrow::Cow,
time::{Duration, Instant},
};
/// Helper function to decode a `(key, value)` pair.
pub(crate) fn decoder<'a, T>(
@@ -50,3 +53,17 @@ where
Cow::Owned(v) => Decompress::decompress_owned(v)?,
})
}
/// Applies an artificial read delay.
///
/// This uses a short busy-wait instead of `thread::sleep` so hidden benchmark knobs like
/// `--db.read-delay=500us` can approximate sub-millisecond latency without depending on OS sleep
/// granularity.
pub(crate) fn apply_read_delay(read_delay: Option<Duration>) {
let Some(read_delay) = read_delay.filter(|delay| !delay.is_zero()) else { return };
let start = Instant::now();
while start.elapsed() < read_delay {
std::hint::spin_loop();
}
}