From 43a0a2b1606786f0a07c02bd2bdcbbfc1ffdb9a0 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Fri, 4 Nov 2022 03:14:41 -0400 Subject: [PATCH] feat(eth-wire): return Hello on handshake (#161) * feat(eth-wire): return Hello on handshake * update P2PStream docs based on new auth design * cargo fmt * make clippy happy --- crates/net/eth-wire/src/ethstream.rs | 6 +++--- crates/net/eth-wire/src/p2pstream.rs | 29 ++++++++++++++++------------ 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/crates/net/eth-wire/src/ethstream.rs b/crates/net/eth-wire/src/ethstream.rs index 44ae90157..325db58ff 100644 --- a/crates/net/eth-wire/src/ethstream.rs +++ b/crates/net/eth-wire/src/ethstream.rs @@ -64,7 +64,7 @@ where let msg = self .next() .await - .ok_or_else(|| EthStreamError::HandshakeError(HandshakeError::NoResponse))??; + .ok_or(EthStreamError::HandshakeError(HandshakeError::NoResponse))??; // TODO: Add any missing checks // https://github.com/ethereum/go-ethereum/blob/9244d5cd61f3ea5a7645fdf2a1a96d53421e412f/eth/protocols/eth/handshake.go#L87-L89 @@ -348,7 +348,7 @@ mod tests { }; let unauthed_stream = UnauthedP2PStream::new(stream); - let p2p_stream = unauthed_stream.handshake(server_hello).await.unwrap(); + let (p2p_stream, _) = unauthed_stream.handshake(server_hello).await.unwrap(); let mut eth_stream = EthStream::new(p2p_stream); eth_stream.handshake(status_copy, fork_filter_clone).await.unwrap(); @@ -377,7 +377,7 @@ mod tests { }; let unauthed_stream = UnauthedP2PStream::new(sink); - let p2p_stream = unauthed_stream.handshake(client_hello).await.unwrap(); + let (p2p_stream, _) = unauthed_stream.handshake(client_hello).await.unwrap(); let mut client_stream = EthStream::new(p2p_stream); client_stream.handshake(status, fork_filter).await.unwrap(); diff --git a/crates/net/eth-wire/src/p2pstream.rs b/crates/net/eth-wire/src/p2pstream.rs index 82f7fed52..614b426c2 100644 --- a/crates/net/eth-wire/src/p2pstream.rs +++ b/crates/net/eth-wire/src/p2pstream.rs @@ -51,8 +51,8 @@ const GRACE_PERIOD: Duration = Duration::from_secs(2); /// from a peer. const MAX_FAILED_PINGS: u8 = 3; -/// An un-authenticated `P2PStream`. This is consumed and returns a [`P2PStream`] after the `Hello` -/// handshake is completed. +/// An un-authenticated [`P2PStream`]. This is consumed and returns a [`P2PStream`] after the +/// `Hello` handshake is completed. #[pin_project] pub struct UnauthedP2PStream { #[pin] @@ -71,8 +71,11 @@ where S: Stream> + Sink + Unpin, { /// Consumes the `UnauthedP2PStream` and returns a `P2PStream` after the `Hello` handshake is - /// completed. - pub async fn handshake(mut self, hello: HelloMessage) -> Result, P2PStreamError> { + /// completed successfully. This also returns the `Hello` message sent by the remote peer. + pub async fn handshake( + mut self, + hello: HelloMessage, + ) -> Result<(P2PStream, HelloMessage), P2PStreamError> { tracing::trace!("sending p2p hello ..."); // send our hello message with the Sink @@ -97,7 +100,7 @@ where } // get the message id - let id = *hello_bytes.first().ok_or_else(|| P2PStreamError::EmptyProtocolMessage)?; + let id = *hello_bytes.first().ok_or(P2PStreamError::EmptyProtocolMessage)?; // the first message sent MUST be the hello message if id != P2PMessageID::Hello as u8 { @@ -124,11 +127,12 @@ where } // determine shared capabilities (currently returns only one capability) - let capability = set_capability_offsets(hello.capabilities, their_hello.capabilities)?; + let capability = + set_capability_offsets(hello.capabilities, their_hello.capabilities.clone())?; let stream = P2PStream::new(self.inner, capability); - Ok(stream) + Ok((stream, their_hello)) } } @@ -153,8 +157,9 @@ pub struct P2PStream { } impl P2PStream { - /// Create a new unauthed [`P2PStream`] from the provided stream. You will need to manually - /// handshake with a peer. + /// Create a new [`P2PStream`] from the provided stream. + /// New [`P2PStream`]s are assumed to have completed the `p2p` handshake successfully and are + /// ready to send and receive subprotocol messages. pub fn new(inner: S, capability: SharedCapability) -> Self { Self { inner, @@ -388,7 +393,7 @@ pub fn set_capability_offsets( // capability with the lowest offset. Ok(shared_with_offsets .first() - .ok_or_else(|| P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))? + .ok_or(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))? .clone()) } @@ -736,7 +741,7 @@ mod tests { }; let unauthed_stream = UnauthedP2PStream::new(stream); - let p2p_stream = unauthed_stream.handshake(server_hello).await.unwrap(); + let (p2p_stream, _) = unauthed_stream.handshake(server_hello).await.unwrap(); // ensure that the two share a single capability, eth67 assert_eq!( @@ -762,7 +767,7 @@ mod tests { }; let unauthed_stream = UnauthedP2PStream::new(sink); - let p2p_stream = unauthed_stream.handshake(client_hello).await.unwrap(); + let (p2p_stream, _) = unauthed_stream.handshake(client_hello).await.unwrap(); // ensure that the two share a single capability, eth67 assert_eq!(