Skip to content

Commit ca67edc

Browse files
authored
[Proto]: Serialization support for AsyncFuncExec (#19118)
## Which issue does this PR close? Closes #19112 ## Rationale for this change Use async functions with Ballista ## What changes are included in this PR? - New `AsyncFuncExecNode` proto definition - to/from glue - A roundtrip test ## Are these changes tested? n/a ## Are there any user-facing changes? n/a
1 parent dc4e3ab commit ca67edc

File tree

8 files changed

+314
-4
lines changed

8 files changed

+314
-4
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-plan/src/async_func.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,14 @@ impl AsyncFuncExec {
100100
input.boundedness(),
101101
))
102102
}
103+
104+
pub fn async_exprs(&self) -> &[Arc<AsyncFuncExpr>] {
105+
&self.async_exprs
106+
}
107+
108+
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
109+
&self.input
110+
}
103111
}
104112

105113
impl DisplayAs for AsyncFuncExec {

datafusion/proto/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ serde = { version = "1.0", optional = true }
7373
serde_json = { workspace = true, optional = true }
7474

7575
[dev-dependencies]
76+
async-trait = { workspace = true }
7677
datafusion = { workspace = true, default-features = false, features = [
7778
"sql",
7879
"datetime_expressions",

datafusion/proto/proto/datafusion.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,7 @@ message PhysicalPlanNode {
748748
GenerateSeriesNode generate_series = 33;
749749
SortMergeJoinExecNode sort_merge_join = 34;
750750
MemoryScanExecNode memory_scan = 35;
751+
AsyncFuncExecNode async_func = 36;
751752
}
752753
}
753754

@@ -1393,3 +1394,9 @@ message SortMergeJoinExecNode {
13931394
repeated SortExprNode sort_options = 6;
13941395
datafusion_common.NullEquality null_equality = 7;
13951396
}
1397+
1398+
message AsyncFuncExecNode {
1399+
PhysicalPlanNode input = 1;
1400+
repeated PhysicalExprNode async_exprs = 2;
1401+
repeated string async_expr_names = 3;
1402+
}

datafusion/proto/src/generated/pbjson.rs

Lines changed: 141 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 12 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec};
100100
use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
101101
use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr};
102102

103+
use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr;
104+
use datafusion_physical_plan::async_func::AsyncFuncExec;
103105
use prost::bytes::BufMut;
104106
use prost::Message;
105107

@@ -251,6 +253,9 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
251253
PhysicalPlanType::SortMergeJoin(sort_join) => {
252254
self.try_into_sort_join(sort_join, ctx, extension_codec)
253255
}
256+
PhysicalPlanType::AsyncFunc(async_func) => {
257+
self.try_into_async_func_physical_plan(async_func, ctx, extension_codec)
258+
}
254259
}
255260
}
256261

@@ -462,6 +467,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
462467
}
463468
}
464469

470+
if let Some(exec) = plan.downcast_ref::<AsyncFuncExec>() {
471+
return protobuf::PhysicalPlanNode::try_from_async_func_exec(
472+
exec,
473+
extension_codec,
474+
);
475+
}
476+
465477
let mut buf: Vec<u8> = vec![];
466478
match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
467479
Ok(_) => {
@@ -1972,6 +1984,44 @@ impl protobuf::PhysicalPlanNode {
19721984
Ok(Arc::new(CooperativeExec::new(input)))
19731985
}
19741986

1987+
fn try_into_async_func_physical_plan(
1988+
&self,
1989+
async_func: &protobuf::AsyncFuncExecNode,
1990+
ctx: &TaskContext,
1991+
extension_codec: &dyn PhysicalExtensionCodec,
1992+
) -> Result<Arc<dyn ExecutionPlan>> {
1993+
let input: Arc<dyn ExecutionPlan> =
1994+
into_physical_plan(&async_func.input, ctx, extension_codec)?;
1995+
1996+
if async_func.async_exprs.len() != async_func.async_expr_names.len() {
1997+
return internal_err!(
1998+
"AsyncFuncExecNode async_exprs length does not match async_expr_names"
1999+
);
2000+
}
2001+
2002+
let async_exprs = async_func
2003+
.async_exprs
2004+
.iter()
2005+
.zip(async_func.async_expr_names.iter())
2006+
.map(|(expr, name)| {
2007+
let physical_expr = parse_physical_expr(
2008+
expr,
2009+
ctx,
2010+
input.schema().as_ref(),
2011+
extension_codec,
2012+
)?;
2013+
2014+
Ok(Arc::new(AsyncFuncExpr::try_new(
2015+
name.clone(),
2016+
physical_expr,
2017+
input.schema().as_ref(),
2018+
)?))
2019+
})
2020+
.collect::<Result<Vec<_>>>()?;
2021+
2022+
Ok(Arc::new(AsyncFuncExec::try_new(async_exprs, input)?))
2023+
}
2024+
19752025
fn try_from_explain_exec(
19762026
exec: &ExplainExec,
19772027
_extension_codec: &dyn PhysicalExtensionCodec,
@@ -3222,6 +3272,34 @@ impl protobuf::PhysicalPlanNode {
32223272

32233273
Ok(None)
32243274
}
3275+
3276+
fn try_from_async_func_exec(
3277+
exec: &AsyncFuncExec,
3278+
extension_codec: &dyn PhysicalExtensionCodec,
3279+
) -> Result<Self> {
3280+
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
3281+
Arc::clone(exec.input()),
3282+
extension_codec,
3283+
)?;
3284+
3285+
let mut async_exprs = vec![];
3286+
let mut async_expr_names = vec![];
3287+
3288+
for async_expr in exec.async_exprs() {
3289+
async_exprs.push(serialize_physical_expr(&async_expr.func, extension_codec)?);
3290+
async_expr_names.push(async_expr.name.clone())
3291+
}
3292+
3293+
Ok(protobuf::PhysicalPlanNode {
3294+
physical_plan_type: Some(PhysicalPlanType::AsyncFunc(Box::new(
3295+
protobuf::AsyncFuncExecNode {
3296+
input: Some(Box::new(input)),
3297+
async_exprs,
3298+
async_expr_names,
3299+
},
3300+
))),
3301+
})
3302+
}
32253303
}
32263304

32273305
pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
@@ -3439,7 +3517,6 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
34393517
fn into_physical_plan(
34403518
node: &Option<Box<protobuf::PhysicalPlanNode>>,
34413519
ctx: &TaskContext,
3442-
34433520
extension_codec: &dyn PhysicalExtensionCodec,
34443521
) -> Result<Arc<dyn ExecutionPlan>> {
34453522
if let Some(field) = node {

0 commit comments

Comments
 (0)