Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr};
use crate::components::metrics::stopwatch::StopwatchMetrics;
use crate::components::network_provider::ChainName;
use crate::components::server::index_node::VersionInfo;
use crate::components::subgraph::ProofOfIndexingVersion;
use crate::components::subgraph::SubgraphVersionSwitchingMode;
use crate::components::transaction_receipt;
use crate::components::versions::ApiVersion;
Expand Down Expand Up @@ -741,6 +742,28 @@ pub trait QueryStore: Send + Sync {
fn deployment_id(&self) -> DeploymentId;
}

/// A single POI digest entry from the `poi2$` table, representing the
/// accumulated digest for a causality region over a block range.
#[derive(Clone, Debug)]
pub struct PoiDigestEntry {
/// The causality region identifier (the entity id in poi2$)
pub id: Id,
/// The accumulated digest bytes
pub digest: Vec<u8>,
/// Start of the block range (inclusive)
pub start_block: BlockNumber,
/// End of the block range (exclusive, i32::MAX if open-ended)
pub end_block: BlockNumber,
}

/// The full POI digest history for a deployment, containing all digest
/// entries and the POI version needed to compute proofs.
#[derive(Clone, Debug)]
pub struct PoiDigestHistory {
pub entries: Vec<PoiDigestEntry>,
pub poi_version: ProofOfIndexingVersion,
}

/// A view of the store that can provide information about the indexing status
/// of any subgraph and any deployment
#[async_trait]
Expand Down Expand Up @@ -790,6 +813,19 @@ pub trait StatusStore: Send + Sync + 'static {
block_number: BlockNumber,
fetch_block_ptr: &dyn BlockPtrForNumber,
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError>;

/// Retrieve the full POI digest history for a deployment within a block
/// range. Returns all `poi2$` entries whose block ranges overlap the
/// given range, along with the deployment's `ProofOfIndexingVersion`.
/// Returns `None` if the deployment doesn't exist or has no POI data.
async fn get_poi_digest_history(
&self,
subgraph_id: &DeploymentHash,
block_range: std::ops::Range<BlockNumber>,
) -> Result<Option<PoiDigestHistory>, StoreError>;

/// Get the network for a deployment
async fn network_for_deployment(&self, id: &DeploymentHash) -> Result<String, StoreError>;
}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions server/index-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ graph-graphql = { path = "../../graphql" }
graph-chain-ethereum = { path = "../../chain/ethereum" }
graph-chain-near = { path = "../../chain/near" }
git-testament = "0.2.6"
rayon = "1"
244 changes: 240 additions & 4 deletions server/index-node/src/resolver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};

use async_trait::async_trait;
use graph::components::subgraph::ProofOfIndexingFinisher;
use graph::data::query::Trace;
use graph::data::store::Id;
use graph::prelude::alloy::primitives::Address;
Expand Down Expand Up @@ -352,7 +353,10 @@ where
))
}

fn resolve_proof_of_indexing(&self, field: &a::Field) -> Result<r::Value, QueryExecutionError> {
async fn resolve_proof_of_indexing(
&self,
field: &a::Field,
) -> Result<r::Value, QueryExecutionError> {
let deployment_id = field
.get_required::<DeploymentHash>("subgraph")
.expect("Valid subgraphId required");
Expand Down Expand Up @@ -381,7 +385,7 @@ where
let poi_fut = self
.store
.get_proof_of_indexing(&deployment_id, &indexer, block.clone());
let poi = match graph::futures03::executor::block_on(poi_fut) {
let poi = match poi_fut.await {
Ok(Some(poi)) => r::Value::String(format!("0x{}", hex::encode(poi))),
Ok(None) => r::Value::Null,
Err(e) => {
Expand Down Expand Up @@ -451,6 +455,237 @@ where
Ok(r::Value::List(public_poi_results))
}

async fn resolve_block_for_poi(
&self,
field: &a::Field,
) -> Result<r::Value, QueryExecutionError> {
const CHUNK_SIZE: i32 = 1_000_000;

let deployment_id = field
.get_required::<DeploymentHash>("subgraph")
.expect("Valid subgraph required");
let target_poi_hash = field
.get_required::<BlockHash>("targetPoi")
.expect("Valid targetPoi required");
let start_block = field
.get_required::<BlockNumber>("startBlock")
.expect("Valid startBlock required");
let end_block = field
.get_required::<BlockNumber>("endBlock")
.expect("Valid endBlock required");

let indexer = Some(
field
.get_required::<Address>("indexer")
.expect("Valid indexer required"),
);

if end_block <= start_block {
return Ok(r::Value::Null);
}

let target_bytes: [u8; 32] = match target_poi_hash.as_slice().try_into() {
Ok(bytes) => bytes,
Err(_) => {
error!(
self.logger,
"Invalid targetPoi: expected 32 bytes";
"got_bytes" => target_poi_hash.as_slice().len()
);
return Ok(r::Value::Null);
}
};

// Resolve the network for this deployment
let network = match self.store.network_for_deployment(&deployment_id).await {
Ok(n) => n,
Err(e) => {
error!(
self.logger,
"Failed to resolve network for deployment";
"subgraph" => &deployment_id,
"error" => format!("{:?}", e)
);
return Ok(r::Value::Null);
}
};

// Fetch the full digest history for the block range
let history = match self
.store
.get_poi_digest_history(&deployment_id, start_block..end_block)
.await
{
Ok(Some(h)) => h,
Ok(None) => return Ok(r::Value::Null),
Err(e) => {
error!(
self.logger,
"Failed to fetch POI digest history";
"subgraph" => &deployment_id,
"error" => format!("{:?}", e)
);
return Ok(r::Value::Null);
}
};

let poi_version = history.poi_version;

// Build a lookup structure: for each causality region id, a sorted
// vec of (start_block, end_block, digest) for binary search.
let mut region_entries: HashMap<Id, Vec<(BlockNumber, BlockNumber, Vec<u8>)>> =
HashMap::new();
for entry in history.entries {
region_entries.entry(entry.id).or_default().push((
entry.start_block,
entry.end_block,
entry.digest,
));
}
for entries in region_entries.values_mut() {
entries.sort_by_key(|(start, _, _)| *start);
}

// Share across rayon threads
let region_entries = Arc::new(region_entries);

let chain_store = match self.store.block_store().chain_store(&network).await {
Some(cs) => cs,
None => {
error!(
self.logger,
"Chain store not found for network";
"network" => &network
);
return Ok(r::Value::Null);
}
};

// Search backwards from end_block (the match is likely near the top).
// Pipeline: fetch the next chunk while computing POIs for the current one.
let mut chunk_end = end_block;
let chunk_start = std::cmp::max(chunk_end - CHUNK_SIZE, start_block);

// Fetch first chunk
let block_numbers: Vec<BlockNumber> = (chunk_start..chunk_end).collect();
let mut current_ptrs = match chain_store
.cheap_clone()
.block_ptrs_by_numbers(block_numbers)
.await
{
Ok(ptrs) => ptrs,
Err(e) => {
error!(
self.logger,
"Failed to fetch block hashes";
"range" => format!("{}..{}", chunk_start, chunk_end),
"error" => format!("{:?}", e)
);
return Ok(r::Value::Null);
}
};
chunk_end = chunk_start;

loop {
// Start prefetching the next chunk while we process the current one
let next_chunk_end = chunk_end;
let next_chunk_start = std::cmp::max(next_chunk_end - CHUNK_SIZE, start_block);
let prefetch = if next_chunk_start < next_chunk_end {
let cs = chain_store.cheap_clone();
let numbers: Vec<BlockNumber> = (next_chunk_start..next_chunk_end).collect();
Some(tokio::spawn(async move {
cs.block_ptrs_by_numbers(numbers).await
}))
} else {
None
};

// Collect blocks with unambiguous hashes for parallel search
let blocks_to_check: Vec<(BlockNumber, BlockHash)> = current_ptrs
.iter()
.filter_map(|(num, ptrs)| {
if ptrs.len() == 1 {
Some((*num, ptrs[0].hash.clone()))
} else {
None
}
})
.collect();

// Parallel POI computation across all cores via rayon
let re = region_entries.clone();
let did = deployment_id.clone();
let result = graph::spawn_blocking_allow_panic(move || {
use rayon::prelude::*;
blocks_to_check
.par_iter()
.find_map_any(|(block_num, block_hash)| {
let block_ptr = BlockPtr::new(block_hash.clone(), *block_num);
let mut finisher =
ProofOfIndexingFinisher::new(&block_ptr, &did, &indexer, poi_version);

for (region_id, entries) in re.as_ref() {
let idx = entries.partition_point(|(start, _, _)| *start <= *block_num);
if idx == 0 {
continue;
}
let (start, end, ref digest) = entries[idx - 1];
if *block_num >= start && *block_num < end {
finisher.add_causality_region(region_id, digest);
}
}

let computed = finisher.finish();
if computed == target_bytes {
Some((*block_num, block_hash.clone(), computed))
} else {
None
}
})
})
.await
.map_err(|e| QueryExecutionError::Panic(e.to_string()))?;

if let Some((block_num, block_hash, computed_poi)) = result {
// Found it - abort any in-flight prefetch
if let Some(handle) = prefetch {
handle.abort();
}
return Ok(object! {
__typename: "PoiSearchResult",
deployment: deployment_id.to_string(),
block: object! {
hash: block_hash.hash_hex(),
number: block_num,
},
proofOfIndexing: format!("0x{}", hex::encode(computed_poi)),
});
}

// Move to the next chunk
match prefetch {
Some(handle) => {
current_ptrs = handle
.await
.map_err(|e| QueryExecutionError::Panic(e.to_string()))?
.map_err(|e| {
error!(
self.logger,
"Failed to fetch block hashes";
"range" => format!("{}..{}", next_chunk_start, next_chunk_end),
"error" => format!("{:?}", e)
);
QueryExecutionError::StoreError(e.into())
})?;
chunk_end = next_chunk_start;
}
None => break,
}
}

Ok(r::Value::Null)
}

async fn resolve_indexing_status_for_version(
&self,
field: &a::Field,
Expand Down Expand Up @@ -791,7 +1026,7 @@ where
field.name.as_str(),
scalar_type.name.as_str(),
) {
("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field),
("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field).await,
("Query", "blockData", "JSONObject") => self.resolve_block_data(field).await,
("Query", "blockHashFromNumber", "Bytes") => {
self.resolve_block_hash_from_number(field).await
Expand Down Expand Up @@ -855,6 +1090,7 @@ where
// The top-level `subgraphVersions` field
(None, "apiVersions") => self.resolve_api_versions(field),
(None, "version") => self.version(),
(None, "blockForPoi") => self.resolve_block_for_poi(field).await,

// Resolve fields of `Object` values (e.g. the `latestBlock` field of `EthereumBlock`)
(value, _) => Ok(value.unwrap_or(r::Value::Null)),
Expand Down
18 changes: 18 additions & 0 deletions server/index-node/src/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ type Query {
blockHash: Bytes!
): [CachedEthereumCall!]
apiVersions(subgraphId: String!): [ApiVersion!]!
"""
Find the block number that produced a given proof of indexing.
Used for dispute investigation to verify which block an indexer
actually synced to when they submitted a POI.
"""
blockForPoi(
subgraph: String!
targetPoi: Bytes!
startBlock: Int!
endBlock: Int!
indexer: Bytes!
): PoiSearchResult
}

type Version {
Expand Down Expand Up @@ -203,6 +215,12 @@ type ProofOfIndexingResult {
proofOfIndexing: Bytes
}

type PoiSearchResult {
block: Block!
deployment: String!
proofOfIndexing: Bytes!
}

type ApiVersion {
"""
Version number in SemVer format
Expand Down
Loading