mirror of
https://github.com/akula-bft/akula.git
synced 2026-04-19 03:00:13 -04:00
Parse NewBlockHashes (#11)
This commit is contained in:
@@ -49,11 +49,18 @@ impl Downloader {
|
||||
|
||||
let mut stream = sentry.receive_messages().await?;
|
||||
while let Some(message_result) = stream.next().await {
|
||||
if let Err(error) = message_result {
|
||||
error!("receive message error {}", error);
|
||||
match message_result {
|
||||
Ok(message_from_peer) => self.handle_incoming_message(&message_from_peer.message),
|
||||
Err(error) => {
|
||||
error!("receive message error {}", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_incoming_message(&self, message: &messages::Message) {
|
||||
tracing::info!("incoming message: {:?}", message.eth_id());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,9 @@ pub fn decode_rlp_message(id: EthMessageId, message_bytes: &[u8]) -> anyhow::Res
|
||||
EthMessageId::GetBlockHeaders => {
|
||||
Message::GetBlockHeaders(rlp::decode::<GetBlockHeadersMessage>(message_bytes)?)
|
||||
}
|
||||
EthMessageId::NewBlockHashes => {
|
||||
Message::NewBlockHashes(rlp::decode::<NewBlockHashesMessage>(message_bytes)?)
|
||||
}
|
||||
_ => anyhow::bail!("decode_rlp_message: unsupported message {:?}", id),
|
||||
};
|
||||
Ok(message)
|
||||
@@ -14,6 +17,41 @@ impl rlp::Encodable for Message {
|
||||
fn rlp_append(&self, stream: &mut rlp::RlpStream) {
|
||||
match self {
|
||||
Message::GetBlockHeaders(message) => message.rlp_append(stream),
|
||||
Message::NewBlockHashes(message) => message.rlp_append(stream),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::downloader::{
|
||||
message_decoder::decode_rlp_message,
|
||||
messages::{EthMessageId, Message},
|
||||
};
|
||||
use ethereum_types::H256;
|
||||
use hex_literal::hex;
|
||||
|
||||
#[test]
|
||||
fn decode_new_block_hashes() {
|
||||
let expected_bytes =
|
||||
hex!("e6e5a07100614faba6650b53fe0913ed7267bcc968eb362e3df908645a50aa526c72ba83a13ead");
|
||||
let result = decode_rlp_message(EthMessageId::NewBlockHashes, &expected_bytes);
|
||||
let some_message = result.unwrap();
|
||||
|
||||
let bytes = rlp::encode(&some_message);
|
||||
assert_eq!(&*bytes, expected_bytes);
|
||||
|
||||
if let Message::NewBlockHashes(message) = some_message {
|
||||
assert_eq!(message.ids.len(), 1);
|
||||
assert_eq!(
|
||||
message.ids[0].0,
|
||||
H256(hex!(
|
||||
"7100614faba6650b53fe0913ed7267bcc968eb362e3df908645a50aa526c72ba"
|
||||
))
|
||||
);
|
||||
assert_eq!(message.ids[0].1, 10567341);
|
||||
} else {
|
||||
assert!(false, "unexpected message type");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::downloader::block_id::BlockId;
|
||||
use ethereum_types::H256;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum EthMessageId {
|
||||
@@ -28,15 +29,25 @@ pub struct GetBlockHeadersMessage {
|
||||
pub reverse: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum Message {
|
||||
GetBlockHeaders(GetBlockHeadersMessage),
|
||||
#[derive(rlp_derive::RlpEncodable, rlp_derive::RlpDecodable, Clone, Copy)]
|
||||
pub struct BlockHashAndNumber(pub H256, pub u64);
|
||||
|
||||
#[derive(rlp_derive::RlpEncodableWrapper, rlp_derive::RlpDecodableWrapper, Clone)]
|
||||
pub struct NewBlockHashesMessage {
|
||||
pub ids: Vec<BlockHashAndNumber>,
|
||||
}
|
||||
|
||||
impl From<Message> for EthMessageId {
|
||||
fn from(message: Message) -> Self {
|
||||
match message {
|
||||
#[derive(Clone)]
|
||||
pub enum Message {
|
||||
GetBlockHeaders(GetBlockHeadersMessage),
|
||||
NewBlockHashes(NewBlockHashesMessage),
|
||||
}
|
||||
|
||||
impl Message {
|
||||
pub fn eth_id(self: &Message) -> EthMessageId {
|
||||
match self {
|
||||
Message::GetBlockHeaders(_) => EthMessageId::GetBlockHeaders,
|
||||
Message::NewBlockHashes(_) => EthMessageId::NewBlockHashes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ impl SentryClient for SentryClientImpl {
|
||||
message: Message,
|
||||
peer_filter: PeerFilter,
|
||||
) -> anyhow::Result<()> {
|
||||
let message_id = EthMessageId::from(message);
|
||||
let message_id = message.eth_id();
|
||||
let message_data = grpc_sentry::OutboundMessageData {
|
||||
id: grpc_sentry::MessageId::from(message_id) as i32,
|
||||
data: rlp::encode(&message).into(),
|
||||
@@ -91,7 +91,7 @@ impl SentryClient for SentryClientImpl {
|
||||
let sent_peers: grpc_sentry::SentPeers = response.into_inner();
|
||||
debug!(
|
||||
"SentryClient send_message sent {:?} to: {:?}",
|
||||
EthMessageId::from(message),
|
||||
message.eth_id(),
|
||||
sent_peers
|
||||
);
|
||||
return Ok(());
|
||||
@@ -122,7 +122,7 @@ impl SentryClient for SentryClientImpl {
|
||||
from_peer_id: peer_id,
|
||||
};
|
||||
debug!("SentryClient receive_messages received a message {:?} from {:?}",
|
||||
EthMessageId::from(message_from_peer.message),
|
||||
message_from_peer.message.eth_id(),
|
||||
message_from_peer.from_peer_id);
|
||||
Ok(message_from_peer)
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user