diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 65bd670bafb..3a59a844cc7 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -15,8 +15,7 @@ env: name: bench jobs: codspeed: - runs-on: - group: Reth + runs-on: ubuntu-latest steps: - uses: actions/checkout@v6 with: diff --git a/.github/workflows/compact.yml b/.github/workflows/compact.yml index 1ccf37106b2..b4fbadce2c7 100644 --- a/.github/workflows/compact.yml +++ b/.github/workflows/compact.yml @@ -17,8 +17,7 @@ env: name: compact-codec jobs: compact-codec: - runs-on: - group: Reth + runs-on: ubuntu-latest strategy: matrix: bin: diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index b0ee51e3492..2cfed4a229d 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -19,8 +19,7 @@ concurrency: jobs: test: name: e2e-testsuite - runs-on: - group: Reth + runs-on: ubuntu-latest env: RUST_BACKTRACE: 1 timeout-minutes: 90 @@ -43,4 +42,3 @@ jobs: --exclude 'op-reth' \ --exclude 'reth' \ -E 'binary(e2e_testsuite)' - diff --git a/.github/workflows/hive.yml b/.github/workflows/hive.yml index d02ab561c6d..d729a8b3814 100644 --- a/.github/workflows/hive.yml +++ b/.github/workflows/hive.yml @@ -24,8 +24,7 @@ jobs: prepare-hive: if: github.repository == 'paradigmxyz/reth' timeout-minutes: 45 - runs-on: - group: Reth + runs-on: ubuntu-latest steps: - uses: actions/checkout@v6 - name: Checkout hive tests @@ -179,8 +178,7 @@ jobs: - prepare-reth - prepare-hive name: run ${{ matrix.scenario.sim }}${{ matrix.scenario.limit && format(' - {0}', matrix.scenario.limit) }} - runs-on: - group: Reth + runs-on: ubuntu-latest permissions: issues: write steps: @@ -247,8 +245,7 @@ jobs: notify-on-error: needs: test if: failure() - runs-on: - group: Reth + runs-on: ubuntu-latest steps: - name: Slack Webhook Action uses: rtCamp/action-slack-notify@v2 diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 027811dfb1e..3720fd573ef 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -23,8 +23,7 @@ jobs: test: name: test / ${{ matrix.network }} if: github.event_name != 'schedule' - runs-on: - group: Reth + runs-on: ubuntu-latest env: RUST_BACKTRACE: 1 strategy: diff --git a/.github/workflows/kurtosis-op.yml b/.github/workflows/kurtosis-op.yml index 1c04e4c00c7..1ac8d687a1f 100644 --- a/.github/workflows/kurtosis-op.yml +++ b/.github/workflows/kurtosis-op.yml @@ -9,7 +9,7 @@ on: push: tags: - - '*' + - "*" env: CARGO_TERM_COLOR: always @@ -32,8 +32,7 @@ jobs: strategy: fail-fast: false name: run kurtosis - runs-on: - group: Reth + runs-on: ubuntu-latest needs: - prepare-reth steps: @@ -83,12 +82,10 @@ jobs: kurtosis service logs -a op-devnet op-cl-2151908-2-op-node-op-reth-op-kurtosis exit 1 - notify-on-error: needs: test if: failure() - runs-on: - group: Reth + runs-on: ubuntu-latest steps: - name: Slack Webhook Action uses: rtCamp/action-slack-notify@v2 diff --git a/.github/workflows/kurtosis.yml b/.github/workflows/kurtosis.yml index 7b03d44c327..6eb85bdc20d 100644 --- a/.github/workflows/kurtosis.yml +++ b/.github/workflows/kurtosis.yml @@ -9,7 +9,7 @@ on: push: tags: - - '*' + - "*" env: CARGO_TERM_COLOR: always @@ -30,8 +30,7 @@ jobs: strategy: fail-fast: false name: run kurtosis - runs-on: - group: Reth + runs-on: ubuntu-latest needs: - prepare-reth steps: @@ -54,13 +53,12 @@ jobs: - name: Run kurtosis uses: ethpandaops/kurtosis-assertoor-github-action@v1 with: - ethereum_package_args: '.github/assets/kurtosis_network_params.yaml' + ethereum_package_args: ".github/assets/kurtosis_network_params.yaml" notify-on-error: needs: test if: failure() - runs-on: - group: Reth + runs-on: ubuntu-latest steps: - name: Slack Webhook Action uses: rtCamp/action-slack-notify@v2 diff --git a/.github/workflows/prepare-reth.yml b/.github/workflows/prepare-reth.yml index 6c426f6cde8..136919cd903 100644 --- a/.github/workflows/prepare-reth.yml +++ b/.github/workflows/prepare-reth.yml @@ -26,8 +26,7 @@ jobs: prepare-reth: if: github.repository == 'paradigmxyz/reth' timeout-minutes: 45 - runs-on: - group: Reth + runs-on: ubuntu-latest steps: - uses: actions/checkout@v6 - run: mkdir artifacts diff --git a/.github/workflows/stage.yml b/.github/workflows/stage.yml index 4825da8e793..76d7391a1df 100644 --- a/.github/workflows/stage.yml +++ b/.github/workflows/stage.yml @@ -22,8 +22,7 @@ jobs: name: stage-run-test # Only run stage commands test in merge groups if: github.event_name == 'merge_group' - runs-on: - group: Reth + runs-on: ubuntu-latest env: RUST_LOG: info,sync=error RUST_BACKTRACE: 1 diff --git a/.github/workflows/sync-era.yml b/.github/workflows/sync-era.yml index ee4c3e92675..471a42371b0 100644 --- a/.github/workflows/sync-era.yml +++ b/.github/workflows/sync-era.yml @@ -17,8 +17,7 @@ concurrency: jobs: sync: name: sync (${{ matrix.chain.bin }}) - runs-on: - group: Reth + runs-on: ubuntu-latest env: RUST_LOG: info,sync=error RUST_BACKTRACE: 1 @@ -64,4 +63,4 @@ jobs: ${{ matrix.chain.bin }} stage unwind num-blocks 100 --chain ${{ matrix.chain.chain }} - name: Run stage unwind to block hash run: | - ${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }} + ${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }} diff --git a/.github/workflows/sync.yml b/.github/workflows/sync.yml index 3e135ea4289..2891d6a5373 100644 --- a/.github/workflows/sync.yml +++ b/.github/workflows/sync.yml @@ -17,8 +17,7 @@ concurrency: jobs: sync: name: sync (${{ matrix.chain.bin }}) - runs-on: - group: Reth + runs-on: ubuntu-latest env: RUST_LOG: info,sync=error RUST_BACKTRACE: 1 @@ -63,4 +62,4 @@ jobs: ${{ matrix.chain.bin }} stage unwind num-blocks 100 --chain ${{ matrix.chain.chain }} - name: Run stage unwind to block hash run: | - ${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }} + ${{ matrix.chain.bin }} stage unwind to-block ${{ matrix.chain.unwind-target }} --chain ${{ matrix.chain.chain }} diff --git a/.github/workflows/unit.yml b/.github/workflows/unit.yml index a5dcd09be49..cd07b3413bf 100644 --- a/.github/workflows/unit.yml +++ b/.github/workflows/unit.yml @@ -19,8 +19,7 @@ concurrency: jobs: test: name: test / ${{ matrix.type }} (${{ matrix.partition }}/${{ matrix.total_partitions }}) - runs-on: - group: Reth + runs-on: ubuntu-latest env: RUST_BACKTRACE: 1 strategy: @@ -65,8 +64,7 @@ jobs: state: name: Ethereum state tests - runs-on: - group: Reth + runs-on: ubuntu-latest env: RUST_LOG: info,sync=error RUST_BACKTRACE: 1 @@ -100,8 +98,7 @@ jobs: doc: name: doc tests - runs-on: - group: Reth + runs-on: ubuntu-latest env: RUST_BACKTRACE: 1 timeout-minutes: 30 diff --git a/Cargo.lock b/Cargo.lock index 7460cfc0513..8427f237ffc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,9 +97,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "alloy-chains" -version = "0.2.20" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bc32535569185cbcb6ad5fa64d989a47bccb9a08e27284b1f2a3ccf16e6d010" +checksum = "1b9ebac8ff9c2f07667e1803dc777304337e160ce5153335beb45e8ec0751808" dependencies = [ "alloy-primitives", "alloy-rlp", @@ -112,9 +112,9 @@ dependencies = [ [[package]] name = "alloy-consensus" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6440213a22df93a87ed512d2f668e7dc1d62a05642d107f82d61edc9e12370" +checksum = "2e318e25fb719e747a7e8db1654170fc185024f3ed5b10f86c08d448a912f6e2" dependencies = [ "alloy-eips", "alloy-primitives", @@ -140,9 +140,9 @@ dependencies = [ [[package]] name = "alloy-consensus-any" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15d0bea09287942405c4f9d2a4f22d1e07611c2dbd9d5bf94b75366340f9e6e0" +checksum = "364380a845193a317bcb7a5398fc86cdb66c47ebe010771dde05f6869bf9e64a" dependencies = [ "alloy-consensus", "alloy-eips", @@ -155,9 +155,9 @@ dependencies = [ [[package]] name = "alloy-contract" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d69af404f1d00ddb42f2419788fa87746a4cd13bab271916d7726fda6c792d94" +checksum = "08d39c80ffc806f27a76ed42f3351a455f3dc4f81d6ff92c8aad2cf36b7d3a34" dependencies = [ "alloy-consensus", "alloy-dyn-abi", @@ -240,9 +240,9 @@ dependencies = [ [[package]] name = "alloy-eips" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bd2c7ae05abcab4483ce821f12f285e01c0b33804e6883dd9ca1569a87ee2be" +checksum = "a4c4d7c5839d9f3a467900c625416b24328450c65702eb3d8caff8813e4d1d33" dependencies = [ "alloy-eip2124", "alloy-eip2930", @@ -288,9 +288,9 @@ dependencies = [ [[package]] name = "alloy-genesis" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc47eaae86488b07ea8e20236184944072a78784a1f4993f8ec17b3aa5d08c21" +checksum = "1ba4b1be0988c11f0095a2380aa596e35533276b8fa6c9e06961bbfe0aebcac5" dependencies = [ "alloy-eips", "alloy-primitives", @@ -329,9 +329,9 @@ dependencies = [ [[package]] name = "alloy-json-rpc" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "003f46c54f22854a32b9cc7972660a476968008ad505427eabab49225309ec40" +checksum = "f72cf87cda808e593381fb9f005ffa4d2475552b7a6c5ac33d087bf77d82abd0" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -344,9 +344,9 @@ dependencies = [ [[package]] name = "alloy-network" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f4029954d9406a40979f3a3b46950928a0fdcfe3ea8a9b0c17490d57e8aa0e3" +checksum = "12aeb37b6f2e61b93b1c3d34d01ee720207c76fe447e2a2c217e433ac75b17f5" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -370,9 +370,9 @@ dependencies = [ [[package]] name = "alloy-network-primitives" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7805124ad69e57bbae7731c9c344571700b2a18d351bda9e0eba521c991d1bcb" +checksum = "abd29ace62872083e30929cd9b282d82723196d196db589f3ceda67edcc05552" dependencies = [ "alloy-consensus", "alloy-eips", @@ -401,9 +401,9 @@ dependencies = [ [[package]] name = "alloy-op-hardforks" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ac97adaba4c26e17192d81f49186ac20c1e844e35a00e169c8d3d58bc84e6b" +checksum = "f96fb2fce4024ada5b2c11d4076acf778a0d3e4f011c6dfd2ffce6d0fcf84ee9" dependencies = [ "alloy-chains", "alloy-hardforks", @@ -444,9 +444,9 @@ dependencies = [ [[package]] name = "alloy-provider" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d369e12c92870d069e0c9dc5350377067af8a056e29e3badf8446099d7e00889" +checksum = "9b710636d7126e08003b8217e24c09f0cca0b46d62f650a841736891b1ed1fc1" dependencies = [ "alloy-chains", "alloy-consensus", @@ -489,9 +489,9 @@ dependencies = [ [[package]] name = "alloy-pubsub" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f77d20cdbb68a614c7a86b3ffef607b37d087bb47a03c58f4c3f8f99bc3ace3b" +checksum = "cdd4c64eb250a18101d22ae622357c6b505e158e9165d4c7974d59082a600c5e" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -533,9 +533,9 @@ dependencies = [ [[package]] name = "alloy-rpc-client" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c89883fe6b7381744cbe80fef638ac488ead4f1956a4278956a1362c71cd2e" +checksum = "d0882e72d2c1c0c79dcf4ab60a67472d3f009a949f774d4c17d0bdb669cfde05" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -559,9 +559,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64e279e6d40ee40fe8f76753b678d8d5d260cb276dc6c8a8026099b16d2b43f4" +checksum = "39cf1398cb33aacb139a960fa3d8cf8b1202079f320e77e952a0b95967bf7a9f" dependencies = [ "alloy-primitives", "alloy-rpc-types-engine", @@ -572,9 +572,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-admin" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bcf50ccb65d29b8599f8f5e23dcac685f1d79459654c830cba381345760e901" +checksum = "65a583d2029b171301f5dcf122aa2ef443a65a373778ec76540d999691ae867d" dependencies = [ "alloy-genesis", "alloy-primitives", @@ -584,9 +584,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-anvil" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e176c26fdd87893b6afeb5d92099d8f7e7a1fe11d6f4fe0883d6e33ac5f31ba" +checksum = "c3ce4c24e416bd0f17fceeb2f26cd8668df08fe19e1dc02f9d41c3b8ed1e93e0" dependencies = [ "alloy-primitives", "alloy-rpc-types-eth", @@ -596,9 +596,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-any" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b43c1622aac2508d528743fd4cfdac1dea92d5a8fa894038488ff7edd0af0b32" +checksum = "6a63fb40ed24e4c92505f488f9dd256e2afaed17faa1b7a221086ebba74f4122" dependencies = [ "alloy-consensus-any", "alloy-rpc-types-eth", @@ -607,9 +607,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-beacon" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1786681640d4c60f22b6b8376b0f3fa200360bf1c3c2cb913e6c97f51928eb1b" +checksum = "16633087e23d8d75161c3a59aa183203637b817a5a8d2f662f612ccb6d129af0" dependencies = [ "alloy-eips", "alloy-primitives", @@ -627,9 +627,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-debug" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b2ca3a434a6d49910a7e8e51797eb25db42ef8a5578c52d877fcb26d0afe7bc" +checksum = "4936f579d9d10eae01772b2ab3497f9d568684f05f26f8175e12f9a1a2babc33" dependencies = [ "alloy-primitives", "derive_more", @@ -639,9 +639,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-engine" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9c4c53a8b0905d931e7921774a1830609713bd3e8222347963172b03a3ecc68" +checksum = "4c60bdce3be295924122732b7ecd0b2495ce4790bedc5370ca7019c08ad3f26e" dependencies = [ "alloy-consensus", "alloy-eips", @@ -660,9 +660,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-eth" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed5fafb741c19b3cca4cdd04fa215c89413491f9695a3e928dee2ae5657f607e" +checksum = "9eae0c7c40da20684548cbc8577b6b7447f7bf4ddbac363df95e3da220e41e72" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -682,9 +682,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-mev" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a97bfc6d9b411c85bb08e1174ddd3e5d61b10d3bd13f529d6609f733cb2f6f" +checksum = "81c0dd81c24944cfbf45b5df7cd149d9cd3e354db81ccf08aa47e0e05be8ab97" dependencies = [ "alloy-consensus", "alloy-eips", @@ -697,9 +697,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-trace" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c55324323aa634b01bdecb2d47462a8dce05f5505b14a6e5db361eef16eda476" +checksum = "ef206a4b8d436fbb7cf2e6a61c692d11df78f9382becc3c9a283bd58e64f0583" dependencies = [ "alloy-primitives", "alloy-rpc-types-eth", @@ -711,9 +711,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-txpool" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96b1aa28effb6854be356ce92ed64cea3b323acd04c3f8bfb5126e2839698043" +checksum = "ecb5a795264a02222f9534435b8f40dcbd88de8e9d586647884aae24f389ebf2" dependencies = [ "alloy-primitives", "alloy-rpc-types-eth", @@ -723,9 +723,9 @@ dependencies = [ [[package]] name = "alloy-serde" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6f180c399ca7c1e2fe17ea58343910cad0090878a696ff5a50241aee12fc529" +checksum = "c0df1987ed0ff2d0159d76b52e7ddfc4e4fbddacc54d2fbee765e0d14d7c01b5" dependencies = [ "alloy-primitives", "arbitrary", @@ -735,9 +735,9 @@ dependencies = [ [[package]] name = "alloy-signer" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc39ad2c0a3d2da8891f4081565780703a593f090f768f884049aa3aa929cbc" +checksum = "6ff69deedee7232d7ce5330259025b868c5e6a52fa8dffda2c861fb3a5889b24" dependencies = [ "alloy-primitives", "async-trait", @@ -750,9 +750,9 @@ dependencies = [ [[package]] name = "alloy-signer-local" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "930e17cb1e46446a193a593a3bfff8d0ecee4e510b802575ebe300ae2e43ef75" +checksum = "72cfe0be3ec5a8c1a46b2e5a7047ed41121d360d97f4405bb7c1c784880c86cb" dependencies = [ "alloy-consensus", "alloy-network", @@ -839,9 +839,9 @@ dependencies = [ [[package]] name = "alloy-transport" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cae82426d98f8bc18f53c5223862907cac30ab8fc5e4cd2bb50808e6d3ab43d8" +checksum = "be98b07210d24acf5b793c99b759e9a696e4a2e67593aec0487ae3b3e1a2478c" dependencies = [ "alloy-json-rpc", "auto_impl", @@ -862,9 +862,9 @@ dependencies = [ [[package]] name = "alloy-transport-http" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90aa6825760905898c106aba9c804b131816a15041523e80b6d4fe7af6380ada" +checksum = "4198a1ee82e562cab85e7f3d5921aab725d9bd154b6ad5017f82df1695877c97" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -877,9 +877,9 @@ dependencies = [ [[package]] name = "alloy-transport-ipc" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ace83a4a6bb896e5894c3479042e6ba78aa5271dde599aa8c36a021d49cc8cc" +checksum = "d8db249779ebc20dc265920c7e706ed0d31dbde8627818d1cbde60919b875bb0" dependencies = [ "alloy-json-rpc", "alloy-pubsub", @@ -897,9 +897,9 @@ dependencies = [ [[package]] name = "alloy-transport-ws" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86c9ab4c199e3a8f3520b60ba81aa67bb21fed9ed0d8304e0569094d0758a56f" +checksum = "5ad2344a12398d7105e3722c9b7a7044ea837128e11d453604dec6e3731a86e2" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -935,9 +935,9 @@ dependencies = [ [[package]] name = "alloy-tx-macros" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae109e33814b49fc0a62f2528993aa8a2dd346c26959b151f05441dc0b9da292" +checksum = "333544408503f42d7d3792bfc0f7218b643d968a03d2c0ed383ae558fb4a76d0" dependencies = [ "darling 0.21.3", "proc-macro2", @@ -1627,15 +1627,15 @@ checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" [[package]] name = "bitcoin-io" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b47c4ab7a93edb0c7198c5535ed9b52b63095f4e9b45279c6736cec4b856baf" +checksum = "2dee39a0ee5b4095224a0cfc6bf4cc1baf0f9624b96b367e53b66d974e51d953" [[package]] name = "bitcoin_hashes" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb18c03d0db0247e147a21a6faafd5a7eb851c743db062de72018b6b7e8e4d16" +checksum = "26ec84b80c482df901772e931a9a681e26a1b9ee2302edeff23cb30328745c8b" dependencies = [ "bitcoin-io", "hex-conservative", @@ -2471,9 +2471,9 @@ dependencies = [ [[package]] name = "convert_case" -version = "0.7.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb402b8d4c85569410425650ce3eddc7d698ed96d39a73f941b08fb63082f1e7" +checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9" dependencies = [ "unicode-segmentation", ] @@ -2978,22 +2978,23 @@ dependencies = [ [[package]] name = "derive_more" -version = "2.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +checksum = "10b768e943bed7bf2cab53df09f4bc34bfd217cdb57d971e769874c9a6710618" dependencies = [ "derive_more-impl", ] [[package]] name = "derive_more-impl" -version = "2.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +checksum = "6d286bfdaf75e988b4a78e013ecd79c581e06399ab53fbacd2d916c2f904f30b" dependencies = [ "convert_case", "proc-macro2", "quote", + "rustc_version 0.4.1", "syn 2.0.111", "unicode-xid", ] @@ -3981,9 +3982,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "flate2" -version = "1.1.5" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb" +checksum = "a2152dbcb980c05735e2a651d96011320a949eb31a0c8b38b72645ce97dec676" dependencies = [ "crc32fast", "miniz_oxide", @@ -4277,9 +4278,9 @@ dependencies = [ [[package]] name = "git2" -version = "0.20.2" +version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2deb07a133b1520dc1a5690e9bd08950108873d7ed5de38dcc74d3b5ebffa110" +checksum = "3e2b37e2f62729cdada11f0e6b3b6fe383c69c29fc619e391223e12856af308c" dependencies = [ "bitflags 2.10.0", "libc", @@ -4704,9 +4705,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56" +checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ "base64 0.22.1", "bytes", @@ -5416,15 +5417,15 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.177" +version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" +checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "libgit2-sys" -version = "0.18.2+1.9.1" +version = "0.18.3+1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c42fe03df2bd3c53a3a9c7317ad91d80c81cd1fb0caec8d7cc4cd2bfa10c222" +checksum = "c9b3acc4b91781bb0b3386669d325163746af5f6e4f73e6d2d630e09a35f3487" dependencies = [ "cc", "libc", @@ -5569,9 +5570,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.28" +version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "loom" @@ -5839,9 +5840,9 @@ dependencies = [ [[package]] name = "mio" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" +checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", "log", @@ -6792,7 +6793,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" dependencies = [ - "toml_edit 0.23.7", + "toml_edit 0.23.9", ] [[package]] @@ -12787,9 +12788,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.23.7" +version = "0.23.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" +checksum = "5d7cbc3b4b49633d57a0509303158ca50de80ae32c265093b24c414705807832" dependencies = [ "indexmap 2.12.1", "toml_datetime 0.7.3", @@ -13303,9 +13304,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.18.1" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" +checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ "getrandom 0.3.4", "js-sys", @@ -14282,18 +14283,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.30" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea879c944afe8a2b25fef16bb4ba234f47c694565e97383b36f3a878219065c" +checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.30" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf955aa904d6040f70dc8e9384444cb1030aed272ba3cb09bbc4ab9e7c1f34f5" +checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 0294a059f8e..8533bbf7f37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -495,33 +495,33 @@ alloy-trie = { version = "0.9.1", default-features = false } alloy-hardforks = "0.4.5" -alloy-consensus = { version = "1.1.2", default-features = false } -alloy-contract = { version = "1.1.2", default-features = false } -alloy-eips = { version = "1.1.2", default-features = false } -alloy-genesis = { version = "1.1.2", default-features = false } -alloy-json-rpc = { version = "1.1.2", default-features = false } -alloy-network = { version = "1.1.2", default-features = false } -alloy-network-primitives = { version = "1.1.2", default-features = false } -alloy-provider = { version = "1.1.2", features = ["reqwest", "debug-api"], default-features = false } -alloy-pubsub = { version = "1.1.2", default-features = false } -alloy-rpc-client = { version = "1.1.2", default-features = false } -alloy-rpc-types = { version = "1.1.2", features = ["eth"], default-features = false } -alloy-rpc-types-admin = { version = "1.1.2", default-features = false } -alloy-rpc-types-anvil = { version = "1.1.2", default-features = false } -alloy-rpc-types-beacon = { version = "1.1.2", default-features = false } -alloy-rpc-types-debug = { version = "1.1.2", default-features = false } -alloy-rpc-types-engine = { version = "1.1.2", default-features = false } -alloy-rpc-types-eth = { version = "1.1.2", default-features = false } -alloy-rpc-types-mev = { version = "1.1.2", default-features = false } -alloy-rpc-types-trace = { version = "1.1.2", default-features = false } -alloy-rpc-types-txpool = { version = "1.1.2", default-features = false } -alloy-serde = { version = "1.1.2", default-features = false } -alloy-signer = { version = "1.1.2", default-features = false } -alloy-signer-local = { version = "1.1.2", default-features = false } -alloy-transport = { version = "1.1.2" } -alloy-transport-http = { version = "1.1.2", features = ["reqwest-rustls-tls"], default-features = false } -alloy-transport-ipc = { version = "1.1.2", default-features = false } -alloy-transport-ws = { version = "1.1.2", default-features = false } +alloy-consensus = { version = "1.1.3", default-features = false } +alloy-contract = { version = "1.1.3", default-features = false } +alloy-eips = { version = "1.1.3", default-features = false } +alloy-genesis = { version = "1.1.3", default-features = false } +alloy-json-rpc = { version = "1.1.3", default-features = false } +alloy-network = { version = "1.1.3", default-features = false } +alloy-network-primitives = { version = "1.1.3", default-features = false } +alloy-provider = { version = "1.1.3", features = ["reqwest", "debug-api"], default-features = false } +alloy-pubsub = { version = "1.1.3", default-features = false } +alloy-rpc-client = { version = "1.1.3", default-features = false } +alloy-rpc-types = { version = "1.1.3", features = ["eth"], default-features = false } +alloy-rpc-types-admin = { version = "1.1.3", default-features = false } +alloy-rpc-types-anvil = { version = "1.1.3", default-features = false } +alloy-rpc-types-beacon = { version = "1.1.3", default-features = false } +alloy-rpc-types-debug = { version = "1.1.3", default-features = false } +alloy-rpc-types-engine = { version = "1.1.3", default-features = false } +alloy-rpc-types-eth = { version = "1.1.3", default-features = false } +alloy-rpc-types-mev = { version = "1.1.3", default-features = false } +alloy-rpc-types-trace = { version = "1.1.3", default-features = false } +alloy-rpc-types-txpool = { version = "1.1.3", default-features = false } +alloy-serde = { version = "1.1.3", default-features = false } +alloy-signer = { version = "1.1.3", default-features = false } +alloy-signer-local = { version = "1.1.3", default-features = false } +alloy-transport = { version = "1.1.3" } +alloy-transport-http = { version = "1.1.3", features = ["reqwest-rustls-tls"], default-features = false } +alloy-transport-ipc = { version = "1.1.3", default-features = false } +alloy-transport-ws = { version = "1.1.3", default-features = false } # op alloy-op-evm = { version = "0.24.1", default-features = false } diff --git a/bin/reth-bench-compare/src/comparison.rs b/bin/reth-bench-compare/src/comparison.rs index 544d17dd848..892456c8ca9 100644 --- a/bin/reth-bench-compare/src/comparison.rs +++ b/bin/reth-bench-compare/src/comparison.rs @@ -57,6 +57,7 @@ pub(crate) struct TotalGasRow { /// - `mean_new_payload_latency_ms`: arithmetic mean latency across blocks. /// - `median_new_payload_latency_ms`: p50 latency across blocks. /// - `p90_new_payload_latency_ms` / `p99_new_payload_latency_ms`: tail latencies across blocks. +/// - `std_dev_new_payload_latency_ms`: standard deviation of latency across blocks. #[derive(Debug, Clone, Serialize)] pub(crate) struct BenchmarkSummary { pub total_blocks: u64, @@ -66,6 +67,7 @@ pub(crate) struct BenchmarkSummary { pub median_new_payload_latency_ms: f64, pub p90_new_payload_latency_ms: f64, pub p99_new_payload_latency_ms: f64, + pub std_dev_new_payload_latency_ms: f64, pub gas_per_second: f64, pub blocks_per_second: f64, pub min_block_number: u64, @@ -96,6 +98,7 @@ pub(crate) struct RefInfo { /// Percent deltas are `(feature - baseline) / baseline * 100`: /// - `new_payload_latency_p50_change_percent` / p90 / p99: percent changes of the respective /// per-block percentiles. +/// - `std_dev_change_percent`: percent change in standard deviation of newPayload latency. /// - `per_block_latency_change_mean_percent` / `per_block_latency_change_median_percent` are the /// mean and median of per-block percent deltas (feature vs baseline), capturing block-level /// drift. @@ -114,6 +117,7 @@ pub(crate) struct ComparisonSummary { pub new_payload_latency_p50_change_percent: f64, pub new_payload_latency_p90_change_percent: f64, pub new_payload_latency_p99_change_percent: f64, + pub std_dev_change_percent: f64, pub gas_per_second_change_percent: f64, pub blocks_per_second_change_percent: f64, } @@ -335,6 +339,9 @@ impl ComparisonGenerator { let mean_new_payload_latency_ms: f64 = latencies_ms.iter().sum::() / total_blocks as f64; + let std_dev_new_payload_latency_ms = + calculate_std_dev(&latencies_ms, mean_new_payload_latency_ms); + let mut sorted_latencies_ms = latencies_ms; sorted_latencies_ms.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal)); let median_new_payload_latency_ms = percentile(&sorted_latencies_ms, 0.5); @@ -365,6 +372,7 @@ impl ComparisonGenerator { median_new_payload_latency_ms, p90_new_payload_latency_ms, p99_new_payload_latency_ms, + std_dev_new_payload_latency_ms, gas_per_second, blocks_per_second, min_block_number, @@ -432,6 +440,10 @@ impl ComparisonGenerator { baseline.p99_new_payload_latency_ms, feature.p99_new_payload_latency_ms, ), + std_dev_change_percent: calc_percent_change( + baseline.std_dev_new_payload_latency_ms, + feature.std_dev_new_payload_latency_ms, + ), gas_per_second_change_percent: calc_percent_change( baseline.gas_per_second, feature.gas_per_second, @@ -562,6 +574,7 @@ impl ComparisonGenerator { " NewPayload Latency p99: {:+.2}%", summary.new_payload_latency_p99_change_percent ); + println!(" NewPayload Latency std dev: {:+.2}%", summary.std_dev_change_percent); println!( " Gas/Second: {:+.2}%", summary.gas_per_second_change_percent @@ -584,11 +597,12 @@ impl ComparisonGenerator { ); println!(" NewPayload latency (ms):"); println!( - " mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}", + " mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}, std dev: {:.2}", baseline.mean_new_payload_latency_ms, baseline.median_new_payload_latency_ms, baseline.p90_new_payload_latency_ms, - baseline.p99_new_payload_latency_ms + baseline.p99_new_payload_latency_ms, + baseline.std_dev_new_payload_latency_ms ); if let (Some(start), Some(end)) = (&report.baseline.start_timestamp, &report.baseline.end_timestamp) @@ -613,11 +627,12 @@ impl ComparisonGenerator { ); println!(" NewPayload latency (ms):"); println!( - " mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}", + " mean: {:.2}, p50: {:.2}, p90: {:.2}, p99: {:.2}, std dev: {:.2}", feature.mean_new_payload_latency_ms, feature.median_new_payload_latency_ms, feature.p90_new_payload_latency_ms, - feature.p99_new_payload_latency_ms + feature.p99_new_payload_latency_ms, + feature.std_dev_new_payload_latency_ms ); if let (Some(start), Some(end)) = (&report.feature.start_timestamp, &report.feature.end_timestamp) diff --git a/crates/cli/commands/src/db/settings.rs b/crates/cli/commands/src/db/settings.rs index b4d718d8030..072312cfb41 100644 --- a/crates/cli/commands/src/db/settings.rs +++ b/crates/cli/commands/src/db/settings.rs @@ -91,6 +91,7 @@ impl Command { let mut settings @ StorageSettings { receipts_in_static_files: _, transaction_senders_in_static_files: _, + storages_history_in_rocksdb: _, } = settings.unwrap_or_else(StorageSettings::legacy); // Update the setting based on the key diff --git a/crates/engine/tree/benches/state_root_task.rs b/crates/engine/tree/benches/state_root_task.rs index b6306678b5b..a99562923b8 100644 --- a/crates/engine/tree/benches/state_root_task.rs +++ b/crates/engine/tree/benches/state_root_task.rs @@ -229,9 +229,15 @@ fn bench_state_root(c: &mut Criterion) { black_box({ let mut handle = payload_processor.spawn( Default::default(), - core::iter::empty::< - Result, core::convert::Infallible>, - >(), + ( + core::iter::empty::< + Result< + Recovered, + core::convert::Infallible, + >, + >(), + std::convert::identity, + ), StateProviderBuilder::new(provider.clone(), genesis_hash, None), OverlayStateProviderFactory::new(provider), &TreeConfig::default(), diff --git a/crates/engine/tree/src/tree/cached_state.rs b/crates/engine/tree/src/tree/cached_state.rs index fd9999b9eba..b4b1f755ab3 100644 --- a/crates/engine/tree/src/tree/cached_state.rs +++ b/crates/engine/tree/src/tree/cached_state.rs @@ -146,17 +146,26 @@ impl StateProvider for CachedStateProvider { storage_key: StorageKey, ) -> ProviderResult> { match self.caches.get_storage(&account, &storage_key) { - SlotStatus::NotCached => { - self.metrics.storage_cache_misses.increment(1); + (SlotStatus::NotCached, maybe_cache) => { let final_res = self.state_provider.storage(account, storage_key)?; - self.caches.insert_storage(account, storage_key, final_res); + let account_cache = maybe_cache.unwrap_or_default(); + account_cache.insert_storage(storage_key, final_res); + // we always need to insert the value to update the weights. + // Note: there exists a race when the storage cache did not exist yet and two + // consumers looking up the a storage value for this account for the first time, + // however we can assume that this will only happen for the very first (mostlikely + // the same) value, and don't expect that this will accidentally + // replace an account storage cache with additional values. + self.caches.insert_storage_cache(account, account_cache); + + self.metrics.storage_cache_misses.increment(1); Ok(final_res) } - SlotStatus::Empty => { + (SlotStatus::Empty, _) => { self.metrics.storage_cache_hits.increment(1); Ok(None) } - SlotStatus::Value(value) => { + (SlotStatus::Value(value), _) => { self.metrics.storage_cache_hits.increment(1); Ok(Some(value)) } @@ -311,18 +320,28 @@ pub(crate) struct ExecutionCache { impl ExecutionCache { /// Get storage value from hierarchical cache. /// - /// Returns a `SlotStatus` indicating whether: - /// - `NotCached`: The account's storage cache doesn't exist - /// - `Empty`: The slot exists in the account's cache but is empty - /// - `Value`: The slot exists and has a specific value - pub(crate) fn get_storage(&self, address: &Address, key: &StorageKey) -> SlotStatus { + /// Returns a tuple of: + /// - `SlotStatus` indicating whether: + /// - `NotCached`: The account's storage cache doesn't exist + /// - `Empty`: The slot exists in the account's cache but is empty + /// - `Value`: The slot exists and has a specific value + /// - `Option>`: The account's storage cache if it exists + pub(crate) fn get_storage( + &self, + address: &Address, + key: &StorageKey, + ) -> (SlotStatus, Option>) { match self.storage_cache.get(address) { - None => SlotStatus::NotCached, - Some(account_cache) => account_cache.get_storage(key), + None => (SlotStatus::NotCached, None), + Some(account_cache) => { + let status = account_cache.get_storage(key); + (status, Some(account_cache)) + } } } /// Insert storage value into hierarchical cache + #[cfg(test)] pub(crate) fn insert_storage( &self, address: Address, @@ -351,6 +370,15 @@ impl ExecutionCache { self.storage_cache.insert(address, account_cache); } + /// Inserts the [`AccountStorageCache`]. + pub(crate) fn insert_storage_cache( + &self, + address: Address, + storage_cache: Arc, + ) { + self.storage_cache.insert(address, storage_cache); + } + /// Invalidate storage for specific account pub(crate) fn invalidate_account_storage(&self, address: &Address) { self.storage_cache.invalidate(address); @@ -800,7 +828,7 @@ mod tests { caches.insert_storage(address, storage_key, Some(storage_value)); // check that the storage returns the cached value - let slot_status = caches.get_storage(&address, &storage_key); + let (slot_status, _) = caches.get_storage(&address, &storage_key); assert_eq!(slot_status, SlotStatus::Value(storage_value)); } @@ -814,7 +842,7 @@ mod tests { let caches = ExecutionCacheBuilder::default().build_caches(1000); // check that the storage is not cached - let slot_status = caches.get_storage(&address, &storage_key); + let (slot_status, _) = caches.get_storage(&address, &storage_key); assert_eq!(slot_status, SlotStatus::NotCached); } @@ -830,7 +858,7 @@ mod tests { caches.insert_storage(address, storage_key, None); // check that the storage is empty - let slot_status = caches.get_storage(&address, &storage_key); + let (slot_status, _) = caches.get_storage(&address, &storage_key); assert_eq!(slot_status, SlotStatus::Empty); } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 2e39ecd919f..7bc9811495d 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -1901,6 +1901,16 @@ where false } + /// Returns true if the given hash is part of the last received sync target fork choice update. + /// + /// See [`ForkchoiceStateTracker::sync_target_state`] + fn is_any_sync_target(&self, block_hash: B256) -> bool { + if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() { + return target.contains(block_hash) + } + false + } + /// Checks if the given `check` hash points to an invalid header, inserting the given `head` /// block into the invalid header cache if the `check` hash has a known invalid ancestor. /// @@ -2040,9 +2050,12 @@ where match self.insert_block(child) { Ok(res) => { debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block"); - if self.is_sync_target_head(child_num_hash.hash) && + if self.is_any_sync_target(child_num_hash.hash) && matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid)) { + debug!(target: "engine::tree", child =?child_num_hash, "connected sync target block"); + // we just inserted a block that we know is part of the canonical chain, so + // we can make it canonical self.make_canonical(child_num_hash.hash)?; } } @@ -2348,11 +2361,15 @@ where // try to append the block match self.insert_block(block) { Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => { - if self.is_sync_target_head(block_num_hash.hash) { - trace!(target: "engine::tree", "appended downloaded sync target block"); + // check if we just inserted a block that's part of sync targets, + // i.e. head, safe, or finalized + if let Some(sync_target) = self.state.forkchoice_state_tracker.sync_target_state() && + sync_target.contains(block_num_hash.hash) + { + debug!(target: "engine::tree", ?sync_target, "appended downloaded sync target block"); - // we just inserted the current sync target block, we can try to make it - // canonical + // we just inserted a block that we know is part of the canonical chain, so we + // can make it canonical return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical { sync_target_head: block_num_hash.hash, }))) diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index ed951a81a54..98ba7e6f553 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -21,6 +21,7 @@ use executor::WorkloadExecutor; use multiproof::{SparseTrieUpdate, *}; use parking_lot::RwLock; use prewarm::PrewarmMetrics; +use rayon::iter::{ParallelBridge, ParallelIterator}; use reth_engine_primitives::ExecutableTxIterator; use reth_evm::{ execute::{ExecutableTxFor, WithTxEnv}, @@ -40,6 +41,7 @@ use reth_trie_sparse::{ }; use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds}; use std::{ + collections::BTreeMap, sync::{ atomic::AtomicBool, mpsc::{self, channel}, @@ -312,21 +314,50 @@ where mpsc::Receiver, I::Tx>, I::Error>>, usize, ) { + let (transactions, convert) = transactions.into(); + let transactions = transactions.into_iter(); // Get the transaction count for prewarming task // Use upper bound if available (more accurate), otherwise use lower bound let (lower, upper) = transactions.size_hint(); let transaction_count_hint = upper.unwrap_or(lower); + // Spawn a task that iterates through all transactions in parallel and sends them to the + // main task. + let (tx, rx) = mpsc::channel(); + self.executor.spawn_blocking(move || { + transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| { + let tx = convert(tx); + let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) }); + let _ = sender.send((idx, tx)); + }); + }); + + // Spawn a task that processes out-of-order transactions from the task above and sends them + // to prewarming and execution tasks. let (prewarm_tx, prewarm_rx) = mpsc::channel(); let (execute_tx, execute_rx) = mpsc::channel(); self.executor.spawn_blocking(move || { - for tx in transactions { - let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) }); + let mut next_for_execution = 0; + let mut queue = BTreeMap::new(); + while let Ok((idx, tx)) = rx.recv() { // only send Ok(_) variants to prewarming task if let Ok(tx) = &tx { let _ = prewarm_tx.send(tx.clone()); } - let _ = execute_tx.send(tx); + + if next_for_execution == idx { + let _ = execute_tx.send(tx); + next_for_execution += 1; + + while let Some(entry) = queue.first_entry() && + *entry.key() == next_for_execution + { + let _ = execute_tx.send(entry.remove()); + next_for_execution += 1; + } + } else { + queue.insert(idx, tx); + } } }); @@ -1017,13 +1048,19 @@ mod tests { let provider_factory = BlockchainProvider::new(factory).unwrap(); - let mut handle = payload_processor.spawn( - Default::default(), - core::iter::empty::, core::convert::Infallible>>(), - StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None), - OverlayStateProviderFactory::new(provider_factory), - &TreeConfig::default(), - ); + let mut handle = + payload_processor.spawn( + Default::default(), + ( + core::iter::empty::< + Result, core::convert::Infallible>, + >(), + std::convert::identity, + ), + StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None), + OverlayStateProviderFactory::new(provider_factory), + &TreeConfig::default(), + ); let mut state_hook = handle.state_hook(); diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index af368b47b2d..f5cd58dcb8c 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -213,16 +213,31 @@ where Evm: ConfigureEngineEvm, { match input { - BlockOrPayload::Payload(payload) => Ok(Either::Left( - self.evm_config + BlockOrPayload::Payload(payload) => { + let (iter, convert) = self + .evm_config .tx_iterator_for_payload(payload) .map_err(NewPayloadError::other)? - .map(|res| res.map(Either::Left).map_err(NewPayloadError::other)), - )), + .into(); + + let iter = Either::Left(iter.into_iter().map(Either::Left)); + let convert = move |tx| { + let Either::Left(tx) = tx else { unreachable!() }; + convert(tx).map(Either::Left).map_err(Either::Left) + }; + + // Box the closure to satisfy the `Fn` bound both here and in the branch below + Ok((iter, Box::new(convert) as Box _ + Send + Sync + 'static>)) + } BlockOrPayload::Block(block) => { - Ok(Either::Right(block.body().clone_transactions().into_iter().map(|tx| { - Ok(Either::Right(tx.try_into_recovered().map_err(NewPayloadError::other)?)) - }))) + let iter = + Either::Right(block.body().clone_transactions().into_iter().map(Either::Right)); + let convert = move |tx: Either<_, N::SignedTx>| { + let Either::Right(tx) = tx else { unreachable!() }; + tx.try_into_recovered().map(Either::Right).map_err(Either::Right) + }; + + Ok((iter, Box::new(convert))) } } } diff --git a/crates/era/tests/it/era/dd.rs b/crates/era/tests/it/era/dd.rs deleted file mode 100644 index 1537b912252..00000000000 --- a/crates/era/tests/it/era/dd.rs +++ /dev/null @@ -1,190 +0,0 @@ -//! Simple decoding and decompressing tests -//! for mainnet era files - -use reth_era::{ - common::file_ops::{StreamReader, StreamWriter}, - era::file::{EraReader, EraWriter}, -}; -use std::io::Cursor; - -use crate::{EraTestDownloader, HOODI}; - -// Helper function to test decompression and decoding for a specific era file -async fn test_era_file_decompression_and_decoding( - downloader: &EraTestDownloader, - filename: &str, - network: &str, -) -> eyre::Result<()> { - println!("\nTesting file: {filename}"); - let file = downloader.open_era_file(filename, network).await?; - - // Handle genesis era separately - if file.group.is_genesis() { - // Genesis has no blocks - assert_eq!(file.group.blocks.len(), 0, "Genesis should have no blocks"); - assert!(file.group.slot_index.is_none(), "Genesis should not have block slot index"); - - // Test genesis state decompression - let state_data = file.group.era_state.decompress()?; - assert!(!state_data.is_empty(), "Genesis state should decompress to non-empty data"); - - // Verify state slot index - assert_eq!( - file.group.state_slot_index.slot_count(), - 1, - "Genesis state index should have count of 1" - ); - - let mut buffer = Vec::new(); - { - let mut writer = EraWriter::new(&mut buffer); - writer.write_file(&file)?; - } - - let reader = EraReader::new(Cursor::new(&buffer)); - let read_back_file = reader.read(file.id.network_name.clone())?; - - assert_eq!( - file.group.era_state.decompress()?, - read_back_file.group.era_state.decompress()?, - "Genesis state data should be identical" - ); - - println!("Genesis era verified successfully"); - return Ok(()); - } - - // Non-genesis era - test beacon blocks - println!( - " Non-genesis era with {} beacon blocks, starting at slot {}", - file.group.blocks.len(), - file.group.starting_slot() - ); - - // Test beacon block decompression across different positions - let test_block_indices = [ - 0, // First block - file.group.blocks.len() / 2, // Middle block - file.group.blocks.len() - 1, // Last block - ]; - - for &block_idx in &test_block_indices { - let block = &file.group.blocks[block_idx]; - let slot = file.group.starting_slot() + block_idx as u64; - - println!( - "\n Testing beacon block at slot {}, compressed size: {} bytes", - slot, - block.data.len() - ); - - // Test beacon block decompression - let block_data = block.decompress()?; - assert!( - !block_data.is_empty(), - "Beacon block at slot {slot} decompression should produce non-empty data" - ); - } - - // Test era state decompression - let state_data = file.group.era_state.decompress()?; - assert!(!state_data.is_empty(), "Era state decompression should produce non-empty data"); - println!(" Era state decompressed: {} bytes", state_data.len()); - - // Verify slot indices - if let Some(ref block_slot_index) = file.group.slot_index { - println!( - " Block slot index: starting_slot={}, count={}", - block_slot_index.starting_slot, - block_slot_index.slot_count() - ); - - // Check for empty slots - let empty_slots: Vec = (0..block_slot_index.slot_count()) - .filter(|&i| !block_slot_index.has_data_at_slot(i)) - .collect(); - - if !empty_slots.is_empty() { - println!( - " Found {} empty slots (first few): {:?}", - empty_slots.len(), - &empty_slots[..empty_slots.len().min(5)] - ); - } - } - - // Test round-trip serialization - let mut buffer = Vec::new(); - { - let mut writer = EraWriter::new(&mut buffer); - writer.write_file(&file)?; - } - - // Read back from buffer - let reader = EraReader::new(Cursor::new(&buffer)); - let read_back_file = reader.read(file.id.network_name.clone())?; - - // Verify basic properties are preserved - assert_eq!(file.id.network_name, read_back_file.id.network_name); - assert_eq!(file.id.start_slot, read_back_file.id.start_slot); - assert_eq!(file.id.slot_count, read_back_file.id.slot_count); - assert_eq!(file.group.blocks.len(), read_back_file.group.blocks.len()); - - // Test data preservation for beacon blocks - for &idx in &test_block_indices { - let original_block = &file.group.blocks[idx]; - let read_back_block = &read_back_file.group.blocks[idx]; - let slot = file.group.starting_slot() + idx as u64; - - // Test that decompressed data is identical - assert_eq!( - original_block.decompress()?, - read_back_block.decompress()?, - "Beacon block data should be identical for slot {slot}" - ); - } - - // Test state data preservation - assert_eq!( - file.group.era_state.decompress()?, - read_back_file.group.era_state.decompress()?, - "Era state data should be identical" - ); - - // Test slot indices preservation - if let (Some(original_index), Some(read_index)) = - (&file.group.slot_index, &read_back_file.group.slot_index) - { - assert_eq!( - original_index.starting_slot, read_index.starting_slot, - "Block slot index starting slot should match" - ); - assert_eq!( - original_index.offsets, read_index.offsets, - "Block slot index offsets should match" - ); - } - - assert_eq!( - file.group.state_slot_index.starting_slot, - read_back_file.group.state_slot_index.starting_slot, - "State slot index starting slot should match" - ); - assert_eq!( - file.group.state_slot_index.offsets, read_back_file.group.state_slot_index.offsets, - "State slot index offsets should match" - ); - - Ok(()) -} - -#[test_case::test_case("hoodi-00000-212f13fc.era"; "era_dd_hoodi_0")] -#[test_case::test_case("hoodi-00021-857e418b.era"; "era_dd_hoodi_21")] -#[test_case::test_case("hoodi-00175-202aaa6d.era"; "era_dd_hoodi_175")] -#[test_case::test_case("hoodi-00201-0d521fc8.era"; "era_dd_hoodi_201")] -#[tokio::test(flavor = "multi_thread")] -#[ignore = "download intensive"] -async fn test_hoodi_era1_file_decompression_and_decoding(filename: &str) -> eyre::Result<()> { - let downloader = EraTestDownloader::new().await?; - test_era_file_decompression_and_decoding(&downloader, filename, HOODI).await -} diff --git a/crates/era/tests/it/era/mod.rs b/crates/era/tests/it/era/mod.rs index 66ae11afc49..a5bb431720a 100644 --- a/crates/era/tests/it/era/mod.rs +++ b/crates/era/tests/it/era/mod.rs @@ -1,2 +1,2 @@ -mod dd; mod genesis; +mod roundtrip; diff --git a/crates/era/tests/it/era/roundtrip.rs b/crates/era/tests/it/era/roundtrip.rs new file mode 100644 index 00000000000..7234b43f714 --- /dev/null +++ b/crates/era/tests/it/era/roundtrip.rs @@ -0,0 +1,228 @@ +//! Roundtrip tests for `.era` files. +//! +//! These tests verify the full lifecycle of era files by: +//! - Reading files from their original source +//! - Decompressing their contents +//! - Re-compressing the data +//! - Writing the data back to a new file +//! - Confirming that all original data is preserved throughout the process +//! +//! +//! Only a couple of era files are downloaded from `https://mainnet.era.nimbus.team/` for mainnet +//! and `https://hoodi.era.nimbus.team/` for hoodi to keep the tests efficient. + +use reth_era::{ + common::file_ops::{EraFileFormat, StreamReader, StreamWriter}, + era::{ + file::{EraFile, EraReader, EraWriter}, + types::{ + consensus::{CompressedBeaconState, CompressedSignedBeaconBlock}, + group::{EraGroup, EraId}, + }, + }, +}; +use std::io::Cursor; + +use crate::{EraTestDownloader, HOODI, MAINNET}; + +// Helper function to test roundtrip compression/encoding for a specific file +async fn test_era_file_roundtrip( + downloader: &EraTestDownloader, + filename: &str, + network: &str, +) -> eyre::Result<()> { + println!("\nTesting roundtrip for file: {filename}"); + + let original_file = downloader.open_era_file(filename, network).await?; + + if original_file.group.is_genesis() { + println!("Genesis era detected, using special handling"); + assert_eq!(original_file.group.blocks.len(), 0, "Genesis should have no blocks"); + assert!( + original_file.group.slot_index.is_none(), + "Genesis should not have block slot index" + ); + + let state_data = original_file.group.era_state.decompress()?; + println!(" Genesis state decompressed: {} bytes", state_data.len()); + + // File roundtrip test + let mut buffer = Vec::new(); + { + let mut writer = EraWriter::new(&mut buffer); + writer.write_file(&original_file)?; + } + + let reader = EraReader::new(Cursor::new(&buffer)); + let roundtrip_file = reader.read(network.to_string())?; + + assert_eq!( + original_file.group.era_state.decompress()?, + roundtrip_file.group.era_state.decompress()?, + "Genesis state data should be identical after roundtrip" + ); + + println!("Genesis era verified successfully"); + return Ok(()); + } + + // non genesis start + let original_state_data = original_file.group.era_state.decompress()?; + + let mut buffer = Vec::new(); + { + let mut writer = EraWriter::new(&mut buffer); + writer.write_file(&original_file)?; + } + + // Read back from buffer + let reader = EraReader::new(Cursor::new(&buffer)); + let roundtrip_file = reader.read(network.to_string())?; + + assert_eq!( + original_file.id.network_name, roundtrip_file.id.network_name, + "Network name should match after roundtrip" + ); + assert_eq!( + original_file.id.start_slot, roundtrip_file.id.start_slot, + "Start slot should match after roundtrip" + ); + assert_eq!( + original_file.group.blocks.len(), + roundtrip_file.group.blocks.len(), + "Block count should match after roundtrip" + ); + + // Select a few blocks to test + let test_block_indices = [ + 0, // First block + original_file.group.blocks.len() / 2, // Middle block + original_file.group.blocks.len() - 1, // Last block + ]; + + // Test individual beacon blocks + for &block_idx in &test_block_indices { + let original_block = &original_file.group.blocks[block_idx]; + let roundtrip_block = &roundtrip_file.group.blocks[block_idx]; + + let original_block_data = original_block.decompress()?; + let roundtrip_block_data = roundtrip_block.decompress()?; + + // Verify file roundtrip preserves data + assert_eq!( + original_block_data, roundtrip_block_data, + "Block {block_idx} data should be identical after file roundtrip" + ); + + // Verify compression roundtrip + let recompressed_block = CompressedSignedBeaconBlock::from_ssz(&original_block_data)?; + let recompressed_block_data = recompressed_block.decompress()?; + + assert_eq!( + original_block_data, recompressed_block_data, + "Block {block_idx} should be identical after re-compression cycle" + ); + } + + let roundtrip_state_data = roundtrip_file.group.era_state.decompress()?; + + assert_eq!( + original_state_data, roundtrip_state_data, + "Era state data should be identical after roundtrip" + ); + + let recompressed_state = CompressedBeaconState::from_ssz(&roundtrip_state_data)?; + let recompressed_state_data = recompressed_state.decompress()?; + + assert_eq!( + original_state_data, recompressed_state_data, + "Era state data should be identical after re-compression cycle" + ); + + let recompressed_blocks: Vec = roundtrip_file + .group + .blocks + .iter() + .map(|block| { + let data = block.decompress()?; + CompressedSignedBeaconBlock::from_ssz(&data) + }) + .collect::, _>>()?; + + let new_group = if let Some(ref block_index) = roundtrip_file.group.slot_index { + EraGroup::with_block_index( + recompressed_blocks, + recompressed_state, + block_index.clone(), + roundtrip_file.group.state_slot_index.clone(), + ) + } else { + EraGroup::new( + recompressed_blocks, + recompressed_state, + roundtrip_file.group.state_slot_index, + ) + }; + + let (start_slot, slot_count) = new_group.slot_range(); + let new_file = EraFile::new(new_group, EraId::new(network, start_slot, slot_count)); + + let mut reconstructed_buffer = Vec::new(); + { + let mut writer = EraWriter::new(&mut reconstructed_buffer); + writer.write_file(&new_file)?; + } + + let reader = EraReader::new(Cursor::new(&reconstructed_buffer)); + let reconstructed_file = reader.read(network.to_string())?; + + assert_eq!( + original_file.group.blocks.len(), + reconstructed_file.group.blocks.len(), + "Block count should match after full reconstruction" + ); + + // Verify all reconstructed blocks match + for (idx, (orig, recon)) in + original_file.group.blocks.iter().zip(reconstructed_file.group.blocks.iter()).enumerate() + { + assert_eq!( + orig.decompress()?, + recon.decompress()?, + "Block {idx} should match after full reconstruction" + ); + } + + // Verify reconstructed state matches + assert_eq!( + original_state_data, + reconstructed_file.group.era_state.decompress()?, + "State should match after full reconstruction" + ); + + println!("File {filename} roundtrip successful"); + Ok(()) +} + +#[test_case::test_case("mainnet-00000-4b363db9.era"; "era_roundtrip_mainnet_0")] +#[test_case::test_case("mainnet-00178-0d0a5290.era"; "era_roundtrip_mainnet_178")] +#[test_case::test_case("mainnet-01070-7616e3e2.era"; "era_roundtrip_mainnet_1070")] +#[test_case::test_case("mainnet-01267-e3ddc749.era"; "era_roundtrip_mainnet_1267")] +#[test_case::test_case("mainnet-01592-d4dc8b98.era"; "era_roundtrip_mainnet_1592")] +#[tokio::test(flavor = "multi_thread")] +#[ignore = "download intensive"] +async fn test_roundtrip_compression_encoding_mainnet(filename: &str) -> eyre::Result<()> { + let downloader = EraTestDownloader::new().await?; + test_era_file_roundtrip(&downloader, filename, MAINNET).await +} + +#[test_case::test_case("hoodi-00000-212f13fc.era"; "era_roundtrip_hoodi_0")] +#[test_case::test_case("hoodi-00021-857e418b.era"; "era_roundtrip_hoodi_21")] +#[test_case::test_case("hoodi-00175-202aaa6d.era"; "era_roundtrip_hoodi_175")] +#[test_case::test_case("hoodi-00201-0d521fc8.era"; "era_roundtrip_hoodi_201")] +#[tokio::test(flavor = "multi_thread")] +#[ignore = "download intensive"] +async fn test_roundtrip_compression_encoding_hoodi(filename: &str) -> eyre::Result<()> { + let downloader = EraTestDownloader::new().await?; + test_era_file_roundtrip(&downloader, filename, HOODI).await +} diff --git a/crates/era/tests/it/era1/dd.rs b/crates/era/tests/it/era1/dd.rs deleted file mode 100644 index 6b2d2bd42d1..00000000000 --- a/crates/era/tests/it/era1/dd.rs +++ /dev/null @@ -1,159 +0,0 @@ -//! Simple decoding and decompressing tests -//! for mainnet era1 files - -use alloy_consensus::{BlockBody, Header}; -use alloy_primitives::U256; -use reth_era::{ - common::file_ops::{StreamReader, StreamWriter}, - e2s::types::IndexEntry, - era1::{ - file::{Era1Reader, Era1Writer}, - types::execution::CompressedBody, - }, -}; -use reth_ethereum_primitives::TransactionSigned; -use std::io::Cursor; - -use crate::{EraTestDownloader, MAINNET}; - -// Helper function to test decompression and decoding for a specific era1 file -async fn test_file_decompression( - downloader: &EraTestDownloader, - filename: &str, -) -> eyre::Result<()> { - println!("\nTesting file: {filename}"); - let file = downloader.open_era1_file(filename, MAINNET).await?; - - // Test block decompression across different positions in the file - let test_block_indices = [ - 0, // First block - file.group.blocks.len() / 2, // Middle block - file.group.blocks.len() - 1, // Last block - ]; - - for &block_idx in &test_block_indices { - let block = &file.group.blocks[block_idx]; - let block_number = file.group.block_index.starting_number() + block_idx as u64; - - println!( - "\n Testing block {}, compressed body size: {} bytes", - block_number, - block.body.data.len() - ); - - // Test header decompression and decoding - let header_data = block.header.decompress()?; - assert!( - !header_data.is_empty(), - "Block {block_number} header decompression should produce non-empty data" - ); - - let header = block.header.decode_header()?; - assert_eq!(header.number, block_number, "Decoded header should have correct block number"); - println!("Header decompression and decoding successful"); - - // Test body decompression - let body_data = block.body.decompress()?; - assert!( - !body_data.is_empty(), - "Block {block_number} body decompression should produce non-empty data" - ); - println!("Body decompression successful ({} bytes)", body_data.len()); - - let decoded_body: BlockBody = - CompressedBody::decode_body_from_decompressed::(&body_data) - .expect("Failed to decode body"); - - println!( - "Body decoding successful: {} transactions, {} ommers, withdrawals: {}", - decoded_body.transactions.len(), - decoded_body.ommers.len(), - decoded_body.withdrawals.is_some() - ); - - // Test receipts decompression - let receipts_data = block.receipts.decompress()?; - assert!( - !receipts_data.is_empty(), - "Block {block_number} receipts decompression should produce non-empty data" - ); - println!("Receipts decompression successful ({} bytes)", receipts_data.len()); - - assert!( - block.total_difficulty.value > U256::ZERO, - "Block {block_number} should have non-zero difficulty" - ); - println!("Total difficulty verified: {}", block.total_difficulty.value); - } - - // Test round-trip serialization - println!("\n Testing data preservation roundtrip..."); - let mut buffer = Vec::new(); - { - let mut writer = Era1Writer::new(&mut buffer); - writer.write_file(&file)?; - } - - // Read back from buffer - let reader = Era1Reader::new(Cursor::new(&buffer)); - let read_back_file = reader.read(file.id.network_name.clone())?; - - // Verify basic properties are preserved - assert_eq!(file.id.network_name, read_back_file.id.network_name); - assert_eq!(file.id.start_block, read_back_file.id.start_block); - assert_eq!(file.group.blocks.len(), read_back_file.group.blocks.len()); - assert_eq!(file.group.accumulator.root, read_back_file.group.accumulator.root); - - // Test data preservation for some blocks - for &idx in &test_block_indices { - let original_block = &file.group.blocks[idx]; - let read_back_block = &read_back_file.group.blocks[idx]; - let block_number = file.group.block_index.starting_number() + idx as u64; - - println!("Block {block_number} details:"); - println!(" Header size: {} bytes", original_block.header.data.len()); - println!(" Body size: {} bytes", original_block.body.data.len()); - println!(" Receipts size: {} bytes", original_block.receipts.data.len()); - - // Test that decompressed data is identical - assert_eq!( - original_block.header.decompress()?, - read_back_block.header.decompress()?, - "Header data should be identical for block {block_number}" - ); - - assert_eq!( - original_block.body.decompress()?, - read_back_block.body.decompress()?, - "Body data should be identical for block {block_number}" - ); - - assert_eq!( - original_block.receipts.decompress()?, - read_back_block.receipts.decompress()?, - "Receipts data should be identical for block {block_number}" - ); - - assert_eq!( - original_block.total_difficulty.value, read_back_block.total_difficulty.value, - "Total difficulty should be identical for block {block_number}" - ); - } - - Ok(()) -} - -#[test_case::test_case("mainnet-00000-5ec1ffb8.era1"; "era_dd_mainnet_0")] -#[test_case::test_case("mainnet-00003-d8b8a40b.era1"; "era_dd_mainnet_3")] -#[test_case::test_case("mainnet-00151-e322efe1.era1"; "era_dd_mainnet_151")] -#[test_case::test_case("mainnet-00293-0d6c5812.era1"; "era_dd_mainnet_293")] -#[test_case::test_case("mainnet-00443-ea71b6f9.era1"; "era_dd_mainnet_443")] -#[test_case::test_case("mainnet-01367-d7efc68f.era1"; "era_dd_mainnet_1367")] -#[test_case::test_case("mainnet-01610-99fdde4b.era1"; "era_dd_mainnet_1610")] -#[test_case::test_case("mainnet-01895-3f81607c.era1"; "era_dd_mainnet_1895")] -#[tokio::test(flavor = "multi_thread")] -#[ignore = "download intensive"] -async fn test_mainnet_era1_file_decompression_and_decoding(filename: &str) -> eyre::Result<()> { - let downloader = EraTestDownloader::new().await?; - test_file_decompression(&downloader, filename).await -} diff --git a/crates/era/tests/it/era1/mod.rs b/crates/era/tests/it/era1/mod.rs index fb1e16f4e0e..a5bb431720a 100644 --- a/crates/era/tests/it/era1/mod.rs +++ b/crates/era/tests/it/era1/mod.rs @@ -1,3 +1,2 @@ -mod dd; mod genesis; mod roundtrip; diff --git a/crates/era/tests/it/era1/roundtrip.rs b/crates/era/tests/it/era1/roundtrip.rs index 84250c440e7..e86189e2083 100644 --- a/crates/era/tests/it/era1/roundtrip.rs +++ b/crates/era/tests/it/era1/roundtrip.rs @@ -6,6 +6,9 @@ //! - Re-encoding and recompressing the data //! - Writing the data back to a new file //! - Confirming that all original data is preserved throughout the process +//! +//! Only a couple of era1 files are downloaded from for mainnet +//! and for sepolia to keep the tests efficient. use alloy_consensus::{BlockBody, BlockHeader, Header, ReceiptEnvelope}; use reth_era::{ @@ -27,7 +30,7 @@ use std::io::Cursor; use crate::{EraTestDownloader, MAINNET, SEPOLIA}; // Helper function to test roundtrip compression/encoding for a specific file -async fn test_file_roundtrip( +async fn test_era1_file_roundtrip( downloader: &EraTestDownloader, filename: &str, network: &str, @@ -252,27 +255,27 @@ async fn test_file_roundtrip( Ok(()) } -#[test_case::test_case("mainnet-00000-5ec1ffb8.era1"; "era_mainnet_0")] -#[test_case::test_case("mainnet-00151-e322efe1.era1"; "era_mainnet_151")] -#[test_case::test_case("mainnet-01367-d7efc68f.era1"; "era_mainnet_1367")] -#[test_case::test_case("mainnet-01895-3f81607c.era1"; "era_mainnet_1895")] +#[test_case::test_case("mainnet-00000-5ec1ffb8.era1"; "era1_roundtrip_mainnet_0")] +#[test_case::test_case("mainnet-00151-e322efe1.era1"; "era1_roundtrip_mainnet_151")] +#[test_case::test_case("mainnet-01367-d7efc68f.era1"; "era1_roundtrip_mainnet_1367")] +#[test_case::test_case("mainnet-01895-3f81607c.era1"; "era1_roundtrip_mainnet_1895")] #[tokio::test(flavor = "multi_thread")] #[ignore = "download intensive"] async fn test_roundtrip_compression_encoding_mainnet(filename: &str) -> eyre::Result<()> { let downloader = EraTestDownloader::new().await?; - test_file_roundtrip(&downloader, filename, MAINNET).await + test_era1_file_roundtrip(&downloader, filename, MAINNET).await } -#[test_case::test_case("sepolia-00000-643a00f7.era1"; "era_sepolia_0")] -#[test_case::test_case("sepolia-00074-0e81003c.era1"; "era_sepolia_74")] -#[test_case::test_case("sepolia-00173-b6924da5.era1"; "era_sepolia_173")] -#[test_case::test_case("sepolia-00182-a4f0a8a1.era1"; "era_sepolia_182")] +#[test_case::test_case("sepolia-00000-643a00f7.era1"; "era1_roundtrip_sepolia_0")] +#[test_case::test_case("sepolia-00074-0e81003c.era1"; "era1_roundtrip_sepolia_74")] +#[test_case::test_case("sepolia-00173-b6924da5.era1"; "era1_roundtrip_sepolia_173")] +#[test_case::test_case("sepolia-00182-a4f0a8a1.era1"; "era1_roundtrip_sepolia_182")] #[tokio::test(flavor = "multi_thread")] #[ignore = "download intensive"] async fn test_roundtrip_compression_encoding_sepolia(filename: &str) -> eyre::Result<()> { let downloader = EraTestDownloader::new().await?; - test_file_roundtrip(&downloader, filename, SEPOLIA).await?; + test_era1_file_roundtrip(&downloader, filename, SEPOLIA).await?; Ok(()) } diff --git a/crates/era/tests/it/main.rs b/crates/era/tests/it/main.rs index d9587e8fceb..3954ef860b7 100644 --- a/crates/era/tests/it/main.rs +++ b/crates/era/tests/it/main.rs @@ -91,16 +91,19 @@ const ERA_MAINNET_URL: &str = "https://mainnet.era.nimbus.team/"; /// Succinct list of mainnet files we want to download /// from //TODO: to replace with internal era files hosting url /// for testing purposes -const ERA_MAINNET_FILES_NAMES: [&str; 4] = [ +const ERA_MAINNET_FILES_NAMES: [&str; 8] = [ "mainnet-00000-4b363db9.era", + "mainnet-00178-0d0a5290.era", "mainnet-00518-4e267a3a.era", - "mainnet-01140-f70d4869.era", + "mainnet-00780-bb546fec.era", + "mainnet-01070-7616e3e2.era", + "mainnet-01267-e3ddc749.era", "mainnet-01581-82073d28.era", + "mainnet-01592-d4dc8b98.era", ]; -/// Utility for downloading `.era1` files for tests -/// in a temporary directory -/// and caching them in memory +/// Utility for downloading `.era` and `.era1` files for tests +/// in a temporary directory and caching them in memory #[derive(Debug)] struct EraTestDownloader { /// Temporary directory for storing downloaded files @@ -180,7 +183,7 @@ impl EraTestDownloader { Ok(()) } - /// Get network configuration, URL and supported files, based on network and file type + /// Get network configuration, URL and supported files, based on network and file type fn get_network_config( &self, filename: &str, @@ -202,14 +205,13 @@ impl EraTestDownloader { } } - /// open .era1 file, downloading it if necessary + /// Open `.era1` file, downloading it if necessary async fn open_era1_file(&self, filename: &str, network: &str) -> Result { let path = self.download_file(filename, network).await?; Era1Reader::open(&path, network).map_err(|e| eyre!("Failed to open Era1 file: {e}")) } - /// open .era file, downloading it if necessary - #[allow(dead_code)] + /// Open `.era` file, downloading it if necessary async fn open_era_file(&self, filename: &str, network: &str) -> Result { let path = self.download_file(filename, network).await?; EraReader::open(&path, network).map_err(|e| eyre!("Failed to open Era1 file: {e}")) diff --git a/crates/ethereum/evm/src/lib.rs b/crates/ethereum/evm/src/lib.rs index c0f8adc9c54..3a7473d776a 100644 --- a/crates/ethereum/evm/src/lib.rs +++ b/crates/ethereum/evm/src/lib.rs @@ -289,12 +289,15 @@ where &self, payload: &ExecutionData, ) -> Result, Self::Error> { - Ok(payload.payload.transactions().clone().into_iter().map(|tx| { + let txs = payload.payload.transactions().clone().into_iter(); + let convert = |tx: Bytes| { let tx = TxTy::::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?; let signer = tx.try_recover().map_err(AnyError::new)?; Ok::<_, AnyError>(tx.with_signer(signer)) - })) + }; + + Ok((txs, convert)) } } diff --git a/crates/evm/evm/src/engine.rs b/crates/evm/evm/src/engine.rs index e8316426079..48fa55162bd 100644 --- a/crates/evm/evm/src/engine.rs +++ b/crates/evm/evm/src/engine.rs @@ -18,22 +18,51 @@ pub trait ConfigureEngineEvm: ConfigureEvm { ) -> Result, Self::Error>; } -/// Iterator over executable transactions. -pub trait ExecutableTxIterator: - Iterator> + Send + 'static -{ +/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be +/// used to convert them to an executable transaction. This tuple is used in the engine to +/// parallelize heavy work like decoding or recovery. +pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static { + /// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`] + /// + /// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example, + /// an unrecovered transaction or just the transaction bytes. + type RawTx: Send + Sync + 'static; /// The executable transaction type iterator yields. - type Tx: ExecutableTxFor + Clone + Send + Sync + 'static; + type Tx: Clone + Send + Sync + 'static; /// Errors that may occur while recovering or decoding transactions. type Error: core::error::Error + Send + Sync + 'static; + + /// Iterator over [`ExecutableTxTuple::Tx`] + type Iter: Iterator + Send + 'static; + /// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a + /// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery + /// and will be parallelized in the engine. + type Convert: Fn(Self::RawTx) -> Result + Send + Sync + 'static; } -impl ExecutableTxIterator for T +impl ExecutableTxTuple for (I, F) where - Tx: ExecutableTxFor + Clone + Send + Sync + 'static, + RawTx: Send + Sync + 'static, + Tx: Clone + Send + Sync + 'static, Err: core::error::Error + Send + Sync + 'static, - T: Iterator> + Send + 'static, + I: Iterator + Send + 'static, + F: Fn(RawTx) -> Result + Send + Sync + 'static, { + type RawTx = RawTx; type Tx = Tx; type Error = Err; + + type Iter = I; + type Convert = F; +} + +/// Iterator over executable transactions. +pub trait ExecutableTxIterator: + ExecutableTxTuple> +{ +} + +impl ExecutableTxIterator for T where + T: ExecutableTxTuple> +{ } diff --git a/crates/optimism/evm/src/lib.rs b/crates/optimism/evm/src/lib.rs index e5df16ee2e7..7509edf1f81 100644 --- a/crates/optimism/evm/src/lib.rs +++ b/crates/optimism/evm/src/lib.rs @@ -16,7 +16,7 @@ use alloy_consensus::{BlockHeader, Header}; use alloy_eips::Decodable2718; use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded}; use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv}; -use alloy_primitives::U256; +use alloy_primitives::{Bytes, U256}; use core::fmt::Debug; use op_alloy_consensus::EIP1559ParamError; use op_alloy_rpc_types_engine::OpExecutionData; @@ -265,12 +265,15 @@ where &self, payload: &OpExecutionData, ) -> Result, Self::Error> { - Ok(payload.payload.transactions().clone().into_iter().map(|encoded| { + let transactions = payload.payload.transactions().clone().into_iter(); + let convert = |encoded: Bytes| { let tx = TxTy::::decode_2718_exact(encoded.as_ref()) .map_err(AnyError::new)?; let signer = tx.try_recover().map_err(AnyError::new)?; Ok::<_, AnyError>(WithEncoded::new(encoded, tx.with_signer(signer))) - })) + }; + + Ok((transactions, convert)) } } diff --git a/crates/primitives-traits/src/block/body.rs b/crates/primitives-traits/src/block/body.rs index 4dc9a67e887..11007236730 100644 --- a/crates/primitives-traits/src/block/body.rs +++ b/crates/primitives-traits/src/block/body.rs @@ -198,6 +198,26 @@ pub trait BlockBody: .collect() }) } + + /// Returns an iterator over `Recovered<&Transaction>` for all transactions in the block body. + /// + /// This method recovers signers and returns an iterator without cloning transactions, + /// making it more efficient than [`BlockBody::recover_transactions`] when owned values are not + /// required. + /// + /// # Errors + /// + /// Returns an error if any transaction's signature is invalid. + fn recover_transactions_ref( + &self, + ) -> Result> + '_, RecoveryError> { + let signers = self.recover_signers()?; + Ok(self + .transactions() + .iter() + .zip(signers) + .map(|(tx, signer)| Recovered::new_unchecked(tx, signer))) + } } impl BlockBody for alloy_consensus::BlockBody diff --git a/crates/prune/types/src/target.rs b/crates/prune/types/src/target.rs index 404051a6fe5..df723479880 100644 --- a/crates/prune/types/src/target.rs +++ b/crates/prune/types/src/target.rs @@ -38,7 +38,7 @@ pub enum HistoryType { /// Default number of blocks to retain for merkle changesets. /// This is used by both the `MerkleChangeSets` stage and the pruner segment. -pub const MERKLE_CHANGESETS_RETENTION_BLOCKS: u64 = 64; +pub const MERKLE_CHANGESETS_RETENTION_BLOCKS: u64 = 128; /// Default pruning mode for merkle changesets const fn default_merkle_changesets_mode() -> PruneMode { @@ -95,13 +95,7 @@ pub struct PruneModes { pub bodies_history: Option, /// Merkle Changesets pruning configuration for `AccountsTrieChangeSets` and /// `StoragesTrieChangeSets`. - #[cfg_attr( - any(test, feature = "serde"), - serde( - default = "default_merkle_changesets_mode", - deserialize_with = "deserialize_prune_mode_with_min_blocks::" - ) - )] + #[cfg_attr(any(test, feature = "serde"), serde(default = "default_merkle_changesets_mode"))] pub merkle_changesets: PruneMode, /// Receipts pruning configuration by retaining only those receipts that contain logs emitted /// by the specified addresses, discarding others. This setting is overridden by `receipts`. @@ -155,14 +149,15 @@ impl PruneModes { /// Returns `true` if any migration was performed. /// /// Currently migrates: - /// - `merkle_changesets`: `Distance(10064)` -> `Distance(64)` - pub fn migrate(&mut self) -> bool { - if self.merkle_changesets == PruneMode::Distance(MINIMUM_PRUNING_DISTANCE) { + /// - `merkle_changesets`: `Distance(n)` where `n < 128` or `n == 10064` -> `Distance(128)` + pub const fn migrate(&mut self) -> bool { + if let PruneMode::Distance(d) = self.merkle_changesets && + (d < MERKLE_CHANGESETS_RETENTION_BLOCKS || d == MINIMUM_PRUNING_DISTANCE) + { self.merkle_changesets = PruneMode::Distance(MERKLE_CHANGESETS_RETENTION_BLOCKS); - true - } else { - false + return true; } + false } /// Returns an error if we can't unwind to the targeted block because the target block is @@ -214,28 +209,6 @@ impl PruneModes { } } -/// Deserializes [`PruneMode`] and validates that the value is not less than the const -/// generic parameter `MIN_BLOCKS`. This parameter represents the number of blocks that needs to be -/// left in database after the pruning. -/// -/// 1. For [`PruneMode::Full`], it fails if `MIN_BLOCKS > 0`. -/// 2. For [`PruneMode::Distance`], it fails if `distance < MIN_BLOCKS + 1`. `+ 1` is needed because -/// `PruneMode::Distance(0)` means that we leave zero blocks from the latest, meaning we have one -/// block in the database. -#[cfg(any(test, feature = "serde"))] -fn deserialize_prune_mode_with_min_blocks< - 'de, - const MIN_BLOCKS: u64, - D: serde::Deserializer<'de>, ->( - deserializer: D, -) -> Result { - use serde::Deserialize; - let prune_mode = PruneMode::deserialize(deserializer)?; - serde_deserialize_validate::(&prune_mode)?; - Ok(prune_mode) -} - /// Deserializes [`Option`] and validates that the value is not less than the const /// generic parameter `MIN_BLOCKS`. This parameter represents the number of blocks that needs to be /// left in database after the pruning. diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 56ea13e92ee..76b0a3c3097 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -546,6 +546,13 @@ where .transpose()? .flatten(); + // Return error if toBlock exceeds current head + if let Some(t) = to && + t > info.best_number + { + return Err(EthFilterError::BlockRangeExceedsHead); + } + if let Some(f) = from && f > info.best_number { @@ -894,6 +901,9 @@ pub enum EthFilterError { /// Invalid block range. #[error("invalid block range params")] InvalidBlockRangeParams, + /// Block range extends beyond current head. + #[error("block range extends beyond current head block")] + BlockRangeExceedsHead, /// Query scope is too broad. #[error("query exceeds max block range {0}")] QueryExceedsMaxBlocks(u64), @@ -928,7 +938,8 @@ impl From for jsonrpsee::types::error::ErrorObject<'static> { EthFilterError::EthAPIError(err) => err.into(), err @ (EthFilterError::InvalidBlockRangeParams | EthFilterError::QueryExceedsMaxBlocks(_) | - EthFilterError::QueryExceedsMaxResults { .. }) => { + EthFilterError::QueryExceedsMaxResults { .. } | + EthFilterError::BlockRangeExceedsHead) => { rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string()) } } diff --git a/crates/rpc/rpc/src/trace.rs b/crates/rpc/rpc/src/trace.rs index 5eb9a0f73c7..9732ac4269b 100644 --- a/crates/rpc/rpc/src/trace.rs +++ b/crates/rpc/rpc/src/trace.rs @@ -382,7 +382,7 @@ where let mut all_traces = Vec::new(); let mut block_traces = Vec::with_capacity(self.inner.eth_config.max_tracing_requests); - for chunk_start in (start..end).step_by(self.inner.eth_config.max_tracing_requests) { + for chunk_start in (start..=end).step_by(self.inner.eth_config.max_tracing_requests) { let chunk_end = std::cmp::min(chunk_start + self.inner.eth_config.max_tracing_requests as u64, end); diff --git a/crates/storage/db-api/src/models/metadata.rs b/crates/storage/db-api/src/models/metadata.rs index e562ede03b6..de68998ce6e 100644 --- a/crates/storage/db-api/src/models/metadata.rs +++ b/crates/storage/db-api/src/models/metadata.rs @@ -19,6 +19,9 @@ pub struct StorageSettings { /// Whether this node always writes transaction senders to static files. #[serde(default)] pub transaction_senders_in_static_files: bool, + /// Whether `StoragesHistory` is stored in `RocksDB`. + #[serde(default)] + pub storages_history_in_rocksdb: bool, } impl StorageSettings { @@ -28,7 +31,11 @@ impl StorageSettings { /// `false`, ensuring older nodes continue writing receipts and transaction senders to the /// database when receipt pruning is enabled. pub const fn legacy() -> Self { - Self { receipts_in_static_files: false, transaction_senders_in_static_files: false } + Self { + receipts_in_static_files: false, + transaction_senders_in_static_files: false, + storages_history_in_rocksdb: false, + } } /// Sets the `receipts_in_static_files` flag to the provided value. @@ -42,4 +49,10 @@ impl StorageSettings { self.transaction_senders_in_static_files = value; self } + + /// Sets the `storages_history_in_rocksdb` flag to the provided value. + pub const fn with_storages_history_in_rocksdb(mut self, value: bool) -> Self { + self.storages_history_in_rocksdb = value; + self + } } diff --git a/crates/transaction-pool/src/validate/eth.rs b/crates/transaction-pool/src/validate/eth.rs index 42e6a968ad9..a6f9f9c8e39 100644 --- a/crates/transaction-pool/src/validate/eth.rs +++ b/crates/transaction-pool/src/validate/eth.rs @@ -247,6 +247,20 @@ where } } + /// Validates a single transaction with the provided state provider. + pub fn validate_one_with_state_provider( + &self, + origin: TransactionOrigin, + transaction: Tx, + state: impl AccountInfoReader, + ) -> TransactionValidationOutcome { + let tx = match self.validate_one_no_state(origin, transaction) { + Ok(tx) => tx, + Err(invalid_outcome) => return invalid_outcome, + }; + self.validate_one_against_state(origin, tx, state) + } + /// Performs stateless validation on single transaction. Returns unaltered input transaction /// if all checks pass, so transaction can continue through to stateful validation as argument /// to [`validate_one_against_state`](Self::validate_one_against_state). diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs index 433c13fb08f..12a4d3485c3 100644 --- a/crates/trie/parallel/src/proof.rs +++ b/crates/trie/parallel/src/proof.rs @@ -126,7 +126,8 @@ impl ParallelProof { ))) })?; - // Extract storage proof directly from the result + // Extract storage proof directly from the result. + // The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here. let storage_proof = match proof_msg.result? { crate::proof_task::ProofResult::StorageProof { hashed_address: addr, proof } => { debug_assert_eq!( @@ -134,7 +135,8 @@ impl ParallelProof { hashed_address, "storage worker must return same address: expected {hashed_address}, got {addr}" ); - proof + // Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise. + Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()) } crate::proof_task::ProofResult::AccountMultiproof { .. } => { unreachable!("storage worker only sends StorageProof variant") @@ -223,8 +225,12 @@ impl ParallelProof { ) })?; + // The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here. let (multiproof, stats) = match proof_result_msg.result? { - crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => (proof, stats), + crate::proof_task::ProofResult::AccountMultiproof { proof, stats } => { + // Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise. + (Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()), stats) + } crate::proof_task::ProofResult::StorageProof { .. } => { unreachable!("account worker only sends AccountMultiproof variant") } diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 58dc99fc371..87feff9ad6e 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -41,6 +41,7 @@ use alloy_primitives::{ use alloy_rlp::{BufMut, Encodable}; use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use dashmap::DashMap; +use metrics::Histogram; use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind}; use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult}; use reth_storage_errors::db::DatabaseError; @@ -79,6 +80,275 @@ use crate::proof_task_metrics::{ type StorageProofResult = Result; type TrieNodeProviderResult = Result, SparseTrieError>; +/// Maximum number of storage proof jobs to batch together per account. +const STORAGE_PROOF_BATCH_LIMIT: usize = 32; + +/// Maximum number of blinded node requests to defer during storage proof batching. +/// When this limit is reached, batching stops early to process deferred nodes, +/// preventing starvation of blinded node requests under high proof load. +const MAX_DEFERRED_BLINDED_NODES: usize = 16; + +/// Holds batched storage proof jobs for the same account. +/// +/// When multiple storage proof requests arrive for the same account, they can be merged +/// into a single proof computation with combined prefix sets and target slots. +#[derive(Debug)] +struct BatchedStorageProof { + /// The merged prefix set from all batched jobs. + prefix_set: PrefixSetMut, + /// The merged target slots from all batched jobs. + target_slots: B256Set, + /// Whether any job requested branch node masks. + with_branch_node_masks: bool, + /// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`). + multi_added_removed_keys: Option>, + /// All senders that need to receive the result. + senders: Vec, +} + +impl BatchedStorageProof { + /// Creates a new batch from the first storage proof input. + fn new(input: StorageProofInput, sender: ProofResultContext) -> Self { + // Convert frozen PrefixSet to mutable PrefixSetMut by collecting its keys. + let prefix_set = PrefixSetMut::from(input.prefix_set.iter().copied()); + Self { + prefix_set, + target_slots: input.target_slots, + with_branch_node_masks: input.with_branch_node_masks, + multi_added_removed_keys: input.multi_added_removed_keys, + senders: vec![sender], + } + } + + /// Merges another storage proof job into this batch. + /// + /// # Panics + /// Panics if `input.multi_added_removed_keys` does not point to the same Arc as the batch's. + /// This is a critical invariant for proof correctness. + fn merge(&mut self, input: StorageProofInput, sender: ProofResultContext) { + // Validate that all batched jobs share the same multi_added_removed_keys Arc. + // This is a critical invariant: if jobs have different keys, the merged proof + // would be computed with only the first job's keys, producing incorrect results. + // Using assert! (not debug_assert!) because incorrect proofs could cause consensus + // failures. + assert!( + match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) { + (Some(a), Some(b)) => Arc::ptr_eq(a, b), + (None, None) => true, + _ => false, + }, + "All batched storage proof jobs must share the same multi_added_removed_keys Arc" + ); + + self.prefix_set.extend_keys(input.prefix_set.iter().copied()); + self.target_slots.extend(input.target_slots); + self.with_branch_node_masks |= input.with_branch_node_masks; + self.senders.push(sender); + } + + /// Converts this batch into a single `StorageProofInput` for computation. + fn into_input(self, hashed_address: B256) -> (StorageProofInput, Vec) { + let input = StorageProofInput { + hashed_address, + prefix_set: self.prefix_set.freeze(), + target_slots: self.target_slots, + with_branch_node_masks: self.with_branch_node_masks, + multi_added_removed_keys: self.multi_added_removed_keys, + }; + (input, self.senders) + } +} + +/// Metrics for storage worker batching. +#[derive(Clone, Default)] +struct StorageWorkerBatchMetrics { + /// Histogram of batch sizes (number of jobs merged per computation). + #[cfg(feature = "metrics")] + batch_size_histogram: Option, +} + +impl StorageWorkerBatchMetrics { + #[cfg(feature = "metrics")] + fn new() -> Self { + Self { + batch_size_histogram: Some(metrics::histogram!( + "trie.proof_task.storage_worker_batch_size" + )), + } + } + + #[cfg(not(feature = "metrics"))] + fn new() -> Self { + Self {} + } + + fn record_batch_size(&self, _size: usize) { + #[cfg(feature = "metrics")] + if let Some(h) = &self.batch_size_histogram { + h.record(_size as f64); + } + } +} + +/// Maximum number of account multiproof jobs to batch together. +const ACCOUNT_PROOF_BATCH_LIMIT: usize = 32; + +/// Holds batched account multiproof jobs. +/// +/// When multiple account multiproof requests arrive, they can be merged +/// into a single proof computation with combined targets and prefix sets. +#[derive(Debug)] +struct BatchedAccountProof { + /// The merged targets from all batched jobs. + targets: MultiProofTargets, + /// The merged account prefix set from all batched jobs. + account_prefix_set: PrefixSetMut, + /// The merged storage prefix sets from all batched jobs. + storage_prefix_sets: B256Map, + /// The merged destroyed accounts from all batched jobs. + destroyed_accounts: B256Set, + /// Whether any job requested branch node masks. + collect_branch_node_masks: bool, + /// The `multi_added_removed_keys` from the first job (they should all share the same `Arc`). + multi_added_removed_keys: Option>, + /// The shared `missed_leaves_storage_roots` cache from the first job. + missed_leaves_storage_roots: Arc>, + /// All senders that need to receive the result. + senders: Vec, +} + +impl BatchedAccountProof { + /// Creates a new batch from the first account multiproof input. + fn new(input: AccountMultiproofInput) -> Self { + // Convert frozen prefix sets to mutable versions. + let account_prefix_set = + PrefixSetMut::from(input.prefix_sets.account_prefix_set.iter().copied()); + let storage_prefix_sets = input + .prefix_sets + .storage_prefix_sets + .into_iter() + .map(|(addr, ps)| (addr, PrefixSetMut::from(ps.iter().copied()))) + .collect(); + let destroyed_accounts = input.prefix_sets.destroyed_accounts; + + Self { + targets: input.targets, + account_prefix_set, + storage_prefix_sets, + destroyed_accounts, + collect_branch_node_masks: input.collect_branch_node_masks, + multi_added_removed_keys: input.multi_added_removed_keys, + missed_leaves_storage_roots: input.missed_leaves_storage_roots, + senders: vec![input.proof_result_sender], + } + } + + /// Attempts to merge another account multiproof job into this batch. + /// + /// Returns the job back if caches are incompatible so the caller can process it separately. + fn try_merge(&mut self, input: AccountMultiproofInput) -> Result<(), AccountMultiproofInput> { + // Require all jobs to share the same caches; otherwise merging would produce + // incorrect proofs by reusing the wrong retained keys or missed-leaf storage roots. + let multi_added_removed_keys_mismatch = + !match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) { + (Some(a), Some(b)) => Arc::ptr_eq(a, b), + (None, None) => true, + _ => false, + }; + + if multi_added_removed_keys_mismatch || + !Arc::ptr_eq(&self.missed_leaves_storage_roots, &input.missed_leaves_storage_roots) + { + return Err(input); + } + + // Merge targets. + self.targets.extend(input.targets); + + // Merge account prefix set. + self.account_prefix_set.extend_keys(input.prefix_sets.account_prefix_set.iter().copied()); + + // Merge storage prefix sets. + for (addr, ps) in input.prefix_sets.storage_prefix_sets { + match self.storage_prefix_sets.entry(addr) { + alloy_primitives::map::Entry::Occupied(mut entry) => { + entry.get_mut().extend_keys(ps.iter().copied()); + } + alloy_primitives::map::Entry::Vacant(entry) => { + entry.insert(PrefixSetMut::from(ps.iter().copied())); + } + } + } + + // Merge destroyed accounts. + self.destroyed_accounts.extend(input.prefix_sets.destroyed_accounts); + + // OR the branch node masks flag. + self.collect_branch_node_masks |= input.collect_branch_node_masks; + + // Collect the sender. + self.senders.push(input.proof_result_sender); + + Ok(()) + } + + /// Converts this batch into a single `AccountMultiproofInput` for computation. + fn into_input(self) -> (AccountMultiproofInput, Vec) { + // Freeze the mutable prefix sets. + let storage_prefix_sets: B256Map = + self.storage_prefix_sets.into_iter().map(|(addr, ps)| (addr, ps.freeze())).collect(); + + let prefix_sets = TriePrefixSets { + account_prefix_set: self.account_prefix_set.freeze(), + storage_prefix_sets, + destroyed_accounts: self.destroyed_accounts, + }; + + // Use a dummy sender for the input since we'll handle all senders separately. + let dummy_sender = self.senders.first().expect("batch always has at least one sender"); + let input = AccountMultiproofInput { + targets: self.targets, + prefix_sets, + collect_branch_node_masks: self.collect_branch_node_masks, + multi_added_removed_keys: self.multi_added_removed_keys, + missed_leaves_storage_roots: self.missed_leaves_storage_roots, + proof_result_sender: dummy_sender.clone(), + }; + (input, self.senders) + } +} + +/// Metrics for account worker batching. +#[derive(Clone, Default)] +struct AccountWorkerBatchMetrics { + /// Histogram of batch sizes (number of jobs merged per computation). + #[cfg(feature = "metrics")] + batch_size_histogram: Option, +} + +impl AccountWorkerBatchMetrics { + #[cfg(feature = "metrics")] + fn new() -> Self { + Self { + batch_size_histogram: Some(metrics::histogram!( + "trie.proof_task.account_worker_batch_size" + )), + } + } + + #[cfg(not(feature = "metrics"))] + fn new() -> Self { + Self {} + } + + fn record_batch_size(&self, _size: usize) { + #[cfg(feature = "metrics")] + if let Some(h) = &self.batch_size_histogram { + h.record(_size as f64); + } + } +} + /// A handle that provides type-safe access to proof worker pools. /// /// The handle stores direct senders to both storage and account worker pools, @@ -552,12 +822,16 @@ impl TrieNodeProvider for ProofTaskTrieNodeProvider { } } /// Result of a proof calculation, which can be either an account multiproof or a storage proof. -#[derive(Debug)] +/// +/// The proof data is wrapped in `Arc` to enable efficient sharing when batching multiple +/// proof requests. This avoids expensive cloning of the underlying proof structures +/// when sending results to multiple receivers. +#[derive(Debug, Clone)] pub enum ProofResult { /// Account multiproof with statistics AccountMultiproof { - /// The account multiproof - proof: DecodedMultiProof, + /// The account multiproof (Arc-wrapped for efficient sharing in batches) + proof: Arc, /// Statistics collected during proof computation stats: ParallelTrieStats, }, @@ -565,8 +839,8 @@ pub enum ProofResult { StorageProof { /// The hashed address this storage proof belongs to hashed_address: B256, - /// The storage multiproof - proof: DecodedStorageMultiProof, + /// The storage multiproof (Arc-wrapped for efficient sharing in batches) + proof: Arc, }, } @@ -575,11 +849,17 @@ impl ProofResult { /// /// For account multiproofs, returns the multiproof directly (discarding stats). /// For storage proofs, wraps the storage proof into a minimal multiproof. + /// + /// Note: This method clones the inner proof data. If you need to avoid the clone + /// when you're the sole owner, consider using `Arc::try_unwrap` first. pub fn into_multiproof(self) -> DecodedMultiProof { match self { - Self::AccountMultiproof { proof, stats: _ } => proof, + Self::AccountMultiproof { proof, stats: _ } => { + Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()) + } Self::StorageProof { hashed_address, proof } => { - DecodedMultiProof::from_storage_proof(hashed_address, proof) + let storage_proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()); + DecodedMultiProof::from_storage_proof(hashed_address, storage_proof) } } } @@ -708,11 +988,18 @@ where /// 2. Advertises availability /// 3. Processes jobs in a loop: /// - Receives job from channel + /// - Drains additional same-account storage proof jobs (batching) /// - Marks worker as busy - /// - Processes the job + /// - Processes the batched jobs as a single proof computation /// - Marks worker as available /// 4. Shuts down when channel closes /// + /// # Batching Strategy + /// + /// When multiple storage proof requests arrive for the same account, they are merged + /// into a single proof computation. This reduces redundant trie traversals when state + /// updates arrive faster than proof computation can process them. + /// /// # Panic Safety /// /// If this function panics, the worker thread terminates but other workers @@ -732,6 +1019,7 @@ where // Create provider from factory let provider = task_ctx.factory.database_provider_ro()?; let proof_tx = ProofTaskTx::new(provider, worker_id); + let batch_metrics = StorageWorkerBatchMetrics::new(); trace!( target: "trie::proof_task", @@ -746,20 +1034,104 @@ where // Initially mark this worker as available. available_workers.fetch_add(1, Ordering::Relaxed); + // Deferred blinded node jobs to process after batched storage proofs. + // Pre-allocate with capacity to avoid reallocations during batching. + let mut deferred_blinded_nodes: Vec<(B256, Nibbles, Sender)> = + Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES); + while let Ok(job) = work_rx.recv() { // Mark worker as busy. available_workers.fetch_sub(1, Ordering::Relaxed); match job { StorageWorkerJob::StorageProof { input, proof_result_sender } => { - Self::process_storage_proof( - worker_id, - &proof_tx, - input, - proof_result_sender, - &mut storage_proofs_processed, - &mut cursor_metrics_cache, + // Start batching: group storage proofs by account. + let mut batches: B256Map = B256Map::default(); + batches.insert( + input.hashed_address, + BatchedStorageProof::new(input, proof_result_sender), ); + let mut total_jobs = 1usize; + + // Drain additional jobs from the queue. + while total_jobs < STORAGE_PROOF_BATCH_LIMIT { + match work_rx.try_recv() { + Ok(StorageWorkerJob::StorageProof { + input: next_input, + proof_result_sender: next_sender, + }) => { + total_jobs += 1; + let addr = next_input.hashed_address; + match batches.entry(addr) { + alloy_primitives::map::Entry::Occupied(mut entry) => { + entry.get_mut().merge(next_input, next_sender); + } + alloy_primitives::map::Entry::Vacant(entry) => { + entry.insert(BatchedStorageProof::new( + next_input, + next_sender, + )); + } + } + } + Ok(StorageWorkerJob::BlindedStorageNode { + account, + path, + result_sender, + }) => { + // Defer blinded node jobs to process after batched proofs. + deferred_blinded_nodes.push((account, path, result_sender)); + // Stop batching if too many blinded nodes are deferred to prevent + // starvation. + if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES { + break; + } + } + Err(_) => break, + } + } + + // Process all batched storage proofs. + for (hashed_address, batch) in batches { + let batch_size = batch.senders.len(); + batch_metrics.record_batch_size(batch_size); + + let (merged_input, senders) = batch.into_input(hashed_address); + + trace!( + target: "trie::proof_task", + worker_id, + ?hashed_address, + batch_size, + prefix_set_len = merged_input.prefix_set.len(), + target_slots_len = merged_input.target_slots.len(), + "Processing batched storage proof" + ); + + Self::process_batched_storage_proof( + worker_id, + &proof_tx, + hashed_address, + merged_input, + senders, + &mut storage_proofs_processed, + &mut cursor_metrics_cache, + ); + } + + // Process any deferred blinded node jobs. + for (account, path, result_sender) in + std::mem::take(&mut deferred_blinded_nodes) + { + Self::process_blinded_node( + worker_id, + &proof_tx, + account, + path, + result_sender, + &mut storage_nodes_processed, + ); + } } StorageWorkerJob::BlindedStorageNode { account, path, result_sender } => { @@ -795,82 +1167,103 @@ where Ok(()) } - /// Processes a storage proof request. - fn process_storage_proof( + /// Processes a batched storage proof request and sends results to all waiting receivers. + /// + /// This computes a single storage proof with merged targets and sends the same result + /// to all original requestors, reducing redundant trie traversals. + fn process_batched_storage_proof( worker_id: usize, proof_tx: &ProofTaskTx, + hashed_address: B256, input: StorageProofInput, - proof_result_sender: ProofResultContext, + senders: Vec, storage_proofs_processed: &mut u64, cursor_metrics_cache: &mut ProofTaskCursorMetricsCache, ) where Provider: TrieCursorFactory + HashedCursorFactory, { - let hashed_address = input.hashed_address; - let ProofResultContext { sender, sequence_number: seq, state, start_time } = - proof_result_sender; - let mut trie_cursor_metrics = TrieCursorMetricsCache::default(); let mut hashed_cursor_metrics = HashedCursorMetricsCache::default(); - trace!( - target: "trie::proof_task", - worker_id, - hashed_address = ?hashed_address, - prefix_set_len = input.prefix_set.len(), - target_slots_len = input.target_slots.len(), - "Processing storage proof" - ); - let proof_start = Instant::now(); let result = proof_tx.compute_storage_proof( input, &mut trie_cursor_metrics, &mut hashed_cursor_metrics, ); - let proof_elapsed = proof_start.elapsed(); - *storage_proofs_processed += 1; - let result_msg = result.map(|storage_proof| ProofResult::StorageProof { - hashed_address, - proof: storage_proof, - }); - - if sender - .send(ProofResultMessage { - sequence_number: seq, - result: result_msg, - elapsed: start_time.elapsed(), - state, - }) - .is_err() - { - trace!( - target: "trie::proof_task", - worker_id, - hashed_address = ?hashed_address, - storage_proofs_processed, - "Proof result receiver dropped, discarding result" - ); + // Send the result to all waiting receivers. + let num_senders = senders.len(); + match result { + Ok(storage_proof) => { + // Success case: wrap proof in Arc for efficient sharing across all senders. + let proof_result = + ProofResult::StorageProof { hashed_address, proof: Arc::new(storage_proof) }; + + for ProofResultContext { sender, sequence_number, state, start_time } in senders { + *storage_proofs_processed += 1; + + if sender + .send(ProofResultMessage { + sequence_number, + result: Ok(proof_result.clone()), + elapsed: start_time.elapsed(), + state, + }) + .is_err() + { + trace!( + target: "trie::proof_task", + worker_id, + ?hashed_address, + sequence_number, + "Proof result receiver dropped, discarding result" + ); + } + } + } + Err(error) => { + // Error case: convert to string for cloning, then send to all receivers. + let error_msg = error.to_string(); + + for ProofResultContext { sender, sequence_number, state, start_time } in senders { + *storage_proofs_processed += 1; + + if sender + .send(ProofResultMessage { + sequence_number, + result: Err(ParallelStateRootError::Other(error_msg.clone())), + elapsed: start_time.elapsed(), + state, + }) + .is_err() + { + trace!( + target: "trie::proof_task", + worker_id, + ?hashed_address, + sequence_number, + "Proof result receiver dropped, discarding result" + ); + } + } + } } trace!( target: "trie::proof_task", worker_id, - hashed_address = ?hashed_address, + ?hashed_address, proof_time_us = proof_elapsed.as_micros(), - total_processed = storage_proofs_processed, + num_senders, trie_cursor_duration_us = trie_cursor_metrics.total_duration.as_micros(), hashed_cursor_duration_us = hashed_cursor_metrics.total_duration.as_micros(), - ?trie_cursor_metrics, - ?hashed_cursor_metrics, - "Storage proof completed" + "Batched storage proof completed" ); #[cfg(feature = "metrics")] { - // Accumulate per-proof metrics into the worker's cache let per_proof_cache = ProofTaskCursorMetricsCache { account_trie_cursor: TrieCursorMetricsCache::default(), account_hashed_cursor: HashedCursorMetricsCache::default(), @@ -987,11 +1380,18 @@ where /// 2. Advertises availability /// 3. Processes jobs in a loop: /// - Receives job from channel + /// - Drains additional account multiproof jobs (batching) /// - Marks worker as busy - /// - Processes the job + /// - Processes the batched jobs as a single proof computation /// - Marks worker as available /// 4. Shuts down when channel closes /// + /// # Batching Strategy + /// + /// When multiple account multiproof requests arrive, they are merged into + /// a single proof computation. This reduces redundant trie traversals when + /// state updates arrive faster than proof computation can process them. + /// /// # Panic Safety /// /// If this function panics, the worker thread terminates but other workers @@ -1012,6 +1412,7 @@ where // Create provider from factory let provider = task_ctx.factory.database_provider_ro()?; let proof_tx = ProofTaskTx::new(provider, worker_id); + let batch_metrics = AccountWorkerBatchMetrics::new(); trace!( target: "trie::proof_task", @@ -1026,20 +1427,98 @@ where // Count this worker as available only after successful initialization. available_workers.fetch_add(1, Ordering::Relaxed); + // Deferred blinded node jobs to process after batched account proofs. + // Pre-allocate with capacity to avoid reallocations during batching. + let mut deferred_blinded_nodes: Vec<(Nibbles, Sender)> = + Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES); + while let Ok(job) = work_rx.recv() { // Mark worker as busy. available_workers.fetch_sub(1, Ordering::Relaxed); match job { AccountWorkerJob::AccountMultiproof { input } => { - Self::process_account_multiproof( - worker_id, - &proof_tx, - storage_work_tx.clone(), - *input, - &mut account_proofs_processed, - &mut cursor_metrics_cache, - ); + // Start batching: accumulate account multiproof jobs. If we encounter an + // incompatible job (different caches), process it as a separate batch. + let mut next_account_job: Option> = Some(input); + + while let Some(account_job) = next_account_job.take() { + let mut batch = BatchedAccountProof::new(*account_job); + let mut pending_incompatible: Option> = None; + + // Drain additional jobs from the queue. + while batch.senders.len() < ACCOUNT_PROOF_BATCH_LIMIT { + match work_rx.try_recv() { + Ok(AccountWorkerJob::AccountMultiproof { input: next_input }) => { + match batch.try_merge(*next_input) { + Ok(()) => {} + Err(incompatible) => { + trace!( + target: "trie::proof_task", + worker_id, + "Account multiproof batch split due to incompatible caches" + ); + pending_incompatible = Some(Box::new(incompatible)); + break; + } + } + } + Ok(AccountWorkerJob::BlindedAccountNode { + path, + result_sender, + }) => { + // Defer blinded node jobs to process after batched proofs. + deferred_blinded_nodes.push((path, result_sender)); + // Stop batching if too many blinded nodes are deferred to + // prevent starvation. + if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES { + break; + } + } + Err(_) => break, + } + } + + let batch_size = batch.senders.len(); + batch_metrics.record_batch_size(batch_size); + + let (merged_input, senders) = batch.into_input(); + + trace!( + target: "trie::proof_task", + worker_id, + batch_size, + targets_len = merged_input.targets.len(), + "Processing batched account multiproof" + ); + + Self::process_batched_account_multiproof( + worker_id, + &proof_tx, + &storage_work_tx, + merged_input, + senders, + &mut account_proofs_processed, + &mut cursor_metrics_cache, + ); + + // If we encountered an incompatible job, process it as its own batch + // before handling any deferred blinded node requests. + if let Some(incompatible_job) = pending_incompatible { + next_account_job = Some(incompatible_job); + } + } + + // Process any deferred blinded node jobs. + for (path, result_sender) in std::mem::take(&mut deferred_blinded_nodes) { + Self::process_blinded_node( + worker_id, + &proof_tx, + path, + result_sender, + &mut account_nodes_processed, + ); + } } AccountWorkerJob::BlindedAccountNode { path, result_sender } => { @@ -1074,12 +1553,16 @@ where Ok(()) } - /// Processes an account multiproof request. - fn process_account_multiproof( + /// Processes a batched account multiproof request and sends results to all waiting receivers. + /// + /// This computes a single account multiproof with merged targets and sends the same result + /// to all original requestors, reducing redundant trie traversals. + fn process_batched_account_multiproof( worker_id: usize, proof_tx: &ProofTaskTx, - storage_work_tx: CrossbeamSender, + storage_work_tx: &CrossbeamSender, input: AccountMultiproofInput, + senders: Vec, account_proofs_processed: &mut u64, cursor_metrics_cache: &mut ProofTaskCursorMetricsCache, ) where @@ -1091,21 +1574,21 @@ where collect_branch_node_masks, multi_added_removed_keys, missed_leaves_storage_roots, - proof_result_sender: - ProofResultContext { sender: result_tx, sequence_number: seq, state, start_time: start }, + proof_result_sender: _, // We use the senders vec instead } = input; let span = debug_span!( target: "trie::proof_task", - "Account multiproof calculation", + "Batched account multiproof calculation", targets = targets.len(), + batch_size = senders.len(), worker_id, ); let _span_guard = span.enter(); trace!( target: "trie::proof_task", - "Processing account multiproof" + "Processing batched account multiproof" ); let proof_start = Instant::now(); @@ -1120,7 +1603,7 @@ where tracker.set_precomputed_storage_roots(storage_root_targets_len as u64); let storage_proof_receivers = match dispatch_storage_proofs( - &storage_work_tx, + storage_work_tx, &targets, &mut storage_prefix_sets, collect_branch_node_masks, @@ -1128,14 +1611,17 @@ where ) { Ok(receivers) => receivers, Err(error) => { - // Send error through result channel - error!(target: "trie::proof_task", "Failed to dispatch storage proofs: {error}"); - let _ = result_tx.send(ProofResultMessage { - sequence_number: seq, - result: Err(error), - elapsed: start.elapsed(), - state, - }); + // Send error to all receivers + let error_msg = error.to_string(); + for ProofResultContext { sender, sequence_number, state, start_time } in senders { + *account_proofs_processed += 1; + let _ = sender.send(ProofResultMessage { + sequence_number, + result: Err(ParallelStateRootError::Other(error_msg.clone())), + elapsed: start_time.elapsed(), + state, + }); + } return; } }; @@ -1156,46 +1642,75 @@ where build_account_multiproof_with_storage_roots(&proof_tx.provider, ctx, &mut tracker); let proof_elapsed = proof_start.elapsed(); - let total_elapsed = start.elapsed(); let proof_cursor_metrics = tracker.cursor_metrics; proof_cursor_metrics.record_spans(); let stats = tracker.finish(); - let result = result.map(|proof| ProofResult::AccountMultiproof { proof, stats }); - *account_proofs_processed += 1; - - // Send result to MultiProofTask - if result_tx - .send(ProofResultMessage { - sequence_number: seq, - result, - elapsed: total_elapsed, - state, - }) - .is_err() - { - trace!( - target: "trie::proof_task", - worker_id, - account_proofs_processed, - "Account multiproof receiver dropped, discarding result" - ); + + // Send the result to all waiting receivers. + let num_senders = senders.len(); + match result { + Ok(proof) => { + // Success case: wrap proof in Arc for efficient sharing across all senders. + let proof_result = ProofResult::AccountMultiproof { proof: Arc::new(proof), stats }; + + for ProofResultContext { sender, sequence_number, state, start_time } in senders { + *account_proofs_processed += 1; + + if sender + .send(ProofResultMessage { + sequence_number, + result: Ok(proof_result.clone()), + elapsed: start_time.elapsed(), + state, + }) + .is_err() + { + trace!( + target: "trie::proof_task", + worker_id, + sequence_number, + "Account multiproof receiver dropped, discarding result" + ); + } + } + } + Err(error) => { + // Error case: convert to string for cloning, then send to all receivers. + let error_msg = error.to_string(); + + for ProofResultContext { sender, sequence_number, state, start_time } in senders { + *account_proofs_processed += 1; + + if sender + .send(ProofResultMessage { + sequence_number, + result: Err(ParallelStateRootError::Other(error_msg.clone())), + elapsed: start_time.elapsed(), + state, + }) + .is_err() + { + trace!( + target: "trie::proof_task", + worker_id, + sequence_number, + "Account multiproof receiver dropped, discarding result" + ); + } + } + } } trace!( target: "trie::proof_task", proof_time_us = proof_elapsed.as_micros(), - total_elapsed_us = total_elapsed.as_micros(), - total_processed = account_proofs_processed, + num_senders, account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(), account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(), storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(), storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(), - account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor, - account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor, - storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor, - storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor, - "Account multiproof completed" + "Batched account multiproof completed" ); #[cfg(feature = "metrics")] @@ -1338,7 +1853,9 @@ where drop(_guard); - // Extract storage proof from the result + // Extract storage proof from the result. + // The proof is Arc-wrapped for efficient batch sharing, so we unwrap it + // here. let proof = match proof_msg.result? { ProofResult::StorageProof { hashed_address: addr, proof } => { debug_assert_eq!( @@ -1346,7 +1863,9 @@ where hashed_address, "storage worker must return same address: expected {hashed_address}, got {addr}" ); - proof + // Efficiently unwrap Arc: returns inner value if sole owner, clones + // otherwise. + Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()) } ProofResult::AccountMultiproof { .. } => { unreachable!("storage worker only sends StorageProof variant") @@ -1409,8 +1928,11 @@ where // Consume remaining storage proof receivers for accounts not encountered during trie walk. for (hashed_address, receiver) in storage_proof_receivers { if let Ok(proof_msg) = receiver.recv() { - // Extract storage proof from the result + // Extract storage proof from the result. + // The proof is Arc-wrapped for efficient batch sharing, so we unwrap it here. if let Ok(ProofResult::StorageProof { proof, .. }) = proof_msg.result { + // Efficiently unwrap Arc: returns inner value if sole owner, clones otherwise. + let proof = Arc::try_unwrap(proof).unwrap_or_else(|arc| (*arc).clone()); collected_decoded_storages.insert(hashed_address, proof); } } diff --git a/examples/custom-node/src/evm/config.rs b/examples/custom-node/src/evm/config.rs index f2bd3326893..92810439a83 100644 --- a/examples/custom-node/src/evm/config.rs +++ b/examples/custom-node/src/evm/config.rs @@ -25,6 +25,7 @@ use reth_op::{ primitives::SignedTransaction, }; use reth_rpc_api::eth::helpers::pending_block::BuildPendingEnv; +use revm_primitives::Bytes; use std::sync::Arc; #[derive(Debug, Clone)] @@ -126,13 +127,15 @@ impl ConfigureEngineEvm for CustomEvmConfig { &self, payload: &CustomExecutionData, ) -> Result, Self::Error> { - Ok(payload.inner.payload.transactions().clone().into_iter().map(|encoded| { + let transactions = payload.inner.payload.transactions().clone().into_iter(); + let convert = |encoded: Bytes| { let tx = CustomTransaction::decode_2718_exact(encoded.as_ref()) .map_err(Into::into) .map_err(PayloadError::Decode)?; let signer = tx.try_recover().map_err(NewPayloadError::other)?; Ok::<_, NewPayloadError>(WithEncoded::new(encoded, tx.with_signer(signer))) - })) + }; + Ok((transactions, convert)) } }