Skip to content

Commit 03cb0d3

Browse files
committed
Fix mv dependencies involving unrelated files
1 parent ec7e88a commit 03cb0d3

File tree

1 file changed

+79
-5
lines changed

1 file changed

+79
-5
lines changed

src/materialized/dependencies.rs

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,13 @@ fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet<usize>) -> R
639639
_ => unreachable!(),
640640
};
641641

642+
if new_projection.is_empty() {
643+
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
644+
produce_one_row: true,
645+
schema: Arc::new(DFSchema::empty()),
646+
}));
647+
}
648+
642649
TableScan::try_new(
643650
scan.table_name,
644651
scan.source,
@@ -1798,11 +1805,11 @@ mod test {
17981805
query_to_analyze: "SELECT column1 AS output FROM t3",
17991806
projection: &[],
18001807
expected_plan: vec![
1801-
"+--------------+-----------------------------+",
1802-
"| plan_type | plan |",
1803-
"+--------------+-----------------------------+",
1804-
"| logical_plan | TableScan: t3 projection=[] |",
1805-
"+--------------+-----------------------------+",
1808+
"+--------------+-----------------------+",
1809+
"| plan_type | plan |",
1810+
"+--------------+-----------------------+",
1811+
"| logical_plan | EmptyRelation: rows=1 |",
1812+
"+--------------+-----------------------+",
18061813
],
18071814
expected_output: vec![
18081815
"++",
@@ -2051,6 +2058,73 @@ mod test {
20512058
Ok(())
20522059
}
20532060

2061+
#[tokio::test]
2062+
async fn test_cross_join_unrelated_files() -> Result<()> {
2063+
let context = setup().await?;
2064+
2065+
// Test case: Cross join where only columns from left table (t1) are selected
2066+
// The cross join with t3 affects cardinality but we don't select any t3 columns
2067+
// Expected: Only files from t1 should be in dependencies, not from t3
2068+
// BUG: Currently t3 files are incorrectly included in dependencies
2069+
let query = "SELECT t1.column1, t1.column2 FROM t1 CROSS JOIN t3";
2070+
2071+
let plan = context.sql(query).await?.into_optimized_plan()?;
2072+
2073+
println!("Original plan:\n{}", plan.display_indent());
2074+
2075+
// We're partitioning on column1 which only comes from t1
2076+
let partition_col_indices: HashSet<usize> = [0].into_iter().collect(); // column1 is at index 0
2077+
2078+
let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?;
2079+
println!("After pushdown:\n{}", analyzed.display_indent());
2080+
2081+
// Register materialized view
2082+
context.register_table(
2083+
"mv_cross_join",
2084+
Arc::new(MockMaterializedView {
2085+
table_path: ListingTableUrl::parse("s3://mv_cross_join/").unwrap(),
2086+
partition_columns: vec!["column1".to_string()],
2087+
static_partition_columns: None,
2088+
query: plan,
2089+
file_ext: ".parquet",
2090+
}),
2091+
)?;
2092+
2093+
// Add file metadata for the MV
2094+
context.sql(
2095+
"INSERT INTO file_metadata VALUES
2096+
('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2021/data.01.parquet', '2023-07-12T16:00:00Z', 0),
2097+
('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2022/data.01.parquet', '2023-07-12T16:00:00Z', 0),
2098+
('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0)"
2099+
).await?.collect().await?;
2100+
2101+
// Get dependencies
2102+
let df = context
2103+
.sql("SELECT * FROM mv_dependencies('mv_cross_join', 'v2')")
2104+
.await?;
2105+
let batches = df.collect().await?;
2106+
2107+
// Print the actual dependencies for debugging
2108+
println!("Actual dependencies:");
2109+
println!("{}", pretty_format_batches(&batches)?);
2110+
2111+
// Expected: Only t1 files should be in dependencies, NOT t3 files
2112+
// This test currently FAILS because t3 files are incorrectly included
2113+
let expected = [
2114+
"+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
2115+
"| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
2116+
"+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
2117+
"| s3://mv_cross_join/column1=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |",
2118+
"| s3://mv_cross_join/column1=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |",
2119+
"| s3://mv_cross_join/column1=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |",
2120+
"+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
2121+
];
2122+
2123+
assert_batches_sorted_eq!(expected, &batches);
2124+
2125+
Ok(())
2126+
}
2127+
20542128
#[test]
20552129
fn test_pushdown_unnest_guard_partition_date_only() -> Result<()> {
20562130
// This test simulates a simplified MV scenario:

0 commit comments

Comments
 (0)