Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ src/main/idls/*
.project
.settings
.vscode/
*/bin
*/bin
.serena
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,28 @@ public DynamicUpdateHandler getHandler() {

void await(String reason, Supplier<Boolean> unblockCondition);

/**
* Asynchronously wait until unblockCondition evaluates to true.
*
* @param unblockCondition condition that should return true to indicate completion
* @return Promise that completes when the condition becomes true, or completes exceptionally with
* CanceledFailure if the enclosing CancellationScope is canceled
*/
Promise<Void> awaitAsync(Supplier<Boolean> unblockCondition);

/**
* Asynchronously wait until unblockCondition evaluates to true or timeout expires.
*
* @param timeout maximum time to wait for the condition
* @param timerSummary summary for the timer created by this await, used in workflow history
* @param unblockCondition condition that should return true to indicate completion
* @return Promise that completes with true if the condition was satisfied, false if the timeout
* expired, or exceptionally with CanceledFailure if the enclosing CancellationScope is
* canceled
*/
Promise<Boolean> awaitAsync(
Duration timeout, String timerSummary, Supplier<Boolean> unblockCondition);

Promise<Void> newTimer(Duration duration);

Promise<Void> newTimer(Duration duration, TimerOptions options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ public void await(String reason, Supplier<Boolean> unblockCondition) {
next.await(reason, unblockCondition);
}

@Override
public Promise<Void> awaitAsync(Supplier<Boolean> unblockCondition) {
return next.awaitAsync(unblockCondition);
}

@Override
public Promise<Boolean> awaitAsync(
Duration timeout, String timerSummary, Supplier<Boolean> unblockCondition) {
return next.awaitAsync(timeout, timerSummary, unblockCondition);
}

@Override
public Promise<Void> newTimer(Duration duration) {
return next.newTimer(duration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.temporal.internal.worker.WorkflowExecutorCache;
import io.temporal.workflow.CancellationScope;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -92,6 +93,24 @@ static DeterministicRunner newRunner(
@Nonnull
WorkflowThread newCallbackThread(Runnable runnable, @Nullable String name);

/**
* Creates a new repeatable workflow thread that re-evaluates its condition on each
* runUntilBlocked() call. The thread completes when the condition returns true or when
* cancelled/destroyed.
*
* <p>This is used for async await where the condition can contain blocking operations like
* Workflow.await(), activity calls, etc. Unlike simple condition watchers, these conditions run
* in their own workflow thread context with full workflow capabilities.
*
* @param condition The condition to evaluate repeatedly. May contain blocking operations.
* @param detached Whether the thread is detached from parent cancellation scope
* @param name Optional name for the thread
* @return A new WorkflowThread that repeatedly evaluates the condition
*/
@Nonnull
WorkflowThread newRepeatableThread(
Supplier<Boolean> condition, boolean detached, @Nullable String name);

/**
* Retrieve data from runner locals. Returns 1. not found (an empty Optional) 2. found but null
* (an Optional of an empty Optional) 3. found and non-null (an Optional of an Optional of a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) {
appendCallbackThreadsLocked();
}
toExecuteInWorkflowThread.clear();

progress = false;

Iterator<WorkflowThread> ci = threads.iterator();
while (ci.hasNext()) {
WorkflowThread c = ci.next();
Expand Down Expand Up @@ -504,6 +506,53 @@ public WorkflowThread newCallbackThread(Runnable runnable, @Nullable String name
return result;
}

/**
* Creates a new repeatable workflow thread that re-evaluates its condition on each
* runUntilBlocked() call until the condition returns true.
*
* <p>IMPORTANT: The condition must be read-only (it observes state but must not modify it). This
* is because the condition may be evaluated multiple times per workflow task, and modifying state
* would cause non-determinism.
*
* <p>The thread reports progress only when the condition becomes true, ensuring the event loop
* doesn't spin indefinitely when conditions remain false.
*
* @param condition The read-only condition to evaluate repeatedly
* @param detached Whether the thread is detached from parent cancellation scope
* @param name Optional name for the thread
* @return A new WorkflowThread that repeatedly evaluates the condition
*/
@Override
@Nonnull
public WorkflowThread newRepeatableThread(
Supplier<Boolean> condition, boolean detached, @Nullable String name) {
if (name == null) {
name =
"repeatable[" + workflowContext.getReplayContext().getWorkflowId() + "]-" + addedThreads;
}
if (rootWorkflowThread == null) {
throw new IllegalStateException(
"newRepeatableThread can be called only with existing root workflow thread");
}
checkWorkflowThreadOnly();
checkNotClosed();
WorkflowThread result =
new RepeatableWorkflowThread(
workflowThreadExecutor,
workflowContext,
this,
name,
WORKFLOW_THREAD_PRIORITY + (addedThreads++),
detached,
CancellationScopeImpl.current(),
condition,
cache,
getContextPropagators(),
getPropagatedContexts());
workflowThreadsToAdd.add(result);
return result;
}

/**
* Executes before any other threads next time runUntilBlockedCalled. Must never be called from
* any workflow threads.
Expand Down
Loading
Loading