Require tip option for stages

This commit is contained in:
Artem Vorotnikov
2022-04-27 02:46:21 +03:00
parent e5af0ab9ce
commit eb4caac2bd
17 changed files with 177 additions and 87 deletions

View File

@@ -141,12 +141,10 @@ async fn download_headers(
txn.commit()?;
let mut staged_sync = stagedsync::StagedSync::new();
staged_sync.push(HeaderDownload::new(
conn,
consensus,
chain_config,
env.begin()?,
)?);
staged_sync.push(
HeaderDownload::new(conn, consensus, chain_config, env.begin()?)?,
false,
);
staged_sync.run(&env).await?;
Ok(())
@@ -168,9 +166,12 @@ async fn blockhashes(data_dir: AkulaDataDir) -> anyhow::Result<()> {
)?;
let mut staged_sync = stagedsync::StagedSync::new();
staged_sync.push(BlockHashes {
temp_dir: etl_temp_dir.clone(),
});
staged_sync.push(
BlockHashes {
temp_dir: etl_temp_dir.clone(),
},
false,
);
staged_sync.run(&env).await?;
Ok(())
}

View File

@@ -245,45 +245,66 @@ fn main() -> anyhow::Result<()> {
db.begin()?,
)?;
let peer = header_download.peer.clone();
staged_sync.push(header_download);
staged_sync.push(TotalGasIndex);
staged_sync.push(BlockHashes {
temp_dir: etl_temp_dir.clone(),
});
staged_sync.push(BodyDownload::new(consensus, peer)?);
staged_sync.push(TotalTxIndex);
staged_sync.push(SenderRecovery {
batch_size: opt.sender_recovery_batch_size.try_into().unwrap(),
});
staged_sync.push(Execution {
batch_size: opt.execution_batch_size.saturating_mul(1_000_000_000_u64),
history_batch_size: opt
.execution_history_batch_size
.saturating_mul(1_000_000_000_u64),
exit_after_batch: opt.execution_exit_after_batch,
batch_until: None,
commit_every: None,
});
staged_sync.push(header_download, false);
staged_sync.push(TotalGasIndex, false);
staged_sync.push(
BlockHashes {
temp_dir: etl_temp_dir.clone(),
},
false,
);
staged_sync.push(BodyDownload::new(consensus, peer)?, false);
staged_sync.push(TotalTxIndex, false);
staged_sync.push(
SenderRecovery {
batch_size: opt.sender_recovery_batch_size.try_into().unwrap(),
},
false,
);
staged_sync.push(
Execution {
batch_size: opt.execution_batch_size.saturating_mul(1_000_000_000_u64),
history_batch_size: opt
.execution_history_batch_size
.saturating_mul(1_000_000_000_u64),
exit_after_batch: opt.execution_exit_after_batch,
batch_until: None,
commit_every: None,
},
false,
);
if !opt.skip_commitment {
staged_sync.push(HashState::new(etl_temp_dir.clone(), None));
staged_sync.push(Interhashes::new(etl_temp_dir.clone(), None));
staged_sync.push(HashState::new(etl_temp_dir.clone(), None), !opt.prune);
staged_sync.push(Interhashes::new(etl_temp_dir.clone(), None), !opt.prune);
}
staged_sync.push(AccountHistoryIndex {
temp_dir: etl_temp_dir.clone(),
flush_interval: 50_000,
});
staged_sync.push(StorageHistoryIndex {
temp_dir: etl_temp_dir.clone(),
flush_interval: 50_000,
});
staged_sync.push(TxLookup {
temp_dir: etl_temp_dir.clone(),
});
staged_sync.push(CallTraceIndex {
temp_dir: etl_temp_dir.clone(),
flush_interval: 50_000,
});
staged_sync.push(Finish);
staged_sync.push(
AccountHistoryIndex {
temp_dir: etl_temp_dir.clone(),
flush_interval: 50_000,
},
!opt.prune,
);
staged_sync.push(
StorageHistoryIndex {
temp_dir: etl_temp_dir.clone(),
flush_interval: 50_000,
},
!opt.prune,
);
staged_sync.push(
TxLookup {
temp_dir: etl_temp_dir.clone(),
},
!opt.prune,
);
staged_sync.push(
CallTraceIndex {
temp_dir: etl_temp_dir.clone(),
flush_interval: 50_000,
},
!opt.prune,
);
staged_sync.push(Finish, !opt.prune);
info!("Running staged sync");
staged_sync.run(&db).await?;

View File

@@ -7,6 +7,14 @@ use crate::{kv::mdbx::*, models::*, stagedsync::stage::*};
use std::time::{Duration, Instant};
use tracing::*;
struct QueuedStage<'db, E>
where
E: EnvironmentKind,
{
stage: Box<dyn Stage<'db, E>>,
require_tip: bool,
}
/// Staged synchronization framework
///
/// As the name suggests, the gist of this framework is splitting sync into logical _stages_ that are consecutively executed one after another.
@@ -23,7 +31,7 @@ pub struct StagedSync<'db, E>
where
E: EnvironmentKind,
{
stages: Vec<Box<dyn Stage<'db, E>>>,
stages: Vec<QueuedStage<'db, E>>,
min_progress_to_commit_after_stage: u64,
pruning_interval: u64,
max_block: Option<BlockNumber>,
@@ -55,11 +63,14 @@ where
}
}
pub fn push<S>(&mut self, stage: S)
pub fn push<S>(&mut self, stage: S, require_tip: bool)
where
S: Stage<'db, E> + 'static,
{
self.stages.push(Box::new(stage))
self.stages.push(QueuedStage {
stage: Box::new(stage),
require_tip,
})
}
pub fn set_pruning_interval(&mut self, v: u64) -> &mut Self {
@@ -94,6 +105,8 @@ where
pub async fn run(&mut self, db: &'db MdbxEnvironment<E>) -> anyhow::Result<()> {
let num_stages = self.stages.len();
let mut minimum_progress = None;
let mut maximum_progress = None;
let mut unwind_to = None;
'run_loop: loop {
let mut tx = db.begin_mutable()?;
@@ -101,7 +114,9 @@ where
// Start with unwinding if it's been requested.
if let Some(to) = unwind_to.take() {
// Unwind stages in reverse order.
for (stage_index, stage) in self.stages.iter_mut().enumerate().rev() {
for (stage_index, QueuedStage { stage, .. }) in
self.stages.iter_mut().enumerate().rev()
{
let stage_id = stage.id();
// Unwind magic happens here.
@@ -159,17 +174,18 @@ where
let mut previous_stage = None;
let mut timings = vec![];
let mut minimum_progress = None;
let mut reached_tip_flag = true;
// Execute each stage in direct order.
for (stage_index, stage) in self.stages.iter_mut().enumerate() {
for (stage_index, QueuedStage { stage, require_tip }) in
self.stages.iter_mut().enumerate()
{
let mut restarted = false;
let stage_id = stage.id();
let start_time = Instant::now();
let start_progress = stage_id.get_progress(&tx)?;
// Re-invoke the stage until it reports `StageOutput::done`.
let done_progress = loop {
let prev_progress = stage_id.get_progress(&tx)?;
@@ -194,17 +210,36 @@ where
}
let invocation_start_time = Instant::now();
let output = stage
.execute(
&mut tx,
StageInput {
restarted,
first_started_at: (start_time, start_progress),
previous_stage,
stage_progress: prev_progress,
},
)
.await?;
let output = if !reached_tip_flag
&& *require_tip
&& maximum_progress
.map(|maximum_progress| {
maximum_progress
< self.max_block.unwrap_or(BlockNumber(u64::MAX))
})
.unwrap_or(true)
{
info!("Tip not reached, skipping stage");
ExecOutput::Progress {
stage_progress: prev_progress.unwrap_or_default(),
done: true,
reached_tip: false,
}
} else {
stage
.execute(
&mut tx,
StageInput {
restarted,
first_started_at: (start_time, start_progress),
previous_stage,
stage_progress: prev_progress,
},
)
.await?
};
// Nothing here, pass along.
match &output {
@@ -260,13 +295,25 @@ where
stage::ExecOutput::Progress {
stage_progress,
done,
reached_tip,
} => {
stage_id.save_progress(&tx, stage_progress)?;
if let Some(m) = &mut minimum_progress {
*m = std::cmp::min(*m, stage_progress);
} else {
minimum_progress = Some(stage_progress);
macro_rules! record_outliers {
($f:expr, $v:expr) => {
if let Some(m) = $v {
*m = $f(*m, stage_progress);
} else {
*$v = Some(stage_progress);
}
};
}
record_outliers!(std::cmp::min, &mut minimum_progress);
record_outliers!(std::cmp::max, &mut maximum_progress);
if !reached_tip {
reached_tip_flag = false;
}
// Check if we should commit now.
@@ -318,7 +365,9 @@ where
let prune_to = BlockNumber(prune_to);
// Prune all stages
for (stage_index, stage) in self.stages.iter_mut().enumerate().rev() {
for (stage_index, QueuedStage { stage, .. }) in
self.stages.iter_mut().enumerate().rev()
{
let stage_id = stage.id();
let span = span!(

View File

@@ -32,6 +32,7 @@ pub enum ExecOutput {
Progress {
stage_progress: BlockNumber,
done: bool,
reached_tip: bool,
},
}

View File

@@ -57,6 +57,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: highest_block,
done: true,
reached_tip: true,
})
}

View File

@@ -59,6 +59,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: target,
done: true,
reached_tip: true,
})
}

View File

@@ -117,6 +117,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: max_block,
done: true,
reached_tip: true,
})
}
@@ -377,6 +378,7 @@ mod tests {
ExecOutput::Progress {
stage_progress: BlockNumber(20),
done: true,
reached_tip: true,
}
);
@@ -416,6 +418,7 @@ mod tests {
ExecOutput::Progress {
stage_progress: BlockNumber(30),
done: true,
reached_tip: true,
}
);

View File

@@ -222,11 +222,13 @@ where
ExecOutput::Progress {
stage_progress: executed_to,
done,
reached_tip: true,
}
} else {
ExecOutput::Progress {
stage_progress: prev_progress,
done: true,
reached_tip: true,
}
})
}

View File

@@ -33,6 +33,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: prev_stage,
done: true,
reached_tip: true,
})
}
async fn unwind<'tx>(

View File

@@ -203,6 +203,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: max_block,
done: true,
reached_tip: true,
})
}
@@ -418,6 +419,7 @@ mod tests {
ExecOutput::Progress {
stage_progress: BlockNumber(3),
done: true,
reached_tip: true,
}
);

View File

@@ -81,25 +81,23 @@ where
};
let mut starting_block = prev_progress;
let mut target = txn
.cursor(tables::LastHeader)?
.last()?
.map(|(_, (v, _))| v)
.unwrap();
let do_lookup = target - starting_block <= BlockNumber(BATCH_SIZE as u64);
match do_lookup {
true => {
let mut stream = self.peer.recv().await?;
target = match self.evaluate_chain_tip(&mut stream).await? {
(value, _) if value - starting_block <= BlockNumber(BATCH_SIZE as u64) => value,
last_header => {
txn.set(tables::LastHeader, Default::default(), last_header)?;
starting_block + BlockNumber(BATCH_SIZE as u64)
}
}
let (target, reached_tip) = if *txn.get(tables::LastHeader, ())?.unwrap().0
- *starting_block
<= BATCH_SIZE as u64
{
let mut stream = self.peer.recv().await?;
let (tip_block_number, tip_hash) = self.evaluate_chain_tip(&mut stream).await?;
if *tip_block_number - *starting_block <= BATCH_SIZE as u64 {
(tip_block_number, true)
} else {
txn.set(tables::LastHeader, (), (tip_block_number, tip_hash))?;
(BlockNumber(*starting_block + BATCH_SIZE as u64), false)
}
false => target = starting_block + BlockNumber(BATCH_SIZE as u64),
}
} else {
(starting_block + BlockNumber(BATCH_SIZE as u64), false)
};
let mut stream = self.peer.recv_headers().await?;
@@ -122,6 +120,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: target - 1u8,
done: true,
reached_tip,
})
}

View File

@@ -247,6 +247,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: max_block,
done: true,
reached_tip: true,
})
}

View File

@@ -110,6 +110,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: cmp::max(max_block, past_progress),
done: true,
reached_tip: true,
})
}

View File

@@ -159,6 +159,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: highest_block,
done,
reached_tip: true,
})
}
@@ -370,6 +371,7 @@ mod tests {
ExecOutput::Progress {
stage_progress: 3.into(),
done: true,
reached_tip: true,
}
);

View File

@@ -63,6 +63,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: max_block,
done: true,
reached_tip: true,
})
}

View File

@@ -69,6 +69,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: max_block,
done: true,
reached_tip: true,
})
}

View File

@@ -69,6 +69,7 @@ where
Ok(ExecOutput::Progress {
stage_progress: highest_block,
done: true,
reached_tip: true,
})
}
@@ -353,6 +354,7 @@ mod tests {
ExecOutput::Progress {
stage_progress: 3.into(),
done: true,
reached_tip: true,
}
);
@@ -392,6 +394,7 @@ mod tests {
ExecOutput::Progress {
stage_progress: 0.into(),
done: true,
reached_tip: true,
}
);
}