mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
chore(eth-wire): trace handshake messages (#910)
This commit is contained in:
@ -51,7 +51,10 @@ where
|
||||
status: Status,
|
||||
fork_filter: ForkFilter,
|
||||
) -> Result<(EthStream<S>, Status), EthStreamError> {
|
||||
tracing::trace!("sending eth status ...");
|
||||
tracing::trace!(
|
||||
%status,
|
||||
"sending eth status to peer"
|
||||
);
|
||||
|
||||
// we need to encode and decode here on our own because we don't have an `EthStream` yet
|
||||
// The max length for a status with TTD is: <msg id = 1 byte> + <rlp(status) = 88 byte>
|
||||
@ -60,7 +63,7 @@ where
|
||||
let our_status_bytes = our_status_bytes.freeze();
|
||||
self.inner.send(our_status_bytes).await?;
|
||||
|
||||
tracing::trace!("waiting for eth status from peer ...");
|
||||
tracing::trace!("waiting for eth status from peer");
|
||||
let their_msg = self
|
||||
.inner
|
||||
.next()
|
||||
@ -83,6 +86,10 @@ where
|
||||
// https://github.com/ethereum/go-ethereum/blob/9244d5cd61f3ea5a7645fdf2a1a96d53421e412f/eth/protocols/eth/handshake.go#L87-L89
|
||||
match msg.message {
|
||||
EthMessage::Status(resp) => {
|
||||
tracing::trace!(
|
||||
status=%resp,
|
||||
"validating incoming eth status from peer"
|
||||
);
|
||||
if status.genesis != resp.genesis {
|
||||
return Err(EthHandshakeError::MismatchedGenesis {
|
||||
expected: status.genesis,
|
||||
|
||||
@ -77,14 +77,14 @@ where
|
||||
mut self,
|
||||
hello: HelloMessage,
|
||||
) -> Result<(P2PStream<S>, HelloMessage), P2PStreamError> {
|
||||
tracing::trace!("sending p2p hello ...");
|
||||
tracing::trace!(?hello, "sending p2p hello to peer");
|
||||
|
||||
// send our hello message with the Sink
|
||||
let mut raw_hello_bytes = BytesMut::new();
|
||||
P2PMessage::Hello(hello.clone()).encode(&mut raw_hello_bytes);
|
||||
self.inner.send(raw_hello_bytes.into()).await?;
|
||||
|
||||
tracing::trace!("waiting for p2p hello from peer ...");
|
||||
tracing::trace!("waiting for p2p hello from peer");
|
||||
|
||||
let first_message_bytes = tokio::time::timeout(HANDSHAKE_TIMEOUT, self.inner.next())
|
||||
.await
|
||||
@ -123,6 +123,11 @@ where
|
||||
}
|
||||
}?;
|
||||
|
||||
tracing::trace!(
|
||||
hello=?their_hello,
|
||||
"validating incoming p2p hello from peer"
|
||||
);
|
||||
|
||||
// TODO: explicitly document that we only support v5.
|
||||
if their_hello.protocol_version != ProtocolVersion::V5 {
|
||||
// TODO: do we want to send a `Disconnect` message here?
|
||||
@ -219,14 +224,16 @@ impl<S> P2PStream<S> {
|
||||
let mut buf = BytesMut::with_capacity(disconnect.length());
|
||||
disconnect.encode(&mut buf);
|
||||
|
||||
tracing::trace!(
|
||||
fromlen=%buf.len(),
|
||||
msg=%hex::encode(&buf),
|
||||
"Compressing disconnect message",
|
||||
);
|
||||
|
||||
let mut compressed = BytesMut::zeroed(1 + snap::raw::max_compress_len(buf.len() - 1));
|
||||
let compressed_size = self.encoder.compress(&buf[1..], &mut compressed[1..])?;
|
||||
let compressed_size =
|
||||
self.encoder.compress(&buf[1..], &mut compressed[1..]).map_err(|err| {
|
||||
tracing::debug!(
|
||||
?err,
|
||||
msg=%hex::encode(&buf[1..]),
|
||||
"error compressing disconnect"
|
||||
);
|
||||
err
|
||||
})?;
|
||||
|
||||
// truncate the compressed buffer to the actual compressed size (plus one for the message
|
||||
// id)
|
||||
@ -236,12 +243,6 @@ impl<S> P2PStream<S> {
|
||||
// message
|
||||
compressed[0] = buf[0];
|
||||
|
||||
tracing::trace!(
|
||||
tolen=%compressed.len(),
|
||||
compressed=%hex::encode(&compressed),
|
||||
"Compressed disconnect message",
|
||||
);
|
||||
|
||||
self.outgoing_messages.push_back(compressed.freeze());
|
||||
self.disconnecting = true;
|
||||
Ok(())
|
||||
@ -326,16 +327,16 @@ where
|
||||
// the message ID byte, which is the first byte in this buffer
|
||||
let mut decompress_buf = BytesMut::zeroed(decompressed_len + 1);
|
||||
|
||||
tracing::trace!(
|
||||
fromlen=%bytes.len(),
|
||||
tolen=%decompress_buf.len(),
|
||||
msg=%hex::encode(&bytes),
|
||||
"Decompressing message",
|
||||
);
|
||||
|
||||
// each message following a successful handshake is compressed with snappy, so we need
|
||||
// to decompress the message before we can decode it.
|
||||
this.decoder.decompress(&bytes[1..], &mut decompress_buf[1..])?;
|
||||
this.decoder.decompress(&bytes[1..], &mut decompress_buf[1..]).map_err(|err| {
|
||||
tracing::debug!(
|
||||
?err,
|
||||
msg=%hex::encode(&bytes[1..]),
|
||||
"error decompressing p2p message"
|
||||
);
|
||||
err
|
||||
})?;
|
||||
|
||||
let id = *bytes.first().ok_or(P2PStreamError::EmptyProtocolMessage)?;
|
||||
match id {
|
||||
@ -442,14 +443,16 @@ where
|
||||
return Err(P2PStreamError::SendBufferFull)
|
||||
}
|
||||
|
||||
tracing::trace!(
|
||||
fromlen=%item.len(),
|
||||
msg=%hex::encode(&item),
|
||||
"Compressing message",
|
||||
);
|
||||
|
||||
let mut compressed = BytesMut::zeroed(1 + snap::raw::max_compress_len(item.len() - 1));
|
||||
let compressed_size = this.encoder.compress(&item[1..], &mut compressed[1..])?;
|
||||
let compressed_size =
|
||||
this.encoder.compress(&item[1..], &mut compressed[1..]).map_err(|err| {
|
||||
tracing::debug!(
|
||||
?err,
|
||||
msg=%hex::encode(&item[1..]),
|
||||
"error compressing p2p message"
|
||||
);
|
||||
err
|
||||
})?;
|
||||
|
||||
// truncate the compressed buffer to the actual compressed size (plus one for the message
|
||||
// id)
|
||||
|
||||
Reference in New Issue
Block a user