darkfid: sync logic changed

Currently we used forward block syncing, while with this changes we do two step backwards header sync and then forward block sync using already known headers. This enables the node to know in advance the tip of the peers they will sync from
This commit is contained in:
skoupidi
2024-04-16 18:36:56 +03:00
parent 58db8fe588
commit 07e7252901
5 changed files with 450 additions and 77 deletions

View File

@@ -16,6 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
// TODO: Protocal functions need to be protected so peers can't spam us.
/// Block proposal broadcast protocol
mod protocol_proposal;
pub use protocol_proposal::{ProposalMessage, ProtocolProposal};
@@ -23,8 +25,8 @@ pub use protocol_proposal::{ProposalMessage, ProtocolProposal};
/// Validator blockchain sync protocol
mod protocol_sync;
pub use protocol_sync::{
ForkSyncRequest, ForkSyncResponse, IsSyncedRequest, IsSyncedResponse, ProtocolSync,
SyncRequest, SyncResponse,
ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, IsSyncedRequest,
IsSyncedResponse, ProtocolSync, SyncRequest, SyncResponse, TipRequest, TipResponse, BATCH,
};
/// Transaction broadcast protocol

View File

@@ -23,7 +23,7 @@ use log::{debug, error};
use smol::Executor;
use darkfi::{
blockchain::{BlockInfo, HeaderHash},
blockchain::{BlockInfo, Header, HeaderHash},
impl_p2p_message,
net::{
ChannelPtr, Message, MessageSubscription, ProtocolBase, ProtocolBasePtr,
@@ -35,15 +35,16 @@ use darkfi::{
use darkfi_serial::{SerialDecodable, SerialEncodable};
// Constant defining how many blocks we send during syncing.
const BATCH: usize = 10;
pub const BATCH: usize = 10;
/// Auxiliary structure used for blockchain syncing.
/// Structure represening a request to ask a node if they are synced.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct IsSyncedRequest {}
impl_p2p_message!(IsSyncedRequest, "issyncedrequest");
/// Auxiliary structure used for blockchain syncing.
/// Structure representing the response to `IsSyncedRequest`,
/// containing a boolean flag to indicate if we are synced.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct IsSyncedResponse {
/// Flag indicating the node is synced
@@ -52,16 +53,61 @@ pub struct IsSyncedResponse {
impl_p2p_message!(IsSyncedResponse, "issyncedresponse");
/// Auxiliary structure used for blockchain syncing.
/// Structure represening a request to ask a node for their current
/// canonical(finalized) tip block hash. We also include our own
/// tip, so they can verify we follow the same sequence.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct TipRequest {
/// Canonical(finalized) tip block hash
pub tip: HeaderHash,
}
impl_p2p_message!(TipRequest, "tiprequest");
/// Structure representing the response to `TipRequest`,
/// containing our canonical(finalized) tip block height and hash.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct TipResponse {
/// Canonical(finalized) tip block height
pub height: u32,
/// Canonical(finalized) tip block hash
pub hash: HeaderHash,
}
impl_p2p_message!(TipResponse, "tipresponse");
/// Structure represening a request to ask a node for up to `BATCH` headers before
/// the provided header height.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct HeaderSyncRequest {
/// Header height
pub height: u32,
}
impl_p2p_message!(HeaderSyncRequest, "headersyncrequest");
/// Structure representing the response to `HeaderSyncRequest`,
/// containing up to `BATCH` headers before the requested block height.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct HeaderSyncResponse {
/// Response headers
pub headers: Vec<Header>,
}
impl_p2p_message!(HeaderSyncResponse, "headersyncresponse");
/// Structure represening a request to ask a node for up to`BATCH` blocks
/// of provided headers.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct SyncRequest {
/// Block height
pub height: u32,
/// Header hashes
pub headers: Vec<HeaderHash>,
}
impl_p2p_message!(SyncRequest, "syncrequest");
/// Auxiliary structure used for blockchain syncing.
/// Structure representing the response to `SyncRequest`,
/// containing up to `BATCH` blocks after the requested block height.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct SyncResponse {
/// Response blocks
@@ -70,7 +116,11 @@ pub struct SyncResponse {
impl_p2p_message!(SyncResponse, "syncresponse");
/// Auxiliary structure used for fork chain syncing.
/// Structure represening a request to ask a node a fork sequence.
/// If we include a specific fork tip, they have to return its sequence,
/// otherwise they respond with their best fork sequence.
/// We also include our own canonical(finalized) tip, so they can verify
/// we follow the same sequence.
#[derive(Debug, SerialEncodable, SerialDecodable)]
pub struct ForkSyncRequest {
/// Canonical(finalized) tip block hash
@@ -81,7 +131,8 @@ pub struct ForkSyncRequest {
impl_p2p_message!(ForkSyncRequest, "forksyncrequest");
/// Auxiliary structure used for fork chain syncing.
/// Structure representing the response to `ForkSyncRequest`,
/// containing the requested fork sequence.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct ForkSyncResponse {
/// Response fork proposals
@@ -92,6 +143,8 @@ impl_p2p_message!(ForkSyncResponse, "forksyncresponse");
pub struct ProtocolSync {
is_synced_sub: MessageSubscription<IsSyncedRequest>,
tip_sub: MessageSubscription<TipRequest>,
header_sub: MessageSubscription<HeaderSyncRequest>,
request_sub: MessageSubscription<SyncRequest>,
fork_request_sub: MessageSubscription<ForkSyncRequest>,
jobsman: ProtocolJobsManagerPtr,
@@ -108,17 +161,25 @@ impl ProtocolSync {
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<IsSyncedRequest>().await;
msg_subsystem.add_dispatch::<IsSyncedResponse>().await;
msg_subsystem.add_dispatch::<TipRequest>().await;
msg_subsystem.add_dispatch::<TipResponse>().await;
msg_subsystem.add_dispatch::<HeaderSyncRequest>().await;
msg_subsystem.add_dispatch::<HeaderSyncResponse>().await;
msg_subsystem.add_dispatch::<SyncRequest>().await;
msg_subsystem.add_dispatch::<SyncResponse>().await;
msg_subsystem.add_dispatch::<ForkSyncRequest>().await;
msg_subsystem.add_dispatch::<ForkSyncResponse>().await;
let is_synced_sub = channel.subscribe_msg::<IsSyncedRequest>().await?;
let tip_sub = channel.subscribe_msg::<TipRequest>().await?;
let header_sub = channel.subscribe_msg::<HeaderSyncRequest>().await?;
let request_sub = channel.subscribe_msg::<SyncRequest>().await?;
let fork_request_sub = channel.subscribe_msg::<ForkSyncRequest>().await?;
Ok(Arc::new(Self {
is_synced_sub,
tip_sub,
header_sub,
request_sub,
fork_request_sub,
jobsman: ProtocolJobsManager::new("SyncProtocol", channel.clone()),
@@ -139,7 +200,6 @@ impl ProtocolSync {
continue
};
// TODO: This needs to be protected so peer can't spam us
// Check if node has finished syncing its blockchain and respond
let response = IsSyncedResponse { synced: *self.validator.synced.read().await };
if let Err(e) = self.channel.send(&response).await {
@@ -152,6 +212,123 @@ impl ProtocolSync {
}
}
async fn handle_receive_tip_request(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_tip_request", "START");
loop {
let request = match self.tip_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"recv fail: {}",
e
);
continue
}
};
// Check if node has finished syncing its blockchain
if !*self.validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"Node still syncing blockchain, skipping..."
);
continue
}
// Check we follow the same sequence
match self.validator.blockchain.blocks.contains(&request.tip) {
Ok(contains) => {
if !contains {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"Node doesn't follow request sequence"
);
continue
}
}
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"block_store.contains fail: {}",
e
);
continue
}
}
// Grab our current tip and return it
let tip = match self.validator.blockchain.last() {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"blockchain.last fail: {}",
e
);
continue
}
};
let response = TipResponse { height: tip.0, hash: tip.1 };
if let Err(e) = self.channel.send(&response).await {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_tip_request",
"channel send fail: {}",
e
)
};
}
}
async fn handle_receive_header_request(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_header_request", "START");
loop {
let request = match self.header_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"recv fail: {}",
e
);
continue
}
};
// Check if node has finished syncing its blockchain
if !*self.validator.synced.read().await {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"Node still syncing blockchain, skipping..."
);
continue
}
let headers = match self.validator.blockchain.get_headers_before(request.height, BATCH)
{
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"get_headers_before fail: {}",
e
);
continue
}
};
let response = HeaderSyncResponse { headers };
if let Err(e) = self.channel.send(&response).await {
error!(
target: "darkfid::proto::protocol_sync::handle_receive_header_request",
"channel send fail: {}",
e
)
};
}
}
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
debug!(target: "darkfid::proto::protocol_sync::handle_receive_request", "START");
loop {
@@ -176,7 +353,16 @@ impl ProtocolSync {
continue
}
let blocks = match self.validator.blockchain.get_blocks_after(request.height, BATCH) {
// Check if request exists the configured limit
if request.headers.len() > BATCH {
debug!(
target: "darkfid::proto::protocol_sync::handle_receive_request",
"Node requested more blocks than allowed."
);
continue
}
let blocks = match self.validator.blockchain.get_blocks_by_hash(&request.headers) {
Ok(v) => v,
Err(e) => {
error!(
@@ -267,6 +453,14 @@ impl ProtocolBase for ProtocolSync {
.clone()
.spawn(self.clone().handle_receive_is_synced_request(), executor.clone())
.await;
self.jobsman
.clone()
.spawn(self.clone().handle_receive_tip_request(), executor.clone())
.await;
self.jobsman
.clone()
.spawn(self.clone().handle_receive_header_request(), executor.clone())
.await;
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
self.jobsman
.clone()

View File

@@ -16,23 +16,75 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi::{system::sleep, util::encoding::base64, Result};
use std::collections::HashMap;
use darkfi::{
blockchain::{Header, HeaderHash},
net::ChannelPtr,
system::sleep,
util::encoding::base64,
Error, Result,
};
use darkfi_serial::serialize_async;
use log::{debug, info, warn};
use rand::{prelude::SliceRandom, rngs::OsRng};
use tinyjson::JsonValue;
use crate::{
proto::{
ForkSyncRequest, ForkSyncResponse, IsSyncedRequest, IsSyncedResponse, SyncRequest,
SyncResponse, COMMS_TIMEOUT,
ForkSyncRequest, ForkSyncResponse, HeaderSyncRequest, HeaderSyncResponse, IsSyncedRequest,
IsSyncedResponse, SyncRequest, SyncResponse, TipRequest, TipResponse, BATCH, COMMS_TIMEOUT,
},
Darkfid,
};
/// async task used for block syncing
// TODO: Parallelize independent requests.
// We can also make them be like torrents, where we retrieve chunks not in order.
/// async task used for block syncing.
pub async fn sync_task(node: &Darkfid) -> Result<()> {
info!(target: "darkfid::task::sync_task", "Starting blockchain sync...");
// Block until at least node is connected to at least one synced peer
// Grab synced peers
let peers = synced_peers(node).await?;
// TODO: Configure a checkpoint, filter peers that don't have that and start
// syncing the sequence until that
// Grab last known block/header
// TODO: grab last known header from the headers sync tree, once added
let mut last = node.validator.blockchain.last()?;
info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1);
loop {
// Grab the most common tip and the corresponding peers
let (common_tip_height, common_tip_peers) = most_common_tip(&peers, &last.1).await?;
// Retrieve all the headers backawards until our last known one and verify them.
// We use the next height, in order to also retrieve the peers tip header.
let headers = retrieve_headers(&common_tip_peers, last, common_tip_height + 1).await?;
// Retrieve all the blocks for those headers and apply them to canonical
retrieve_blocks(node, &peers, headers).await?;
let last_received = node.validator.blockchain.last()?;
info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last_received.0, last_received.1);
if last == last_received {
break
}
last = last_received;
}
// Sync best fork
sync_best_fork(node, &peers, &last.1).await?;
*node.validator.synced.write().await = true;
info!(target: "darkfid::task::sync_task", "Blockchain synced!");
Ok(())
}
/// Auxiliary function to lock until node is connected to at least one synced peer.
async fn synced_peers(node: &Darkfid) -> Result<Vec<ChannelPtr>> {
let mut peers = vec![];
loop {
// Grab channels
@@ -70,61 +122,187 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
sleep(10).await;
}
// Getting a peer to ask for blocks
let channel = &peers[0];
Ok(peers)
}
// Communication setup
let block_response_sub = channel.subscribe_msg::<SyncResponse>().await?;
let proposals_response_sub = channel.subscribe_msg::<ForkSyncResponse>().await?;
let notif_sub = node.subscribers.get("blocks").unwrap();
let proposal_notif_sub = node.subscribers.get("proposals").unwrap();
/// Auxiliary function to ask all peers for their current tip and find the most common one.
async fn most_common_tip(
peers: &[ChannelPtr],
last_tip: &HeaderHash,
) -> Result<(u32, Vec<ChannelPtr>)> {
let mut tips: HashMap<(u32, [u8; 32]), Vec<ChannelPtr>> = HashMap::new();
for peer in peers {
// Node creates a `TipRequest` and sends it
let response_sub = peer.subscribe_msg::<TipResponse>().await?;
let request = TipRequest { tip: *last_tip };
peer.send(&request).await?;
// TODO: make this parallel and use a head selection method,
// for example use a manual known head and only connect to nodes
// that follow that. Also use a random peer on every block range
// we sync.
// Node sends the last known block hash of the canonical blockchain
// and loops until the response is the same block (used to utilize
// batch requests).
let mut last = node.validator.blockchain.last()?;
info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1);
loop {
// Node creates a `SyncRequest` and sends it
let request = SyncRequest { height: last.0 };
channel.send(&request).await?;
// TODO: add a timeout here to retry
// Node waits for response
let response = block_response_sub.receive().await?;
let Ok(response) = response_sub.receive_with_timeout(COMMS_TIMEOUT).await else { continue };
// Verify and store retrieved blocks
debug!(target: "darkfid::task::sync_task", "Processing received blocks");
node.validator.add_blocks(&response.blocks).await?;
// Notify subscriber
for block in &response.blocks {
let encoded_block = JsonValue::String(base64::encode(&serialize_async(block).await));
notif_sub.notify(vec![encoded_block].into()).await;
}
let last_received = node.validator.blockchain.last()?;
info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last_received.0, last_received.1);
if last == last_received {
break
}
last = last_received;
// Handle response
let tip = (response.height, *response.hash.inner());
let Some(tip_peers) = tips.get_mut(&tip) else {
tips.insert(tip, vec![peer.clone()]);
continue
};
tip_peers.push(peer.clone());
}
// Node syncs current best fork
let request = ForkSyncRequest { tip: last.1, fork_tip: None };
// Grab the most common tip peers
let mut common_tips = vec![];
let mut common_tip_peers = vec![];
for (tip, peers) in tips {
if peers.len() < common_tip_peers.len() {
continue;
}
if peers.len() == common_tip_peers.len() {
common_tips.push(tip);
continue;
}
common_tips = vec![tip];
common_tip_peers = peers;
}
if common_tips.len() > 1 {
info!(target: "darkfid::task::sync::most_common_tip", "Multiple common tips found: {:?}", common_tips);
return Err(Error::UnsupportedChain)
}
Ok((common_tips[0].0, common_tip_peers))
}
/// Auxiliary function to retrieve headers backwards until our last known one and verify them.
async fn retrieve_headers(
peers: &[ChannelPtr],
last_known: (u32, HeaderHash),
tip_height: u32,
) -> Result<Vec<Header>> {
// Communication setup
let mut peer_subs = vec![];
for peer in peers {
peer_subs.push(peer.subscribe_msg::<HeaderSyncResponse>().await?);
}
// TODO: store them in a sled tree
let mut headers = vec![];
let mut last_tip_height = tip_height;
'headers_loop: loop {
for (index, peer) in peers.iter().enumerate() {
// Node creates a `HeaderSyncRequest` and sends it
let request = HeaderSyncRequest { height: last_tip_height };
peer.send(&request).await?;
// Node waits for response
let Ok(response) = peer_subs[index].receive_with_timeout(COMMS_TIMEOUT).await else {
continue
};
// Retain only the headers after our last known
let mut response_headers = response.headers.to_vec();
response_headers.retain(|h| h.height > last_known.0);
if response_headers.is_empty() {
break 'headers_loop
}
response_headers.extend_from_slice(&headers);
headers = response_headers;
last_tip_height = headers[0].height;
}
}
// Check if we retrieved any new headers
if headers.is_empty() {
return Ok(headers);
}
// Verify headers sequence. Here we do a quick and dirty verification
// of just the hashes and heights sequence. We will formaly verify
// the blocks when we retrieve them.
if headers[0].previous != last_known.1 || headers[0].height != last_known.0 + 1 {
return Err(Error::BlockIsInvalid(headers[0].hash().as_string()))
}
for (index, header) in headers[1..].iter().enumerate() {
if header.previous != headers[index].hash() || header.height != headers[index].height + 1 {
return Err(Error::BlockIsInvalid(header.hash().as_string()))
}
}
Ok(headers)
}
/// Auxiliary function to retrieve blocks of provided headers and apply them to canonical.
async fn retrieve_blocks(
node: &Darkfid,
peers: &[ChannelPtr],
mut headers: Vec<Header>,
) -> Result<()> {
// Communication setup
let mut peer_subs = vec![];
for peer in peers {
peer_subs.push(peer.subscribe_msg::<SyncResponse>().await?);
}
let notif_sub = node.subscribers.get("blocks").unwrap();
'blocks_loop: loop {
for (index, peer) in peers.iter().enumerate() {
// Grab first `BATCH` headers
let mut headers_copy = headers.clone();
let mut request_headers = vec![];
while request_headers.len() < BATCH {
if headers_copy.is_empty() {
break;
}
request_headers.push(headers_copy.remove(0).hash());
}
// Node creates a `SyncRequest` and sends it
let request = SyncRequest { headers: request_headers };
peer.send(&request).await?;
// Node waits for response
let Ok(response) = peer_subs[index].receive_with_timeout(COMMS_TIMEOUT).await else {
continue
};
// Verify and store retrieved blocks
debug!(target: "darkfid::task::sync::retrieve_blocks", "Processing received blocks");
node.validator.add_blocks(&response.blocks).await?;
// Notify subscriber
for block in &response.blocks {
info!(target: "darkfid::task::sync::retrieve_blocks", "Appended block: {} - {}", block.header.height, block.hash());
let encoded_block =
JsonValue::String(base64::encode(&serialize_async(block).await));
notif_sub.notify(vec![encoded_block].into()).await;
}
headers = headers_copy;
if headers.is_empty() {
break 'blocks_loop
}
}
}
Ok(())
}
/// Auxiliary function to retrieve best fork state from a random peer.
async fn sync_best_fork(node: &Darkfid, peers: &[ChannelPtr], last_tip: &HeaderHash) -> Result<()> {
// Getting a random peer to ask for blocks
let channel = &peers.choose(&mut OsRng).unwrap();
// Communication setup
let response_sub = channel.subscribe_msg::<ForkSyncResponse>().await?;
let notif_sub = node.subscribers.get("proposals").unwrap();
// Node creates a `ForkSyncRequest` and sends it
let request = ForkSyncRequest { tip: *last_tip, fork_tip: None };
channel.send(&request).await?;
// TODO: add a timeout here to retry
// Node waits for response
let response = proposals_response_sub.receive().await?;
let response = response_sub.receive_with_timeout(COMMS_TIMEOUT).await?;
// Verify and store retrieved proposals
debug!(target: "darkfid::task::sync_task", "Processing received proposals");
@@ -132,10 +310,8 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
node.validator.append_proposal(proposal).await?;
// Notify subscriber
let enc_prop = JsonValue::String(base64::encode(&serialize_async(proposal).await));
proposal_notif_sub.notify(vec![enc_prop].into()).await;
notif_sub.notify(vec![enc_prop].into()).await;
}
*node.validator.synced.write().await = true;
info!(target: "darkfid::task::sync_task", "Blockchain synced!");
Ok(())
}

View File

@@ -505,16 +505,16 @@ impl BlockStore {
Ok(block_difficulties)
}
/// Fetch n hashes after given height. In the iteration, if an order
/// Fetch n hashes before given height. In the iteration, if an order
/// height is not found, the iteration stops and the function returns what
/// it has found so far in the `BlockOrderStore`.
pub fn get_after(&self, height: u32, n: usize) -> Result<Vec<HeaderHash>> {
pub fn get_before(&self, height: u32, n: usize) -> Result<Vec<HeaderHash>> {
let mut ret = vec![];
let mut key = height;
let mut counter = 0;
while counter <= n {
if let Some(found) = self.order.get_gt(key.to_be_bytes())? {
if let Some(found) = self.order.get_lt(key.to_be_bytes())? {
let (height, hash) = parse_u32_key_record(found)?;
key = height;
ret.push(hash);
@@ -524,7 +524,7 @@ impl BlockStore {
break
}
Ok(ret)
Ok(ret.iter().rev().copied().collect())
}
/// Fetch the first block hash in the order tree, based on the `Ord`

View File

@@ -171,11 +171,12 @@ impl Blockchain {
self.get_blocks_by_hash(&hashes)
}
/// Retrieve n blocks after given start block height.
pub fn get_blocks_after(&self, height: u32, n: usize) -> Result<Vec<BlockInfo>> {
debug!(target: "blockchain", "get_blocks_after(): {} -> {}", height, n);
let hashes = self.blocks.get_after(height, n)?;
self.get_blocks_by_hash(&hashes)
/// Retrieve n headers before given block height.
pub fn get_headers_before(&self, height: u32, n: usize) -> Result<Vec<Header>> {
debug!(target: "blockchain", "get_headers_before(): {} -> {}", height, n);
let hashes = self.blocks.get_before(height, n)?;
let headers = self.headers.get(&hashes, true)?;
Ok(headers.iter().map(|h| h.clone().unwrap()).collect())
}
/// Retrieve stored blocks count