-
Notifications
You must be signed in to change notification settings - Fork 46
Add completion and cancellation docs for AI Transport #3204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
387 changes: 387 additions & 0 deletions
387
src/pages/docs/ai-transport/messaging/completion-and-cancellation.mdx
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,387 @@ | ||
| --- | ||
| title: "Completion and cancellation" | ||
| meta_description: "Signal when AI responses are complete and support user-initiated cancellation of in-progress responses." | ||
| meta_keywords: "completion signalling, cancellation, abort, streaming lifecycle, metadata, AI transport, realtime" | ||
| --- | ||
|
|
||
| AI responses streamed using the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) or [message-per-token](/docs/ai-transport/token-streaming/message-per-token) pattern do not require explicit completion signals to function. Subscribers receive tokens as they arrive and can render them progressively. However, some applications benefit from explicitly signalling when a response is complete, or allowing users to cancel an in-progress response. | ||
|
|
||
| ## Benefits of completion and cancellation signals <a id="benefits"/> | ||
|
|
||
| Explicit completion and cancellation signals are useful when your application needs to: | ||
|
|
||
| - Detect whether a response is still in progress after reconnection, so clients can distinguish between a completed response and one that is still streaming | ||
| - Finalize UI state when a response ends, such as removing typing indicators or enabling input controls | ||
| - Allow users to abort a response mid-stream, stopping generation and saving compute resources | ||
| - Coordinate multiple content parts within a single response, where downstream logic depends on knowing when each part is finished | ||
|
|
||
| ## Signal completion <a id="completion"/> | ||
|
|
||
| Use [operation metadata](/docs/messages/updates-deletes#append-operation-metadata) to signal that a content part or response is complete. Operation metadata is a set of key-value pairs carried on each append or update operation. Subscribers can inspect this metadata to determine the current phase of a message. | ||
|
|
||
| ### Content-part completion <a id="content-part-completion"/> | ||
|
|
||
| When streaming content using the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) pattern, signal that a content part is complete by appending an empty string with a metadata marker. The empty append does not change the message's data, but the metadata signals to subscribers that no more content follows for this message. | ||
|
|
||
| This keeps the entire content lifecycle (create, stream, done) within a single Ably message: | ||
|
|
||
| <Code> | ||
| ```javascript | ||
| const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); | ||
|
|
||
| // Publish initial message | ||
| const { serials: [msgSerial] } = await channel.publish({ name: 'response', data: '' }); | ||
|
|
||
| // Stream tokens | ||
| for await (const event of stream) { | ||
| if (event.type === 'token') { | ||
| channel.appendMessage({ | ||
| serial: msgSerial, | ||
| data: event.text | ||
| }, { | ||
| metadata: { phase: 'streaming' } | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| // Signal content-part completion with an empty append | ||
| channel.appendMessage({ | ||
| serial: msgSerial, | ||
| data: '' | ||
| }, { | ||
| metadata: { phase: 'done' } | ||
| }); | ||
| ``` | ||
| ```python | ||
| channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}') | ||
|
|
||
| # Publish initial message | ||
| message = Message(name='response', data='') | ||
| result = await channel.publish(message) | ||
| msg_serial = result.serials[0] | ||
|
|
||
| # Stream tokens | ||
| async for event in stream: | ||
| if event['type'] == 'token': | ||
| channel.append_message( | ||
| serial=msg_serial, | ||
| data=event['text'], | ||
| metadata={'phase': 'streaming'} | ||
| ) | ||
|
|
||
| # Signal content-part completion with an empty append | ||
| channel.append_message( | ||
| serial=msg_serial, | ||
| data='', | ||
| metadata={'phase': 'done'} | ||
| ) | ||
| ``` | ||
| ```java | ||
| Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}"); | ||
|
|
||
| // Publish initial message | ||
| CompletableFuture<PublishResult> publishFuture = new CompletableFuture<>(); | ||
| channel.publish("response", "", new Callback<PublishResult>() { | ||
| @Override | ||
| public void onSuccess(PublishResult result) { | ||
| publishFuture.complete(result); | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(ErrorInfo reason) { | ||
| publishFuture.completeExceptionally(AblyException.fromErrorInfo(reason)); | ||
| } | ||
| }); | ||
| String msgSerial = publishFuture.get().serials[0]; | ||
|
|
||
| // Stream tokens | ||
| for (Event event : stream) { | ||
| if (event.getType().equals("token")) { | ||
| MessageMetadata metadata = new MessageMetadata(); | ||
| metadata.put("phase", "streaming"); | ||
| channel.appendMessage(msgSerial, event.getText(), metadata); | ||
| } | ||
| } | ||
|
|
||
| // Signal content-part completion with an empty append | ||
| MessageMetadata metadata = new MessageMetadata(); | ||
| metadata.put("phase", "done"); | ||
| channel.appendMessage(msgSerial, "", metadata); | ||
| ``` | ||
| </Code> | ||
|
|
||
| <Aside data-type="note"> | ||
| The completion append does not need to be awaited. Appends to the same message serial are delivered in order, so the completion marker is guaranteed to arrive after all previous tokens for that content part. | ||
| </Aside> | ||
|
|
||
| ### Response-level completion <a id="response-completion"/> | ||
|
|
||
| A single AI response may span multiple content parts, each represented as a separate Ably message with its own stream of appends. To signal that the entire response is complete, publish a discrete message after all content parts are finished. Subscribers can use this as a cue to finalize the response in the UI. | ||
|
|
||
| <Code> | ||
| ```javascript | ||
| // After all content parts are done, signal response-level completion | ||
| await channel.publish({ | ||
| name: 'response-end', | ||
| data: '', | ||
| extras: { | ||
| headers: { | ||
| responseId: 'resp_abc123' | ||
| } | ||
| } | ||
| }); | ||
| ``` | ||
| ```python | ||
| # After all content parts are done, signal response-level completion | ||
| await channel.publish(Message( | ||
| name='response-end', | ||
| data='', | ||
| extras={ | ||
| 'headers': { | ||
| 'responseId': 'resp_abc123' | ||
| } | ||
| } | ||
| )) | ||
| ``` | ||
| ```java | ||
| // After all content parts are done, signal response-level completion | ||
| JsonObject extras = new JsonObject(); | ||
| JsonObject headers = new JsonObject(); | ||
| headers.addProperty("responseId", "resp_abc123"); | ||
| extras.add("headers", headers); | ||
|
|
||
| channel.publish(new Message("response-end", "", new MessageExtras(extras))); | ||
| ``` | ||
| </Code> | ||
|
|
||
| <Aside data-type="note"> | ||
| Await all pending append operations before publishing the response-level completion message. Appends to the same message serial are always delivered in order regardless of whether you await the acknowledgment, but create and append operations issued without awaiting the acknowledgment can race to be accepted by Ably. Without awaiting, the completion message could arrive before the last content token. | ||
| </Aside> | ||
|
|
||
| ### Detect completion from history <a id="detect-completion"/> | ||
|
|
||
| When [hydrating client state](/docs/ai-transport/token-streaming/message-per-response#hydration) from history, inspect `version.metadata` on each message to determine whether a content part was fully completed or is still in progress. If the most recent operation's metadata carries your completion marker, the content part is done. If it carries a streaming marker or no marker, the stream may still be active. | ||
|
|
||
| <Code> | ||
| ```javascript | ||
| const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); | ||
|
|
||
| await channel.subscribe((message) => { | ||
| // ...handle message actions as normal... | ||
| }); | ||
|
|
||
| let page = await channel.history({ untilAttach: true }); | ||
|
|
||
| while (page) { | ||
| for (const message of page.items) { | ||
| const phase = message.version?.metadata?.phase; | ||
|
|
||
| if (phase === 'done') { | ||
| // Content part is complete, render as final | ||
| } else { | ||
| // Content part may still be streaming, listen for live appends | ||
| } | ||
| } | ||
| page = page.hasNext() ? await page.next() : null; | ||
| } | ||
| ``` | ||
| ```python | ||
| channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}') | ||
|
|
||
| await channel.subscribe(on_message) | ||
|
|
||
| page = await channel.history(until_attach=True) | ||
|
|
||
| while page: | ||
| for message in page.items: | ||
| phase = getattr(message.version, 'metadata', {}).get('phase') | ||
|
|
||
| if phase == 'done': | ||
| # Content part is complete, render as final | ||
| pass | ||
| else: | ||
| # Content part may still be streaming, listen for live appends | ||
| pass | ||
|
|
||
| page = await page.next() if page.has_next() else None | ||
| ``` | ||
| ```java | ||
| Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}"); | ||
|
|
||
| channel.subscribe(message -> { /* handle message actions as normal */ }); | ||
|
|
||
| PaginatedResult<Message> page = channel.history(new Param("untilAttach", "true")); | ||
|
|
||
| while (page != null) { | ||
| for (Message message : page.items()) { | ||
| String phase = message.version != null && message.version.metadata != null | ||
| ? message.version.metadata.get("phase") | ||
| : null; | ||
|
|
||
| if ("done".equals(phase)) { | ||
| // Content part is complete, render as final | ||
| } else { | ||
| // Content part may still be streaming, listen for live appends | ||
| } | ||
| } | ||
| page = page.hasNext() ? page.next() : null; | ||
| } | ||
| ``` | ||
| </Code> | ||
|
|
||
| <Aside data-type="important"> | ||
| Operation metadata is last-write-wins. Each operation's `version.metadata` overwrites all previous values on the message rather than merging with them. The metadata visible on a message in history reflects only the most recent operation. This is what makes the empty-append-with-completion-marker pattern work: the final append sets `phase: 'done'`, and that is what persists on the message. | ||
| </Aside> | ||
|
|
||
| ## Cancel a response <a id="cancellation"/> | ||
|
|
||
| Cancellation allows users to stop an in-progress response. The subscriber publishes a cancellation message on the channel, and the publisher stops generating and flushes any pending operations. | ||
|
|
||
| ### How it works <a id="cancel-how"/> | ||
|
|
||
| 1. The subscriber publishes a cancellation message on the channel with a response ID identifying the response to cancel. | ||
| 2. The publisher receives the cancellation message, stops generating, and flushes any pending append operations. | ||
| 3. The publisher optionally publishes a confirmation message to signal clean shutdown to other subscribers. | ||
|
|
||
| ### Publish a cancellation request <a id="cancel-request"/> | ||
|
|
||
| The subscriber sends a cancellation message with a `responseId` in the message [extras](/docs/messages#properties) to identify which response to cancel: | ||
|
|
||
| <Code> | ||
| ```javascript | ||
| const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); | ||
|
|
||
| // Send cancellation request for a specific response | ||
| await channel.publish({ | ||
| name: 'cancel', | ||
| data: '', | ||
| extras: { | ||
| headers: { | ||
| responseId: 'resp_abc123' | ||
| } | ||
| } | ||
| }); | ||
| ``` | ||
| ```python | ||
| channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}') | ||
|
|
||
| # Send cancellation request for a specific response | ||
| await channel.publish(Message( | ||
| name='cancel', | ||
| data='', | ||
| extras={ | ||
| 'headers': { | ||
| 'responseId': 'resp_abc123' | ||
| } | ||
| } | ||
| )) | ||
| ``` | ||
| ```java | ||
| Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}"); | ||
|
|
||
| // Send cancellation request for a specific response | ||
| JsonObject extras = new JsonObject(); | ||
| JsonObject headers = new JsonObject(); | ||
| headers.addProperty("responseId", "resp_abc123"); | ||
| extras.add("headers", headers); | ||
|
|
||
| channel.publish(new Message("cancel", "", new MessageExtras(extras))); | ||
| ``` | ||
| </Code> | ||
|
|
||
| ### Handle cancellation <a id="cancel-handle"/> | ||
|
|
||
| The publisher subscribes for cancellation messages and stops generation when one arrives. After stopping, flush any pending append operations before optionally publishing a confirmation message: | ||
|
|
||
| <Code> | ||
| ```javascript | ||
| const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); | ||
|
|
||
| // Track pending appends for flushing | ||
| const pendingAppends = []; | ||
|
|
||
| // Listen for cancellation requests | ||
| await channel.subscribe('cancel', async (message) => { | ||
| const responseId = message.extras?.headers?.responseId; | ||
|
|
||
| // Stop generation for the matching response | ||
| stopGeneration(responseId); | ||
|
|
||
| // Flush any pending appends before confirming | ||
| await Promise.all(pendingAppends); | ||
|
|
||
| // Optionally confirm cancellation to all subscribers | ||
| await channel.publish({ | ||
| name: 'cancelled', | ||
| data: '', | ||
| extras: { | ||
| headers: { | ||
| responseId | ||
| } | ||
| } | ||
| }); | ||
| }); | ||
| ``` | ||
| ```python | ||
| channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}') | ||
|
|
||
| # Track pending appends for flushing | ||
| pending_appends = [] | ||
|
|
||
| # Listen for cancellation requests | ||
| async def on_cancel(message): | ||
| response_id = message.extras.get('headers', {}).get('responseId') | ||
|
|
||
| # Stop generation for the matching response | ||
| stop_generation(response_id) | ||
|
|
||
| # Flush any pending appends before confirming | ||
| await asyncio.gather(*pending_appends) | ||
|
|
||
| # Optionally confirm cancellation to all subscribers | ||
| await channel.publish(Message( | ||
| name='cancelled', | ||
| data='', | ||
| extras={ | ||
| 'headers': { | ||
| 'responseId': response_id | ||
| } | ||
| } | ||
| )) | ||
|
|
||
| await channel.subscribe('cancel', on_cancel) | ||
| ``` | ||
| ```java | ||
| Channel channel = realtime.channels.get("ai:{{RANDOM_CHANNEL_NAME}}"); | ||
|
|
||
| // Listen for cancellation requests | ||
| channel.subscribe("cancel", message -> { | ||
| JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject(); | ||
| String responseId = headers != null ? headers.get("responseId").getAsString() : null; | ||
|
|
||
| // Stop generation for the matching response | ||
| stopGeneration(responseId); | ||
|
|
||
| // Flush any pending appends before confirming | ||
| flushPendingAppends(); | ||
|
|
||
| // Optionally confirm cancellation to all subscribers | ||
| JsonObject confirmExtras = new JsonObject(); | ||
| JsonObject confirmHeaders = new JsonObject(); | ||
| confirmHeaders.addProperty("responseId", responseId); | ||
| confirmExtras.add("headers", confirmHeaders); | ||
|
|
||
| channel.publish(new Message("cancelled", "", new MessageExtras(confirmExtras))); | ||
| }); | ||
| ``` | ||
| </Code> | ||
|
|
||
| <Aside data-type="note"> | ||
| Cancellation is fire-and-forget. No acknowledgment round-trip is needed between the subscriber and publisher. The optional confirmation message is a convenience for other subscribers to update their UI, not a required handshake. | ||
| </Aside> | ||
|
|
||
| <Aside data-type="further-reading"> | ||
| - Learn about the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) and [message-per-token](/docs/ai-transport/token-streaming/message-per-token) token streaming patterns | ||
| - Use [online status](/docs/ai-transport/sessions-identity/online-status) to detect agent availability via presence | ||
| - Use [correlation metadata](/docs/ai-transport/token-streaming/message-per-response#publishing-with-metadata) to match responses to their requests | ||
| </Aside> | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.