diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 060e1e0..50f806c 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -639,6 +639,13 @@ fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet) -> R _ => unreachable!(), }; + if new_projection.is_empty() { + return Ok(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: true, + schema: Arc::new(DFSchema::empty()), + })); + } + TableScan::try_new( scan.table_name, scan.source, @@ -1798,11 +1805,11 @@ mod test { query_to_analyze: "SELECT column1 AS output FROM t3", projection: &[], expected_plan: vec![ - "+--------------+-----------------------------+", - "| plan_type | plan |", - "+--------------+-----------------------------+", - "| logical_plan | TableScan: t3 projection=[] |", - "+--------------+-----------------------------+", + "+--------------+-----------------------+", + "| plan_type | plan |", + "+--------------+-----------------------+", + "| logical_plan | EmptyRelation: rows=1 |", + "+--------------+-----------------------+", ], expected_output: vec![ "++", @@ -2051,6 +2058,73 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_cross_join_unrelated_files() -> Result<()> { + let context = setup().await?; + + // Test case: Cross join where only columns from left table (t1) are selected + // The cross join with t3 affects cardinality but we don't select any t3 columns + // Expected: Only files from t1 should be in dependencies, not from t3 + // BUG: Currently t3 files are incorrectly included in dependencies + let query = "SELECT t1.column1, t1.column2 FROM t1 CROSS JOIN t3"; + + let plan = context.sql(query).await?.into_optimized_plan()?; + + println!("Original plan:\n{}", plan.display_indent()); + + // We're partitioning on column1 which only comes from t1 + let partition_col_indices: HashSet = [0].into_iter().collect(); // column1 is at index 0 + + let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?; + println!("After pushdown:\n{}", analyzed.display_indent()); + + // Register materialized view + context.register_table( + "mv_cross_join", + Arc::new(MockMaterializedView { + table_path: ListingTableUrl::parse("s3://mv_cross_join/").unwrap(), + partition_columns: vec!["column1".to_string()], + static_partition_columns: None, + query: plan, + file_ext: ".parquet", + }), + )?; + + // Add file metadata for the MV + context.sql( + "INSERT INTO file_metadata VALUES + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2021/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2022/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0)" + ).await?.collect().await?; + + // Get dependencies + let df = context + .sql("SELECT * FROM mv_dependencies('mv_cross_join', 'v2')") + .await?; + let batches = df.collect().await?; + + // Print the actual dependencies for debugging + println!("Actual dependencies:"); + println!("{}", pretty_format_batches(&batches)?); + + // Expected: Only t1 files should be in dependencies, NOT t3 files + // This test currently FAILS because t3 files are incorrectly included + let expected = [ + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| s3://mv_cross_join/column1=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://mv_cross_join/column1=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://mv_cross_join/column1=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + ]; + + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } + #[test] fn test_pushdown_unnest_guard_partition_date_only() -> Result<()> { // This test simulates a simplified MV scenario: