diff --git a/.dprint-cache/locks/.4365269138195617005.lock b/.dprint-cache/locks/.4365269138195617005.lock new file mode 100644 index 00000000000..e69de29bb2d diff --git a/.dprint-cache/locks/.4365269138195617005.lock.poll b/.dprint-cache/locks/.4365269138195617005.lock.poll new file mode 100644 index 00000000000..56a6051ca2b --- /dev/null +++ b/.dprint-cache/locks/.4365269138195617005.lock.poll @@ -0,0 +1 @@ +1 \ No newline at end of file diff --git a/.dprint-cache/plugin-cache-manifest.json b/.dprint-cache/plugin-cache-manifest.json new file mode 100644 index 00000000000..a28cc8bc110 --- /dev/null +++ b/.dprint-cache/plugin-cache-manifest.json @@ -0,0 +1 @@ +{"schemaVersion":8,"wasmCacheVersion":"6.1.0-rc.3","plugins":{}} \ No newline at end of file diff --git a/crates/net/eth-wire-types/src/broadcast.rs b/crates/net/eth-wire-types/src/broadcast.rs index 1900cf004aa..9855f6f6cb8 100644 --- a/crates/net/eth-wire-types/src/broadcast.rs +++ b/crates/net/eth-wire-types/src/broadcast.rs @@ -169,7 +169,7 @@ impl NewPooledTransactionHashes { matches!(version, EthVersion::Eth67 | EthVersion::Eth66) } Self::Eth68(_) => { - matches!(version, EthVersion::Eth68 | EthVersion::Eth69) + matches!(version, EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70) } } } diff --git a/crates/net/eth-wire-types/src/capability.rs b/crates/net/eth-wire-types/src/capability.rs index f7cd00671f8..d35e4c17ee9 100644 --- a/crates/net/eth-wire-types/src/capability.rs +++ b/crates/net/eth-wire-types/src/capability.rs @@ -100,6 +100,16 @@ impl Capability { Self::eth(EthVersion::Eth68) } + /// Returns the [`EthVersion::Eth69`] capability. + pub const fn eth_69() -> Self { + Self::eth(EthVersion::Eth69) + } + + /// Returns the [`EthVersion::Eth70`] capability. + pub const fn eth_70() -> Self { + Self::eth(EthVersion::Eth70) + } + /// Whether this is eth v66 protocol. #[inline] pub fn is_eth_v66(&self) -> bool { @@ -118,10 +128,26 @@ impl Capability { self.name == "eth" && self.version == 68 } + /// Whether this is eth v69. + #[inline] + pub fn is_eth_v69(&self) -> bool { + self.name == "eth" && self.version == 69 + } + + /// Whether this is eth v70. + #[inline] + pub fn is_eth_v70(&self) -> bool { + self.name == "eth" && self.version == 70 + } + /// Whether this is any eth version. #[inline] pub fn is_eth(&self) -> bool { - self.is_eth_v66() || self.is_eth_v67() || self.is_eth_v68() + self.is_eth_v66() || + self.is_eth_v67() || + self.is_eth_v68() || + self.is_eth_v69() || + self.is_eth_v70() } } @@ -141,7 +167,7 @@ impl From for Capability { #[cfg(any(test, feature = "arbitrary"))] impl<'a> arbitrary::Arbitrary<'a> for Capability { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { - let version = u.int_in_range(66..=69)?; // Valid eth protocol versions are 66-69 + let version = u.int_in_range(66..=70)?; // Valid eth protocol versions are 66-70 // Only generate valid eth protocol name for now since it's the only supported protocol Ok(Self::new_static("eth", version)) } @@ -155,6 +181,8 @@ pub struct Capabilities { eth_66: bool, eth_67: bool, eth_68: bool, + eth_69: bool, + eth_70: bool, } impl Capabilities { @@ -164,6 +192,8 @@ impl Capabilities { eth_66: value.iter().any(Capability::is_eth_v66), eth_67: value.iter().any(Capability::is_eth_v67), eth_68: value.iter().any(Capability::is_eth_v68), + eth_69: value.iter().any(Capability::is_eth_v69), + eth_70: value.iter().any(Capability::is_eth_v70), inner: value, } } @@ -182,7 +212,7 @@ impl Capabilities { /// Whether the peer supports `eth` sub-protocol. #[inline] pub const fn supports_eth(&self) -> bool { - self.eth_68 || self.eth_67 || self.eth_66 + self.eth_70 || self.eth_69 || self.eth_68 || self.eth_67 || self.eth_66 } /// Whether this peer supports eth v66 protocol. @@ -202,6 +232,18 @@ impl Capabilities { pub const fn supports_eth_v68(&self) -> bool { self.eth_68 } + + /// Whether this peer supports eth v69 protocol. + #[inline] + pub const fn supports_eth_v69(&self) -> bool { + self.eth_69 + } + + /// Whether this peer supports eth v70 protocol. + #[inline] + pub const fn supports_eth_v70(&self) -> bool { + self.eth_70 + } } impl From> for Capabilities { @@ -224,6 +266,8 @@ impl Decodable for Capabilities { eth_66: inner.iter().any(Capability::is_eth_v66), eth_67: inner.iter().any(Capability::is_eth_v67), eth_68: inner.iter().any(Capability::is_eth_v68), + eth_69: inner.iter().any(Capability::is_eth_v69), + eth_70: inner.iter().any(Capability::is_eth_v70), inner, }) } diff --git a/crates/net/eth-wire-types/src/lib.rs b/crates/net/eth-wire-types/src/lib.rs index b7d27227846..d5189cae256 100644 --- a/crates/net/eth-wire-types/src/lib.rs +++ b/crates/net/eth-wire-types/src/lib.rs @@ -12,7 +12,7 @@ extern crate alloc; mod status; -pub use status::{Status, StatusBuilder, StatusEth69, StatusMessage, UnifiedStatus}; +pub use status::{Status, StatusBuilder, StatusEth69, StatusEth70, StatusMessage, UnifiedStatus}; pub mod version; pub use version::{EthVersion, ProtocolVersion}; diff --git a/crates/net/eth-wire-types/src/message.rs b/crates/net/eth-wire-types/src/message.rs index 5f36115204b..2bdb1d22c6a 100644 --- a/crates/net/eth-wire-types/src/message.rs +++ b/crates/net/eth-wire-types/src/message.rs @@ -1,4 +1,4 @@ -//! Implements Ethereum wire protocol for versions 66, 67, and 68. +//! Implements Ethereum wire protocol for versions 66 through 70. //! Defines structs/enums for messages, request-response pairs, and broadcasts. //! Handles compatibility with [`EthVersion`]. //! @@ -8,13 +8,13 @@ use super::{ broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, - GetNodeData, GetPooledTransactions, GetReceipts, NewPooledTransactionHashes66, + GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NewPooledTransactionHashes66, NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69, - Transactions, + StatusEth70, Transactions, }; use crate::{ status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives, - RawCapabilityMessage, Receipts69, SharedTransactions, + RawCapabilityMessage, Receipts69, Receipts70, SharedTransactions, }; use alloc::{boxed::Box, string::String, sync::Arc}; use alloy_primitives::{ @@ -66,10 +66,12 @@ impl ProtocolMessage { // For EIP-7642 (https://github.com/ethereum/EIPs/blob/master/EIPS/eip-7642.md): // pre-merge (legacy) status messages include total difficulty, whereas eth/69 omits it. let message = match message_type { - EthMessageID::Status => EthMessage::Status(if version < EthVersion::Eth69 { - StatusMessage::Legacy(Status::decode(buf)?) - } else { + EthMessageID::Status => EthMessage::Status(if version >= EthVersion::Eth70 { + StatusMessage::Eth70(StatusEth70::decode(buf)?) + } else if version >= EthVersion::Eth69 { StatusMessage::Eth69(StatusEth69::decode(buf)?) + } else { + StatusMessage::Legacy(Status::decode(buf)?) }), EthMessageID::NewBlockHashes => { EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?) @@ -111,13 +113,24 @@ impl ProtocolMessage { } EthMessage::NodeData(RequestPair::decode(buf)?) } - EthMessageID::GetReceipts => EthMessage::GetReceipts(RequestPair::decode(buf)?), + EthMessageID::GetReceipts => { + if version >= EthVersion::Eth70 { + EthMessage::GetReceipts70(crate::receipts::GetReceipts70::decode(buf)?) + } else { + EthMessage::GetReceipts(RequestPair::decode(buf)?) + } + } EthMessageID::Receipts => { if version < EthVersion::Eth69 { EthMessage::Receipts(RequestPair::decode(buf)?) - } else { + } else if version < EthVersion::Eth70 { // with eth69, receipts no longer include the bloom EthMessage::Receipts69(RequestPair::decode(buf)?) + } else { + // eth/70 continues to omit bloom filters and adds the + // `lastBlockIncomplete` flag, encoded as + // `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`. + EthMessage::Receipts70(crate::receipts::Receipts70::::decode(buf)?) } } EthMessageID::BlockRangeUpdate => { @@ -205,6 +218,9 @@ impl From> for ProtocolBroadcastMes /// /// The `eth/69` announces the historical block range served by the node. Removes total difficulty /// information. And removes the Bloom field from receipts transferred over the protocol. +/// +/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts +/// requests/responses. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum EthMessage { @@ -259,6 +275,12 @@ pub enum EthMessage { NodeData(RequestPair), /// Represents a `GetReceipts` request-response pair. GetReceipts(RequestPair), + /// Represents a `GetReceipts` request for eth/70. + /// + /// Note: Unlike earlier protocol versions, the eth/70 encoding for + /// `GetReceipts` in EIP-7975 inlines the request id. The type still wraps + /// a [`RequestPair`], but with a custom inline encoding. + GetReceipts70(GetReceipts70), /// Represents a Receipts request-response pair. #[cfg_attr( feature = "serde", @@ -271,6 +293,16 @@ pub enum EthMessage { serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned") )] Receipts69(RequestPair>), + /// Represents a Receipts request-response pair for eth/70. + #[cfg_attr( + feature = "serde", + serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned") + )] + /// + /// Note: The eth/70 encoding for `Receipts` in EIP-7975 inlines the + /// request id. The type still wraps a [`RequestPair`], but with a custom + /// inline encoding. + Receipts70(Receipts70), /// Represents a `BlockRangeUpdate` message broadcast to the network. #[cfg_attr( feature = "serde", @@ -300,8 +332,8 @@ impl EthMessage { Self::PooledTransactions(_) => EthMessageID::PooledTransactions, Self::GetNodeData(_) => EthMessageID::GetNodeData, Self::NodeData(_) => EthMessageID::NodeData, - Self::GetReceipts(_) => EthMessageID::GetReceipts, - Self::Receipts(_) | Self::Receipts69(_) => EthMessageID::Receipts, + Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts, + Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts, Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate, Self::Other(msg) => EthMessageID::Other(msg.id as u8), } @@ -314,6 +346,7 @@ impl EthMessage { Self::GetBlockBodies(_) | Self::GetBlockHeaders(_) | Self::GetReceipts(_) | + Self::GetReceipts70(_) | Self::GetPooledTransactions(_) | Self::GetNodeData(_) ) @@ -326,6 +359,7 @@ impl EthMessage { Self::PooledTransactions(_) | Self::Receipts(_) | Self::Receipts69(_) | + Self::Receipts70(_) | Self::BlockHeaders(_) | Self::BlockBodies(_) | Self::NodeData(_) @@ -351,8 +385,10 @@ impl Encodable for EthMessage { Self::GetNodeData(request) => request.encode(out), Self::NodeData(data) => data.encode(out), Self::GetReceipts(request) => request.encode(out), + Self::GetReceipts70(request) => request.encode(out), Self::Receipts(receipts) => receipts.encode(out), Self::Receipts69(receipt69) => receipt69.encode(out), + Self::Receipts70(receipt70) => receipt70.encode(out), Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out), Self::Other(unknown) => out.put_slice(&unknown.payload), } @@ -374,8 +410,10 @@ impl Encodable for EthMessage { Self::GetNodeData(request) => request.length(), Self::NodeData(data) => data.length(), Self::GetReceipts(request) => request.length(), + Self::GetReceipts70(request) => request.length(), Self::Receipts(receipts) => receipts.length(), Self::Receipts69(receipt69) => receipt69.length(), + Self::Receipts70(receipt70) => receipt70.length(), Self::BlockRangeUpdate(block_range_update) => block_range_update.length(), Self::Other(unknown) => unknown.length(), } @@ -597,6 +635,106 @@ impl RequestPair { } } +impl RequestPair { + /// Encodes the request id and payload in a flattened list: + /// `[request-id, firstBlockReceiptIndex, [blockhashes...]]`. + pub fn encode_inline(&self, out: &mut dyn alloy_rlp::BufMut) { + let payload_length = self.request_id.length() + + self.message.first_block_receipt_index.length() + + self.message.block_hashes.length(); + let header = Header { list: true, payload_length }; + header.encode(out); + self.request_id.encode(out); + self.message.first_block_receipt_index.encode(out); + self.message.block_hashes.encode(out); + } + + /// Returns the length of the flattened encoding. + pub fn length_inline(&self) -> usize { + let mut length = 0; + length += self.request_id.length(); + length += self.message.first_block_receipt_index.length(); + length += self.message.block_hashes.length(); + length += length_of_length(length); + length + } + + /// Decodes the flattened eth/70 `GetReceipts` payload. + pub fn decode_inline(buf: &mut &[u8]) -> alloy_rlp::Result { + let header = Header::decode(buf)?; + let initial_length = buf.len(); + let request_id = u64::decode(buf)?; + let first_block_receipt_index = u64::decode(buf)?; + let block_hashes = Vec::::decode(buf)?; + + let consumed_len = initial_length - buf.len(); + if consumed_len != header.payload_length { + return Err(alloy_rlp::Error::UnexpectedLength) + } + + Ok(Self { + request_id, + message: crate::receipts::GetReceipts70Payload { + first_block_receipt_index, + block_hashes, + }, + }) + } +} + +impl RequestPair> { + /// Encodes the request id and payload in a flattened list: + /// `[request-id, lastBlockIncomplete, [[receipts...], ...]]`. + pub fn encode_inline(&self, out: &mut dyn alloy_rlp::BufMut) + where + T: Encodable, + { + let payload_length = self.request_id.length() + + self.message.last_block_incomplete.length() + + self.message.receipts.length(); + let header = Header { list: true, payload_length }; + header.encode(out); + self.request_id.encode(out); + self.message.last_block_incomplete.encode(out); + self.message.receipts.encode(out); + } + + /// Returns the length of the flattened encoding. + pub fn length_inline(&self) -> usize + where + T: Encodable, + { + let mut length = 0; + length += self.request_id.length(); + length += self.message.last_block_incomplete.length(); + length += self.message.receipts.length(); + length += length_of_length(length); + length + } + + /// Decodes the flattened eth/70 Receipts payload. + pub fn decode_inline(buf: &mut &[u8]) -> alloy_rlp::Result + where + T: Decodable, + { + let header = Header::decode(buf)?; + let initial_length = buf.len(); + let request_id = u64::decode(buf)?; + let last_block_incomplete = bool::decode(buf)?; + let receipts = Vec::>::decode(buf)?; + + let consumed_len = initial_length - buf.len(); + if consumed_len != header.payload_length { + return Err(alloy_rlp::Error::UnexpectedLength) + } + + Ok(Self { + request_id, + message: crate::receipts::Receipts70Payload { last_block_incomplete, receipts }, + }) + } +} + /// Allows messages with request ids to be serialized into RLP bytes. impl Encodable for RequestPair where diff --git a/crates/net/eth-wire-types/src/receipts.rs b/crates/net/eth-wire-types/src/receipts.rs index 5efa31ede04..7ebc9e07073 100644 --- a/crates/net/eth-wire-types/src/receipts.rs +++ b/crates/net/eth-wire-types/src/receipts.rs @@ -17,6 +17,44 @@ pub struct GetReceipts( pub Vec, ); +/// Eth/70 `GetReceipts` request payload that supports partial receipt queries. +/// +/// When used with eth/70, the request id is carried by the surrounding +/// [`crate::message::RequestPair`], and the on-wire shape is the flattened list +/// `[request-id, firstBlockReceiptIndex, [blockhash₁, ...]]`. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] +pub struct GetReceipts70Payload { + /// Index into the receipts of the first requested block hash. + pub first_block_receipt_index: u64, + /// The block hashes to request receipts for. + pub block_hashes: Vec, +} + +/// Helper type for the full eth/70 request carrying the request id alongside +/// the payload. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] +pub struct GetReceipts70(pub crate::message::RequestPair); + +impl alloy_rlp::Encodable for GetReceipts70 { + fn encode(&self, out: &mut dyn alloy_rlp::BufMut) { + self.0.encode_inline(out); + } + + fn length(&self) -> usize { + self.0.length_inline() + } +} + +impl alloy_rlp::Decodable for GetReceipts70 { + fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { + crate::message::RequestPair::::decode_inline(buf).map(Self) + } +} + /// The response to [`GetReceipts`], containing receipt lists that correspond to each block /// requested. #[derive(Clone, Debug, PartialEq, Eq, Default)] @@ -58,7 +96,13 @@ pub struct Receipts69(pub Vec>); impl Receipts69 { /// Encodes all receipts with the bloom filter. /// - /// Note: This is an expensive operation that recalculates the bloom for each receipt. + /// Eth/69 omits bloom filters on the wire, while some internal callers + /// (and legacy APIs) still operate on [`Receipts`] with + /// [`ReceiptWithBloom`]. This helper reconstructs the bloom locally from + /// each receipt's logs so the older API can be used on top of eth/69 data. + /// + /// Note: This is an expensive operation that recalculates the bloom for + /// every receipt. pub fn into_with_bloom(self) -> Receipts { Receipts( self.0 @@ -75,6 +119,72 @@ impl From> for Receipts { } } +/// Eth/70 `Receipts` response payload. +/// +/// This is used in conjunction with [`crate::message::RequestPair`] to encode the full wire +/// message `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] +pub struct Receipts70Payload { + /// Whether the receipts list for the last block is incomplete. + pub last_block_incomplete: bool, + /// Receipts grouped by block. + pub receipts: Vec>, +} + +/// Helper type for the full eth/70 response carrying the request id alongside +/// the payload. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] +pub struct Receipts70(pub crate::message::RequestPair>); + +impl alloy_rlp::Encodable for Receipts70 +where + T: alloy_rlp::Encodable, +{ + fn encode(&self, out: &mut dyn alloy_rlp::BufMut) { + self.0.encode_inline(out); + } + + fn length(&self) -> usize { + self.0.length_inline() + } +} + +impl alloy_rlp::Decodable for Receipts70 +where + T: alloy_rlp::Decodable, +{ + fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { + crate::message::RequestPair::>::decode_inline(buf).map(Self) + } +} + +impl Receipts70Payload { + /// Encodes all receipts with the bloom filter. + /// + /// Just like eth/69, eth/70 does not transmit bloom filters over the wire. + /// When higher layers still expect the older bloom-bearing [`Receipts`] + /// type, this helper converts the eth/70 payload into that shape by + /// recomputing the bloom locally from the contained receipts. + /// + /// Note: This is an expensive operation that recalculates the bloom for + /// every receipt. + pub fn into_with_bloom(self) -> Receipts { + // Reuse the eth/69 helper, since both variants carry the same + // receipt list shape (only eth/70 adds request metadata). + Receipts69(self.receipts).into_with_bloom() + } +} + +impl From> for Receipts { + fn from(receipts: Receipts70) -> Self { + receipts.0.message.into_with_bloom() + } +} + #[cfg(test)] mod tests { use super::*; @@ -225,4 +335,72 @@ mod tests { let encoded = alloy_rlp::encode(&request); assert_eq!(encoded, data); } + + #[test] + fn encode_get_receipts70_inline_shape() { + let req = GetReceipts70(RequestPair { + request_id: 1111, + message: GetReceipts70Payload { + first_block_receipt_index: 0, + block_hashes: vec![ + hex!("00000000000000000000000000000000000000000000000000000000deadc0de").into(), + hex!("00000000000000000000000000000000000000000000000000000000feedbeef").into(), + ], + }, + }); + + let mut out = vec![]; + req.encode(&mut out); + + let mut buf = out.as_slice(); + let header = alloy_rlp::Header::decode(&mut buf).unwrap(); + let payload_start = buf.len(); + let request_id = u64::decode(&mut buf).unwrap(); + let first_block_receipt_index = u64::decode(&mut buf).unwrap(); + let block_hashes = Vec::::decode(&mut buf).unwrap(); + + assert!(buf.is_empty(), "buffer not fully consumed"); + assert_eq!(request_id, 1111); + assert_eq!(first_block_receipt_index, 0); + assert_eq!(block_hashes.len(), 2); + // ensure payload length matches header + assert_eq!(payload_start - buf.len(), header.payload_length); + + let mut buf = out.as_slice(); + let decoded = GetReceipts70::decode(&mut buf).unwrap(); + assert!(buf.is_empty(), "buffer not fully consumed on decode"); + assert_eq!(decoded, req); + } + + #[test] + fn encode_receipts70_inline_shape() { + let payload: Receipts70Payload = Receipts70Payload { + last_block_incomplete: true, + receipts: vec![vec![Receipt::default()]], + }; + + let resp = Receipts70(RequestPair { request_id: 7, message: payload }); + + let mut out = vec![]; + resp.encode(&mut out); + + let mut buf = out.as_slice(); + let header = alloy_rlp::Header::decode(&mut buf).unwrap(); + let payload_start = buf.len(); + let request_id = u64::decode(&mut buf).unwrap(); + let last_block_incomplete = bool::decode(&mut buf).unwrap(); + let receipts = Vec::>::decode(&mut buf).unwrap(); + + assert!(buf.is_empty(), "buffer not fully consumed"); + assert_eq!(payload_start - buf.len(), header.payload_length); + assert_eq!(request_id, 7); + assert!(last_block_incomplete); + assert_eq!(receipts.len(), 1); + assert_eq!(receipts[0].len(), 1); + + let mut buf = out.as_slice(); + let decoded = Receipts70::decode(&mut buf).unwrap(); + assert!(buf.is_empty(), "buffer not fully consumed on decode"); + assert_eq!(decoded, resp); + } } diff --git a/crates/net/eth-wire-types/src/status.rs b/crates/net/eth-wire-types/src/status.rs index 7c9bbe1af49..667d11fadc1 100644 --- a/crates/net/eth-wire-types/src/status.rs +++ b/crates/net/eth-wire-types/src/status.rs @@ -13,7 +13,7 @@ use reth_codecs_derive::add_arbitrary_tests; /// unsupported fields are stripped out. #[derive(Clone, Debug, PartialEq, Eq, Copy)] pub struct UnifiedStatus { - /// The eth protocol version (e.g. eth/66 to eth/69). + /// The eth protocol version (e.g. eth/66 to eth/70). pub version: EthVersion, /// The chain ID identifying the peer’s network. pub chain: Chain, @@ -109,6 +109,11 @@ impl UnifiedStatus { } } + /// Consume this `UnifiedStatus` and produce the [`StatusEth70`] message used by `eth/70`. + pub fn into_eth70(self) -> StatusEth70 { + self.into_eth69() + } + /// Convert this `UnifiedStatus` into the appropriate `StatusMessage` variant based on version. pub fn into_message(self) -> StatusMessage { if self.version >= EthVersion::Eth69 { @@ -131,7 +136,7 @@ impl UnifiedStatus { earliest_block: None, latest_block: None, }, - StatusMessage::Eth69(e) => Self { + StatusMessage::Eth69(e) | StatusMessage::Eth70(e) => Self { version: e.version, chain: e.chain, genesis: e.genesis, @@ -157,7 +162,7 @@ impl StatusBuilder { self.status } - /// Sets the eth protocol version (e.g., eth/66, eth/69). + /// Sets the eth protocol version (e.g., eth/66, eth/70). pub const fn version(mut self, version: EthVersion) -> Self { self.status.version = version; self @@ -378,8 +383,11 @@ impl Debug for StatusEth69 { } } -/// `StatusMessage` can store either the Legacy version (with TD) or the -/// eth/69 version (omits TD). +/// Share eth/69 status with eth/70 +pub type StatusEth70 = StatusEth69; + +/// `StatusMessage` can store either the Legacy version (with TD), or the eth/69+/eth/70 version +/// (omits TD, includes block range). #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum StatusMessage { @@ -387,6 +395,8 @@ pub enum StatusMessage { Legacy(Status), /// The new `eth/69` status with no `total_difficulty`. Eth69(StatusEth69), + /// The `eth/70` status, identical to `eth/69` . + Eth70(StatusEth70), } impl StatusMessage { @@ -395,6 +405,7 @@ impl StatusMessage { match self { Self::Legacy(legacy_status) => legacy_status.genesis, Self::Eth69(status_69) => status_69.genesis, + Self::Eth70(status_70) => status_70.genesis, } } @@ -403,6 +414,7 @@ impl StatusMessage { match self { Self::Legacy(legacy_status) => legacy_status.version, Self::Eth69(status_69) => status_69.version, + Self::Eth70(status_70) => status_70.version, } } @@ -411,6 +423,7 @@ impl StatusMessage { match self { Self::Legacy(legacy_status) => &legacy_status.chain, Self::Eth69(status_69) => &status_69.chain, + Self::Eth70(status_70) => &status_70.chain, } } @@ -419,6 +432,7 @@ impl StatusMessage { match self { Self::Legacy(legacy_status) => legacy_status.forkid, Self::Eth69(status_69) => status_69.forkid, + Self::Eth70(status_70) => status_70.forkid, } } @@ -427,6 +441,7 @@ impl StatusMessage { match self { Self::Legacy(legacy_status) => legacy_status.blockhash, Self::Eth69(status_69) => status_69.blockhash, + Self::Eth70(status_70) => status_70.blockhash, } } } @@ -435,14 +450,14 @@ impl Encodable for StatusMessage { fn encode(&self, out: &mut dyn BufMut) { match self { Self::Legacy(s) => s.encode(out), - Self::Eth69(s) => s.encode(out), + Self::Eth69(s) | Self::Eth70(s) => s.encode(out), } } fn length(&self) -> usize { match self { Self::Legacy(s) => s.length(), - Self::Eth69(s) => s.length(), + Self::Eth69(s) | Self::Eth70(s) => s.length(), } } } @@ -452,6 +467,7 @@ impl Display for StatusMessage { match self { Self::Legacy(s) => Display::fmt(s, f), Self::Eth69(s69) => Display::fmt(s69, f), + Self::Eth70(s70) => Display::fmt(s70, f), } } } @@ -546,6 +562,24 @@ mod tests { assert_eq!(unified_status, roundtripped_unified_status); } + #[test] + fn roundtrip_eth70() { + let unified_status = UnifiedStatus::builder() + .version(EthVersion::Eth70) + .chain(Chain::mainnet()) + .genesis(MAINNET_GENESIS_HASH) + .forkid(ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 }) + .blockhash(b256!("0xfeb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d")) + .total_difficulty(None) + .earliest_block(Some(1)) + .latest_block(Some(2)) + .build(); + + let status_message = unified_status.into_message(); + let roundtripped_unified_status = UnifiedStatus::from_message(status_message); + assert_eq!(unified_status, roundtripped_unified_status); + } + #[test] fn encode_eth69_status_message() { let expected = hex!("f8544501a0d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3c684b715077d8083ed14f2840112a880a0feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d"); diff --git a/crates/net/eth-wire-types/src/version.rs b/crates/net/eth-wire-types/src/version.rs index 8b2e3a424d9..6553bd2e41e 100644 --- a/crates/net/eth-wire-types/src/version.rs +++ b/crates/net/eth-wire-types/src/version.rs @@ -27,6 +27,8 @@ pub enum EthVersion { Eth68 = 68, /// The `eth` protocol version 69. Eth69 = 69, + /// The `eth` protocol version 70. + Eth70 = 70, } impl EthVersion { @@ -55,6 +57,11 @@ impl EthVersion { pub const fn is_eth69(&self) -> bool { matches!(self, Self::Eth69) } + + /// Returns true if the version is eth/70 + pub const fn is_eth70(&self) -> bool { + matches!(self, Self::Eth70) + } } /// RLP encodes `EthVersion` as a single byte (66-69). @@ -96,6 +103,7 @@ impl TryFrom<&str> for EthVersion { "67" => Ok(Self::Eth67), "68" => Ok(Self::Eth68), "69" => Ok(Self::Eth69), + "70" => Ok(Self::Eth70), _ => Err(ParseVersionError(s.to_string())), } } @@ -120,6 +128,7 @@ impl TryFrom for EthVersion { 67 => Ok(Self::Eth67), 68 => Ok(Self::Eth68), 69 => Ok(Self::Eth69), + 70 => Ok(Self::Eth70), _ => Err(ParseVersionError(u.to_string())), } } @@ -149,6 +158,7 @@ impl From for &'static str { EthVersion::Eth67 => "67", EthVersion::Eth68 => "68", EthVersion::Eth69 => "69", + EthVersion::Eth70 => "70", } } } @@ -195,7 +205,7 @@ impl Decodable for ProtocolVersion { #[cfg(test)] mod tests { - use super::{EthVersion, ParseVersionError}; + use super::EthVersion; use alloy_rlp::{Decodable, Encodable, Error as RlpError}; use bytes::BytesMut; @@ -205,7 +215,7 @@ mod tests { assert_eq!(EthVersion::Eth67, EthVersion::try_from("67").unwrap()); assert_eq!(EthVersion::Eth68, EthVersion::try_from("68").unwrap()); assert_eq!(EthVersion::Eth69, EthVersion::try_from("69").unwrap()); - assert_eq!(Err(ParseVersionError("70".to_string())), EthVersion::try_from("70")); + assert_eq!(EthVersion::Eth70, EthVersion::try_from("70").unwrap()); } #[test] @@ -214,12 +224,18 @@ mod tests { assert_eq!(EthVersion::Eth67, "67".parse().unwrap()); assert_eq!(EthVersion::Eth68, "68".parse().unwrap()); assert_eq!(EthVersion::Eth69, "69".parse().unwrap()); - assert_eq!(Err(ParseVersionError("70".to_string())), "70".parse::()); + assert_eq!(EthVersion::Eth70, "70".parse().unwrap()); } #[test] fn test_eth_version_rlp_encode() { - let versions = [EthVersion::Eth66, EthVersion::Eth67, EthVersion::Eth68, EthVersion::Eth69]; + let versions = [ + EthVersion::Eth66, + EthVersion::Eth67, + EthVersion::Eth68, + EthVersion::Eth69, + EthVersion::Eth70, + ]; for version in versions { let mut encoded = BytesMut::new(); @@ -236,7 +252,7 @@ mod tests { (67_u8, Ok(EthVersion::Eth67)), (68_u8, Ok(EthVersion::Eth68)), (69_u8, Ok(EthVersion::Eth69)), - (70_u8, Err(RlpError::Custom("invalid eth version"))), + (70_u8, Ok(EthVersion::Eth70)), (65_u8, Err(RlpError::Custom("invalid eth version"))), ]; diff --git a/crates/net/eth-wire/src/capability.rs b/crates/net/eth-wire/src/capability.rs index 9b706a02cf9..9691acb4399 100644 --- a/crates/net/eth-wire/src/capability.rs +++ b/crates/net/eth-wire/src/capability.rs @@ -418,6 +418,8 @@ mod tests { Capability::new_static("eth", 66), Capability::new_static("eth", 67), Capability::new_static("eth", 68), + Capability::new_static("eth", 69), + Capability::new_static("eth", 70), ] .into(); @@ -425,6 +427,8 @@ mod tests { assert!(capabilities.supports_eth_v66()); assert!(capabilities.supports_eth_v67()); assert!(capabilities.supports_eth_v68()); + assert!(capabilities.supports_eth_v69()); + assert!(capabilities.supports_eth_v70()); } #[test] diff --git a/crates/net/eth-wire/src/handshake.rs b/crates/net/eth-wire/src/handshake.rs index f604f1fca11..72188bf3224 100644 --- a/crates/net/eth-wire/src/handshake.rs +++ b/crates/net/eth-wire/src/handshake.rs @@ -205,7 +205,7 @@ where return Err(err.into()); } - if let StatusMessage::Eth69(s) = their_status_message { + if let StatusMessage::Eth69(s) | StatusMessage::Eth70(s) = their_status_message { if s.earliest > s.latest { return Err(EthHandshakeError::EarliestBlockGreaterThanLatestBlock { got: s.earliest, diff --git a/crates/net/eth-wire/src/hello.rs b/crates/net/eth-wire/src/hello.rs index 40deebb6310..9cad7223a01 100644 --- a/crates/net/eth-wire/src/hello.rs +++ b/crates/net/eth-wire/src/hello.rs @@ -260,10 +260,11 @@ mod tests { assert_eq!(hello_encoded.len(), hello.length()); } + //TODO: add test for eth70 here once we have fully support it #[test] - fn test_default_protocols_include_eth69() { - // ensure that the default protocol list includes Eth69 as the latest version + fn test_default_protocols_still_include_eth69() { + // ensure that older eth/69 remains advertised for compatibility let secret_key = SecretKey::new(&mut rand_08::thread_rng()); let id = pk2id(&secret_key.public_key(SECP256K1)); let hello = HelloMessageWithProtocols::builder(id).build(); diff --git a/crates/net/eth-wire/src/protocol.rs b/crates/net/eth-wire/src/protocol.rs index 16ec62b7cd7..c5c17e85039 100644 --- a/crates/net/eth-wire/src/protocol.rs +++ b/crates/net/eth-wire/src/protocol.rs @@ -84,5 +84,6 @@ mod tests { assert_eq!(Protocol::eth(EthVersion::Eth67).messages(), 17); assert_eq!(Protocol::eth(EthVersion::Eth68).messages(), 17); assert_eq!(Protocol::eth(EthVersion::Eth69).messages(), 18); + assert_eq!(Protocol::eth(EthVersion::Eth70).messages(), 17); } } diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index 8a5c7541490..7fa861c106e 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -3,8 +3,8 @@ use reth_eth_wire_types::{ message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData, - GetPooledTransactions, GetReceipts, NetworkPrimitives, NodeData, PooledTransactions, Receipts, - Receipts69, UnifiedStatus, + GetPooledTransactions, GetReceipts, GetReceipts70, GetReceipts70Payload, NetworkPrimitives, + NodeData, PooledTransactions, Receipts, Receipts69, Receipts70Payload, UnifiedStatus, }; use reth_ethereum_forks::ForkId; use reth_network_p2p::error::{RequestError, RequestResult}; @@ -238,6 +238,15 @@ pub enum PeerRequest { /// The channel to send the response for receipts. response: oneshot::Sender>>, }, + /// Requests receipts from the peer using eth/70 (supports `firstBlockReceiptIndex`). + /// + /// The response should be sent through the channel. + GetReceipts70 { + /// The request for receipts. + request: GetReceipts70Payload, + /// The channel to send the response for receipts. + response: oneshot::Sender>>, + }, } // === impl PeerRequest === @@ -257,6 +266,7 @@ impl PeerRequest { Self::GetNodeData { response, .. } => response.send(Err(err)).ok(), Self::GetReceipts { response, .. } => response.send(Err(err)).ok(), Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(), + Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(), }; } @@ -281,6 +291,12 @@ impl PeerRequest { Self::GetReceipts { request, .. } | Self::GetReceipts69 { request, .. } => { EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() }) } + Self::GetReceipts70 { request, .. } => { + EthMessage::GetReceipts70(GetReceipts70(RequestPair { + request_id, + message: request.clone(), + })) + } } } diff --git a/crates/net/network/src/eth_requests.rs b/crates/net/network/src/eth_requests.rs index 492bf8bd55e..041b00e30ad 100644 --- a/crates/net/network/src/eth_requests.rs +++ b/crates/net/network/src/eth_requests.rs @@ -10,7 +10,8 @@ use alloy_rlp::Encodable; use futures::StreamExt; use reth_eth_wire::{ BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData, - GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts, Receipts69, + GetReceipts, GetReceipts70Payload, HeadersDirection, NetworkPrimitives, NodeData, Receipts, + Receipts69, Receipts70Payload, }; use reth_network_api::test_utils::PeersHandle; use reth_network_p2p::error::RequestResult; @@ -217,6 +218,66 @@ where let _ = response.send(Ok(Receipts69(receipts))); } + fn on_receipts70_request( + &self, + _peer_id: PeerId, + request: GetReceipts70Payload, + response: oneshot::Sender>>, + ) { + self.metrics.eth_receipts_requests_received_total.increment(1); + + let GetReceipts70Payload { first_block_receipt_index, block_hashes } = request; + + let mut receipts = Vec::new(); + let mut total_bytes = 0usize; + let mut last_block_incomplete = false; + + for (idx, hash) in block_hashes.into_iter().enumerate() { + if idx >= MAX_RECEIPTS_SERVE { + break + } + + let Some(mut block_receipts) = + self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default() + else { + break + }; + + if idx == 0 && first_block_receipt_index > 0 { + let skip = first_block_receipt_index as usize; + if skip >= block_receipts.len() { + block_receipts.clear(); + } else { + block_receipts.drain(0..skip); + } + } + + let block_size = block_receipts.length(); + + if total_bytes + block_size <= SOFT_RESPONSE_LIMIT { + total_bytes += block_size; + receipts.push(block_receipts); + continue; + } + + let mut partial_block = Vec::new(); + for receipt in block_receipts { + let receipt_size = receipt.length(); + if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT { + break; + } + total_bytes += receipt_size; + partial_block.push(receipt); + } + + receipts.push(partial_block); + last_block_incomplete = true; + break; + } + + let _ = response.send(Ok(Receipts70Payload { last_block_incomplete, receipts })); + } + #[inline] fn get_receipts_response(&self, request: GetReceipts, transform_fn: F) -> Vec> where @@ -285,6 +346,9 @@ where IncomingEthRequest::GetReceipts69 { peer_id, request, response } => { this.on_receipts69_request(peer_id, request, response) } + IncomingEthRequest::GetReceipts70 { peer_id, request, response } => { + this.on_receipts70_request(peer_id, request, response) + } } }, ); @@ -359,4 +423,15 @@ pub enum IncomingEthRequest { /// The channel sender for the response containing Receipts69. response: oneshot::Sender>>, }, + /// Request Receipts from the peer using eth/70. + /// + /// The response should be sent through the channel. + GetReceipts70 { + /// The ID of the peer to request receipts from. + peer_id: PeerId, + /// The specific receipts requested including the `firstBlockReceiptIndex`. + request: GetReceipts70Payload, + /// The channel sender for the response containing Receipts70. + response: oneshot::Sender>>, + }, } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index c0a2934df75..52d8757a50d 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -532,6 +532,13 @@ impl NetworkManager { response, }) } + PeerRequest::GetReceipts70 { request, response } => { + self.delegate_eth_request(IncomingEthRequest::GetReceipts70 { + peer_id, + request, + response, + }) + } PeerRequest::GetPooledTransactions { request, response } => { self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions { peer_id, diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 115939b1616..6d70c37125e 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -11,7 +11,7 @@ use reth_eth_wire::{ message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, EthMessage, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives, NewBlock, NewBlockHashes, NewBlockPayload, NewPooledTransactionHashes, NodeData, PooledTransactions, - Receipts, SharedTransactions, Transactions, + Receipts, Receipts70, Receipts70Payload, SharedTransactions, Transactions, }; use reth_eth_wire_types::RawCapabilityMessage; use reth_network_api::PeerRequest; @@ -116,6 +116,11 @@ pub enum PeerResponse { /// The receiver channel for the response to a receipts request. response: oneshot::Receiver>>, }, + /// Represents a response to a request for receipts using eth/70. + Receipts70 { + /// The receiver channel for the response to a receipts request. + response: oneshot::Receiver>>, + }, } // === impl PeerResponse === @@ -151,6 +156,10 @@ impl PeerResponse { Self::Receipts69 { response } => { poll_request!(response, Receipts69, cx) } + Self::Receipts70 { response } => match ready!(response.poll_unpin(cx)) { + Ok(res) => PeerResponseResult::Receipts70(res), + Err(err) => PeerResponseResult::Receipts70(Err(err.into())), + }, }; Poll::Ready(res) } @@ -171,6 +180,8 @@ pub enum PeerResponseResult { Receipts(RequestResult>>>), /// Represents a result containing receipts or an error for eth/69. Receipts69(RequestResult>>), + /// Represents a result containing receipts or an error for eth/70. + Receipts70(RequestResult>), } // === impl PeerResponseResult === @@ -208,6 +219,13 @@ impl PeerResponseResult { Self::Receipts69(resp) => { to_message!(resp, Receipts69, id) } + Self::Receipts70(resp) => match resp { + Ok(res) => { + let request = RequestPair { request_id: id, message: res }; + Ok(EthMessage::Receipts70(Receipts70(request))) + } + Err(err) => Err(err), + }, } } @@ -220,6 +238,7 @@ impl PeerResponseResult { Self::NodeData(res) => res.as_ref().err(), Self::Receipts(res) => res.as_ref().err(), Self::Receipts69(res) => res.as_ref().err(), + Self::Receipts70(res) => res.as_ref().err(), } } diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 98ccbad3b95..06d9a29f455 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -25,10 +25,10 @@ use futures::{stream::Fuse, SinkExt, StreamExt}; use metrics::Gauge; use reth_eth_wire::{ errors::{EthHandshakeError, EthStreamError}, - message::{EthBroadcastMessage, MessageError, RequestPair}, + message::{EthBroadcastMessage, MessageError}, Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives, NewBlockPayload, }; -use reth_eth_wire_types::RawCapabilityMessage; +use reth_eth_wire_types::{message::RequestPair, RawCapabilityMessage}; use reth_metrics::common::mpsc::MeteredPollSender; use reth_network_api::PeerRequest; use reth_network_p2p::error::RequestError; @@ -270,12 +270,84 @@ impl ActiveSession { on_request!(req, Receipts, GetReceipts) } } + EthMessage::GetReceipts70(req70) => { + let RequestPair { + request_id, + message: + reth_eth_wire::GetReceipts70Payload { first_block_receipt_index, block_hashes }, + } = req70.0; + + let (tx, response) = oneshot::channel(); + let received = ReceivedRequest { + request_id, + rx: PeerResponse::Receipts70 { response }, + received: Instant::now(), + }; + self.received_requests_from_remote.push(received); + + let request = + reth_eth_wire::GetReceipts70Payload { first_block_receipt_index, block_hashes }; + self.try_emit_request(PeerMessage::EthRequest(PeerRequest::GetReceipts70 { + request, + response: tx, + })) + .into() + } EthMessage::Receipts(resp) => { on_response!(resp, GetReceipts) } EthMessage::Receipts69(resp) => { on_response!(resp, GetReceipts69) } + EthMessage::Receipts70(resp) => { + // Handle eth/70 receipts responses. Support `GetReceipts`, + // `GetReceipts69`, and `GetReceipts70` by converting the payload as needed. + let RequestPair { + request_id, + message: reth_eth_wire::Receipts70Payload { last_block_incomplete, receipts }, + } = resp.0; + + if let Some(req) = self.inflight_requests.remove(&request_id) { + match req.request { + RequestState::Waiting(PeerRequest::GetReceipts69 { response, .. }) => { + trace!(peer_id=?self.remote_peer_id, ?request_id, "received eth/70 Receipts for GetReceipts69"); + let message = reth_eth_wire::Receipts69(receipts); + let _ = response.send(Ok(message)); + self.update_request_timeout(req.timestamp, Instant::now()); + } + RequestState::Waiting(PeerRequest::GetReceipts70 { response, .. }) => { + trace!(peer_id=?self.remote_peer_id, ?request_id, "received eth/70 Receipts for GetReceipts70"); + let message = reth_eth_wire::Receipts70Payload { + last_block_incomplete, + receipts, + }; + let _ = response.send(Ok(message)); + self.update_request_timeout(req.timestamp, Instant::now()); + } + RequestState::Waiting(PeerRequest::GetReceipts { response, .. }) => { + trace!(peer_id=?self.remote_peer_id, ?request_id, "received eth/70 Receipts for GetReceipts"); + let receipts69 = reth_eth_wire::Receipts69(receipts); + let message: reth_eth_wire::Receipts = + receipts69.into_with_bloom(); + let _ = response.send(Ok(message)); + self.update_request_timeout(req.timestamp, Instant::now()); + } + RequestState::Waiting(request) => { + request.send_bad_response(); + } + RequestState::TimedOut => { + // request was already timed out internally + self.update_request_timeout(req.timestamp, Instant::now()); + } + } + } else { + trace!(peer_id=?self.remote_peer_id, ?request_id, "received response to unknown request"); + // we received a response to a request we never sent + self.on_bad_message(); + } + + OnIncomingMessageOutcome::Ok + } EthMessage::BlockRangeUpdate(msg) => { // Validate that earliest <= latest according to the spec if msg.earliest > msg.latest { @@ -313,7 +385,29 @@ impl ActiveSession { let request_id = self.next_id(); trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer"); - let msg = request.create_request_message(request_id); + let mut msg = request.create_request_message(request_id); + + // For eth/70 peers we send `GetReceipts` using the new eth/70 + // encoding with `firstBlockReceiptIndex = 0`, while keeping the + // user-facing `PeerRequest` API unchanged. + if self.conn.version() >= EthVersion::Eth70 { + msg = match msg { + EthMessage::GetReceipts(pair) => { + let RequestPair { request_id, message } = pair; + let reth_eth_wire::GetReceipts(block_hashes) = message; + let get70 = reth_eth_wire::GetReceipts70(RequestPair { + request_id, + message: reth_eth_wire::GetReceipts70Payload { + first_block_receipt_index: 0, + block_hashes, + }, + }); + EthMessage::GetReceipts70(get70) + } + other => other, + }; + } + self.queued_outgoing.push_back(msg.into()); let req = InflightRequest { request: RequestState::Waiting(request), @@ -368,6 +462,7 @@ impl ActiveSession { fn handle_outgoing_response(&mut self, id: u64, resp: PeerResponseResult) { match resp.try_into_message(id) { Ok(msg) => { + let msg: EthMessage = msg; self.queued_outgoing.push_back(msg.into()); } Err(err) => { @@ -746,9 +841,8 @@ impl Future for ActiveSession { }; if should_send { - this.queued_outgoing.push_back( - EthMessage::BlockRangeUpdate(this.local_range_info.to_message()).into(), - ); + let msg = EthMessage::BlockRangeUpdate(this.local_range_info.to_message()); + this.queued_outgoing.push_back(msg.into()); this.last_sent_latest_block = Some(current_latest); } } diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 17528e2fcfa..7feb240aeaa 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -538,9 +538,8 @@ impl SessionManager { // negotiated version let version = conn.version(); - // Configure the interval at which the range information is updated, starting with - // ETH69 - let range_update_interval = (conn.version() >= EthVersion::Eth69).then(|| { + // Configure the interval at which the range information is updated (eth/69 only) + let range_update_interval = (conn.version() == EthVersion::Eth69).then(|| { let mut interval = tokio::time::interval(RANGE_UPDATE_INTERVAL); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); interval diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 11b0e14dcaf..9f37f29e8d2 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1921,7 +1921,9 @@ impl PooledTransactionsHashesBuilder { fn new(version: EthVersion) -> Self { match version { EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()), - EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()), + EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 => { + Self::Eth68(Default::default()) + } } } diff --git a/docs/crates/eth-wire.md b/docs/crates/eth-wire.md index cf62ab143e8..3915e9241cb 100644 --- a/docs/crates/eth-wire.md +++ b/docs/crates/eth-wire.md @@ -413,3 +413,6 @@ additional "satellite" protocols (e.g. `snap`) using negotiated `SharedCapabilit - Starting with ETH69: - `BlockRangeUpdate (0x11)` announces the historical block range served. - Receipts omit bloom: encoded as `Receipts69` instead of `Receipts`. +- Starting with ETH70 (EIP-7975): + - Status reuses the ETH69 format (no additional block range fields). + - Receipts continue to omit bloom; `GetReceipts`/`Receipts` add the eth/70 variants to support partial receipt ranges (`firstBlockReceiptIndex` and `lastBlockIncomplete`). diff --git a/examples/network-proxy/src/main.rs b/examples/network-proxy/src/main.rs index 51ba8e2b4a4..50ef9e4e727 100644 --- a/examples/network-proxy/src/main.rs +++ b/examples/network-proxy/src/main.rs @@ -82,6 +82,7 @@ async fn main() -> eyre::Result<()> { IncomingEthRequest::GetNodeData { .. } => {} IncomingEthRequest::GetReceipts { .. } => {} IncomingEthRequest::GetReceipts69 { .. } => {} + IncomingEthRequest::GetReceipts70 { .. } => {} } } transaction_message = transactions_rx.recv() => {