fix(eth-wire): fix disconnect reason rlp decoding (#502)

* add trace in DisconnectReason Decodable impl

* add trace for decoding p2p hello

* add traces to p2p and eth stream

* refactor P2PMessage decoding

 * improve disconnect tracing

* s/Hello/first

* add geth disconnect test

* add disconnectreason test cases

* add known failing disconnect messages

* add trace when disconnect reason decoding fails

* cargo fmt

* add more examples

 * adding more as they appear in traces
 * will add the rest since they can be exhaustively enumerated

* add every other possible encoding

* fix disconnect decoding

 * the four possible formats for a disconnect message (rlp list (y/n) x
   snappy (y/n)):
   * encoded as a single rlp byte
     * with snappy
     * without snappy
   * encoded as a rlp list
     * with snappy
     * without snappy
 * fix the type for decoding in the test_decode_known_reasons test

* sort reasons by length in test

* remove printlns

* use one call to advance

* simplify decode impl to strip last byte

 * todo: comment explaining the different formats being parsed?

* explicitly remove geth as a peer

* style: traces

* add another disconnect code from geth

* fix: add check for DisconnectRequested

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Dan Cline
2022-12-19 11:19:08 -05:00
committed by GitHub
parent f8b719e04c
commit 5758c297b3
4 changed files with 176 additions and 47 deletions

View File

@@ -1,7 +1,7 @@
//! Disconnect
use bytes::Buf;
use reth_rlp::{Decodable, DecodeError, Encodable, EMPTY_LIST_CODE};
use reth_rlp::{Decodable, DecodeError, Encodable};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use thiserror::Error;
@@ -124,30 +124,32 @@ impl Encodable for DisconnectReason {
/// input is snappy compressed.
impl Decodable for DisconnectReason {
fn decode(buf: &mut &[u8]) -> Result<Self, DecodeError> {
if buf.len() < 4 {
return Err(DecodeError::Custom("disconnect reason should have 4 bytes"))
if buf.is_empty() {
return Err(DecodeError::InputTooShort)
}
let first = buf[0];
if first != 0x02 {
return Err(DecodeError::Custom("invalid disconnect reason - invalid snappy header"))
}
// encoded as a single byte
let reason_byte = if buf.len() == 1 {
u8::decode(buf)?
} else if buf.len() <= 4 {
// in any disconnect encoding, headers precede and do not wrap the reason, so we should
// advance to the end of the buffer
buf.advance(buf.len() - 1);
let second = buf[1];
if second != 0x04 {
// TODO: make sure this error message is correct
return Err(DecodeError::Custom("invalid disconnect reason - invalid snappy header"))
}
// geth rlp encodes [`DisconnectReason::DisconnectRequested`] as 0x00 and not as empty
// string 0x80
if buf[0] == 0x00 {
DisconnectReason::DisconnectRequested as u8
} else {
// the reason is encoded at the end of the snappy encoded bytes
u8::decode(buf)?
}
} else {
return Err(DecodeError::Custom("invalid disconnect reason length"))
};
let third = buf[2];
if third != EMPTY_LIST_CODE + 1 {
return Err(DecodeError::Custom("invalid disconnect reason - invalid rlp header"))
}
let reason = u8::decode(&mut &buf[3..])?;
let reason = DisconnectReason::try_from(reason)
let reason = DisconnectReason::try_from(reason_byte)
.map_err(|_| DecodeError::Custom("unknown disconnect reason"))?;
buf.advance(4);
Ok(reason)
}
}
@@ -267,4 +269,89 @@ mod tests {
assert_eq!(decompressed, disconnect_raw);
}
#[test]
fn test_decode_known_reasons() {
let all_reasons = vec![
// non-snappy, encoding the disconnect reason as a single byte
"0180",
"0101",
"0102",
"0103",
"0104",
"0105",
"0106",
"0107",
"0108",
"0109",
"010a",
"010b",
"0110",
// non-snappy, encoding the disconnect reason in a list
"01c180",
"01c101",
"01c102",
"01c103",
"01c104",
"01c105",
"01c106",
"01c107",
"01c108",
"01c109",
"01c10a",
"01c10b",
"01c110",
// snappy, compressing a single byte
"010080",
"010001",
"010002",
"010003",
"010004",
"010005",
"010006",
"010007",
"010008",
"010009",
"01000a",
"01000b",
"010010",
// TODO: just saw this format once, not really sure what this format even is
"01010003",
"01010000",
// snappy, encoded the disconnect reason as a list
"010204c180",
"010204c101",
"010204c102",
"010204c103",
"010204c104",
"010204c105",
"010204c106",
"010204c107",
"010204c108",
"010204c109",
"010204c10a",
"010204c10b",
"010204c110",
];
for reason in all_reasons {
let reason = hex::decode(reason).unwrap();
let message = P2PMessage::decode(&mut &reason[..]).unwrap();
let P2PMessage::Disconnect(_) = message else {
panic!("expected a disconnect message");
};
}
}
#[test]
fn test_decode_disconnect_requested() {
let reason = "01010000";
let reason = hex::decode(reason).unwrap();
match P2PMessage::decode(&mut &reason[..]).unwrap() {
P2PMessage::Disconnect(DisconnectReason::DisconnectRequested) => {}
_ => {
unreachable!()
}
}
}
}

View File

@@ -120,6 +120,8 @@ pub enum P2PHandshakeError {
Timeout,
#[error("Disconnected by peer: {0}")]
Disconnected(DisconnectReason),
#[error("error decoding a message during handshake: {0}")]
DecodeError(#[from] reth_rlp::DecodeError),
}
/// An error that can occur when interacting with a [`Pinger`].

View File

@@ -99,42 +99,24 @@ where
})
}
// the u8::decode implementation handles the 0x80 case for P2PMessageID::Hello, and the
// TryFrom implementation ensures that the message id is known.
let message_id = u8::decode(&mut &first_message_bytes[..])?;
let id = P2PMessageID::try_from(message_id)?;
// The first message sent MUST be a hello OR disconnect message
//
// If the first message is a disconnect message, we should not decode using
// Decodable::decode, because the first message (either Disconnect or Hello) is not snappy
// compressed, and the Decodable implementation assumes that non-hello messages are snappy
// compressed.
match id {
P2PMessageID::Hello => {}
P2PMessageID::Disconnect => {
// the u8::decode implementation handles the 0x80 case for
// DisconnectReason::DisconnectRequested, and the TryFrom implementation ensures
// that the disconnect reason is known.
let disconnect_id = u8::decode(&mut &first_message_bytes[1..])?;
let reason = DisconnectReason::try_from(disconnect_id)?;
let their_hello = match P2PMessage::decode(&mut &first_message_bytes[..]) {
Ok(P2PMessage::Hello(hello)) => Ok(hello),
Ok(P2PMessage::Disconnect(reason)) => {
tracing::error!("Disconnected by peer during handshake: {}", reason);
counter!("p2pstream.disconnected_errors", 1);
return Err(P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(reason)))
Err(P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(reason)))
}
id => {
tracing::error!("expected hello message but received: {:?}", id);
return Err(P2PStreamError::HandshakeError(
P2PHandshakeError::NonHelloMessageInHandshake,
))
Err(err) => {
tracing::warn!(?err, msg=%hex::encode(&first_message_bytes), "Failed to decode first message from peer");
Err(P2PStreamError::HandshakeError(err.into()))
}
}
let their_hello = match P2PMessage::decode(&mut &first_message_bytes[..])? {
P2PMessage::Hello(hello) => Ok(hello),
msg => {
// Note: this should never occur due to the id check
Ok(msg) => {
tracing::error!("expected hello message but received: {:?}", msg);
Err(P2PStreamError::HandshakeError(P2PHandshakeError::NonHelloMessageInHandshake))
}
@@ -308,7 +290,12 @@ where
this.outgoing_messages.push_back(pong_bytes.into());
}
_ if id == P2PMessageID::Disconnect as u8 => {
let reason = DisconnectReason::decode(&mut &bytes[1..])?;
let reason = DisconnectReason::decode(&mut &bytes[1..]).map_err(|err| {
tracing::warn!(
?err, msg=%hex::encode(&bytes[1..]), "Failed to decode disconnect message from peer"
);
err
})?;
return Poll::Ready(Some(Err(P2PStreamError::Disconnected(reason))))
}
_ if id == P2PMessageID::Hello as u8 => {

View File

@@ -255,3 +255,56 @@ async fn test_outgoing_connect_with_single_geth() {
let incoming_peer_id = event_stream.next_session_established().await.unwrap();
assert_eq!(incoming_peer_id, geth_peer_id);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_geth_disconnect() {
reth_tracing::init_tracing();
let secret_key = SecretKey::new(&mut rand::thread_rng());
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30309);
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30310);
let config = NetworkConfig::builder(Arc::new(TestApi::default()), secret_key)
.listener_addr(reth_p2p_socket)
.discovery_addr(reth_disc_socket)
.build();
let network = NetworkManager::new(config).await.unwrap();
let handle = network.handle().clone();
tokio::task::spawn(network);
// instantiate geth and add ourselves as a peer
let temp_dir = tempfile::tempdir().unwrap().into_path();
let geth = Geth::new().disable_discovery().data_dir(temp_dir).spawn();
let geth_p2p_port = geth.p2p_port().unwrap();
let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port);
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), geth.port()).to_string();
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
// get the peer id we should be expecting
let geth_peer_id: PeerId =
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
// add geth as a peer then wait for a `SessionEstablished` event
handle.add_peer(geth_peer_id, geth_socket);
// create networkeventstream to get the next session established event easily
let mut events = handle.event_listener();
if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await {
assert_eq!(peer_id, geth_peer_id);
} else {
panic!("Expected a session established event");
}
// remove geth as a peer deliberately
handle.disconnect_peer(geth_peer_id);
// wait for a disconnect from geth
if let Some(NetworkEvent::SessionClosed { peer_id }) = events.next().await {
assert_eq!(peer_id, geth_peer_id);
} else {
panic!("Expected a session closed event");
}
}