From 6bb3622be24f264f402d277f8449ef48098e85ba Mon Sep 17 00:00:00 2001 From: Artem Vorotnikov Date: Tue, 11 Jan 2022 15:57:01 +0300 Subject: [PATCH] Optimize ConvertBodies stage --- bin/akula.rs | 92 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 63 insertions(+), 29 deletions(-) diff --git a/bin/akula.rs b/bin/akula.rs index 35a3cbb..e3f174c 100644 --- a/bin/akula.rs +++ b/bin/akula.rs @@ -225,6 +225,7 @@ where Source: KV, { db: Arc, + commit_after: Duration, } #[async_trait] @@ -248,7 +249,7 @@ where let mut highest_block = original_highest_block; const MAX_TXS_PER_BATCH: usize = 500_000; - const BUFFERING_FACTOR: usize = 500_000; + const BUFFERING_FACTOR: usize = 100_000; let erigon_tx = self.db.begin().await?; if erigon_tx @@ -285,52 +286,76 @@ where let mut starting_index = prev_body.base_tx_id + prev_body.tx_amount as u64; let canonical_header_walker = walk(&mut canonical_header_cur, Some(highest_block + 1)); pin!(canonical_header_walker); + let erigon_body_walker = walk(&mut erigon_body_cur, Some(highest_block + 1)); + pin!(erigon_body_walker); let mut batch = Vec::with_capacity(BUFFERING_FACTOR); - let mut converted = Vec::with_capacity(BUFFERING_FACTOR); + let mut converted = Vec::new(); let mut extracted_blocks_num = 0; let mut extracted_txs_num = 0; - let started_at = Instant::now(); + let mut last_check = started_at; + let done = loop { let mut no_more_bodies = true; let mut accum_txs = 0; - while let Some((block_num, block_hash)) = canonical_header_walker.try_next().await? { - if let Some((_, body)) = erigon_body_cur.seek_exact((block_num, block_hash)).await? - { - let base_tx_id = body.base_tx_id; + 'l: while let Some((block_num, block_hash)) = canonical_header_walker.try_next().await? + { + loop { + if let Some(((body_block_num, body_block_hash), body)) = + erigon_body_walker.try_next().await? + { + if body_block_num > block_num { + break 'l; + } - let txs = walk(&mut erigon_tx_cur, Some(base_tx_id.encode().to_vec())) - .map(|res| res.map(|(_, tx)| tx)) - .take(body.tx_amount) - .collect::>>() - .await?; + if body_block_hash != block_hash { + continue; + } - if txs.len() != body.tx_amount { - bail!( - "Invalid tx amount in Erigon for block #{}/{}: {} != {}", - block_num, - block_hash, - body.tx_amount, - txs.len() - ); - } + let base_tx_id = body.base_tx_id; - accum_txs += body.tx_amount; - batch.push((block_num, block_hash, body, txs)); + let txs = walk(&mut erigon_tx_cur, Some(base_tx_id.encode().to_vec())) + .map(|res| res.map(|(_, tx)| tx)) + .take(body.tx_amount) + .collect::>>() + .await?; + + if txs.len() != body.tx_amount { + bail!( + "Invalid tx amount in Erigon for block #{}/{}: {} != {}", + block_num, + block_hash, + body.tx_amount, + txs.len() + ); + } + + accum_txs += body.tx_amount; + batch.push((block_num, block_hash, body, txs)); - if accum_txs > MAX_TXS_PER_BATCH { - no_more_bodies = false; break; + } else { + break 'l; } - } else { + } + + if accum_txs > MAX_TXS_PER_BATCH { + no_more_bodies = false; break; } } + debug!( + "Read a batch of {} blocks with {} transactions", + batch.len(), + accum_txs + ); + extracted_blocks_num += batch.len(); extracted_txs_num += accum_txs; + converted.reserve(batch.len()); batch .par_drain(..) .map(move |(block_number, block_hash, body, txs)| { @@ -377,7 +402,7 @@ where } let now = Instant::now(); - let elapsed = now - started_at; + let elapsed = now - last_check; if elapsed > Duration::from_secs(30) { info!( "Highest block {}, batch size: {} blocks with {} transactions, {} tx/sec", @@ -388,7 +413,13 @@ where / (elapsed.as_secs() as f64 + (elapsed.subsec_millis() as f64 / 1000_f64)) ); - break no_more_bodies; + if now - started_at > self.commit_after { + break false; + } + + extracted_blocks_num = 0; + extracted_txs_num = 0; + last_check = Instant::now(); } }; @@ -594,7 +625,10 @@ async fn main() -> anyhow::Result<()> { temp_dir: etl_temp_dir.clone(), }); if let Some(erigon_db) = erigon_db { - staged_sync.push(ConvertBodies { db: erigon_db }); + staged_sync.push(ConvertBodies { + db: erigon_db, + commit_after: Duration::from_secs(120), + }); } else { // also add body download stage here }