11//! Transactions management for the p2p network.
22
3- use alloy_consensus:: transaction:: TxHashRef ;
3+ use alloy_consensus:: transaction:: { Recovered , TxHashRef } ;
44
55/// Aggregation on configurable parameters for [`TransactionsManager`].
66pub mod config;
@@ -15,6 +15,8 @@ pub use self::constants::{
1515 tx_fetcher:: DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ ,
1616 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE ,
1717} ;
18+ /// Default maximum recovered transaction cache size.
19+ pub const DEFAULT_RECOVERED_TX_CACHE_SIZE : u32 = 32_768 ;
1820use config:: AnnouncementAcceptance ;
1921pub use config:: {
2022 AnnouncementFilteringPolicy , TransactionFetcherConfig , TransactionIngressPolicy ,
@@ -30,7 +32,7 @@ use crate::{
3032 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS ,
3133 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS , DEFAULT_BUDGET_TRY_DRAIN_STREAM ,
3234 } ,
33- cache:: LruCache ,
35+ cache:: { LruCache , LruMap } ,
3436 duration_metered_exec, metered_poll_nested_stream_with_budget,
3537 metrics:: {
3638 AnnouncedTxTypesMetrics , TransactionsManagerMetrics , NETWORK_POOL_TRANSACTIONS_SCOPE ,
@@ -311,6 +313,8 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
311313 pending_pool_imports_info : PendingPoolImportsInfo ,
312314 /// Bad imports.
313315 bad_imports : LruCache < TxHash > ,
316+ /// Cache recovered transactions to avoid repeated signature recovery for duplicates.
317+ recovered_txs : LruMap < TxHash , Arc < Recovered < N :: PooledTransaction > > > ,
314318 /// All the connected peers.
315319 peers : HashMap < PeerId , PeerMetadata < N > > ,
316320 /// Send half for the command channel.
@@ -405,6 +409,7 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
405409 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS ,
406410 ) ,
407411 bad_imports : LruCache :: new ( DEFAULT_MAX_COUNT_BAD_IMPORTS ) ,
412+ recovered_txs : LruMap :: new ( transactions_manager_config. recovered_tx_cache_size ) ,
408413 peers : Default :: default ( ) ,
409414 command_tx,
410415 command_rx : UnboundedReceiverStream :: new ( command_rx) ,
@@ -1372,18 +1377,48 @@ where
13721377 // reallocations
13731378 let mut new_txs = Vec :: with_capacity ( transactions. len ( ) ) ;
13741379 for tx in transactions {
1375- // recover transaction
1376- let tx = match tx. try_into_recovered ( ) {
1377- Ok ( tx) => tx,
1378- Err ( badtx) => {
1379- trace ! ( target: "net::tx" ,
1380- peer_id=format!( "{peer_id:#}" ) ,
1381- hash=%badtx. tx_hash( ) ,
1382- client_version=%peer. client_version,
1383- "failed ecrecovery for transaction"
1384- ) ;
1385- has_bad_transactions = true ;
1386- continue
1380+ // recover transaction, prefer cached value if present when enabled
1381+ let tx_hash = * tx. tx_hash ( ) ;
1382+ let tx = if self . config . recovered_tx_cache_size == 0 {
1383+ #[ cfg( test) ]
1384+ RECOVER_INVOCATIONS . fetch_add ( 1 , Ordering :: Relaxed ) ;
1385+ match tx. try_into_recovered ( ) {
1386+ Ok ( tx) => Arc :: new ( tx) ,
1387+ Err ( badtx) => {
1388+ trace ! ( target: "net::tx" ,
1389+ peer_id=format!( "{peer_id:#}" ) ,
1390+ hash=%badtx. tx_hash( ) ,
1391+ client_version=%peer. client_version,
1392+ "failed ecrecovery for transaction"
1393+ ) ;
1394+ has_bad_transactions = true ;
1395+ continue
1396+ }
1397+ }
1398+ } else {
1399+ match self . recovered_txs . get ( & tx_hash) {
1400+ Some ( cached) => cached. clone ( ) ,
1401+ None => {
1402+ #[ cfg( test) ]
1403+ RECOVER_INVOCATIONS . fetch_add ( 1 , Ordering :: Relaxed ) ;
1404+ match tx. try_into_recovered ( ) {
1405+ Ok ( tx) => {
1406+ let tx = Arc :: new ( tx) ;
1407+ _ = self . recovered_txs . insert ( tx_hash, tx. clone ( ) ) ;
1408+ tx
1409+ }
1410+ Err ( badtx) => {
1411+ trace ! ( target: "net::tx" ,
1412+ peer_id=format!( "{peer_id:#}" ) ,
1413+ hash=%badtx. tx_hash( ) ,
1414+ client_version=%peer. client_version,
1415+ "failed ecrecovery for transaction"
1416+ ) ;
1417+ has_bad_transactions = true ;
1418+ continue
1419+ }
1420+ }
1421+ }
13871422 }
13881423 } ;
13891424
@@ -1404,7 +1439,7 @@ where
14041439 } else {
14051440 // this is a new transaction that should be imported into the pool
14061441
1407- let pool_transaction = Pool :: Transaction :: from_pooled ( tx ) ;
1442+ let pool_transaction = Pool :: Transaction :: from_pooled ( ( * tx ) . clone ( ) ) ;
14081443 new_txs. push ( pool_transaction) ;
14091444
14101445 entry. insert ( HashSet :: from ( [ peer_id] ) ) ;
@@ -2112,6 +2147,9 @@ struct TxManagerPollDurations {
21122147 acc_cmds : Duration ,
21132148}
21142149
2150+ #[ cfg( test) ]
2151+ static RECOVER_INVOCATIONS : AtomicUsize = AtomicUsize :: new ( 0 ) ;
2152+
21152153#[ cfg( test) ]
21162154mod tests {
21172155 use super :: * ;
@@ -2128,6 +2166,7 @@ mod tests {
21282166 use alloy_rlp:: Decodable ;
21292167 use futures:: FutureExt ;
21302168 use reth_chainspec:: MIN_TRANSACTION_GAS ;
2169+ use reth_eth_wire:: { EthVersion , PooledTransactions } ;
21312170 use reth_ethereum_primitives:: { PooledTransactionVariant , Transaction , TransactionSigned } ;
21322171 use reth_network_api:: { NetworkInfo , PeerKind } ;
21332172 use reth_network_p2p:: {
@@ -2889,6 +2928,38 @@ mod tests {
28892928 assert ! ( propagated. 0 . is_empty( ) ) ;
28902929 }
28912930
2931+ #[ tokio:: test( flavor = "multi_thread" ) ]
2932+ async fn test_recovered_tx_cache_stores_recovered_once ( ) {
2933+ reth_tracing:: init_test_tracing ( ) ;
2934+
2935+ // Build a tx manager and shrink the recovered cache to a tiny size.
2936+ let ( mut tx_manager, _network) = new_tx_manager ( ) . await ;
2937+ tx_manager. recovered_txs = LruMap :: new ( 1 ) ;
2938+ RECOVER_INVOCATIONS . store ( 0 , Ordering :: Relaxed ) ;
2939+
2940+ // Register a peer so the import path can proceed.
2941+ let peer_id = PeerId :: random ( ) ;
2942+ let ( peer_meta, _rx) = new_mock_session ( peer_id, EthVersion :: Eth68 ) ;
2943+ tx_manager. peers . insert ( peer_id, peer_meta) ;
2944+
2945+ // A simple valid transaction (same as other tests).
2946+ let input = hex ! (
2947+ "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2948+ ) ;
2949+ let signed_tx = TransactionSigned :: decode ( & mut & input[ ..] ) . unwrap ( ) ;
2950+ let pooled: PooledTransactionVariant = signed_tx. try_into ( ) . unwrap ( ) ;
2951+
2952+ // Feed the same transaction twice; the cache should only keep one entry.
2953+ tx_manager. import_transactions (
2954+ peer_id,
2955+ PooledTransactions ( vec ! [ pooled. clone( ) , pooled] ) ,
2956+ TransactionSource :: Broadcast ,
2957+ ) ;
2958+
2959+ assert_eq ! ( tx_manager. recovered_txs. len( ) , 1 ) ;
2960+ assert_eq ! ( RECOVER_INVOCATIONS . load( Ordering :: Relaxed ) , 1 ) ;
2961+ }
2962+
28922963 #[ tokio:: test]
28932964 async fn test_relaxed_filter_ignores_unknown_tx_types ( ) {
28942965 reth_tracing:: init_test_tracing ( ) ;
0 commit comments