diff --git a/src/data/nav/aitransport.ts b/src/data/nav/aitransport.ts
index 9e1567392f..bf68251537 100644
--- a/src/data/nav/aitransport.ts
+++ b/src/data/nav/aitransport.ts
@@ -79,6 +79,10 @@ export default {
name: 'Citations',
link: '/docs/ai-transport/messaging/citations',
},
+ {
+ name: 'Completion and cancellation',
+ link: '/docs/ai-transport/messaging/completion-and-cancellation',
+ },
],
},
{
diff --git a/src/pages/docs/ai-transport/messaging/completion-and-cancellation.mdx b/src/pages/docs/ai-transport/messaging/completion-and-cancellation.mdx
new file mode 100644
index 0000000000..0e5228f148
--- /dev/null
+++ b/src/pages/docs/ai-transport/messaging/completion-and-cancellation.mdx
@@ -0,0 +1,387 @@
+---
+title: "Completion and cancellation"
+meta_description: "Signal when AI responses are complete and support user-initiated cancellation of in-progress responses."
+meta_keywords: "completion signalling, cancellation, abort, streaming lifecycle, metadata, AI transport, realtime"
+---
+
+AI responses streamed using the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) or [message-per-token](/docs/ai-transport/token-streaming/message-per-token) pattern do not require explicit completion signals to function. Subscribers receive tokens as they arrive and can render them progressively. However, some applications benefit from explicitly signalling when a response is complete, or allowing users to cancel an in-progress response.
+
+## Benefits of completion and cancellation signals
+
+Explicit completion and cancellation signals are useful when your application needs to:
+
+- Detect whether a response is still in progress after reconnection, so clients can distinguish between a completed response and one that is still streaming
+- Finalize UI state when a response ends, such as removing typing indicators or enabling input controls
+- Allow users to abort a response mid-stream, stopping generation and saving compute resources
+- Coordinate multiple content parts within a single response, where downstream logic depends on knowing when each part is finished
+
+## Signal completion
+
+Use [operation metadata](/docs/messages/updates-deletes#append-operation-metadata) to signal that a content part or response is complete. Operation metadata is a set of key-value pairs carried on each append or update operation. Subscribers can inspect this metadata to determine the current phase of a message.
+
+### Content-part completion
+
+When streaming content using the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) pattern, signal that a content part is complete by appending an empty string with a metadata marker. The empty append does not change the message's data, but the metadata signals to subscribers that no more content follows for this message.
+
+This keeps the entire content lifecycle (create, stream, done) within a single Ably message:
+
+
+```javascript
+const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}');
+
+// Publish initial message
+const { serials: [msgSerial] } = await channel.publish({ name: 'response', data: '' });
+
+// Stream tokens
+for await (const event of stream) {
+ if (event.type === 'token') {
+ channel.appendMessage({
+ serial: msgSerial,
+ data: event.text
+ }, {
+ metadata: { phase: 'streaming' }
+ });
+ }
+}
+
+// Signal content-part completion with an empty append
+channel.appendMessage({
+ serial: msgSerial,
+ data: ''
+}, {
+ metadata: { phase: 'done' }
+});
+```
+```python
+channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}')
+
+# Publish initial message
+message = Message(name='response', data='')
+result = await channel.publish(message)
+msg_serial = result.serials[0]
+
+# Stream tokens
+async for event in stream:
+ if event['type'] == 'token':
+ channel.append_message(
+ serial=msg_serial,
+ data=event['text'],
+ metadata={'phase': 'streaming'}
+ )
+
+# Signal content-part completion with an empty append
+channel.append_message(
+ serial=msg_serial,
+ data='',
+ metadata={'phase': 'done'}
+)
+```
+```java
+Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}");
+
+// Publish initial message
+CompletableFuture publishFuture = new CompletableFuture<>();
+channel.publish("response", "", new Callback() {
+ @Override
+ public void onSuccess(PublishResult result) {
+ publishFuture.complete(result);
+ }
+
+ @Override
+ public void onError(ErrorInfo reason) {
+ publishFuture.completeExceptionally(AblyException.fromErrorInfo(reason));
+ }
+});
+String msgSerial = publishFuture.get().serials[0];
+
+// Stream tokens
+for (Event event : stream) {
+ if (event.getType().equals("token")) {
+ MessageMetadata metadata = new MessageMetadata();
+ metadata.put("phase", "streaming");
+ channel.appendMessage(msgSerial, event.getText(), metadata);
+ }
+}
+
+// Signal content-part completion with an empty append
+MessageMetadata metadata = new MessageMetadata();
+metadata.put("phase", "done");
+channel.appendMessage(msgSerial, "", metadata);
+```
+
+
+
+
+### Response-level completion
+
+A single AI response may span multiple content parts, each represented as a separate Ably message with its own stream of appends. To signal that the entire response is complete, publish a discrete message after all content parts are finished. Subscribers can use this as a cue to finalize the response in the UI.
+
+
+```javascript
+// After all content parts are done, signal response-level completion
+await channel.publish({
+ name: 'response-end',
+ data: '',
+ extras: {
+ headers: {
+ responseId: 'resp_abc123'
+ }
+ }
+});
+```
+```python
+# After all content parts are done, signal response-level completion
+await channel.publish(Message(
+ name='response-end',
+ data='',
+ extras={
+ 'headers': {
+ 'responseId': 'resp_abc123'
+ }
+ }
+))
+```
+```java
+// After all content parts are done, signal response-level completion
+JsonObject extras = new JsonObject();
+JsonObject headers = new JsonObject();
+headers.addProperty("responseId", "resp_abc123");
+extras.add("headers", headers);
+
+channel.publish(new Message("response-end", "", new MessageExtras(extras)));
+```
+
+
+
+
+### Detect completion from history
+
+When [hydrating client state](/docs/ai-transport/token-streaming/message-per-response#hydration) from history, inspect `version.metadata` on each message to determine whether a content part was fully completed or is still in progress. If the most recent operation's metadata carries your completion marker, the content part is done. If it carries a streaming marker or no marker, the stream may still be active.
+
+
+```javascript
+const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}');
+
+await channel.subscribe((message) => {
+ // ...handle message actions as normal...
+});
+
+let page = await channel.history({ untilAttach: true });
+
+while (page) {
+ for (const message of page.items) {
+ const phase = message.version?.metadata?.phase;
+
+ if (phase === 'done') {
+ // Content part is complete, render as final
+ } else {
+ // Content part may still be streaming, listen for live appends
+ }
+ }
+ page = page.hasNext() ? await page.next() : null;
+}
+```
+```python
+channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}')
+
+await channel.subscribe(on_message)
+
+page = await channel.history(until_attach=True)
+
+while page:
+ for message in page.items:
+ phase = getattr(message.version, 'metadata', {}).get('phase')
+
+ if phase == 'done':
+ # Content part is complete, render as final
+ pass
+ else:
+ # Content part may still be streaming, listen for live appends
+ pass
+
+ page = await page.next() if page.has_next() else None
+```
+```java
+Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}");
+
+channel.subscribe(message -> { /* handle message actions as normal */ });
+
+PaginatedResult page = channel.history(new Param("untilAttach", "true"));
+
+while (page != null) {
+ for (Message message : page.items()) {
+ String phase = message.version != null && message.version.metadata != null
+ ? message.version.metadata.get("phase")
+ : null;
+
+ if ("done".equals(phase)) {
+ // Content part is complete, render as final
+ } else {
+ // Content part may still be streaming, listen for live appends
+ }
+ }
+ page = page.hasNext() ? page.next() : null;
+}
+```
+
+
+
+
+## Cancel a response
+
+Cancellation allows users to stop an in-progress response. The subscriber publishes a cancellation message on the channel, and the publisher stops generating and flushes any pending operations.
+
+### How it works
+
+1. The subscriber publishes a cancellation message on the channel with a response ID identifying the response to cancel.
+2. The publisher receives the cancellation message, stops generating, and flushes any pending append operations.
+3. The publisher optionally publishes a confirmation message to signal clean shutdown to other subscribers.
+
+### Publish a cancellation request
+
+The subscriber sends a cancellation message with a `responseId` in the message [extras](/docs/messages#properties) to identify which response to cancel:
+
+
+```javascript
+const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}');
+
+// Send cancellation request for a specific response
+await channel.publish({
+ name: 'cancel',
+ data: '',
+ extras: {
+ headers: {
+ responseId: 'resp_abc123'
+ }
+ }
+});
+```
+```python
+channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}')
+
+# Send cancellation request for a specific response
+await channel.publish(Message(
+ name='cancel',
+ data='',
+ extras={
+ 'headers': {
+ 'responseId': 'resp_abc123'
+ }
+ }
+))
+```
+```java
+Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}");
+
+// Send cancellation request for a specific response
+JsonObject extras = new JsonObject();
+JsonObject headers = new JsonObject();
+headers.addProperty("responseId", "resp_abc123");
+extras.add("headers", headers);
+
+channel.publish(new Message("cancel", "", new MessageExtras(extras)));
+```
+
+
+### Handle cancellation
+
+The publisher subscribes for cancellation messages and stops generation when one arrives. After stopping, flush any pending append operations before optionally publishing a confirmation message:
+
+
+```javascript
+const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}');
+
+// Track pending appends for flushing
+const pendingAppends = [];
+
+// Listen for cancellation requests
+await channel.subscribe('cancel', async (message) => {
+ const responseId = message.extras?.headers?.responseId;
+
+ // Stop generation for the matching response
+ stopGeneration(responseId);
+
+ // Flush any pending appends before confirming
+ await Promise.all(pendingAppends);
+
+ // Optionally confirm cancellation to all subscribers
+ await channel.publish({
+ name: 'cancelled',
+ data: '',
+ extras: {
+ headers: {
+ responseId
+ }
+ }
+ });
+});
+```
+```python
+channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}')
+
+# Track pending appends for flushing
+pending_appends = []
+
+# Listen for cancellation requests
+async def on_cancel(message):
+ response_id = message.extras.get('headers', {}).get('responseId')
+
+ # Stop generation for the matching response
+ stop_generation(response_id)
+
+ # Flush any pending appends before confirming
+ await asyncio.gather(*pending_appends)
+
+ # Optionally confirm cancellation to all subscribers
+ await channel.publish(Message(
+ name='cancelled',
+ data='',
+ extras={
+ 'headers': {
+ 'responseId': response_id
+ }
+ }
+ ))
+
+await channel.subscribe('cancel', on_cancel)
+```
+```java
+Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}");
+
+// Listen for cancellation requests
+channel.subscribe("cancel", message -> {
+ JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
+ String responseId = headers != null ? headers.get("responseId").getAsString() : null;
+
+ // Stop generation for the matching response
+ stopGeneration(responseId);
+
+ // Flush any pending appends before confirming
+ flushPendingAppends();
+
+ // Optionally confirm cancellation to all subscribers
+ JsonObject confirmExtras = new JsonObject();
+ JsonObject confirmHeaders = new JsonObject();
+ confirmHeaders.addProperty("responseId", responseId);
+ confirmExtras.add("headers", confirmHeaders);
+
+ channel.publish(new Message("cancelled", "", new MessageExtras(confirmExtras)));
+});
+```
+
+
+
+
+
diff --git a/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx b/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx
index 441c3c9337..0d6c8831d9 100644
--- a/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx
+++ b/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx
@@ -41,6 +41,19 @@ To enable the channel rule:
The examples on this page use the `ai:` namespace prefix, which assumes you have configured the rule for `ai`.
+Your token or API key needs the following [capabilities](/docs/auth/capabilities) on the channel:
+
+| Capability | Purpose |
+| --- | --- |
+| `subscribe` | Receive messages |
+| `history` | Retrieve historical messages for [client hydration](#hydration) |
+| `publish` | Create new messages |
+| `message-update-own` | Append to your own messages |
+
+
+
## Publishing tokens
Publish tokens from a [Realtime](/docs/api/realtime-sdk) client, which maintains a persistent connection to the Ably service. This allows you to publish at very high message rates with the lowest possible latencies, while preserving guarantees around message delivery order. For more information, see [Realtime and REST](/docs/basics#realtime-and-rest).
@@ -116,6 +129,10 @@ for (Event event : stream) {
When publishing tokens, don't await the `channel.appendMessage()` call. Ably rolls up acknowledgments and debounces them for efficiency, which means awaiting each append would unnecessarily slow down your token stream. Messages are still published in the order that `appendMessage()` is called, so delivery order is not affected.
+
+
```javascript
// ✅ Do this - append without await for maximum throughput
@@ -173,7 +190,7 @@ for (Event event : stream) {