feat(eth-wire): return Hello on handshake (#161)

* feat(eth-wire): return Hello on handshake

* update P2PStream docs based on new auth design

* cargo fmt

* make clippy happy
This commit is contained in:
Dan Cline
2022-11-04 03:14:41 -04:00
committed by GitHub
parent ccdf2ffa92
commit 43a0a2b160
2 changed files with 20 additions and 15 deletions

View File

@ -64,7 +64,7 @@ where
let msg = self let msg = self
.next() .next()
.await .await
.ok_or_else(|| EthStreamError::HandshakeError(HandshakeError::NoResponse))??; .ok_or(EthStreamError::HandshakeError(HandshakeError::NoResponse))??;
// TODO: Add any missing checks // TODO: Add any missing checks
// https://github.com/ethereum/go-ethereum/blob/9244d5cd61f3ea5a7645fdf2a1a96d53421e412f/eth/protocols/eth/handshake.go#L87-L89 // https://github.com/ethereum/go-ethereum/blob/9244d5cd61f3ea5a7645fdf2a1a96d53421e412f/eth/protocols/eth/handshake.go#L87-L89
@ -348,7 +348,7 @@ mod tests {
}; };
let unauthed_stream = UnauthedP2PStream::new(stream); let unauthed_stream = UnauthedP2PStream::new(stream);
let p2p_stream = unauthed_stream.handshake(server_hello).await.unwrap(); let (p2p_stream, _) = unauthed_stream.handshake(server_hello).await.unwrap();
let mut eth_stream = EthStream::new(p2p_stream); let mut eth_stream = EthStream::new(p2p_stream);
eth_stream.handshake(status_copy, fork_filter_clone).await.unwrap(); eth_stream.handshake(status_copy, fork_filter_clone).await.unwrap();
@ -377,7 +377,7 @@ mod tests {
}; };
let unauthed_stream = UnauthedP2PStream::new(sink); let unauthed_stream = UnauthedP2PStream::new(sink);
let p2p_stream = unauthed_stream.handshake(client_hello).await.unwrap(); let (p2p_stream, _) = unauthed_stream.handshake(client_hello).await.unwrap();
let mut client_stream = EthStream::new(p2p_stream); let mut client_stream = EthStream::new(p2p_stream);
client_stream.handshake(status, fork_filter).await.unwrap(); client_stream.handshake(status, fork_filter).await.unwrap();

View File

@ -51,8 +51,8 @@ const GRACE_PERIOD: Duration = Duration::from_secs(2);
/// from a peer. /// from a peer.
const MAX_FAILED_PINGS: u8 = 3; const MAX_FAILED_PINGS: u8 = 3;
/// An un-authenticated `P2PStream`. This is consumed and returns a [`P2PStream`] after the `Hello` /// An un-authenticated [`P2PStream`]. This is consumed and returns a [`P2PStream`] after the
/// handshake is completed. /// `Hello` handshake is completed.
#[pin_project] #[pin_project]
pub struct UnauthedP2PStream<S> { pub struct UnauthedP2PStream<S> {
#[pin] #[pin]
@ -71,8 +71,11 @@ where
S: Stream<Item = Result<BytesMut, io::Error>> + Sink<Bytes, Error = io::Error> + Unpin, S: Stream<Item = Result<BytesMut, io::Error>> + Sink<Bytes, Error = io::Error> + Unpin,
{ {
/// Consumes the `UnauthedP2PStream` and returns a `P2PStream` after the `Hello` handshake is /// Consumes the `UnauthedP2PStream` and returns a `P2PStream` after the `Hello` handshake is
/// completed. /// completed successfully. This also returns the `Hello` message sent by the remote peer.
pub async fn handshake(mut self, hello: HelloMessage) -> Result<P2PStream<S>, P2PStreamError> { pub async fn handshake(
mut self,
hello: HelloMessage,
) -> Result<(P2PStream<S>, HelloMessage), P2PStreamError> {
tracing::trace!("sending p2p hello ..."); tracing::trace!("sending p2p hello ...");
// send our hello message with the Sink // send our hello message with the Sink
@ -97,7 +100,7 @@ where
} }
// get the message id // get the message id
let id = *hello_bytes.first().ok_or_else(|| P2PStreamError::EmptyProtocolMessage)?; let id = *hello_bytes.first().ok_or(P2PStreamError::EmptyProtocolMessage)?;
// the first message sent MUST be the hello message // the first message sent MUST be the hello message
if id != P2PMessageID::Hello as u8 { if id != P2PMessageID::Hello as u8 {
@ -124,11 +127,12 @@ where
} }
// determine shared capabilities (currently returns only one capability) // determine shared capabilities (currently returns only one capability)
let capability = set_capability_offsets(hello.capabilities, their_hello.capabilities)?; let capability =
set_capability_offsets(hello.capabilities, their_hello.capabilities.clone())?;
let stream = P2PStream::new(self.inner, capability); let stream = P2PStream::new(self.inner, capability);
Ok(stream) Ok((stream, their_hello))
} }
} }
@ -153,8 +157,9 @@ pub struct P2PStream<S> {
} }
impl<S> P2PStream<S> { impl<S> P2PStream<S> {
/// Create a new unauthed [`P2PStream`] from the provided stream. You will need to manually /// Create a new [`P2PStream`] from the provided stream.
/// handshake with a peer. /// New [`P2PStream`]s are assumed to have completed the `p2p` handshake successfully and are
/// ready to send and receive subprotocol messages.
pub fn new(inner: S, capability: SharedCapability) -> Self { pub fn new(inner: S, capability: SharedCapability) -> Self {
Self { Self {
inner, inner,
@ -388,7 +393,7 @@ pub fn set_capability_offsets(
// capability with the lowest offset. // capability with the lowest offset.
Ok(shared_with_offsets Ok(shared_with_offsets
.first() .first()
.ok_or_else(|| P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))? .ok_or(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))?
.clone()) .clone())
} }
@ -736,7 +741,7 @@ mod tests {
}; };
let unauthed_stream = UnauthedP2PStream::new(stream); let unauthed_stream = UnauthedP2PStream::new(stream);
let p2p_stream = unauthed_stream.handshake(server_hello).await.unwrap(); let (p2p_stream, _) = unauthed_stream.handshake(server_hello).await.unwrap();
// ensure that the two share a single capability, eth67 // ensure that the two share a single capability, eth67
assert_eq!( assert_eq!(
@ -762,7 +767,7 @@ mod tests {
}; };
let unauthed_stream = UnauthedP2PStream::new(sink); let unauthed_stream = UnauthedP2PStream::new(sink);
let p2p_stream = unauthed_stream.handshake(client_hello).await.unwrap(); let (p2p_stream, _) = unauthed_stream.handshake(client_hello).await.unwrap();
// ensure that the two share a single capability, eth67 // ensure that the two share a single capability, eth67
assert_eq!( assert_eq!(