Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/data/nav/aitransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ export default {
name: 'Citations',
link: '/docs/ai-transport/messaging/citations',
},
{
name: 'Completion and cancellation',
link: '/docs/ai-transport/messaging/completion-and-cancellation',
},
],
},
{
Expand Down
387 changes: 387 additions & 0 deletions src/pages/docs/ai-transport/messaging/completion-and-cancellation.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,387 @@
---
title: "Completion and cancellation"
meta_description: "Signal when AI responses are complete and support user-initiated cancellation of in-progress responses."
meta_keywords: "completion signalling, cancellation, abort, streaming lifecycle, metadata, AI transport, realtime"
---

AI responses streamed using the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) or [message-per-token](/docs/ai-transport/token-streaming/message-per-token) pattern do not require explicit completion signals to function. Subscribers receive tokens as they arrive and can render them progressively. However, some applications benefit from explicitly signalling when a response is complete, or allowing users to cancel an in-progress response.

## Benefits of completion and cancellation signals <a id="benefits"/>

Explicit completion and cancellation signals are useful when your application needs to:

- Detect whether a response is still in progress after reconnection, so clients can distinguish between a completed response and one that is still streaming
- Finalize UI state when a response ends, such as removing typing indicators or enabling input controls
- Allow users to abort a response mid-stream, stopping generation and saving compute resources
- Coordinate multiple content parts within a single response, where downstream logic depends on knowing when each part is finished

## Signal completion <a id="completion"/>

Use [operation metadata](/docs/messages/updates-deletes#append-operation-metadata) to signal that a content part or response is complete. Operation metadata is a set of key-value pairs carried on each append or update operation. Subscribers can inspect this metadata to determine the current phase of a message.

### Content-part completion <a id="content-part-completion"/>

When streaming content using the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) pattern, signal that a content part is complete by appending an empty string with a metadata marker. The empty append does not change the message's data, but the metadata signals to subscribers that no more content follows for this message.

This keeps the entire content lifecycle (create, stream, done) within a single Ably message:

<Code>
```javascript
const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}');

// Publish initial message
const { serials: [msgSerial] } = await channel.publish({ name: 'response', data: '' });

// Stream tokens
for await (const event of stream) {
if (event.type === 'token') {
channel.appendMessage({
serial: msgSerial,
data: event.text
}, {
metadata: { phase: 'streaming' }
});
}
}

// Signal content-part completion with an empty append
channel.appendMessage({
serial: msgSerial,
data: ''
}, {
metadata: { phase: 'done' }
});
```
```python
channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}')

# Publish initial message
message = Message(name='response', data='')
result = await channel.publish(message)
msg_serial = result.serials[0]

# Stream tokens
async for event in stream:
if event['type'] == 'token':
channel.append_message(
serial=msg_serial,
data=event['text'],
metadata={'phase': 'streaming'}
)

# Signal content-part completion with an empty append
channel.append_message(
serial=msg_serial,
data='',
metadata={'phase': 'done'}
)
```
```java
Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}");

// Publish initial message
CompletableFuture<PublishResult> publishFuture = new CompletableFuture<>();
channel.publish("response", "", new Callback<PublishResult>() {
@Override
public void onSuccess(PublishResult result) {
publishFuture.complete(result);
}

@Override
public void onError(ErrorInfo reason) {
publishFuture.completeExceptionally(AblyException.fromErrorInfo(reason));
}
});
String msgSerial = publishFuture.get().serials[0];

// Stream tokens
for (Event event : stream) {
if (event.getType().equals("token")) {
MessageMetadata metadata = new MessageMetadata();
metadata.put("phase", "streaming");
channel.appendMessage(msgSerial, event.getText(), metadata);
}
}

// Signal content-part completion with an empty append
MessageMetadata metadata = new MessageMetadata();
metadata.put("phase", "done");
channel.appendMessage(msgSerial, "", metadata);
```
</Code>

<Aside data-type="note">
The completion append does not need to be awaited. Appends to the same message serial are delivered in order, so the completion marker is guaranteed to arrive after all previous tokens for that content part.
</Aside>

### Response-level completion <a id="response-completion"/>

A single AI response may span multiple content parts, each represented as a separate Ably message with its own stream of appends. To signal that the entire response is complete, publish a discrete message after all content parts are finished. Subscribers can use this as a cue to finalize the response in the UI.

<Code>
```javascript
// After all content parts are done, signal response-level completion
await channel.publish({
name: 'response-end',
data: '',
extras: {
headers: {
responseId: 'resp_abc123'
}
}
});
```
```python
# After all content parts are done, signal response-level completion
await channel.publish(Message(
name='response-end',
data='',
extras={
'headers': {
'responseId': 'resp_abc123'
}
}
))
```
```java
// After all content parts are done, signal response-level completion
JsonObject extras = new JsonObject();
JsonObject headers = new JsonObject();
headers.addProperty("responseId", "resp_abc123");
extras.add("headers", headers);

channel.publish(new Message("response-end", "", new MessageExtras(extras)));
```
</Code>

<Aside data-type="note">
Await all pending append operations before publishing the response-level completion message. Appends to the same message serial are always delivered in order regardless of whether you await the acknowledgment, but create and append operations issued without awaiting the acknowledgment can race to be accepted by Ably. Without awaiting, the completion message could arrive before the last content token.
</Aside>

### Detect completion from history <a id="detect-completion"/>

When [hydrating client state](/docs/ai-transport/token-streaming/message-per-response#hydration) from history, inspect `version.metadata` on each message to determine whether a content part was fully completed or is still in progress. If the most recent operation's metadata carries your completion marker, the content part is done. If it carries a streaming marker or no marker, the stream may still be active.

<Code>
```javascript
const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}');

await channel.subscribe((message) => {
// ...handle message actions as normal...
});

let page = await channel.history({ untilAttach: true });

while (page) {
for (const message of page.items) {
const phase = message.version?.metadata?.phase;

if (phase === 'done') {
// Content part is complete, render as final
} else {
// Content part may still be streaming, listen for live appends
}
}
page = page.hasNext() ? await page.next() : null;
}
```
```python
channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}')

await channel.subscribe(on_message)

page = await channel.history(until_attach=True)

while page:
for message in page.items:
phase = getattr(message.version, 'metadata', {}).get('phase')

if phase == 'done':
# Content part is complete, render as final
pass
else:
# Content part may still be streaming, listen for live appends
pass

page = await page.next() if page.has_next() else None
```
```java
Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}");

channel.subscribe(message -> { /* handle message actions as normal */ });

PaginatedResult<Message> page = channel.history(new Param("untilAttach", "true"));

while (page != null) {
for (Message message : page.items()) {
String phase = message.version != null && message.version.metadata != null
? message.version.metadata.get("phase")
: null;

if ("done".equals(phase)) {
// Content part is complete, render as final
} else {
// Content part may still be streaming, listen for live appends
}
}
page = page.hasNext() ? page.next() : null;
}
```
</Code>

<Aside data-type="important">
Operation metadata is last-write-wins. Each operation's `version.metadata` overwrites all previous values on the message rather than merging with them. The metadata visible on a message in history reflects only the most recent operation. This is what makes the empty-append-with-completion-marker pattern work: the final append sets `phase: 'done'`, and that is what persists on the message.
</Aside>

## Cancel a response <a id="cancellation"/>

Cancellation allows users to stop an in-progress response. The subscriber publishes a cancellation message on the channel, and the publisher stops generating and flushes any pending operations.

### How it works <a id="cancel-how"/>

1. The subscriber publishes a cancellation message on the channel with a response ID identifying the response to cancel.
2. The publisher receives the cancellation message, stops generating, and flushes any pending append operations.
3. The publisher optionally publishes a confirmation message to signal clean shutdown to other subscribers.

### Publish a cancellation request <a id="cancel-request"/>

The subscriber sends a cancellation message with a `responseId` in the message [extras](/docs/messages#properties) to identify which response to cancel:

<Code>
```javascript
const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}');

// Send cancellation request for a specific response
await channel.publish({
name: 'cancel',
data: '',
extras: {
headers: {
responseId: 'resp_abc123'
}
}
});
```
```python
channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}')

# Send cancellation request for a specific response
await channel.publish(Message(
name='cancel',
data='',
extras={
'headers': {
'responseId': 'resp_abc123'
}
}
))
```
```java
Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}");

// Send cancellation request for a specific response
JsonObject extras = new JsonObject();
JsonObject headers = new JsonObject();
headers.addProperty("responseId", "resp_abc123");
extras.add("headers", headers);

channel.publish(new Message("cancel", "", new MessageExtras(extras)));
```
</Code>

### Handle cancellation <a id="cancel-handle"/>

The publisher subscribes for cancellation messages and stops generation when one arrives. After stopping, flush any pending append operations before optionally publishing a confirmation message:

<Code>
```javascript
const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}');

// Track pending appends for flushing
const pendingAppends = [];

// Listen for cancellation requests
await channel.subscribe('cancel', async (message) => {
const responseId = message.extras?.headers?.responseId;

// Stop generation for the matching response
stopGeneration(responseId);

// Flush any pending appends before confirming
await Promise.all(pendingAppends);

// Optionally confirm cancellation to all subscribers
await channel.publish({
name: 'cancelled',
data: '',
extras: {
headers: {
responseId
}
}
});
});
```
```python
channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}')

# Track pending appends for flushing
pending_appends = []

# Listen for cancellation requests
async def on_cancel(message):
response_id = message.extras.get('headers', {}).get('responseId')

# Stop generation for the matching response
stop_generation(response_id)

# Flush any pending appends before confirming
await asyncio.gather(*pending_appends)

# Optionally confirm cancellation to all subscribers
await channel.publish(Message(
name='cancelled',
data='',
extras={
'headers': {
'responseId': response_id
}
}
))

await channel.subscribe('cancel', on_cancel)
```
```java
Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}");

// Listen for cancellation requests
channel.subscribe("cancel", message -> {
JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
String responseId = headers != null ? headers.get("responseId").getAsString() : null;

// Stop generation for the matching response
stopGeneration(responseId);

// Flush any pending appends before confirming
flushPendingAppends();

// Optionally confirm cancellation to all subscribers
JsonObject confirmExtras = new JsonObject();
JsonObject confirmHeaders = new JsonObject();
confirmHeaders.addProperty("responseId", responseId);
confirmExtras.add("headers", confirmHeaders);

channel.publish(new Message("cancelled", "", new MessageExtras(confirmExtras)));
});
```
</Code>

<Aside data-type="note">
Cancellation is fire-and-forget. No acknowledgment round-trip is needed between the subscriber and publisher. The optional confirmation message is a convenience for other subscribers to update their UI, not a required handshake.
</Aside>

<Aside data-type="further-reading">
- Learn about the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) and [message-per-token](/docs/ai-transport/token-streaming/message-per-token) token streaming patterns
- Use [online status](/docs/ai-transport/sessions-identity/online-status) to detect agent availability via presence
- Use [correlation metadata](/docs/ai-transport/token-streaming/message-per-response#publishing-with-metadata) to match responses to their requests
</Aside>
Loading