From a4484bb0a0c02461f4795d6df495a730d324f1f9 Mon Sep 17 00:00:00 2001
From: noorall <863485501@qq.com>
Date: Fri, 27 Feb 2026 14:25:15 +0800
Subject: [PATCH] [FLINK-39150][runtime] Fix join operator crashes jobs when
using custom types or custom type serializers
---
.../flink-end-to-end-tests-table-api/pom.xml | 26 ++++
.../test/join/JoinWithCustomTypeExample.java | 76 ++++++++++
.../flink/table/test/join/TestClass.java | 22 +++
flink-end-to-end-tests/run-nightly-tests.sh | 2 +
.../test-scripts/test_table_api.sh | 37 +++++
.../table/planner/loader/PlannerModule.java | 3 +-
.../AdaptiveJoinOperatorGenerator.java | 79 ++---------
.../exec/batch/BatchExecAdaptiveJoin.java | 43 ++++--
.../AdaptiveJoinOperatorGeneratorTest.java | 20 +--
.../operators/join/adaptive/AdaptiveJoin.java | 4 +-
.../join/adaptive/AdaptiveJoinGenerator.java | 50 +++++++
.../adaptive/AdaptiveJoinOperatorFactory.java | 130 ++++++++++++------
12 files changed, 360 insertions(+), 132 deletions(-)
create mode 100644 flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java
create mode 100644 flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java
create mode 100755 flink-end-to-end-tests/test-scripts/test_table_api.sh
create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
index bec0aba2aba2e..45346e8e3e572 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
@@ -104,4 +104,30 @@ under the License.
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ JoinWithCustomTypeExample
+ package
+
+ shade
+
+
+ JoinWithCustomTypeExample
+
+
+ org.apache.flink.table.test.join.JoinWithCustomTypeExample
+
+
+
+
+
+
+
+
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java
new file mode 100644
index 0000000000000..a598a72f91533
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.join;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Example application that tests JoinWithCustomType with Table API. */
+public class JoinWithCustomTypeExample {
+
+ private static final TypeInformation INT = Types.INT;
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ runTest(env, tEnv);
+ }
+
+ private static void runTest(StreamExecutionEnvironment env, StreamTableEnvironment tEnv)
+ throws Exception {
+ System.out.println("Running join with custom type test...");
+
+ TestClass value = new TestClass();
+ TypeInformation valueTypeInfo =
+ new PojoTypeInfo<>(TestClass.class, new ArrayList<>());
+
+ Table table1 =
+ tEnv.fromDataStream(
+ env.fromData(Row.of(1)).returns(Types.ROW_NAMED(new String[] {"id"}, INT)));
+
+ Table table2 =
+ tEnv.fromDataStream(
+ env.fromData(Row.of(1, value))
+ .returns(
+ Types.ROW_NAMED(
+ new String[] {"id2", "value"},
+ INT,
+ valueTypeInfo)));
+
+ tEnv.toDataStream(table1.leftOuterJoin(table2, $("id").isEqual($("id2"))))
+ .sinkTo(new DiscardingSink<>());
+
+ env.execute("joinWithCustomType");
+ System.out.println("Job joinWithCustomType completed successfully!");
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java
new file mode 100644
index 0000000000000..dad5872c0fd24
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.join;
+
+/** Custom class for testing join with custom type. */
+public class TestClass {}
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 92304ef616a0a..4ed08570951a2 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -223,6 +223,8 @@ function run_group_2 {
# run_test "PyFlink YARN application on Docker test" "$END_TO_END_DIR/test-scripts/test_pyflink_yarn.sh" "skip_check_exceptions"
# fi
+ run_test "Flink Table API end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_api.sh"
+
################################################################################
# Sticky Scheduling
################################################################################
diff --git a/flink-end-to-end-tests/test-scripts/test_table_api.sh b/flink-end-to-end-tests/test-scripts/test_table_api.sh
new file mode 100755
index 0000000000000..450eaa17e301f
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_table_api.sh
@@ -0,0 +1,37 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# End to end test for join with custom type examples. It only verifies that the job can be submitted
+# and run correctly.
+#
+# Usage:
+# FLINK_DIR= flink-end-to-end-tests/test-scripts/test_table_api.sh
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-end-to-end-tests-table-api/target/JoinWithCustomTypeExample.jar
+
+$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR
+EXIT_CODE=$?
+
+stop_cluster
+
+exit $EXIT_CODE
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
index 289a3f23fa60f..1a8375b5039ef 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
@@ -34,6 +34,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
+import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -135,7 +136,7 @@ private PlannerModule() {
}
}
- public ClassLoader getSubmoduleClassLoader() {
+ public URLClassLoader getSubmoduleClassLoader() {
return this.submoduleClassLoader;
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java
index 78981a5e8ce98..b69220399d452 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java
@@ -20,30 +20,21 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.table.planner.plan.utils.HashJoinOperatorUtil;
-import org.apache.flink.table.planner.plan.utils.OperatorType;
import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
-import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinGenerator;
import org.apache.flink.table.types.logical.RowType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
/**
- * Implementation class for {@link AdaptiveJoin}. It can selectively generate broadcast hash join,
- * shuffle hash join or shuffle merge join operator based on actual conditions.
+ * Implementation class for {@link AdaptiveJoinGenerator}. It can selectively generate broadcast
+ * hash join, shuffle hash join or shuffle merge join operator based on actual conditions.
*/
-public class AdaptiveJoinOperatorGenerator implements AdaptiveJoin {
- private static final Logger LOG = LoggerFactory.getLogger(AdaptiveJoinOperatorGenerator.class);
+public class AdaptiveJoinOperatorGenerator implements AdaptiveJoinGenerator {
private final int[] leftKeys;
private final int[] rightKeys;
- private final FlinkJoinType joinType;
-
private final boolean[] filterNulls;
private final RowType leftType;
@@ -64,18 +55,9 @@ public class AdaptiveJoinOperatorGenerator implements AdaptiveJoin {
private final long managedMemory;
- private final OperatorType originalJoin;
-
- private boolean leftIsBuild;
-
- private boolean originalLeftIsBuild;
-
- private boolean isBroadcastJoin;
-
public AdaptiveJoinOperatorGenerator(
int[] leftKeys,
int[] rightKeys,
- FlinkJoinType joinType,
boolean[] filterNulls,
RowType leftType,
RowType rightType,
@@ -85,12 +67,9 @@ public AdaptiveJoinOperatorGenerator(
long leftRowCount,
long rightRowCount,
boolean tryDistinctBuildRow,
- long managedMemory,
- boolean leftIsBuild,
- OperatorType originalJoin) {
+ long managedMemory) {
this.leftKeys = leftKeys;
this.rightKeys = rightKeys;
- this.joinType = joinType;
this.filterNulls = filterNulls;
this.leftType = leftType;
this.rightType = rightType;
@@ -101,23 +80,17 @@ public AdaptiveJoinOperatorGenerator(
this.rightRowCount = rightRowCount;
this.tryDistinctBuildRow = tryDistinctBuildRow;
this.managedMemory = managedMemory;
- checkState(
- originalJoin == OperatorType.ShuffleHashJoin
- || originalJoin == OperatorType.SortMergeJoin,
- String.format(
- "Adaptive join "
- + "currently only supports adaptive optimization for ShuffleHashJoin and "
- + "SortMergeJoin, not including %s.",
- originalJoin.toString()));
- this.leftIsBuild = leftIsBuild;
- this.originalLeftIsBuild = leftIsBuild;
- this.originalJoin = originalJoin;
}
@Override
public StreamOperatorFactory> genOperatorFactory(
- ClassLoader classLoader, ReadableConfig config) {
- if (isBroadcastJoin || originalJoin == OperatorType.ShuffleHashJoin) {
+ ClassLoader classLoader,
+ ReadableConfig config,
+ FlinkJoinType joinType,
+ boolean originIsSortMergeJoin,
+ boolean isBroadcastJoin,
+ boolean leftIsBuild) {
+ if (isBroadcastJoin || !originIsSortMergeJoin) {
return HashJoinOperatorUtil.generateOperatorFactory(
leftKeys,
rightKeys,
@@ -150,32 +123,4 @@ public StreamOperatorFactory> genOperatorFactory(
classLoader);
}
}
-
- @Override
- public FlinkJoinType getJoinType() {
- return joinType;
- }
-
- @Override
- public void markAsBroadcastJoin(boolean canBroadcast, boolean leftIsBuild) {
- this.isBroadcastJoin = canBroadcast;
- this.leftIsBuild = leftIsBuild;
- }
-
- @Override
- public boolean shouldReorderInputs() {
- // Sort merge join requires the left side to be read first if the broadcast threshold is not
- // met.
- if (!isBroadcastJoin && originalJoin == OperatorType.SortMergeJoin) {
- return false;
- }
-
- if (leftIsBuild != originalLeftIsBuild) {
- LOG.info(
- "The build side of the adaptive join has been updated. Compile phase build side: {}, Runtime build side: {}.",
- originalLeftIsBuild ? "left" : "right",
- leftIsBuild ? "left" : "right");
- }
- return !leftIsBuild;
- }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java
index bf1122bde8696..a103a8e349bb9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.table.api.TableException;
@@ -36,7 +37,7 @@
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.OperatorType;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
-import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
@@ -45,6 +46,8 @@
import java.io.IOException;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkState;
+
/** {@link BatchExecNode} for adaptive join. */
public class BatchExecAdaptiveJoin extends ExecNodeBase
implements BatchExecNode, SingleTransformationTranslator {
@@ -87,6 +90,14 @@ public BatchExecAdaptiveJoin(
this.leftIsBuild = leftIsBuild;
this.tryDistinctBuildRow = tryDistinctBuildRow;
this.description = description;
+ checkState(
+ originalJoin == OperatorType.ShuffleHashJoin
+ || originalJoin == OperatorType.SortMergeJoin,
+ String.format(
+ "Adaptive join "
+ + "currently only supports adaptive optimization for ShuffleHashJoin and "
+ + "SortMergeJoin, not including %s.",
+ originalJoin.toString()));
this.originalJoin = originalJoin;
}
@@ -113,11 +124,10 @@ protected Transformation translateToPlanInternal(
leftType,
rightType);
- AdaptiveJoinOperatorGenerator adaptiveJoin =
+ AdaptiveJoinOperatorGenerator adaptiveJoinGenerator =
new AdaptiveJoinOperatorGenerator(
joinSpec.getLeftKeys(),
joinSpec.getRightKeys(),
- joinSpec.getJoinType(),
joinSpec.getFilterNulls(),
leftType,
rightType,
@@ -127,16 +137,19 @@ protected Transformation translateToPlanInternal(
estimatedLeftRowCount,
estimatedRightRowCount,
tryDistinctBuildRow,
- managedMemory,
- leftIsBuild,
- originalJoin);
+ managedMemory);
return ExecNodeUtil.createTwoInputTransformation(
leftInputTransform,
rightInputTransform,
createTransformationName(config),
createTransformationDescription(config),
- getAdaptiveJoinOperatorFactory(adaptiveJoin),
+ getAdaptiveJoinOperatorFactory(
+ adaptiveJoinGenerator,
+ config.get(CoreOptions.CHECK_LEAKED_CLASSLOADER),
+ joinSpec.getJoinType(),
+ originalJoin,
+ leftIsBuild),
InternalTypeInfo.of(getOutputType()),
// Given that the probe side might be decided at runtime, we choose the larger
// parallelism here.
@@ -146,10 +159,20 @@ protected Transformation translateToPlanInternal(
}
private StreamOperatorFactory getAdaptiveJoinOperatorFactory(
- AdaptiveJoin adaptiveJoin) {
+ AdaptiveJoinOperatorGenerator adaptiveJoinGenerator,
+ boolean checkClassLoaderLeak,
+ FlinkJoinType joinType,
+ OperatorType originalJoin,
+ boolean leftIsBuild) {
try {
- byte[] adaptiveJoinSerialized = InstantiationUtil.serializeObject(adaptiveJoin);
- return new AdaptiveJoinOperatorFactory<>(adaptiveJoinSerialized);
+ byte[] adaptiveJoinGeneratorSerialized =
+ InstantiationUtil.serializeObject(adaptiveJoinGenerator);
+ return new AdaptiveJoinOperatorFactory<>(
+ adaptiveJoinGeneratorSerialized,
+ joinType,
+ originalJoin == OperatorType.SortMergeJoin,
+ leftIsBuild,
+ checkClassLoaderLeak);
} catch (IOException e) {
throw new TableException("The adaptive join operator failed to serialize.", e);
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java
index 3baa5030cbed1..d554465993b7c 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java
@@ -28,7 +28,7 @@
import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
import org.apache.flink.table.runtime.operators.join.Int2HashJoinOperatorTestBase;
import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
-import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinGenerator;
import org.apache.flink.table.runtime.util.UniformBinaryRowGenerator;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -268,10 +268,15 @@ public Object newOperator(
boolean buildLeft,
boolean isBroadcast,
OperatorType operatorType) {
- AdaptiveJoin adaptiveJoin = genAdaptiveJoin(flinkJoinType, operatorType);
- adaptiveJoin.markAsBroadcastJoin(isBroadcast, buildLeft);
+ AdaptiveJoinGenerator adaptiveJoinGenerator = genAdaptiveJoinGenerator();
- return adaptiveJoin.genOperatorFactory(getClass().getClassLoader(), new Configuration());
+ return adaptiveJoinGenerator.genOperatorFactory(
+ getClass().getClassLoader(),
+ new Configuration(),
+ flinkJoinType,
+ operatorType == SortMergeJoin,
+ isBroadcast,
+ buildLeft);
}
public void assertOperatorType(Object operator, OperatorType expectedOperatorType) {
@@ -301,7 +306,7 @@ public void assertOperatorType(Object operator, OperatorType expectedOperatorTyp
}
}
- public AdaptiveJoin genAdaptiveJoin(FlinkJoinType flinkJoinType, OperatorType operatorType) {
+ public AdaptiveJoinGenerator genAdaptiveJoinGenerator() {
GeneratedJoinCondition condFuncCode =
new GeneratedJoinCondition(
Int2HashJoinOperatorTestBase.MyJoinCondition.class.getCanonicalName(),
@@ -316,7 +321,6 @@ public JoinCondition newInstance(ClassLoader classLoader) {
return new AdaptiveJoinOperatorGenerator(
new int[] {0},
new int[] {0},
- flinkJoinType,
new boolean[] {true},
RowType.of(new IntType(), new IntType()),
RowType.of(new IntType(), new IntType()),
@@ -326,8 +330,6 @@ public JoinCondition newInstance(ClassLoader classLoader) {
20,
10000,
false,
- TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY.defaultValue().getBytes(),
- true,
- operatorType);
+ TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY.defaultValue().getBytes());
}
}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java
index 0d289ae9639b3..9ca9759038933 100755
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java
@@ -32,11 +32,11 @@ public interface AdaptiveJoin extends Serializable {
* Generates a StreamOperatorFactory for this join operator using the provided ClassLoader and
* config.
*
- * @param classLoader the ClassLoader to be used for loading classes.
+ * @param userClassLoader the user ClassLoader to be used for loading classes.
* @param config the configuration to be applied for creating the operator factory.
* @return a StreamOperatorFactory instance.
*/
- StreamOperatorFactory> genOperatorFactory(ClassLoader classLoader, ReadableConfig config);
+ StreamOperatorFactory> genOperatorFactory(ClassLoader userClassLoader, ReadableConfig config);
/**
* Get the join type of the join operator.
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java
new file mode 100644
index 0000000000000..02496ebb7b24c
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.adaptive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+
+import java.io.Serializable;
+
+/** Interface for implementing an adaptive join operator generator. */
+@Internal
+public interface AdaptiveJoinGenerator extends Serializable {
+ /**
+ * Generates a StreamOperatorFactory for this join operator using the provided ClassLoader and
+ * parameters.
+ *
+ * @param classLoader the ClassLoader to be used for loading classes.
+ * @param config the configuration to be applied for creating the operator factory.
+ * @param joinType the join type.
+ * @param originIsSortMergeJoin whether the join operator is a SortMergeJoin.
+ * @param isBroadcastJoin whether the join operator can be optimized to broadcast hash join.
+ * @param leftIsBuild whether the left input side is the build side.
+ * @return a StreamOperatorFactory instance.
+ */
+ StreamOperatorFactory> genOperatorFactory(
+ ClassLoader classLoader,
+ ReadableConfig config,
+ FlinkJoinType joinType,
+ boolean originIsSortMergeJoin,
+ boolean isBroadcastJoin,
+ boolean leftIsBuild);
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java
index 3efac4e3b1b7e..7246f3718c07f 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java
@@ -25,12 +25,17 @@
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.table.planner.loader.PlannerModule;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.IOException;
+import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -46,49 +51,112 @@
@Internal
public class AdaptiveJoinOperatorFactory extends AbstractStreamOperatorFactory
implements AdaptiveJoin {
- private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(AdaptiveJoinOperatorFactory.class);
- private final byte[] adaptiveJoinSerialized;
+ private static final long serialVersionUID = 1L;
- @Nullable private transient AdaptiveJoin adaptiveJoin;
+ private final byte[] adaptiveJoinGeneratorSerialized;
@Nullable private StreamOperatorFactory finalFactory;
- public AdaptiveJoinOperatorFactory(byte[] adaptiveJoinSerialized) {
- this.adaptiveJoinSerialized = checkNotNull(adaptiveJoinSerialized);
+ private final boolean checkClassLoaderLeak;
+
+ private final FlinkJoinType joinType;
+
+ private final boolean originIsSortMergeJoin;
+
+ private final boolean originalLeftIsBuild;
+
+ private boolean leftIsBuild;
+
+ private boolean isBroadcastJoin;
+
+ public AdaptiveJoinOperatorFactory(
+ byte[] adaptiveJoinGeneratorSerialized,
+ FlinkJoinType joinType,
+ boolean originIsSortMergeJoin,
+ boolean leftIsBuild,
+ boolean checkClassLoaderLeak) {
+ this.adaptiveJoinGeneratorSerialized = checkNotNull(adaptiveJoinGeneratorSerialized);
+ this.joinType = joinType;
+ this.originIsSortMergeJoin = originIsSortMergeJoin;
+ this.leftIsBuild = leftIsBuild;
+ this.originalLeftIsBuild = leftIsBuild;
+ this.checkClassLoaderLeak = checkClassLoaderLeak;
}
@Override
public StreamOperatorFactory> genOperatorFactory(
- ClassLoader classLoader, ReadableConfig config) {
- checkAndLazyInitialize();
+ ClassLoader userClassLoader, ReadableConfig config) {
+ // In some IT/E2E tests, plannerModule may be null, so we handle it specially to avoid
+ // breaking these tests.
+ PlannerModule plannerModule = null;
+ try {
+ plannerModule = PlannerModule.getInstance();
+ } catch (Throwable throwable) {
+ LOG.warn(
+ "Failed to get PlannerModule instance, may cause adaptive join deserialization failure.",
+ throwable);
+ }
+
+ ClassLoader classLoader =
+ plannerModule == null
+ ? userClassLoader
+ : FlinkUserCodeClassLoaders.parentFirst(
+ plannerModule.getSubmoduleClassLoader().getURLs(),
+ userClassLoader,
+ NOOP_EXCEPTION_HANDLER,
+ checkClassLoaderLeak);
+
+ AdaptiveJoinGenerator adaptiveJoinGenerator;
+ try {
+ adaptiveJoinGenerator =
+ InstantiationUtil.deserializeObject(
+ adaptiveJoinGeneratorSerialized, classLoader);
+ } catch (ClassNotFoundException | IOException e) {
+ throw new RuntimeException(
+ "Failed to deserialize AdaptiveJoinGenerator instance. "
+ + "Please check whether the flink-table-planner-loader.jar is in the classpath.",
+ e);
+ }
this.finalFactory =
- (StreamOperatorFactory) adaptiveJoin.genOperatorFactory(classLoader, config);
+ (StreamOperatorFactory)
+ adaptiveJoinGenerator.genOperatorFactory(
+ classLoader,
+ config,
+ joinType,
+ originIsSortMergeJoin,
+ isBroadcastJoin,
+ leftIsBuild);
return this.finalFactory;
}
@Override
public FlinkJoinType getJoinType() {
- checkAndLazyInitialize();
- return adaptiveJoin.getJoinType();
+ return joinType;
}
@Override
- public void markAsBroadcastJoin(boolean canBeBroadcast, boolean leftIsBuild) {
- checkAndLazyInitialize();
- adaptiveJoin.markAsBroadcastJoin(canBeBroadcast, leftIsBuild);
+ public void markAsBroadcastJoin(boolean canBroadcast, boolean leftIsBuild) {
+ this.isBroadcastJoin = canBroadcast;
+ this.leftIsBuild = leftIsBuild;
}
@Override
public boolean shouldReorderInputs() {
- checkAndLazyInitialize();
- return adaptiveJoin.shouldReorderInputs();
- }
+ // Sort merge join requires the left side to be read first if the broadcast threshold is not
+ // met.
+ if (!isBroadcastJoin && originIsSortMergeJoin) {
+ return false;
+ }
- private void checkAndLazyInitialize() {
- if (this.adaptiveJoin == null) {
- lazyInitialize();
+ if (leftIsBuild != originalLeftIsBuild) {
+ LOG.info(
+ "The build side of the adaptive join has been updated. Compile phase build side: {}, Runtime build side: {}.",
+ originalLeftIsBuild ? "left" : "right",
+ leftIsBuild ? "left" : "right");
}
+ return !leftIsBuild;
}
@Override
@@ -113,28 +181,4 @@ public Class extends StreamOperator> getStreamOperatorClass(ClassLoader classL
"The method should not be invoked in the "
+ "adaptive join operator for batch jobs.");
}
-
- private void lazyInitialize() {
- if (!tryInitializeAdaptiveJoin(Thread.currentThread().getContextClassLoader())) {
- boolean isSuccess =
- tryInitializeAdaptiveJoin(
- PlannerModule.getInstance().getSubmoduleClassLoader());
- if (!isSuccess) {
- throw new RuntimeException(
- "Failed to deserialize AdaptiveJoin instance. "
- + "Please check whether the flink-table-planner-loader.jar is in the classpath.");
- }
- }
- }
-
- private boolean tryInitializeAdaptiveJoin(ClassLoader classLoader) {
- try {
- this.adaptiveJoin =
- InstantiationUtil.deserializeObject(adaptiveJoinSerialized, classLoader);
- } catch (ClassNotFoundException | IOException e) {
- return false;
- }
-
- return true;
- }
}