Skip to content

Commit 9bd4b62

Browse files
committed
fix: strengthen batch invariants and prevent blinded node starvation
- Change debug_assert to assert for multi_added_removed_keys Arc equality check in BatchedStorageProof::merge, ensuring incorrect proofs are caught in release builds, not just debug - Change BatchedAccountProof::merge to try_merge returning Result, properly handling incompatible caches by processing as separate batches instead of panicking - Add MAX_DEFERRED_BLINDED_NODES (16) limit to prevent starvation of blinded node requests under high proof load - stops batching early when limit reached - Pre-allocate deferred_blinded_nodes vectors with capacity - Remove unnecessary clone of storage_work_tx by taking reference
1 parent a14e221 commit 9bd4b62

File tree

1 file changed

+101
-61
lines changed

1 file changed

+101
-61
lines changed

crates/trie/parallel/src/proof_task.rs

Lines changed: 101 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;
8383
/// Maximum number of storage proof jobs to batch together per account.
8484
const STORAGE_PROOF_BATCH_LIMIT: usize = 32;
8585

86+
/// Maximum number of blinded node requests to defer during storage proof batching.
87+
/// When this limit is reached, batching stops early to process deferred nodes,
88+
/// preventing starvation of blinded node requests under high proof load.
89+
const MAX_DEFERRED_BLINDED_NODES: usize = 16;
90+
8691
/// Holds batched storage proof jobs for the same account.
8792
///
8893
/// When multiple storage proof requests arrive for the same account, they can be merged
@@ -117,13 +122,16 @@ impl BatchedStorageProof {
117122

118123
/// Merges another storage proof job into this batch.
119124
///
120-
/// # Panics (debug builds only)
125+
/// # Panics
121126
/// Panics if `input.multi_added_removed_keys` does not point to the same Arc as the batch's.
127+
/// This is a critical invariant for proof correctness.
122128
fn merge(&mut self, input: StorageProofInput, sender: ProofResultContext) {
123129
// Validate that all batched jobs share the same multi_added_removed_keys Arc.
124130
// This is a critical invariant: if jobs have different keys, the merged proof
125131
// would be computed with only the first job's keys, producing incorrect results.
126-
debug_assert!(
132+
// Using assert! (not debug_assert!) because incorrect proofs could cause consensus
133+
// failures.
134+
assert!(
127135
match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
128136
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
129137
(None, None) => true,
@@ -235,31 +243,24 @@ impl BatchedAccountProof {
235243
}
236244
}
237245

238-
/// Merges another account multiproof job into this batch.
246+
/// Attempts to merge another account multiproof job into this batch.
239247
///
240-
/// # Panics (debug builds only)
241-
/// Panics if `input.multi_added_removed_keys` or `input.missed_leaves_storage_roots`
242-
/// do not point to the same Arc as the batch's.
243-
fn merge(&mut self, input: AccountMultiproofInput) {
244-
// Validate that all batched jobs share the same multi_added_removed_keys Arc.
245-
// This is a critical invariant: if jobs have different keys, the merged proof
246-
// would be computed with only the first job's keys, producing incorrect results.
247-
debug_assert!(
248-
match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
248+
/// Returns the job back if caches are incompatible so the caller can process it separately.
249+
fn try_merge(&mut self, input: AccountMultiproofInput) -> Result<(), AccountMultiproofInput> {
250+
// Require all jobs to share the same caches; otherwise merging would produce
251+
// incorrect proofs by reusing the wrong retained keys or missed-leaf storage roots.
252+
let multi_added_removed_keys_mismatch =
253+
!match (&self.multi_added_removed_keys, &input.multi_added_removed_keys) {
249254
(Some(a), Some(b)) => Arc::ptr_eq(a, b),
250255
(None, None) => true,
251256
_ => false,
252-
},
253-
"All batched account proof jobs must share the same multi_added_removed_keys Arc"
254-
);
257+
};
255258

256-
// Validate that all batched jobs share the same missed_leaves_storage_roots cache.
257-
// This is critical because workers may add entries to this cache during proof computation,
258-
// and all receivers expect to see those entries in their shared cache.
259-
debug_assert!(
260-
Arc::ptr_eq(&self.missed_leaves_storage_roots, &input.missed_leaves_storage_roots),
261-
"All batched account proof jobs must share the same missed_leaves_storage_roots Arc"
262-
);
259+
if multi_added_removed_keys_mismatch ||
260+
!Arc::ptr_eq(&self.missed_leaves_storage_roots, &input.missed_leaves_storage_roots)
261+
{
262+
return Err(input);
263+
}
263264

264265
// Merge targets.
265266
self.targets.extend(input.targets);
@@ -287,6 +288,8 @@ impl BatchedAccountProof {
287288

288289
// Collect the sender.
289290
self.senders.push(input.proof_result_sender);
291+
292+
Ok(())
290293
}
291294

292295
/// Converts this batch into a single `AccountMultiproofInput` for computation.
@@ -1032,8 +1035,9 @@ where
10321035
available_workers.fetch_add(1, Ordering::Relaxed);
10331036

10341037
// Deferred blinded node jobs to process after batched storage proofs.
1038+
// Pre-allocate with capacity to avoid reallocations during batching.
10351039
let mut deferred_blinded_nodes: Vec<(B256, Nibbles, Sender<TrieNodeProviderResult>)> =
1036-
Vec::new();
1040+
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
10371041

10381042
while let Ok(job) = work_rx.recv() {
10391043
// Mark worker as busy.
@@ -1077,6 +1081,11 @@ where
10771081
}) => {
10781082
// Defer blinded node jobs to process after batched proofs.
10791083
deferred_blinded_nodes.push((account, path, result_sender));
1084+
// Stop batching if too many blinded nodes are deferred to prevent
1085+
// starvation.
1086+
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
1087+
break;
1088+
}
10801089
}
10811090
Err(_) => break,
10821091
}
@@ -1419,55 +1428,86 @@ where
14191428
available_workers.fetch_add(1, Ordering::Relaxed);
14201429

14211430
// Deferred blinded node jobs to process after batched account proofs.
1422-
let mut deferred_blinded_nodes: Vec<(Nibbles, Sender<TrieNodeProviderResult>)> = Vec::new();
1431+
// Pre-allocate with capacity to avoid reallocations during batching.
1432+
let mut deferred_blinded_nodes: Vec<(Nibbles, Sender<TrieNodeProviderResult>)> =
1433+
Vec::with_capacity(MAX_DEFERRED_BLINDED_NODES);
14231434

14241435
while let Ok(job) = work_rx.recv() {
14251436
// Mark worker as busy.
14261437
available_workers.fetch_sub(1, Ordering::Relaxed);
14271438

14281439
match job {
14291440
AccountWorkerJob::AccountMultiproof { input } => {
1430-
// Start batching: accumulate account multiproof jobs.
1431-
let mut batch = BatchedAccountProof::new(*input);
1432-
let mut total_jobs = 1usize;
1433-
1434-
// Drain additional jobs from the queue.
1435-
while total_jobs < ACCOUNT_PROOF_BATCH_LIMIT {
1436-
match work_rx.try_recv() {
1437-
Ok(AccountWorkerJob::AccountMultiproof { input: next_input }) => {
1438-
total_jobs += 1;
1439-
batch.merge(*next_input);
1440-
}
1441-
Ok(AccountWorkerJob::BlindedAccountNode { path, result_sender }) => {
1442-
// Defer blinded node jobs to process after batched proofs.
1443-
deferred_blinded_nodes.push((path, result_sender));
1441+
// Start batching: accumulate account multiproof jobs. If we encounter an
1442+
// incompatible job (different caches), process it as a separate batch.
1443+
let mut next_account_job: Option<Box<AccountMultiproofInput>> = Some(input);
1444+
1445+
while let Some(account_job) = next_account_job.take() {
1446+
let mut batch = BatchedAccountProof::new(*account_job);
1447+
let mut pending_incompatible: Option<Box<AccountMultiproofInput>> = None;
1448+
1449+
// Drain additional jobs from the queue.
1450+
while batch.senders.len() < ACCOUNT_PROOF_BATCH_LIMIT {
1451+
match work_rx.try_recv() {
1452+
Ok(AccountWorkerJob::AccountMultiproof { input: next_input }) => {
1453+
match batch.try_merge(*next_input) {
1454+
Ok(()) => {}
1455+
Err(incompatible) => {
1456+
trace!(
1457+
target: "trie::proof_task",
1458+
worker_id,
1459+
"Account multiproof batch split due to incompatible caches"
1460+
);
1461+
pending_incompatible = Some(Box::new(incompatible));
1462+
break;
1463+
}
1464+
}
1465+
}
1466+
Ok(AccountWorkerJob::BlindedAccountNode {
1467+
path,
1468+
result_sender,
1469+
}) => {
1470+
// Defer blinded node jobs to process after batched proofs.
1471+
deferred_blinded_nodes.push((path, result_sender));
1472+
// Stop batching if too many blinded nodes are deferred to
1473+
// prevent starvation.
1474+
if deferred_blinded_nodes.len() >= MAX_DEFERRED_BLINDED_NODES {
1475+
break;
1476+
}
1477+
}
1478+
Err(_) => break,
14441479
}
1445-
Err(_) => break,
14461480
}
1447-
}
14481481

1449-
let batch_size = batch.senders.len();
1450-
batch_metrics.record_batch_size(batch_size);
1482+
let batch_size = batch.senders.len();
1483+
batch_metrics.record_batch_size(batch_size);
14511484

1452-
let (merged_input, senders) = batch.into_input();
1485+
let (merged_input, senders) = batch.into_input();
14531486

1454-
trace!(
1455-
target: "trie::proof_task",
1456-
worker_id,
1457-
batch_size,
1458-
targets_len = merged_input.targets.len(),
1459-
"Processing batched account multiproof"
1460-
);
1487+
trace!(
1488+
target: "trie::proof_task",
1489+
worker_id,
1490+
batch_size,
1491+
targets_len = merged_input.targets.len(),
1492+
"Processing batched account multiproof"
1493+
);
14611494

1462-
Self::process_batched_account_multiproof(
1463-
worker_id,
1464-
&proof_tx,
1465-
storage_work_tx.clone(),
1466-
merged_input,
1467-
senders,
1468-
&mut account_proofs_processed,
1469-
&mut cursor_metrics_cache,
1470-
);
1495+
Self::process_batched_account_multiproof(
1496+
worker_id,
1497+
&proof_tx,
1498+
&storage_work_tx,
1499+
merged_input,
1500+
senders,
1501+
&mut account_proofs_processed,
1502+
&mut cursor_metrics_cache,
1503+
);
1504+
1505+
// If we encountered an incompatible job, process it as its own batch
1506+
// before handling any deferred blinded node requests.
1507+
if let Some(incompatible_job) = pending_incompatible {
1508+
next_account_job = Some(incompatible_job);
1509+
}
1510+
}
14711511

14721512
// Process any deferred blinded node jobs.
14731513
for (path, result_sender) in std::mem::take(&mut deferred_blinded_nodes) {
@@ -1520,7 +1560,7 @@ where
15201560
fn process_batched_account_multiproof<Provider>(
15211561
worker_id: usize,
15221562
proof_tx: &ProofTaskTx<Provider>,
1523-
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
1563+
storage_work_tx: &CrossbeamSender<StorageWorkerJob>,
15241564
input: AccountMultiproofInput,
15251565
senders: Vec<ProofResultContext>,
15261566
account_proofs_processed: &mut u64,
@@ -1563,7 +1603,7 @@ where
15631603
tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);
15641604

15651605
let storage_proof_receivers = match dispatch_storage_proofs(
1566-
&storage_work_tx,
1606+
storage_work_tx,
15671607
&targets,
15681608
&mut storage_prefix_sets,
15691609
collect_branch_node_masks,

0 commit comments

Comments
 (0)