Optimize ConvertBodies stage

This commit is contained in:
Artem Vorotnikov
2022-01-11 15:57:01 +03:00
parent ec08ef56bd
commit 6bb3622be2

View File

@@ -225,6 +225,7 @@ where
Source: KV,
{
db: Arc<Source>,
commit_after: Duration,
}
#[async_trait]
@@ -248,7 +249,7 @@ where
let mut highest_block = original_highest_block;
const MAX_TXS_PER_BATCH: usize = 500_000;
const BUFFERING_FACTOR: usize = 500_000;
const BUFFERING_FACTOR: usize = 100_000;
let erigon_tx = self.db.begin().await?;
if erigon_tx
@@ -285,52 +286,76 @@ where
let mut starting_index = prev_body.base_tx_id + prev_body.tx_amount as u64;
let canonical_header_walker = walk(&mut canonical_header_cur, Some(highest_block + 1));
pin!(canonical_header_walker);
let erigon_body_walker = walk(&mut erigon_body_cur, Some(highest_block + 1));
pin!(erigon_body_walker);
let mut batch = Vec::with_capacity(BUFFERING_FACTOR);
let mut converted = Vec::with_capacity(BUFFERING_FACTOR);
let mut converted = Vec::new();
let mut extracted_blocks_num = 0;
let mut extracted_txs_num = 0;
let started_at = Instant::now();
let mut last_check = started_at;
let done = loop {
let mut no_more_bodies = true;
let mut accum_txs = 0;
while let Some((block_num, block_hash)) = canonical_header_walker.try_next().await? {
if let Some((_, body)) = erigon_body_cur.seek_exact((block_num, block_hash)).await?
{
let base_tx_id = body.base_tx_id;
'l: while let Some((block_num, block_hash)) = canonical_header_walker.try_next().await?
{
loop {
if let Some(((body_block_num, body_block_hash), body)) =
erigon_body_walker.try_next().await?
{
if body_block_num > block_num {
break 'l;
}
let txs = walk(&mut erigon_tx_cur, Some(base_tx_id.encode().to_vec()))
.map(|res| res.map(|(_, tx)| tx))
.take(body.tx_amount)
.collect::<anyhow::Result<Vec<_>>>()
.await?;
if body_block_hash != block_hash {
continue;
}
if txs.len() != body.tx_amount {
bail!(
"Invalid tx amount in Erigon for block #{}/{}: {} != {}",
block_num,
block_hash,
body.tx_amount,
txs.len()
);
}
let base_tx_id = body.base_tx_id;
accum_txs += body.tx_amount;
batch.push((block_num, block_hash, body, txs));
let txs = walk(&mut erigon_tx_cur, Some(base_tx_id.encode().to_vec()))
.map(|res| res.map(|(_, tx)| tx))
.take(body.tx_amount)
.collect::<anyhow::Result<Vec<_>>>()
.await?;
if txs.len() != body.tx_amount {
bail!(
"Invalid tx amount in Erigon for block #{}/{}: {} != {}",
block_num,
block_hash,
body.tx_amount,
txs.len()
);
}
accum_txs += body.tx_amount;
batch.push((block_num, block_hash, body, txs));
if accum_txs > MAX_TXS_PER_BATCH {
no_more_bodies = false;
break;
} else {
break 'l;
}
} else {
}
if accum_txs > MAX_TXS_PER_BATCH {
no_more_bodies = false;
break;
}
}
debug!(
"Read a batch of {} blocks with {} transactions",
batch.len(),
accum_txs
);
extracted_blocks_num += batch.len();
extracted_txs_num += accum_txs;
converted.reserve(batch.len());
batch
.par_drain(..)
.map(move |(block_number, block_hash, body, txs)| {
@@ -377,7 +402,7 @@ where
}
let now = Instant::now();
let elapsed = now - started_at;
let elapsed = now - last_check;
if elapsed > Duration::from_secs(30) {
info!(
"Highest block {}, batch size: {} blocks with {} transactions, {} tx/sec",
@@ -388,7 +413,13 @@ where
/ (elapsed.as_secs() as f64 + (elapsed.subsec_millis() as f64 / 1000_f64))
);
break no_more_bodies;
if now - started_at > self.commit_after {
break false;
}
extracted_blocks_num = 0;
extracted_txs_num = 0;
last_check = Instant::now();
}
};
@@ -594,7 +625,10 @@ async fn main() -> anyhow::Result<()> {
temp_dir: etl_temp_dir.clone(),
});
if let Some(erigon_db) = erigon_db {
staged_sync.push(ConvertBodies { db: erigon_db });
staged_sync.push(ConvertBodies {
db: erigon_db,
commit_after: Duration::from_secs(120),
});
} else {
// also add body download stage here
}