Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ee1ccf8
Continue #3295, experimental DPP support.
mbutrovich Jan 30, 2026
70ad4f5
Remove unnecessary steps in convert(), hoist reflection calls out of …
mbutrovich Jan 30, 2026
e820616
scalastyle
mbutrovich Jan 30, 2026
b08abe0
Fix scenario with multiple DPP expressions (i.e., join on two partiti…
mbutrovich Jan 30, 2026
9dc84f9
Docs.
mbutrovich Jan 30, 2026
b3c7c79
Throw an exception on reflection error in setInSubqueryResult, strong…
mbutrovich Jan 30, 2026
a90f06d
Comments cleanup. Throw exception if column not found in subquery.
mbutrovich Jan 30, 2026
09066fb
Avoid capturing perPartitionByLocation in closure when:
mbutrovich Jan 30, 2026
4ea7c78
Remove IcebergFilePartition from proto and clean up native code now t…
mbutrovich Jan 30, 2026
9eb95d7
Use sab.index and sab.buildKeys with exprId matching (handles renamed…
mbutrovich Jan 31, 2026
0fd297e
Simplify matching logic for SubqueryAdaptiveBroadcastExec expressions…
mbutrovich Jan 31, 2026
8f7b29d
add shim for Spark 4.0 SAB API change (indices instead of index), add…
mbutrovich Jan 31, 2026
da27f5f
add Spark 3.4 shim, whoops
mbutrovich Jan 31, 2026
4af89b2
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Jan 31, 2026
97f3693
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 1, 2026
43a56e5
Comment.
mbutrovich Feb 1, 2026
bac6d66
Refactor down to just one CometExecRDD. Let's see how CI goes.
mbutrovich Feb 1, 2026
e73cca0
Fix spotless.
mbutrovich Feb 1, 2026
95c4e6d
Fix broadcast with DPP?
mbutrovich Feb 1, 2026
67c8bdb
Minor refactor for variable names, comments.
mbutrovich Feb 1, 2026
aa048a7
Fix scalastyle.
mbutrovich Feb 1, 2026
02a52a3
cache parsed commonData.
mbutrovich Feb 1, 2026
9cc541a
Address PR feedback.
mbutrovich Feb 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 15 additions & 25 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper};
use datafusion_comet_spark_expr::EvalMode;
use iceberg::scan::FileScanTask;

/// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables.
///
Expand All @@ -58,8 +59,8 @@ pub struct IcebergScanExec {
plan_properties: PlanProperties,
/// Catalog-specific configuration for FileIO
catalog_properties: HashMap<String, String>,
/// Pre-planned file scan tasks, grouped by partition
file_task_groups: Vec<Vec<iceberg::scan::FileScanTask>>,
/// Pre-planned file scan tasks
tasks: Vec<FileScanTask>,
/// Metrics
metrics: ExecutionPlanMetricsSet,
}
Expand All @@ -69,11 +70,10 @@ impl IcebergScanExec {
metadata_location: String,
schema: SchemaRef,
catalog_properties: HashMap<String, String>,
file_task_groups: Vec<Vec<iceberg::scan::FileScanTask>>,
tasks: Vec<FileScanTask>,
) -> Result<Self, ExecutionError> {
let output_schema = schema;
let num_partitions = file_task_groups.len();
let plan_properties = Self::compute_properties(Arc::clone(&output_schema), num_partitions);
let plan_properties = Self::compute_properties(Arc::clone(&output_schema), 1);

let metrics = ExecutionPlanMetricsSet::new();

Expand All @@ -82,7 +82,7 @@ impl IcebergScanExec {
output_schema,
plan_properties,
catalog_properties,
file_task_groups,
tasks,
metrics,
})
}
Expand Down Expand Up @@ -127,19 +127,10 @@ impl ExecutionPlan for IcebergScanExec {

fn execute(
&self,
partition: usize,
_partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
if partition < self.file_task_groups.len() {
let tasks = &self.file_task_groups[partition];
self.execute_with_tasks(tasks.clone(), partition, context)
} else {
Err(DataFusionError::Execution(format!(
"IcebergScanExec: Partition index {} out of range (only {} task groups available)",
partition,
self.file_task_groups.len()
)))
}
self.execute_with_tasks(self.tasks.clone(), context)
}

fn metrics(&self) -> Option<MetricsSet> {
Expand All @@ -152,15 +143,14 @@ impl IcebergScanExec {
/// deletes via iceberg-rust's ArrowReader.
fn execute_with_tasks(
&self,
tasks: Vec<iceberg::scan::FileScanTask>,
partition: usize,
tasks: Vec<FileScanTask>,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let output_schema = Arc::clone(&self.output_schema);
let file_io = Self::load_file_io(&self.catalog_properties, &self.metadata_location)?;
let batch_size = context.session_config().batch_size();

let metrics = IcebergScanMetrics::new(&self.metrics, partition);
let metrics = IcebergScanMetrics::new(&self.metrics);
let num_tasks = tasks.len();
metrics.num_splits.add(num_tasks);

Expand Down Expand Up @@ -221,10 +211,10 @@ struct IcebergScanMetrics {
}

impl IcebergScanMetrics {
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
fn new(metrics: &ExecutionPlanMetricsSet) -> Self {
Self {
baseline: BaselineMetrics::new(metrics, partition),
num_splits: MetricBuilder::new(metrics).counter("num_splits", partition),
baseline: BaselineMetrics::new(metrics, 0),
num_splits: MetricBuilder::new(metrics).counter("num_splits", 0),
}
}
}
Expand Down Expand Up @@ -311,11 +301,11 @@ where

impl DisplayAs for IcebergScanExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
let num_tasks: usize = self.file_task_groups.iter().map(|g| g.len()).sum();
write!(
f,
"IcebergScanExec: metadata_location={}, num_tasks={}",
self.metadata_location, num_tasks
self.metadata_location,
self.tasks.len()
)
}
}
65 changes: 28 additions & 37 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,33 +1132,28 @@ impl PhysicalPlanner {
))
}
OpStruct::IcebergScan(scan) => {
let required_schema: SchemaRef =
convert_spark_types_to_arrow_schema(scan.required_schema.as_slice());
// Extract common data and single partition's file tasks
// Per-partition injection happens in Scala before sending to native
let common = scan
.common
.as_ref()
.ok_or_else(|| GeneralError("IcebergScan missing common data".into()))?;

let catalog_properties: HashMap<String, String> = scan
let required_schema =
convert_spark_types_to_arrow_schema(common.required_schema.as_slice());
let catalog_properties: HashMap<String, String> = common
.catalog_properties
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

let metadata_location = scan.metadata_location.clone();

debug_assert!(
!scan.file_partitions.is_empty(),
"IcebergScan must have at least one file partition. This indicates a bug in Scala serialization."
);

let tasks = parse_file_scan_tasks(
scan,
&scan.file_partitions[self.partition as usize].file_scan_tasks,
)?;
let file_task_groups = vec![tasks];
let metadata_location = common.metadata_location.clone();
let tasks = parse_file_scan_tasks_from_common(common, &scan.file_scan_tasks)?;

let iceberg_scan = IcebergScanExec::new(
metadata_location,
required_schema,
catalog_properties,
file_task_groups,
tasks,
)?;

Ok((
Expand Down Expand Up @@ -2743,15 +2738,14 @@ fn partition_data_to_struct(
/// Each task contains a residual predicate that is used for row-group level filtering
/// during Parquet scanning.
///
/// This function uses deduplication pools from the IcebergScan to avoid redundant parsing
/// of schemas, partition specs, partition types, name mappings, and other repeated data.
fn parse_file_scan_tasks(
proto_scan: &spark_operator::IcebergScan,
/// This function uses deduplication pools from the IcebergScanCommon to avoid redundant
/// parsing of schemas, partition specs, partition types, name mappings, and other repeated data.
fn parse_file_scan_tasks_from_common(
proto_common: &spark_operator::IcebergScanCommon,
proto_tasks: &[spark_operator::IcebergFileScanTask],
) -> Result<Vec<iceberg::scan::FileScanTask>, ExecutionError> {
// Build caches upfront: for 10K tasks with 1 schema, this parses the schema
// once instead of 10K times, eliminating redundant JSON deserialization
let schema_cache: Vec<Arc<iceberg::spec::Schema>> = proto_scan
// Parse each unique schema once, not once per task
let schema_cache: Vec<Arc<iceberg::spec::Schema>> = proto_common
.schema_pool
.iter()
.map(|json| {
Expand All @@ -2764,7 +2758,7 @@ fn parse_file_scan_tasks(
})
.collect::<Result<Vec<_>, _>>()?;

let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_scan
let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_common
.partition_spec_pool
.iter()
.map(|json| {
Expand All @@ -2774,7 +2768,7 @@ fn parse_file_scan_tasks(
})
.collect();

let name_mapping_cache: Vec<Option<Arc<iceberg::spec::NameMapping>>> = proto_scan
let name_mapping_cache: Vec<Option<Arc<iceberg::spec::NameMapping>>> = proto_common
.name_mapping_pool
.iter()
.map(|json| {
Expand All @@ -2784,7 +2778,7 @@ fn parse_file_scan_tasks(
})
.collect();

let delete_files_cache: Vec<Vec<iceberg::scan::FileScanTaskDeleteFile>> = proto_scan
let delete_files_cache: Vec<Vec<iceberg::scan::FileScanTaskDeleteFile>> = proto_common
.delete_files_pool
.iter()
.map(|list| {
Expand All @@ -2796,7 +2790,7 @@ fn parse_file_scan_tasks(
"EQUALITY_DELETES" => iceberg::spec::DataContentType::EqualityDeletes,
other => {
return Err(GeneralError(format!(
"Invalid delete content type '{}'. This indicates a bug in Scala serialization.",
"Invalid delete content type '{}'",
other
)))
}
Expand All @@ -2817,7 +2811,6 @@ fn parse_file_scan_tasks(
})
.collect::<Result<Vec<_>, _>>()?;

// Partition data pool is in protobuf messages
let results: Result<Vec<_>, _> = proto_tasks
.iter()
.map(|proto_task| {
Expand Down Expand Up @@ -2851,7 +2844,7 @@ fn parse_file_scan_tasks(
};

let bound_predicate = if let Some(idx) = proto_task.residual_idx {
proto_scan
proto_common
.residual_pool
.get(idx as usize)
.and_then(convert_spark_expr_to_predicate)
Expand All @@ -2871,24 +2864,22 @@ fn parse_file_scan_tasks(
};

let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx {
// Get partition data from protobuf pool
let partition_data_proto = proto_scan
let partition_data_proto = proto_common
.partition_data_pool
.get(partition_data_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Invalid partition_data_idx: {} (pool size: {})",
partition_data_idx,
proto_scan.partition_data_pool.len()
proto_common.partition_data_pool.len()
))
})?;

// Convert protobuf PartitionData to iceberg Struct
match partition_data_to_struct(partition_data_proto) {
Ok(s) => Some(s),
Err(e) => {
return Err(ExecutionError::GeneralError(format!(
"Failed to deserialize partition data from protobuf: {}",
"Failed to deserialize partition data: {}",
e
)))
}
Expand All @@ -2907,14 +2898,14 @@ fn parse_file_scan_tasks(
.and_then(|idx| name_mapping_cache.get(idx as usize))
.and_then(|opt| opt.clone());

let project_field_ids = proto_scan
let project_field_ids = proto_common
.project_field_ids_pool
.get(proto_task.project_field_ids_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Invalid project_field_ids_idx: {} (pool size: {})",
proto_task.project_field_ids_idx,
proto_scan.project_field_ids_pool.len()
proto_common.project_field_ids_pool.len()
))
})?
.field_ids
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod spark_partitioning {

// Include generated modules from .proto files.
#[allow(missing_docs)]
#[allow(clippy::large_enum_variant)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably have a follow up issue to revisit the choice to ignore this warning

pub mod spark_operator {
include!(concat!("generated", "/spark.spark_operator.rs"));
}
Expand Down
47 changes: 24 additions & 23 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -156,28 +156,34 @@ message PartitionData {
repeated PartitionValue values = 1;
}

message IcebergScan {
// Schema to read
repeated SparkStructField required_schema = 1;

// Common data shared by all partitions in split mode (sent once, captured in closure)
message IcebergScanCommon {
// Catalog-specific configuration for FileIO (credentials, S3/GCS config, etc.)
map<string, string> catalog_properties = 2;

// Pre-planned file scan tasks grouped by Spark partition
repeated IcebergFilePartition file_partitions = 3;
map<string, string> catalog_properties = 1;

// Table metadata file path for FileIO initialization
string metadata_location = 4;
string metadata_location = 2;

// Schema to read
repeated SparkStructField required_schema = 3;

// Deduplication pools - shared data referenced by index from tasks
repeated string schema_pool = 5;
repeated string partition_type_pool = 6;
repeated string partition_spec_pool = 7;
repeated string name_mapping_pool = 8;
repeated ProjectFieldIdList project_field_ids_pool = 9;
repeated PartitionData partition_data_pool = 10;
repeated DeleteFileList delete_files_pool = 11;
repeated spark.spark_expression.Expr residual_pool = 12;
// Deduplication pools (must contain all entries for cross-partition deduplication)
repeated string schema_pool = 4;
repeated string partition_type_pool = 5;
repeated string partition_spec_pool = 6;
repeated string name_mapping_pool = 7;
repeated ProjectFieldIdList project_field_ids_pool = 8;
repeated PartitionData partition_data_pool = 9;
repeated DeleteFileList delete_files_pool = 10;
repeated spark.spark_expression.Expr residual_pool = 11;
}

message IcebergScan {
// Common data shared across partitions (pools, metadata, catalog props)
IcebergScanCommon common = 1;

// Single partition's file scan tasks
repeated IcebergFileScanTask file_scan_tasks = 2;
}

// Helper message for deduplicating field ID lists
Expand All @@ -190,11 +196,6 @@ message DeleteFileList {
repeated IcebergDeleteFile delete_files = 1;
}

// Groups FileScanTasks for a single Spark partition
message IcebergFilePartition {
repeated IcebergFileScanTask file_scan_tasks = 1;
}

// Iceberg FileScanTask containing data file, delete files, and residual filter
message IcebergFileScanTask {
// Data file path (e.g., s3://bucket/warehouse/db/table/data/00000-0-abc.parquet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ case class CometIcebergNativeScanMetadata(
table: Any,
metadataLocation: String,
nameMapping: Option[String],
tasks: java.util.List[_],
@transient tasks: java.util.List[_],
scanSchema: Any,
tableSchema: Any,
globalFieldIdMapping: Map[String, Int],
Expand Down
Loading
Loading