11//! Transactions management for the p2p network.
22
3- use alloy_consensus:: transaction:: TxHashRef ;
3+ use alloy_consensus:: transaction:: { Recovered , TxHashRef } ;
44
55/// Aggregation on configurable parameters for [`TransactionsManager`].
66pub mod config;
@@ -15,6 +15,8 @@ pub use self::constants::{
1515 tx_fetcher:: DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ ,
1616 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE ,
1717} ;
18+ /// Default maximum recovered transaction cache size.
19+ pub const DEFAULT_RECOVERED_TX_CACHE_SIZE : u32 = 32_768 ;
1820use config:: AnnouncementAcceptance ;
1921pub use config:: {
2022 AnnouncementFilteringPolicy , TransactionFetcherConfig , TransactionIngressPolicy ,
@@ -30,7 +32,7 @@ use crate::{
3032 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS ,
3133 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS , DEFAULT_BUDGET_TRY_DRAIN_STREAM ,
3234 } ,
33- cache:: LruCache ,
35+ cache:: { LruCache , LruMap } ,
3436 duration_metered_exec, metered_poll_nested_stream_with_budget,
3537 metrics:: {
3638 AnnouncedTxTypesMetrics , TransactionsManagerMetrics , NETWORK_POOL_TRANSACTIONS_SCOPE ,
@@ -311,6 +313,8 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
311313 pending_pool_imports_info : PendingPoolImportsInfo ,
312314 /// Bad imports.
313315 bad_imports : LruCache < TxHash > ,
316+ /// Cache recovered transactions to avoid repeated signature recovery for duplicates.
317+ recovered_txs : LruMap < TxHash , Recovered < N :: PooledTransaction > > ,
314318 /// All the connected peers.
315319 peers : HashMap < PeerId , PeerMetadata < N > > ,
316320 /// Send half for the command channel.
@@ -405,6 +409,7 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
405409 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS ,
406410 ) ,
407411 bad_imports : LruCache :: new ( DEFAULT_MAX_COUNT_BAD_IMPORTS ) ,
412+ recovered_txs : LruMap :: new ( transactions_manager_config. recovered_tx_cache_size . max ( 1 ) ) ,
408413 peers : Default :: default ( ) ,
409414 command_tx,
410415 command_rx : UnboundedReceiverStream :: new ( command_rx) ,
@@ -1372,18 +1377,41 @@ where
13721377 // reallocations
13731378 let mut new_txs = Vec :: with_capacity ( transactions. len ( ) ) ;
13741379 for tx in transactions {
1375- // recover transaction
1376- let tx = match tx. try_into_recovered ( ) {
1377- Ok ( tx) => tx,
1378- Err ( badtx) => {
1379- trace ! ( target: "net::tx" ,
1380- peer_id=format!( "{peer_id:#}" ) ,
1381- hash=%badtx. tx_hash( ) ,
1382- client_version=%peer. client_version,
1383- "failed ecrecovery for transaction"
1384- ) ;
1385- has_bad_transactions = true ;
1386- continue
1380+ // recover transaction, prefer cached value if present when enabled
1381+ let tx_hash = * tx. tx_hash ( ) ;
1382+ let tx = if self . config . recovered_tx_cache_size == 0 {
1383+ match tx. try_into_recovered ( ) {
1384+ Ok ( tx) => tx,
1385+ Err ( badtx) => {
1386+ trace ! ( target: "net::tx" ,
1387+ peer_id=format!( "{peer_id:#}" ) ,
1388+ hash=%badtx. tx_hash( ) ,
1389+ client_version=%peer. client_version,
1390+ "failed ecrecovery for transaction"
1391+ ) ;
1392+ has_bad_transactions = true ;
1393+ continue
1394+ }
1395+ }
1396+ } else {
1397+ match self . recovered_txs . get ( & tx_hash) {
1398+ Some ( cached) => cached. clone ( ) ,
1399+ None => match tx. try_into_recovered ( ) {
1400+ Ok ( tx) => {
1401+ _ = self . recovered_txs . insert ( tx_hash, tx. clone ( ) ) ;
1402+ tx
1403+ }
1404+ Err ( badtx) => {
1405+ trace ! ( target: "net::tx" ,
1406+ peer_id=format!( "{peer_id:#}" ) ,
1407+ hash=%badtx. tx_hash( ) ,
1408+ client_version=%peer. client_version,
1409+ "failed ecrecovery for transaction"
1410+ ) ;
1411+ has_bad_transactions = true ;
1412+ continue
1413+ }
1414+ } ,
13871415 }
13881416 } ;
13891417
@@ -2128,6 +2156,7 @@ mod tests {
21282156 use alloy_rlp:: Decodable ;
21292157 use futures:: FutureExt ;
21302158 use reth_chainspec:: MIN_TRANSACTION_GAS ;
2159+ use reth_eth_wire:: { EthVersion , PooledTransactions } ;
21312160 use reth_ethereum_primitives:: { PooledTransactionVariant , Transaction , TransactionSigned } ;
21322161 use reth_network_api:: { NetworkInfo , PeerKind } ;
21332162 use reth_network_p2p:: {
@@ -2889,6 +2918,37 @@ mod tests {
28892918 assert ! ( propagated. 0 . is_empty( ) ) ;
28902919 }
28912920
2921+ #[ tokio:: test( flavor = "multi_thread" ) ]
2922+ async fn test_recovered_tx_cache_stores_recovered_once ( ) {
2923+ reth_tracing:: init_test_tracing ( ) ;
2924+
2925+ // Build a tx manager and shrink the recovered cache to a tiny size.
2926+ let ( mut tx_manager, _network) = new_tx_manager ( ) . await ;
2927+ tx_manager. config . recovered_tx_cache_size = 1 ;
2928+ tx_manager. recovered_txs = LruMap :: new ( 1 ) ;
2929+
2930+ // Register a peer so the import path can proceed.
2931+ let peer_id = PeerId :: random ( ) ;
2932+ let ( peer_meta, _rx) = new_mock_session ( peer_id, EthVersion :: Eth68 ) ;
2933+ tx_manager. peers . insert ( peer_id, peer_meta) ;
2934+
2935+ // A simple valid transaction (same as other tests).
2936+ let input = hex ! (
2937+ "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2938+ ) ;
2939+ let signed_tx = TransactionSigned :: decode ( & mut & input[ ..] ) . unwrap ( ) ;
2940+ let pooled: PooledTransactionVariant = signed_tx. try_into ( ) . unwrap ( ) ;
2941+
2942+ // Feed the same transaction twice; the cache should only keep one entry.
2943+ tx_manager. import_transactions (
2944+ peer_id,
2945+ PooledTransactions ( vec ! [ pooled. clone( ) , pooled] ) ,
2946+ TransactionSource :: Broadcast ,
2947+ ) ;
2948+
2949+ assert_eq ! ( tx_manager. recovered_txs. len( ) , 1 ) ;
2950+ }
2951+
28922952 #[ tokio:: test]
28932953 async fn test_relaxed_filter_ignores_unknown_tx_types ( ) {
28942954 reth_tracing:: init_test_tracing ( ) ;
0 commit comments