Skip to content

[FLINK-39182][runtime/test] Fix the flaky test class ExecutionGraphRestartTest#27719

Closed
RocMarshal wants to merge 1 commit intoapache:masterfrom
RocMarshal:FLINK-39182
Closed

[FLINK-39182][runtime/test] Fix the flaky test class ExecutionGraphRestartTest#27719
RocMarshal wants to merge 1 commit intoapache:masterfrom
RocMarshal:FLINK-39182

Conversation

@RocMarshal
Copy link
Contributor

What is the purpose of the change

[FLINK-39182][runtime/test] Fix the flaky test class ExecutionGraphRestartTest

Motivation: When the Use '@RepeatedTest' to trigger the test cases defined in ExecutionGraphRestartTest, the result would be always failed.
Root cause: some operation lines is not scoped in sync logic(we need to update the related lines to fix the flaky test cases.)

Brief change log

Root cause: some operation lines is not scoped in sync logic(we need to update the related lines to fix the flaky test cases.)

Verifying this change

N.A

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 1, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@BeforeEach
void setUp() {
taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
mainThreadExecutor =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be cleaner to directly use the main-thread executor abstraction instead of wrapping JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor() via forSingleThreadExecutor.
Please confirm that this preserves strict main-thread semantics and does not introduce an extra execution layer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, you could create a wrapper for running in the main thread similar to what we do in https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java#L368

Or even better to create AbstractClass that instantiates these extensions and defines helper for cleaner re-use by ExecutionGraph tests like "Suspend/Restart"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Izeren @featzhang .
If we fix it according to the implementation approach described above, the current test still fails.
Perhaps there is an issue with the way I reproduced it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Root cause: some operation lines is not scoped in sync logic(we need to update the related lines to fix the flaky test cases.)

I am not sure I fully understand where race condition in this test coming from. @RocMarshal, could you please explain where the flow breaks? I will see if I have time to test it locally tomorrow


startScheduling(scheduler);
offerSlots(slotPool, NUM_TASKS);
mainThreadExecutor.execute(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping the entire test logic in mainThreadExecutor.execute(...) improves thread confinement, but we do not explicitly wait for completion.
If the executor is asynchronous, this may still lead to timing issues.
Would it be safer to block on a submitted future (e.g. submit(...).get()) to ensure deterministic execution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ExecutionGraphSuspendTest, I have used:

    private static void runInMainThread(final Runnable runnable) {
        CompletableFuture.runAsync(runnable, JM_MAIN_THREAD_EXECUTOR_RESOURCE.getExecutor()).join();
    }

maybe it is worth creating Abstract test class for Restart/Suspend etc and create these executor extensions at higher level 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RocMarshal, I ran it locally, and it looks like these lambda invocations may not even happen before the test exits, so they are likely passing in a misleading way (not really testing underlying logic)

Copy link
Contributor

@Izeren Izeren Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RocMarshal, I have tried to run this and it has passed @RepeatedTest(300)

This is a dirty patch, ofc. I would move slot related stuff to slot utils + I would use BeforeEach@ AfterEach@ to do slot pool creation/cleanup, WDYT?

Index: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java	(revision b1d5bda2e386387f19d32ca71f8902f692650323)
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java	(date 1772631541535)
@@ -20,29 +20,34 @@
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
 import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.testutils.TestingUtils;
@@ -54,10 +59,13 @@
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -101,8 +109,8 @@
 
     @Test
     void testCancelAllPendingRequestWhileCanceling() throws Exception {
-        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-
+        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
             final int numTasksExceedSlotPool = 50;
             // create a graph with task count larger than slot pool
             JobVertex sender =
@@ -116,7 +124,7 @@
                                     createExecutionSlotAllocatorFactory(slotPool))
                             .build();
 
-            mainThreadExecutor.execute(
+            runInMainThread(
                     () -> {
                         ExecutionGraph executionGraph = scheduler.getExecutionGraph();
                         startScheduling(scheduler);
@@ -134,8 +142,8 @@
 
     @Test
     void testCancelAllPendingRequestWhileFailing() throws Exception {
-        try (DeclarativeSlotPoolBridge slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
-
+        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
             final int numTasksExceedSlotPool = 50;
             // create a graph with task count larger than slot pool
             JobVertex sender =
@@ -148,7 +156,7 @@
                             .setExecutionSlotAllocatorFactory(
                                     createExecutionSlotAllocatorFactory(slotPool))
                             .build();
-            mainThreadExecutor.execute(
+            runInMainThread(
                     () -> {
                         ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 
@@ -168,7 +176,8 @@
     @Test
     void testCancelWhileRestarting() throws Exception {
         // We want to manually control the restart and delay
-        try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
             SchedulerBase scheduler =
                     new DefaultSchedulerBuilder(
                                     createJobGraph(),
@@ -181,7 +190,7 @@
                             .setDelayExecutor(taskRestartExecutor)
                             .build();
 
-            mainThreadExecutor.execute(
+            runInMainThread(
                     () -> {
                         ExecutionGraph executionGraph = scheduler.getExecutionGraph();
 
@@ -209,14 +218,43 @@
         }
     }
 
-    private ResourceID offerSlots(SlotPool slotPool, int numSlots) {
-        return SlotPoolUtils.offerSlots(
-                slotPool, mainThreadExecutor, Collections.nCopies(numSlots, ResourceProfile.ANY));
+    /**
+     * Offers slots directly on the slot pool. This method must be called from the main thread
+     * (inside {@link #runInMainThread(Runnable)}), so it calls slot pool methods directly instead
+     * of going through {@code SlotPoolUtils.offerSlots} which would deadlock due to re-entrant
+     * {@code CompletableFuture.runAsync(..., mainThreadExecutor).join()}.
+     */
+    private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
+        final TaskManagerLocation location = new LocalTaskManagerLocation();
+        slotPool.registerTaskManager(location.getResourceID());
+        final Collection<SlotOffer> offers =
+                IntStream.range(0, numSlots)
+                        .mapToObj(i -> new SlotOffer(new AllocationID(), i, ResourceProfile.ANY))
+                        .collect(Collectors.toList());
+        final Collection<SlotOffer> accepted =
+                slotPool.offerSlots(location, new SimpleAckingTaskManagerGateway(), offers);
+        assertThat(accepted).hasSameSizeAs(offers);
+        return location.getResourceID();
+    }
+
+    private static void runInMainThread(final Runnable runnable) {
+        CompletableFuture.runAsync(runnable, JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor())
+                .join();
+    }
+
+    /**
+     * Returns an {@link AutoCloseable} that closes the given slot pool on the main thread. This
+     * allows using try-with-resources while respecting the slot pool's main-thread assertion in
+     * {@link DeclarativeSlotPoolBridge#close()}.
+     */
+    private static AutoCloseable closeOnMainThread(DeclarativeSlotPoolBridge slotPool) {
+        return () -> runInMainThread(slotPool::close);
     }
 
     @Test
     void testCancelWhileFailing() throws Exception {
-        try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
             SchedulerBase scheduler =
                     new DefaultSchedulerBuilder(
                                     createJobGraph(),
@@ -227,7 +265,7 @@
                             .setRestartBackoffTimeStrategy(
                                     new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
                             .build();
-            mainThreadExecutor.execute(
+            runInMainThread(
                     () -> {
                         ExecutionGraph graph = scheduler.getExecutionGraph();
 
@@ -257,7 +295,8 @@
 
     @Test
     void testFailWhileCanceling() throws Exception {
-        try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
             SchedulerBase scheduler =
                     new DefaultSchedulerBuilder(
                                     createJobGraph(),
@@ -268,7 +307,7 @@
                             .setRestartBackoffTimeStrategy(
                                     new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
                             .build();
-            mainThreadExecutor.execute(
+            runInMainThread(
                     () -> {
                         ExecutionGraph graph = scheduler.getExecutionGraph();
 
@@ -311,7 +350,8 @@
                 ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
         JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sender, receiver);
 
-        try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
             SchedulerBase scheduler =
                     new DefaultSchedulerBuilder(
                                     jobGraph, mainThreadExecutor, EXECUTOR_EXTENSION.getExecutor())
@@ -322,10 +362,16 @@
                             .setDelayExecutor(taskRestartExecutor)
                             .build();
 
-            mainThreadExecutor.execute(
+            final ExecutionGraph eg = scheduler.getExecutionGraph();
+            // Hold the original finished execution reference across runInMainThread calls
+            final Execution[] savedExecution = new Execution[1];
+
+            // Phase 1: Start, deploy, fail one task, and trigger restart.
+            // The restart callback (restartTasks) is queued on mainThreadExecutor via
+            // cancelFuture.thenRunAsync(..., mainThreadExecutor) and will execute after this
+            // runInMainThread call returns.
+            runInMainThread(
                     () -> {
-                        ExecutionGraph eg = scheduler.getExecutionGraph();
-
                         startScheduling(scheduler);
 
                         offerSlots(slotPool, 2);
@@ -343,7 +389,16 @@
                         failedExecution.fail(new Exception("Test Exception"));
                         failedExecution.completeCancelling();
 
+                        savedExecution[0] = finishedExecution;
+
                         taskRestartExecutor.triggerScheduledTasks();
+                    });
+
+            // Phase 2: The restart callback has now executed (it was queued ahead of this
+            // lambda). Verify the graph restarted and the old finished execution is unaffected.
+            runInMainThread(
+                    () -> {
+                        Execution finishedExecution = savedExecution[0];
 
                         assertThat(eg.getState()).isEqualTo(JobStatus.RUNNING);
 
@@ -361,8 +416,8 @@
                             vertex.getCurrentExecutionAttempt().markFinished();
                         }
 
-                        // the state of the finished execution should have not changed since it is
-                        // terminal
+                        // the state of the finished execution should have not changed since it
+                        // is terminal
                         assertThat(finishedExecution.getState()).isEqualTo(ExecutionState.FINISHED);
 
                         assertThat(eg.getState()).isEqualTo(JobStatus.FINISHED);
@@ -377,7 +432,8 @@
      */
     @Test
     void testFailExecutionAfterCancel() throws Exception {
-        try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
+        final DeclarativeSlotPoolBridge slotPool = createSlotPool();
+        try (AutoCloseable ignored = closeOnMainThread(slotPool)) {
             SchedulerBase scheduler =
                     new DefaultSchedulerBuilder(
                                     createJobGraphToCancel(),
@@ -389,7 +445,7 @@
                                     new TestRestartBackoffTimeStrategy(false, Long.MAX_VALUE))
                             .setDelayExecutor(taskRestartExecutor)
                             .build();
-            mainThreadExecutor.execute(
+            runInMainThread(
                     () -> {
                         ExecutionGraph eg = scheduler.getExecutionGraph();
 
@@ -440,6 +496,12 @@
                 physicalSlotProvider);
     }
 
+    private DeclarativeSlotPoolBridge createSlotPool() {
+        return new DeclarativeSlotPoolBridgeBuilder()
+                .setMainThreadExecutor(mainThreadExecutor)
+                .build();
+    }
+
     private static void setupSlotPool(SlotPool slotPool) throws Exception {
         final String jobManagerAddress = "foobar";
         final ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RocMarshal, I ran it locally, and it looks like these lambda invocations may not even happen before the test exits, so they are likely passing in a misleading way (not really testing underlying logic)

Hi, @Izeren Thank you very much for your effort on the patch.
I have double-checked the scenarios you pointed out locally based on your comments and suggestions, and you are absolutely right.

Thank you very much for the reminder and the fix.

I don’t have enough time to follow up on this issue recently. Would you be willing to take it over?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RocMarshal, I have re-opened PR here: #27740, will wait for the tests to pass

}

private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
private ResourceID offerSlots(SlotPool slotPool, int numSlots) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method offerSlots is now non-static due to the mainThreadExecutor instance usage.
Please confirm that this change does not impact other test utility patterns or assumptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for reminders. The change is healthy with checked.

// Release the TaskManager and wait for the job to restart
slotPool.releaseTaskManager(
taskManagerResourceId, new Exception("Test Exception"));
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertThatFuture(executionGraph.getTerminationFuture())

Some state transitions (e.g. RESTARTING) may be triggered asynchronously.
It may be safer to assert the state via a future-based or eventually-style assertion to avoid timing sensitivity.

.isEqualTo(JobStatus.CANCELED);
FlinkAssertions.assertThatFuture(eg.getTerminationFuture())
.eventuallySucceeds()
.isEqualTo(JobStatus.CANCELED);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a structural improvement, it might be cleaner to enforce main-thread confinement within the scheduler setup itself rather than wrapping every test case body in mainThreadExecutor.execute.
This would reduce boilerplate and improve readability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, scheduler does enforce confinement for the calls like startScheduling based on mainThreadExecutor provided in constructor. Could you please give an example of your suggested change?

Copy link
Member

@featzhang featzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a structural improvement, it might be cleaner to enforce main-thread confinement within the scheduler setup itself rather than wrapping every test case body in mainThreadExecutor.execute.
This would reduce boilerplate and improve readability.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Mar 2, 2026
@RocMarshal
Copy link
Contributor Author

Hi, @rionmonster @Izeren Could you help take look if you had the free time ?
Thanks !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants