From a4484bb0a0c02461f4795d6df495a730d324f1f9 Mon Sep 17 00:00:00 2001 From: noorall <863485501@qq.com> Date: Fri, 27 Feb 2026 14:25:15 +0800 Subject: [PATCH] [FLINK-39150][runtime] Fix join operator crashes jobs when using custom types or custom type serializers --- .../flink-end-to-end-tests-table-api/pom.xml | 26 ++++ .../test/join/JoinWithCustomTypeExample.java | 76 ++++++++++ .../flink/table/test/join/TestClass.java | 22 +++ flink-end-to-end-tests/run-nightly-tests.sh | 2 + .../test-scripts/test_table_api.sh | 37 +++++ .../table/planner/loader/PlannerModule.java | 3 +- .../AdaptiveJoinOperatorGenerator.java | 79 ++--------- .../exec/batch/BatchExecAdaptiveJoin.java | 43 ++++-- .../AdaptiveJoinOperatorGeneratorTest.java | 20 +-- .../operators/join/adaptive/AdaptiveJoin.java | 4 +- .../join/adaptive/AdaptiveJoinGenerator.java | 50 +++++++ .../adaptive/AdaptiveJoinOperatorFactory.java | 130 ++++++++++++------ 12 files changed, 360 insertions(+), 132 deletions(-) create mode 100644 flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java create mode 100644 flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java create mode 100755 flink-end-to-end-tests/test-scripts/test_table_api.sh create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml index bec0aba2aba2e..45346e8e3e572 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml +++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml @@ -104,4 +104,30 @@ under the License. + + + + + org.apache.maven.plugins + maven-shade-plugin + + + JoinWithCustomTypeExample + package + + shade + + + JoinWithCustomTypeExample + + + org.apache.flink.table.test.join.JoinWithCustomTypeExample + + + + + + + + diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java new file mode 100644 index 0000000000000..a598a72f91533 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.join; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import java.util.ArrayList; + +import static org.apache.flink.table.api.Expressions.$; + +/** Example application that tests JoinWithCustomType with Table API. */ +public class JoinWithCustomTypeExample { + + private static final TypeInformation INT = Types.INT; + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + runTest(env, tEnv); + } + + private static void runTest(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) + throws Exception { + System.out.println("Running join with custom type test..."); + + TestClass value = new TestClass(); + TypeInformation valueTypeInfo = + new PojoTypeInfo<>(TestClass.class, new ArrayList<>()); + + Table table1 = + tEnv.fromDataStream( + env.fromData(Row.of(1)).returns(Types.ROW_NAMED(new String[] {"id"}, INT))); + + Table table2 = + tEnv.fromDataStream( + env.fromData(Row.of(1, value)) + .returns( + Types.ROW_NAMED( + new String[] {"id2", "value"}, + INT, + valueTypeInfo))); + + tEnv.toDataStream(table1.leftOuterJoin(table2, $("id").isEqual($("id2")))) + .sinkTo(new DiscardingSink<>()); + + env.execute("joinWithCustomType"); + System.out.println("Job joinWithCustomType completed successfully!"); + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java new file mode 100644 index 0000000000000..dad5872c0fd24 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test.join; + +/** Custom class for testing join with custom type. */ +public class TestClass {} diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 92304ef616a0a..4ed08570951a2 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -223,6 +223,8 @@ function run_group_2 { # run_test "PyFlink YARN application on Docker test" "$END_TO_END_DIR/test-scripts/test_pyflink_yarn.sh" "skip_check_exceptions" # fi + run_test "Flink Table API end-to-end test" "$END_TO_END_DIR/test-scripts/test_table_api.sh" + ################################################################################ # Sticky Scheduling ################################################################################ diff --git a/flink-end-to-end-tests/test-scripts/test_table_api.sh b/flink-end-to-end-tests/test-scripts/test_table_api.sh new file mode 100755 index 0000000000000..450eaa17e301f --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_table_api.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# End to end test for join with custom type examples. It only verifies that the job can be submitted +# and run correctly. +# +# Usage: +# FLINK_DIR= flink-end-to-end-tests/test-scripts/test_table_api.sh + +source "$(dirname "$0")"/common.sh + +start_cluster + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-end-to-end-tests-table-api/target/JoinWithCustomTypeExample.jar + +$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR +EXIT_CODE=$? + +stop_cluster + +exit $EXIT_CODE diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java index 289a3f23fa60f..1a8375b5039ef 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.net.URLClassLoader; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -135,7 +136,7 @@ private PlannerModule() { } } - public ClassLoader getSubmoduleClassLoader() { + public URLClassLoader getSubmoduleClassLoader() { return this.submoduleClassLoader; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java index 78981a5e8ce98..b69220399d452 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java @@ -20,30 +20,21 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.table.planner.plan.utils.HashJoinOperatorUtil; -import org.apache.flink.table.planner.plan.utils.OperatorType; import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.operators.join.FlinkJoinType; -import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin; +import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinGenerator; import org.apache.flink.table.types.logical.RowType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.flink.util.Preconditions.checkState; - /** - * Implementation class for {@link AdaptiveJoin}. It can selectively generate broadcast hash join, - * shuffle hash join or shuffle merge join operator based on actual conditions. + * Implementation class for {@link AdaptiveJoinGenerator}. It can selectively generate broadcast + * hash join, shuffle hash join or shuffle merge join operator based on actual conditions. */ -public class AdaptiveJoinOperatorGenerator implements AdaptiveJoin { - private static final Logger LOG = LoggerFactory.getLogger(AdaptiveJoinOperatorGenerator.class); +public class AdaptiveJoinOperatorGenerator implements AdaptiveJoinGenerator { private final int[] leftKeys; private final int[] rightKeys; - private final FlinkJoinType joinType; - private final boolean[] filterNulls; private final RowType leftType; @@ -64,18 +55,9 @@ public class AdaptiveJoinOperatorGenerator implements AdaptiveJoin { private final long managedMemory; - private final OperatorType originalJoin; - - private boolean leftIsBuild; - - private boolean originalLeftIsBuild; - - private boolean isBroadcastJoin; - public AdaptiveJoinOperatorGenerator( int[] leftKeys, int[] rightKeys, - FlinkJoinType joinType, boolean[] filterNulls, RowType leftType, RowType rightType, @@ -85,12 +67,9 @@ public AdaptiveJoinOperatorGenerator( long leftRowCount, long rightRowCount, boolean tryDistinctBuildRow, - long managedMemory, - boolean leftIsBuild, - OperatorType originalJoin) { + long managedMemory) { this.leftKeys = leftKeys; this.rightKeys = rightKeys; - this.joinType = joinType; this.filterNulls = filterNulls; this.leftType = leftType; this.rightType = rightType; @@ -101,23 +80,17 @@ public AdaptiveJoinOperatorGenerator( this.rightRowCount = rightRowCount; this.tryDistinctBuildRow = tryDistinctBuildRow; this.managedMemory = managedMemory; - checkState( - originalJoin == OperatorType.ShuffleHashJoin - || originalJoin == OperatorType.SortMergeJoin, - String.format( - "Adaptive join " - + "currently only supports adaptive optimization for ShuffleHashJoin and " - + "SortMergeJoin, not including %s.", - originalJoin.toString())); - this.leftIsBuild = leftIsBuild; - this.originalLeftIsBuild = leftIsBuild; - this.originalJoin = originalJoin; } @Override public StreamOperatorFactory genOperatorFactory( - ClassLoader classLoader, ReadableConfig config) { - if (isBroadcastJoin || originalJoin == OperatorType.ShuffleHashJoin) { + ClassLoader classLoader, + ReadableConfig config, + FlinkJoinType joinType, + boolean originIsSortMergeJoin, + boolean isBroadcastJoin, + boolean leftIsBuild) { + if (isBroadcastJoin || !originIsSortMergeJoin) { return HashJoinOperatorUtil.generateOperatorFactory( leftKeys, rightKeys, @@ -150,32 +123,4 @@ public StreamOperatorFactory genOperatorFactory( classLoader); } } - - @Override - public FlinkJoinType getJoinType() { - return joinType; - } - - @Override - public void markAsBroadcastJoin(boolean canBroadcast, boolean leftIsBuild) { - this.isBroadcastJoin = canBroadcast; - this.leftIsBuild = leftIsBuild; - } - - @Override - public boolean shouldReorderInputs() { - // Sort merge join requires the left side to be read first if the broadcast threshold is not - // met. - if (!isBroadcastJoin && originalJoin == OperatorType.SortMergeJoin) { - return false; - } - - if (leftIsBuild != originalLeftIsBuild) { - LOG.info( - "The build side of the adaptive join has been updated. Compile phase build side: {}, Runtime build side: {}.", - originalLeftIsBuild ? "left" : "right", - leftIsBuild ? "left" : "right"); - } - return !leftIsBuild; - } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java index bf1122bde8696..a103a8e349bb9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.batch; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.table.api.TableException; @@ -36,7 +37,7 @@ import org.apache.flink.table.planner.plan.utils.JoinUtil; import org.apache.flink.table.planner.plan.utils.OperatorType; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; -import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; @@ -45,6 +46,8 @@ import java.io.IOException; import java.util.List; +import static org.apache.flink.util.Preconditions.checkState; + /** {@link BatchExecNode} for adaptive join. */ public class BatchExecAdaptiveJoin extends ExecNodeBase implements BatchExecNode, SingleTransformationTranslator { @@ -87,6 +90,14 @@ public BatchExecAdaptiveJoin( this.leftIsBuild = leftIsBuild; this.tryDistinctBuildRow = tryDistinctBuildRow; this.description = description; + checkState( + originalJoin == OperatorType.ShuffleHashJoin + || originalJoin == OperatorType.SortMergeJoin, + String.format( + "Adaptive join " + + "currently only supports adaptive optimization for ShuffleHashJoin and " + + "SortMergeJoin, not including %s.", + originalJoin.toString())); this.originalJoin = originalJoin; } @@ -113,11 +124,10 @@ protected Transformation translateToPlanInternal( leftType, rightType); - AdaptiveJoinOperatorGenerator adaptiveJoin = + AdaptiveJoinOperatorGenerator adaptiveJoinGenerator = new AdaptiveJoinOperatorGenerator( joinSpec.getLeftKeys(), joinSpec.getRightKeys(), - joinSpec.getJoinType(), joinSpec.getFilterNulls(), leftType, rightType, @@ -127,16 +137,19 @@ protected Transformation translateToPlanInternal( estimatedLeftRowCount, estimatedRightRowCount, tryDistinctBuildRow, - managedMemory, - leftIsBuild, - originalJoin); + managedMemory); return ExecNodeUtil.createTwoInputTransformation( leftInputTransform, rightInputTransform, createTransformationName(config), createTransformationDescription(config), - getAdaptiveJoinOperatorFactory(adaptiveJoin), + getAdaptiveJoinOperatorFactory( + adaptiveJoinGenerator, + config.get(CoreOptions.CHECK_LEAKED_CLASSLOADER), + joinSpec.getJoinType(), + originalJoin, + leftIsBuild), InternalTypeInfo.of(getOutputType()), // Given that the probe side might be decided at runtime, we choose the larger // parallelism here. @@ -146,10 +159,20 @@ protected Transformation translateToPlanInternal( } private StreamOperatorFactory getAdaptiveJoinOperatorFactory( - AdaptiveJoin adaptiveJoin) { + AdaptiveJoinOperatorGenerator adaptiveJoinGenerator, + boolean checkClassLoaderLeak, + FlinkJoinType joinType, + OperatorType originalJoin, + boolean leftIsBuild) { try { - byte[] adaptiveJoinSerialized = InstantiationUtil.serializeObject(adaptiveJoin); - return new AdaptiveJoinOperatorFactory<>(adaptiveJoinSerialized); + byte[] adaptiveJoinGeneratorSerialized = + InstantiationUtil.serializeObject(adaptiveJoinGenerator); + return new AdaptiveJoinOperatorFactory<>( + adaptiveJoinGeneratorSerialized, + joinType, + originalJoin == OperatorType.SortMergeJoin, + leftIsBuild, + checkClassLoaderLeak); } catch (IOException e) { throw new TableException("The adaptive join operator failed to serialize.", e); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java index 3baa5030cbed1..d554465993b7c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java @@ -28,7 +28,7 @@ import org.apache.flink.table.runtime.operators.join.HashJoinOperator; import org.apache.flink.table.runtime.operators.join.Int2HashJoinOperatorTestBase; import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator; -import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin; +import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinGenerator; import org.apache.flink.table.runtime.util.UniformBinaryRowGenerator; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; @@ -268,10 +268,15 @@ public Object newOperator( boolean buildLeft, boolean isBroadcast, OperatorType operatorType) { - AdaptiveJoin adaptiveJoin = genAdaptiveJoin(flinkJoinType, operatorType); - adaptiveJoin.markAsBroadcastJoin(isBroadcast, buildLeft); + AdaptiveJoinGenerator adaptiveJoinGenerator = genAdaptiveJoinGenerator(); - return adaptiveJoin.genOperatorFactory(getClass().getClassLoader(), new Configuration()); + return adaptiveJoinGenerator.genOperatorFactory( + getClass().getClassLoader(), + new Configuration(), + flinkJoinType, + operatorType == SortMergeJoin, + isBroadcast, + buildLeft); } public void assertOperatorType(Object operator, OperatorType expectedOperatorType) { @@ -301,7 +306,7 @@ public void assertOperatorType(Object operator, OperatorType expectedOperatorTyp } } - public AdaptiveJoin genAdaptiveJoin(FlinkJoinType flinkJoinType, OperatorType operatorType) { + public AdaptiveJoinGenerator genAdaptiveJoinGenerator() { GeneratedJoinCondition condFuncCode = new GeneratedJoinCondition( Int2HashJoinOperatorTestBase.MyJoinCondition.class.getCanonicalName(), @@ -316,7 +321,6 @@ public JoinCondition newInstance(ClassLoader classLoader) { return new AdaptiveJoinOperatorGenerator( new int[] {0}, new int[] {0}, - flinkJoinType, new boolean[] {true}, RowType.of(new IntType(), new IntType()), RowType.of(new IntType(), new IntType()), @@ -326,8 +330,6 @@ public JoinCondition newInstance(ClassLoader classLoader) { 20, 10000, false, - TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY.defaultValue().getBytes(), - true, - operatorType); + TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY.defaultValue().getBytes()); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java index 0d289ae9639b3..9ca9759038933 100755 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java @@ -32,11 +32,11 @@ public interface AdaptiveJoin extends Serializable { * Generates a StreamOperatorFactory for this join operator using the provided ClassLoader and * config. * - * @param classLoader the ClassLoader to be used for loading classes. + * @param userClassLoader the user ClassLoader to be used for loading classes. * @param config the configuration to be applied for creating the operator factory. * @return a StreamOperatorFactory instance. */ - StreamOperatorFactory genOperatorFactory(ClassLoader classLoader, ReadableConfig config); + StreamOperatorFactory genOperatorFactory(ClassLoader userClassLoader, ReadableConfig config); /** * Get the join type of the join operator. diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java new file mode 100644 index 0000000000000..02496ebb7b24c --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.adaptive; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; + +import java.io.Serializable; + +/** Interface for implementing an adaptive join operator generator. */ +@Internal +public interface AdaptiveJoinGenerator extends Serializable { + /** + * Generates a StreamOperatorFactory for this join operator using the provided ClassLoader and + * parameters. + * + * @param classLoader the ClassLoader to be used for loading classes. + * @param config the configuration to be applied for creating the operator factory. + * @param joinType the join type. + * @param originIsSortMergeJoin whether the join operator is a SortMergeJoin. + * @param isBroadcastJoin whether the join operator can be optimized to broadcast hash join. + * @param leftIsBuild whether the left input side is the build side. + * @return a StreamOperatorFactory instance. + */ + StreamOperatorFactory genOperatorFactory( + ClassLoader classLoader, + ReadableConfig config, + FlinkJoinType joinType, + boolean originIsSortMergeJoin, + boolean isBroadcastJoin, + boolean leftIsBuild); +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java index 3efac4e3b1b7e..7246f3718c07f 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java @@ -25,12 +25,17 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.table.planner.loader.PlannerModule; import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.util.FlinkUserCodeClassLoaders; import org.apache.flink.util.InstantiationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -46,49 +51,112 @@ @Internal public class AdaptiveJoinOperatorFactory extends AbstractStreamOperatorFactory implements AdaptiveJoin { - private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(AdaptiveJoinOperatorFactory.class); - private final byte[] adaptiveJoinSerialized; + private static final long serialVersionUID = 1L; - @Nullable private transient AdaptiveJoin adaptiveJoin; + private final byte[] adaptiveJoinGeneratorSerialized; @Nullable private StreamOperatorFactory finalFactory; - public AdaptiveJoinOperatorFactory(byte[] adaptiveJoinSerialized) { - this.adaptiveJoinSerialized = checkNotNull(adaptiveJoinSerialized); + private final boolean checkClassLoaderLeak; + + private final FlinkJoinType joinType; + + private final boolean originIsSortMergeJoin; + + private final boolean originalLeftIsBuild; + + private boolean leftIsBuild; + + private boolean isBroadcastJoin; + + public AdaptiveJoinOperatorFactory( + byte[] adaptiveJoinGeneratorSerialized, + FlinkJoinType joinType, + boolean originIsSortMergeJoin, + boolean leftIsBuild, + boolean checkClassLoaderLeak) { + this.adaptiveJoinGeneratorSerialized = checkNotNull(adaptiveJoinGeneratorSerialized); + this.joinType = joinType; + this.originIsSortMergeJoin = originIsSortMergeJoin; + this.leftIsBuild = leftIsBuild; + this.originalLeftIsBuild = leftIsBuild; + this.checkClassLoaderLeak = checkClassLoaderLeak; } @Override public StreamOperatorFactory genOperatorFactory( - ClassLoader classLoader, ReadableConfig config) { - checkAndLazyInitialize(); + ClassLoader userClassLoader, ReadableConfig config) { + // In some IT/E2E tests, plannerModule may be null, so we handle it specially to avoid + // breaking these tests. + PlannerModule plannerModule = null; + try { + plannerModule = PlannerModule.getInstance(); + } catch (Throwable throwable) { + LOG.warn( + "Failed to get PlannerModule instance, may cause adaptive join deserialization failure.", + throwable); + } + + ClassLoader classLoader = + plannerModule == null + ? userClassLoader + : FlinkUserCodeClassLoaders.parentFirst( + plannerModule.getSubmoduleClassLoader().getURLs(), + userClassLoader, + NOOP_EXCEPTION_HANDLER, + checkClassLoaderLeak); + + AdaptiveJoinGenerator adaptiveJoinGenerator; + try { + adaptiveJoinGenerator = + InstantiationUtil.deserializeObject( + adaptiveJoinGeneratorSerialized, classLoader); + } catch (ClassNotFoundException | IOException e) { + throw new RuntimeException( + "Failed to deserialize AdaptiveJoinGenerator instance. " + + "Please check whether the flink-table-planner-loader.jar is in the classpath.", + e); + } this.finalFactory = - (StreamOperatorFactory) adaptiveJoin.genOperatorFactory(classLoader, config); + (StreamOperatorFactory) + adaptiveJoinGenerator.genOperatorFactory( + classLoader, + config, + joinType, + originIsSortMergeJoin, + isBroadcastJoin, + leftIsBuild); return this.finalFactory; } @Override public FlinkJoinType getJoinType() { - checkAndLazyInitialize(); - return adaptiveJoin.getJoinType(); + return joinType; } @Override - public void markAsBroadcastJoin(boolean canBeBroadcast, boolean leftIsBuild) { - checkAndLazyInitialize(); - adaptiveJoin.markAsBroadcastJoin(canBeBroadcast, leftIsBuild); + public void markAsBroadcastJoin(boolean canBroadcast, boolean leftIsBuild) { + this.isBroadcastJoin = canBroadcast; + this.leftIsBuild = leftIsBuild; } @Override public boolean shouldReorderInputs() { - checkAndLazyInitialize(); - return adaptiveJoin.shouldReorderInputs(); - } + // Sort merge join requires the left side to be read first if the broadcast threshold is not + // met. + if (!isBroadcastJoin && originIsSortMergeJoin) { + return false; + } - private void checkAndLazyInitialize() { - if (this.adaptiveJoin == null) { - lazyInitialize(); + if (leftIsBuild != originalLeftIsBuild) { + LOG.info( + "The build side of the adaptive join has been updated. Compile phase build side: {}, Runtime build side: {}.", + originalLeftIsBuild ? "left" : "right", + leftIsBuild ? "left" : "right"); } + return !leftIsBuild; } @Override @@ -113,28 +181,4 @@ public Class getStreamOperatorClass(ClassLoader classL "The method should not be invoked in the " + "adaptive join operator for batch jobs."); } - - private void lazyInitialize() { - if (!tryInitializeAdaptiveJoin(Thread.currentThread().getContextClassLoader())) { - boolean isSuccess = - tryInitializeAdaptiveJoin( - PlannerModule.getInstance().getSubmoduleClassLoader()); - if (!isSuccess) { - throw new RuntimeException( - "Failed to deserialize AdaptiveJoin instance. " - + "Please check whether the flink-table-planner-loader.jar is in the classpath."); - } - } - } - - private boolean tryInitializeAdaptiveJoin(ClassLoader classLoader) { - try { - this.adaptiveJoin = - InstantiationUtil.deserializeObject(adaptiveJoinSerialized, classLoader); - } catch (ClassNotFoundException | IOException e) { - return false; - } - - return true; - } }