diff --git a/Cargo.lock b/Cargo.lock index 1011044fad5..e2010887d6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3926,6 +3926,7 @@ dependencies = [ "graph-chain-ethereum", "graph-chain-near", "graph-graphql", + "rayon", ] [[package]] diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 658baa8be3e..3b4f906cae7 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -10,6 +10,7 @@ use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr}; use crate::components::metrics::stopwatch::StopwatchMetrics; use crate::components::network_provider::ChainName; use crate::components::server::index_node::VersionInfo; +use crate::components::subgraph::ProofOfIndexingVersion; use crate::components::subgraph::SubgraphVersionSwitchingMode; use crate::components::transaction_receipt; use crate::components::versions::ApiVersion; @@ -741,6 +742,28 @@ pub trait QueryStore: Send + Sync { fn deployment_id(&self) -> DeploymentId; } +/// A single POI digest entry from the `poi2$` table, representing the +/// accumulated digest for a causality region over a block range. +#[derive(Clone, Debug)] +pub struct PoiDigestEntry { + /// The causality region identifier (the entity id in poi2$) + pub id: Id, + /// The accumulated digest bytes + pub digest: Vec, + /// Start of the block range (inclusive) + pub start_block: BlockNumber, + /// End of the block range (exclusive, i32::MAX if open-ended) + pub end_block: BlockNumber, +} + +/// The full POI digest history for a deployment, containing all digest +/// entries and the POI version needed to compute proofs. +#[derive(Clone, Debug)] +pub struct PoiDigestHistory { + pub entries: Vec, + pub poi_version: ProofOfIndexingVersion, +} + /// A view of the store that can provide information about the indexing status /// of any subgraph and any deployment #[async_trait] @@ -790,6 +813,19 @@ pub trait StatusStore: Send + Sync + 'static { block_number: BlockNumber, fetch_block_ptr: &dyn BlockPtrForNumber, ) -> Result, StoreError>; + + /// Retrieve the full POI digest history for a deployment within a block + /// range. Returns all `poi2$` entries whose block ranges overlap the + /// given range, along with the deployment's `ProofOfIndexingVersion`. + /// Returns `None` if the deployment doesn't exist or has no POI data. + async fn get_poi_digest_history( + &self, + subgraph_id: &DeploymentHash, + block_range: std::ops::Range, + ) -> Result, StoreError>; + + /// Get the network for a deployment + async fn network_for_deployment(&self, id: &DeploymentHash) -> Result; } #[async_trait] diff --git a/server/index-node/Cargo.toml b/server/index-node/Cargo.toml index 9672f657e4a..e1f7e1d021c 100644 --- a/server/index-node/Cargo.toml +++ b/server/index-node/Cargo.toml @@ -11,3 +11,4 @@ graph-graphql = { path = "../../graphql" } graph-chain-ethereum = { path = "../../chain/ethereum" } graph-chain-near = { path = "../../chain/near" } git-testament = "0.2.6" +rayon = "1" diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index b8385866d33..70973a9962a 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -1,6 +1,7 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use async_trait::async_trait; +use graph::components::subgraph::ProofOfIndexingFinisher; use graph::data::query::Trace; use graph::data::store::Id; use graph::prelude::alloy::primitives::Address; @@ -352,7 +353,10 @@ where )) } - fn resolve_proof_of_indexing(&self, field: &a::Field) -> Result { + async fn resolve_proof_of_indexing( + &self, + field: &a::Field, + ) -> Result { let deployment_id = field .get_required::("subgraph") .expect("Valid subgraphId required"); @@ -381,7 +385,7 @@ where let poi_fut = self .store .get_proof_of_indexing(&deployment_id, &indexer, block.clone()); - let poi = match graph::futures03::executor::block_on(poi_fut) { + let poi = match poi_fut.await { Ok(Some(poi)) => r::Value::String(format!("0x{}", hex::encode(poi))), Ok(None) => r::Value::Null, Err(e) => { @@ -451,6 +455,237 @@ where Ok(r::Value::List(public_poi_results)) } + async fn resolve_block_for_poi( + &self, + field: &a::Field, + ) -> Result { + const CHUNK_SIZE: i32 = 1_000_000; + + let deployment_id = field + .get_required::("subgraph") + .expect("Valid subgraph required"); + let target_poi_hash = field + .get_required::("targetPoi") + .expect("Valid targetPoi required"); + let start_block = field + .get_required::("startBlock") + .expect("Valid startBlock required"); + let end_block = field + .get_required::("endBlock") + .expect("Valid endBlock required"); + + let indexer = Some( + field + .get_required::
("indexer") + .expect("Valid indexer required"), + ); + + if end_block <= start_block { + return Ok(r::Value::Null); + } + + let target_bytes: [u8; 32] = match target_poi_hash.as_slice().try_into() { + Ok(bytes) => bytes, + Err(_) => { + error!( + self.logger, + "Invalid targetPoi: expected 32 bytes"; + "got_bytes" => target_poi_hash.as_slice().len() + ); + return Ok(r::Value::Null); + } + }; + + // Resolve the network for this deployment + let network = match self.store.network_for_deployment(&deployment_id).await { + Ok(n) => n, + Err(e) => { + error!( + self.logger, + "Failed to resolve network for deployment"; + "subgraph" => &deployment_id, + "error" => format!("{:?}", e) + ); + return Ok(r::Value::Null); + } + }; + + // Fetch the full digest history for the block range + let history = match self + .store + .get_poi_digest_history(&deployment_id, start_block..end_block) + .await + { + Ok(Some(h)) => h, + Ok(None) => return Ok(r::Value::Null), + Err(e) => { + error!( + self.logger, + "Failed to fetch POI digest history"; + "subgraph" => &deployment_id, + "error" => format!("{:?}", e) + ); + return Ok(r::Value::Null); + } + }; + + let poi_version = history.poi_version; + + // Build a lookup structure: for each causality region id, a sorted + // vec of (start_block, end_block, digest) for binary search. + let mut region_entries: HashMap)>> = + HashMap::new(); + for entry in history.entries { + region_entries.entry(entry.id).or_default().push(( + entry.start_block, + entry.end_block, + entry.digest, + )); + } + for entries in region_entries.values_mut() { + entries.sort_by_key(|(start, _, _)| *start); + } + + // Share across rayon threads + let region_entries = Arc::new(region_entries); + + let chain_store = match self.store.block_store().chain_store(&network).await { + Some(cs) => cs, + None => { + error!( + self.logger, + "Chain store not found for network"; + "network" => &network + ); + return Ok(r::Value::Null); + } + }; + + // Search backwards from end_block (the match is likely near the top). + // Pipeline: fetch the next chunk while computing POIs for the current one. + let mut chunk_end = end_block; + let chunk_start = std::cmp::max(chunk_end - CHUNK_SIZE, start_block); + + // Fetch first chunk + let block_numbers: Vec = (chunk_start..chunk_end).collect(); + let mut current_ptrs = match chain_store + .cheap_clone() + .block_ptrs_by_numbers(block_numbers) + .await + { + Ok(ptrs) => ptrs, + Err(e) => { + error!( + self.logger, + "Failed to fetch block hashes"; + "range" => format!("{}..{}", chunk_start, chunk_end), + "error" => format!("{:?}", e) + ); + return Ok(r::Value::Null); + } + }; + chunk_end = chunk_start; + + loop { + // Start prefetching the next chunk while we process the current one + let next_chunk_end = chunk_end; + let next_chunk_start = std::cmp::max(next_chunk_end - CHUNK_SIZE, start_block); + let prefetch = if next_chunk_start < next_chunk_end { + let cs = chain_store.cheap_clone(); + let numbers: Vec = (next_chunk_start..next_chunk_end).collect(); + Some(tokio::spawn(async move { + cs.block_ptrs_by_numbers(numbers).await + })) + } else { + None + }; + + // Collect blocks with unambiguous hashes for parallel search + let blocks_to_check: Vec<(BlockNumber, BlockHash)> = current_ptrs + .iter() + .filter_map(|(num, ptrs)| { + if ptrs.len() == 1 { + Some((*num, ptrs[0].hash.clone())) + } else { + None + } + }) + .collect(); + + // Parallel POI computation across all cores via rayon + let re = region_entries.clone(); + let did = deployment_id.clone(); + let result = graph::spawn_blocking_allow_panic(move || { + use rayon::prelude::*; + blocks_to_check + .par_iter() + .find_map_any(|(block_num, block_hash)| { + let block_ptr = BlockPtr::new(block_hash.clone(), *block_num); + let mut finisher = + ProofOfIndexingFinisher::new(&block_ptr, &did, &indexer, poi_version); + + for (region_id, entries) in re.as_ref() { + let idx = entries.partition_point(|(start, _, _)| *start <= *block_num); + if idx == 0 { + continue; + } + let (start, end, ref digest) = entries[idx - 1]; + if *block_num >= start && *block_num < end { + finisher.add_causality_region(region_id, digest); + } + } + + let computed = finisher.finish(); + if computed == target_bytes { + Some((*block_num, block_hash.clone(), computed)) + } else { + None + } + }) + }) + .await + .map_err(|e| QueryExecutionError::Panic(e.to_string()))?; + + if let Some((block_num, block_hash, computed_poi)) = result { + // Found it - abort any in-flight prefetch + if let Some(handle) = prefetch { + handle.abort(); + } + return Ok(object! { + __typename: "PoiSearchResult", + deployment: deployment_id.to_string(), + block: object! { + hash: block_hash.hash_hex(), + number: block_num, + }, + proofOfIndexing: format!("0x{}", hex::encode(computed_poi)), + }); + } + + // Move to the next chunk + match prefetch { + Some(handle) => { + current_ptrs = handle + .await + .map_err(|e| QueryExecutionError::Panic(e.to_string()))? + .map_err(|e| { + error!( + self.logger, + "Failed to fetch block hashes"; + "range" => format!("{}..{}", next_chunk_start, next_chunk_end), + "error" => format!("{:?}", e) + ); + QueryExecutionError::StoreError(e.into()) + })?; + chunk_end = next_chunk_start; + } + None => break, + } + } + + Ok(r::Value::Null) + } + async fn resolve_indexing_status_for_version( &self, field: &a::Field, @@ -791,7 +1026,7 @@ where field.name.as_str(), scalar_type.name.as_str(), ) { - ("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field), + ("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field).await, ("Query", "blockData", "JSONObject") => self.resolve_block_data(field).await, ("Query", "blockHashFromNumber", "Bytes") => { self.resolve_block_hash_from_number(field).await @@ -855,6 +1090,7 @@ where // The top-level `subgraphVersions` field (None, "apiVersions") => self.resolve_api_versions(field), (None, "version") => self.version(), + (None, "blockForPoi") => self.resolve_block_for_poi(field).await, // Resolve fields of `Object` values (e.g. the `latestBlock` field of `EthereumBlock`) (value, _) => Ok(value.unwrap_or(r::Value::Null)), diff --git a/server/index-node/src/schema.graphql b/server/index-node/src/schema.graphql index 10475d75a0d..2fbe0a5e5dd 100644 --- a/server/index-node/src/schema.graphql +++ b/server/index-node/src/schema.graphql @@ -46,6 +46,18 @@ type Query { blockHash: Bytes! ): [CachedEthereumCall!] apiVersions(subgraphId: String!): [ApiVersion!]! + """ + Find the block number that produced a given proof of indexing. + Used for dispute investigation to verify which block an indexer + actually synced to when they submitted a POI. + """ + blockForPoi( + subgraph: String! + targetPoi: Bytes! + startBlock: Int! + endBlock: Int! + indexer: Bytes! + ): PoiSearchResult } type Version { @@ -203,6 +215,12 @@ type ProofOfIndexingResult { proofOfIndexing: Bytes } +type PoiSearchResult { + block: Block! + deployment: String! + proofOfIndexing: Bytes! +} + type ApiVersion { """ Version number in SemVer format diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index a9fcc833e99..df803c3bffa 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -8,6 +8,7 @@ use graph::anyhow::Context; use graph::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor}; use graph::blockchain::BlockTime; use graph::components::store::write::RowGroup; +use graph::components::store::PoiDigestHistory; use graph::components::store::{ Batch, DeploymentLocator, DerivedEntityQuery, PrunePhase, PruneReporter, PruneRequest, PruningStrategy, QueryPermit, StoredDynamicDataSource, VersionStats, @@ -15,7 +16,7 @@ use graph::components::store::{ use graph::components::versions::VERSIONS; use graph::data::graphql::IntoValue; use graph::data::query::Trace; -use graph::data::store::{IdList, SqlQueryObject}; +use graph::data::store::{Id, IdList, SqlQueryObject}; use graph::data::subgraph::{status, SPEC_VERSION_0_0_6}; use graph::data_source::CausalityRegion; use graph::derive::CheapClone; @@ -1018,6 +1019,69 @@ impl DeploymentStore { Ok(Some(finisher.finish())) } + /// Retrieve all POI digest entries from the `poi2$` table whose block + /// ranges overlap the given `block_range`, along with the deployment's + /// POI version. Used by the `blockForPoi` resolver to reconstruct POIs + /// without per-block entity queries. + pub(crate) async fn get_poi_digest_history( + &self, + site: Arc, + block_range: Range, + ) -> Result, StoreError> { + use diesel::sql_types::{Binary, Integer, Text}; + use graph::components::store::PoiDigestEntry; + + let info = self.subgraph_info(site.cheap_clone()).await?; + + #[derive(QueryableByName)] + struct DigestRow { + #[diesel(sql_type = Text)] + id: String, + #[diesel(sql_type = Binary)] + digest: Vec, + #[diesel(sql_type = Integer)] + start_block: i32, + #[diesel(sql_type = Integer)] + end_block: i32, + } + + let query = format!( + r#"SELECT id, digest, lower(block_range) as start_block, + coalesce(upper(block_range), 2147483647) as end_block + FROM "{}"."poi2$" + WHERE block_range && int4range($1, $2) + ORDER BY id, lower(block_range)"#, + site.namespace, + ); + + let mut conn = self.pool.get_permitted().await?; + let rows = diesel::sql_query(query) + .bind::(block_range.start) + .bind::(block_range.end) + .load::(&mut conn) + .await + .map_err(StoreError::from)?; + + if rows.is_empty() { + return Ok(None); + } + + let entries = rows + .into_iter() + .map(|row| PoiDigestEntry { + id: Id::String(row.id.into()), + digest: row.digest, + start_block: row.start_block, + end_block: row.end_block, + }) + .collect(); + + Ok(Some(PoiDigestHistory { + entries, + poi_version: info.poi_version, + })) + } + /// Get the entity matching `key` from the deployment `site`. Only /// consider entities as of the given `block` pub(crate) async fn get( diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs index 4adec80ab5b..0056d9de6aa 100644 --- a/store/postgres/src/store.rs +++ b/store/postgres/src/store.rs @@ -171,6 +171,20 @@ impl StatusStore for Store { .await } + async fn get_poi_digest_history( + &self, + subgraph_id: &DeploymentHash, + block_range: std::ops::Range, + ) -> Result, StoreError> { + self.subgraph_store + .get_poi_digest_history(subgraph_id, block_range) + .await + } + + async fn network_for_deployment(&self, id: &DeploymentHash) -> Result { + self.subgraph_store.network_for_deployment(id).await + } + async fn query_permit(&self) -> QueryPermit { // Status queries go to the primary shard. self.block_store.query_permit_primary().await diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index e5b9ee0a529..b8c7a8b8e6d 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -19,7 +19,7 @@ use graph::{ server::index_node::VersionInfo, store::{ self, BlockPtrForNumber, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait, - PruneReporter, PruneRequest, SubgraphFork, + PoiDigestHistory, PruneReporter, PruneRequest, SubgraphFork, }, }, data::{ @@ -249,6 +249,21 @@ impl SubgraphStore { self.inner.get_proof_of_indexing(id, indexer, block).await } + pub(crate) async fn get_poi_digest_history( + &self, + id: &DeploymentHash, + block_range: std::ops::Range, + ) -> Result, StoreError> { + self.inner.get_poi_digest_history(id, block_range).await + } + + pub(crate) async fn network_for_deployment( + &self, + id: &DeploymentHash, + ) -> Result { + self.inner.network_for_deployment(id).await + } + pub(crate) async fn get_public_proof_of_indexing( &self, id: &DeploymentHash, @@ -1137,6 +1152,23 @@ impl Inner { store.get_proof_of_indexing(site, indexer, block).await } + pub(crate) async fn get_poi_digest_history( + &self, + id: &DeploymentHash, + block_range: std::ops::Range, + ) -> Result, StoreError> { + let (store, site) = self.store(id).await?; + store.get_poi_digest_history(site, block_range).await + } + + pub(crate) async fn network_for_deployment( + &self, + id: &DeploymentHash, + ) -> Result { + let site = self.site(id).await?; + Ok(site.network.clone()) + } + pub(crate) async fn get_public_proof_of_indexing( &self, id: &DeploymentHash,