[FLINK-39182][runtime/test] Fix the flaky test class ExecutionGraphRestartTest#27719
[FLINK-39182][runtime/test] Fix the flaky test class ExecutionGraphRestartTest#27719RocMarshal wants to merge 1 commit intoapache:masterfrom
Conversation
| @BeforeEach | ||
| void setUp() { | ||
| taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); | ||
| mainThreadExecutor = |
There was a problem hiding this comment.
It might be cleaner to directly use the main-thread executor abstraction instead of wrapping JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor() via forSingleThreadExecutor.
Please confirm that this preserves strict main-thread semantics and does not introduce an extra execution layer.
There was a problem hiding this comment.
+1, you could create a wrapper for running in the main thread similar to what we do in https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java#L368
Or even better to create AbstractClass that instantiates these extensions and defines helper for cleaner re-use by ExecutionGraph tests like "Suspend/Restart"
There was a problem hiding this comment.
Thanks @Izeren @featzhang .
If we fix it according to the implementation approach described above, the current test still fails.
Perhaps there is an issue with the way I reproduced it.
There was a problem hiding this comment.
Root cause: some operation lines is not scoped in sync logic(we need to update the related lines to fix the flaky test cases.)
I am not sure I fully understand where race condition in this test coming from. @RocMarshal, could you please explain where the flow breaks? I will see if I have time to test it locally tomorrow
|
|
||
| startScheduling(scheduler); | ||
| offerSlots(slotPool, NUM_TASKS); | ||
| mainThreadExecutor.execute( |
There was a problem hiding this comment.
Wrapping the entire test logic in mainThreadExecutor.execute(...) improves thread confinement, but we do not explicitly wait for completion.
If the executor is asynchronous, this may still lead to timing issues.
Would it be safer to block on a submitted future (e.g. submit(...).get()) to ensure deterministic execution?
There was a problem hiding this comment.
In ExecutionGraphSuspendTest, I have used:
private static void runInMainThread(final Runnable runnable) {
CompletableFuture.runAsync(runnable, JM_MAIN_THREAD_EXECUTOR_RESOURCE.getExecutor()).join();
}
maybe it is worth creating Abstract test class for Restart/Suspend etc and create these executor extensions at higher level 🤔
There was a problem hiding this comment.
@RocMarshal, I ran it locally, and it looks like these lambda invocations may not even happen before the test exits, so they are likely passing in a misleading way (not really testing underlying logic)
There was a problem hiding this comment.
@RocMarshal, I have tried to run this and it has passed @RepeatedTest(300)
This is a dirty patch, ofc. I would move slot related stuff to slot utils + I would use BeforeEach@ AfterEach@ to do slot pool creation/cleanup, WDYT?
Index: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java (revision b1d5bda2e386387f19d32ca71f8902f692650323)
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java (date 1772631541535)
@@ -20,29 +20,34 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.testutils.TestingUtils;
@@ -54,10 +59,13 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import java.io.IOException;
-import java.util.Collections;
+import java.util.Collection;
import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
@@ -101,8 +109,8 @@
@Test
void testCancelAllPendingRequestWhileCanceling() throws Exception {
- try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
final int numTasksExceedSlotPool = 50;
// create a graph with task count larger than slot pool
JobVertex sender =
@@ -116,7 +124,7 @@
createExecutionSlotAllocatorFactory(slotPool))
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph executionGraph = scheduler.getExecutionGraph();
startScheduling(scheduler);
@@ -134,8 +142,8 @@
@Test
void testCancelAllPendingRequestWhileFailing() throws Exception {
- try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
final int numTasksExceedSlotPool = 50;
// create a graph with task count larger than slot pool
JobVertex sender =
@@ -148,7 +156,7 @@
.setExecutionSlotAllocatorFactory(
createExecutionSlotAllocatorFactory(slotPool))
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph executionGraph = scheduler.getExecutionGraph();
@@ -168,7 +176,8 @@
@Test
void testCancelWhileRestarting() throws Exception {
// We want to manually control the restart and delay
- try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
createJobGraph(),
@@ -181,7 +190,7 @@
.setDelayExecutor(taskRestartExecutor)
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph executionGraph = scheduler.getExecutionGraph();
@@ -209,14 +218,43 @@
}
}
- private ResourceID offerSlots(SlotPool slotPool, int numSlots) {
- return SlotPoolUtils.offerSlots(
- slotPool, mainThreadExecutor, Collections.nCopies(numSlots, ResourceProfile.ANY));
+ /**
+ * Offers slots directly on the slot pool. This method must be called from the main thread
+ * (inside {@link #runInMainThread(Runnable)}), so it calls slot pool methods directly instead
+ * of going through {@code SlotPoolUtils.offerSlots} which would deadlock due to re-entrant
+ * {@code CompletableFuture.runAsync(..., mainThreadExecutor).join()}.
+ */
+ private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
+ final TaskManagerLocation location = new LocalTaskManagerLocation();
+ slotPool.registerTaskManager(location.getResourceID());
+ final Collection<SlotOffer> offers =
+ IntStream.range(0, numSlots)
+ .mapToObj(i -> new SlotOffer(new AllocationID(), i, ResourceProfile.ANY))
+ .collect(Collectors.toList());
+ final Collection<SlotOffer> accepted =
+ slotPool.offerSlots(location, new SimpleAckingTaskManagerGateway(), offers);
+ assertThat(accepted).hasSameSizeAs(offers);
+ return location.getResourceID();
+ }
+
+ private static void runInMainThread(final Runnable runnable) {
+ CompletableFuture.runAsync(runnable, JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor())
+ .join();
+ }
+
+ /**
+ * Returns an {@link AutoCloseable} that closes the given slot pool on the main thread. This
+ * allows using try-with-resources while respecting the slot pool's main-thread assertion in
+ * {@link DeclarativeSlotPoolBridge#close()}.
+ */
+ private static AutoCloseable closeOnMainThread(DeclarativeSlotPoolBridge slotPool) {
+ return () -> runInMainThread(slotPool::close);
}
@Test
void testCancelWhileFailing() throws Exception {
- try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
createJobGraph(),
@@ -227,7 +265,7 @@
.setRestartBackoffTimeStrategy(
new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph graph = scheduler.getExecutionGraph();
@@ -257,7 +295,8 @@
@Test
void testFailWhileCanceling() throws Exception {
- try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
createJobGraph(),
@@ -268,7 +307,7 @@
.setRestartBackoffTimeStrategy(
new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph graph = scheduler.getExecutionGraph();
@@ -311,7 +350,8 @@
ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
- try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
jobGraph, mainThreadExecutor, EXECUTOR_EXTENSION.getExecutor())
@@ -322,10 +362,16 @@
.setDelayExecutor(taskRestartExecutor)
.build();
- mainThreadExecutor.execute(
+ final ExecutionGraph eg = scheduler.getExecutionGraph();
+ // Hold the original finished execution reference across runInMainThread calls
+ final Execution[] savedExecution = new Execution[1];
+
+ // Phase 1: Start, deploy, fail one task, and trigger restart.
+ // The restart callback (restartTasks) is queued on mainThreadExecutor via
+ // cancelFuture.thenRunAsync(..., mainThreadExecutor) and will execute after this
+ // runInMainThread call returns.
+ runInMainThread(
() -> {
- ExecutionGraph eg = scheduler.getExecutionGraph();
-
startScheduling(scheduler);
offerSlots(slotPool, 2);
@@ -343,7 +389,16 @@
failedExecution.fail(new Exception("Test Exception"));
failedExecution.completeCancelling();
+ savedExecution[0] = finishedExecution;
+
taskRestartExecutor.triggerScheduledTasks();
+ });
+
+ // Phase 2: The restart callback has now executed (it was queued ahead of this
+ // lambda). Verify the graph restarted and the old finished execution is unaffected.
+ runInMainThread(
+ () -> {
+ Execution finishedExecution = savedExecution[0];
assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);
@@ -361,8 +416,8 @@
vertex.getCurrentExecutionAttempt().markFinished();
}
- // the state of the finished execution should have not changed since it is
- // terminal
+ // the state of the finished execution should have not changed since it
+ // is terminal
assertThat(finishedExecution.getState()).isEqualTo(ExecutionState.FINISHED);
assertThat(eg.getState()).isEqualTo(JobStatus.FINISHED);
@@ -377,7 +432,8 @@
*/
@Test
void testFailExecutionAfterCancel() throws Exception {
- try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+ final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+ try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
createJobGraphToCancel(),
@@ -389,7 +445,7 @@
new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
.setDelayExecutor(taskRestartExecutor)
.build();
- mainThreadExecutor.execute(
+ runInMainThread(
() -> {
ExecutionGraph eg = scheduler.getExecutionGraph();
@@ -440,6 +496,12 @@
physicalSlotProvider);
}
+ private DeclarativeSlotPoolBridge createSlotPool() {
+ return new DeclarativeSlotPoolBridgeBuilder()
+ .setMainThreadExecutor(mainThreadExecutor)
+ .build();
+ }
+
private static void setupSlotPool(SlotPool slotPool) throws Exception {
final String jobManagerAddress = "foobar";
final ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
There was a problem hiding this comment.
@RocMarshal, I ran it locally, and it looks like these lambda invocations may not even happen before the test exits, so they are likely passing in a misleading way (not really testing underlying logic)
Hi, @Izeren Thank you very much for your effort on the patch.
I have double-checked the scenarios you pointed out locally based on your comments and suggestions, and you are absolutely right.
Thank you very much for the reminder and the fix.
I don’t have enough time to follow up on this issue recently. Would you be willing to take it over?
There was a problem hiding this comment.
@RocMarshal, I have re-opened PR here: #27740, will wait for the tests to pass
| } | ||
|
|
||
| private static ResourceID offerSlots(SlotPool slotPool, int numSlots) { | ||
| private ResourceID offerSlots(SlotPool slotPool, int numSlots) { |
There was a problem hiding this comment.
The method offerSlots is now non-static due to the mainThreadExecutor instance usage.
Please confirm that this change does not impact other test utility patterns or assumptions.
There was a problem hiding this comment.
thanks for reminders. The change is healthy with checked.
| // Release the TaskManager and wait for the job to restart | ||
| slotPool.releaseTaskManager( | ||
| taskManagerResourceId, new Exception("Test Exception")); | ||
| assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING); |
There was a problem hiding this comment.
assertThatFuture(executionGraph.getTerminationFuture())
Some state transitions (e.g. RESTARTING) may be triggered asynchronously.
It may be safer to assert the state via a future-based or eventually-style assertion to avoid timing sensitivity.
| .isEqualTo(JobStatus.CANCELED); | ||
| FlinkAssertions.assertThatFuture(eg.getTerminationFuture()) | ||
| .eventuallySucceeds() | ||
| .isEqualTo(JobStatus.CANCELED); |
There was a problem hiding this comment.
As a structural improvement, it might be cleaner to enforce main-thread confinement within the scheduler setup itself rather than wrapping every test case body in mainThreadExecutor.execute.
This would reduce boilerplate and improve readability.
There was a problem hiding this comment.
IIUC, scheduler does enforce confinement for the calls like startScheduling based on mainThreadExecutor provided in constructor. Could you please give an example of your suggested change?
featzhang
left a comment
There was a problem hiding this comment.
As a structural improvement, it might be cleaner to enforce main-thread confinement within the scheduler setup itself rather than wrapping every test case body in mainThreadExecutor.execute.
This would reduce boilerplate and improve readability.
|
Hi, @rionmonster @Izeren Could you help take look if you had the free time ? |
What is the purpose of the change
[FLINK-39182][runtime/test] Fix the flaky test class ExecutionGraphRestartTest
Motivation: When the Use '@RepeatedTest' to trigger the test cases defined in ExecutionGraphRestartTest, the result would be always failed.
Root cause: some operation lines is not scoped in sync logic(we need to update the related lines to fix the flaky test cases.)
Brief change log
Root cause: some operation lines is not scoped in sync logic(we need to update the related lines to fix the flaky test cases.)
Verifying this change
N.A
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation