Module slack_sdk.web.async_chat_stream
+Classes
+-
+
+class AsyncChatStream +(client: AsyncWebClient,
*,
channel: str,
logger: logging.Logger,
thread_ts: str,
buffer_size: int,
recipient_team_id: str | None = None,
recipient_user_id: str | None = None,
**kwargs) +
+-
+++
+Expand source code +
+
+class AsyncChatStream: + """A helper class for streaming markdown text into a conversation using the chat streaming APIs. + + This class provides a convenient interface for the chat.startStream, chat.appendStream, and chat.stopStream API + methods, with automatic buffering and state management. + """ + + def __init__( + self, + client: "AsyncWebClient", + *, + channel: str, + logger: logging.Logger, + thread_ts: str, + buffer_size: int, + recipient_team_id: Optional[str] = None, + recipient_user_id: Optional[str] = None, + **kwargs, + ): + """Initialize a new ChatStream instance. + + The __init__ method creates a unique ChatStream instance that keeps track of one chat stream. + + Args: + client: The WebClient instance to use for API calls. + channel: An encoded ID that represents a channel, private group, or DM. + logger: A logging channel for outputs. + thread_ts: Provide another message's ts value to reply to. Streamed messages should always be replies to a user + request. + recipient_team_id: The encoded ID of the team the user receiving the streaming text belongs to. Required when + streaming to channels. + recipient_user_id: The encoded ID of the user to receive the streaming text. Required when streaming to channels. + buffer_size: The length of markdown_text to buffer in-memory before calling a method. Increasing this value + decreases the number of method calls made for the same amount of text, which is useful to avoid rate limits. + **kwargs: Additional arguments passed to the underlying API calls. + """ + self._client = client + self._logger = logger + self._token: Optional[str] = kwargs.pop("token", None) + self._stream_args = { + "channel": channel, + "thread_ts": thread_ts, + "recipient_team_id": recipient_team_id, + "recipient_user_id": recipient_user_id, + **kwargs, + } + self._buffer = "" + self._state = "starting" + self._stream_ts: Optional[str] = None + self._buffer_size = buffer_size + + async def append( + self, + *, + markdown_text: str, + **kwargs, + ) -> Optional[AsyncSlackResponse]: + """Append to the stream. + + The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream + is stopped this method cannot be called. + + Args: + markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is + what will be appended to the message received so far. + **kwargs: Additional arguments passed to the underlying API calls. + + Returns: + AsyncSlackResponse if the buffer was flushed, None if buffering. + + Raises: + SlackRequestError: If the stream is already completed. + + Example: + ```python + streamer = client.chat_stream( + channel="C0123456789", + thread_ts="1700000001.123456", + recipient_team_id="T0123456789", + recipient_user_id="U0123456789", + ) + streamer.append(markdown_text="**hello wo") + streamer.append(markdown_text="rld!**") + streamer.stop() + ``` + """ + if self._state == "completed": + raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}") + if kwargs.get("token"): + self._token = kwargs.pop("token") + self._buffer += markdown_text + if len(self._buffer) >= self._buffer_size: + return await self._flush_buffer(**kwargs) + details = { + "buffer_length": len(self._buffer), + "buffer_size": self._buffer_size, + "channel": self._stream_args.get("channel"), + "recipient_team_id": self._stream_args.get("recipient_team_id"), + "recipient_user_id": self._stream_args.get("recipient_user_id"), + "thread_ts": self._stream_args.get("thread_ts"), + } + self._logger.debug(f"ChatStream appended to buffer: {json.dumps(details)}") + return None + + async def stop( + self, + *, + markdown_text: Optional[str] = None, + blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None, + metadata: Optional[Union[Dict, Metadata]] = None, + **kwargs, + ) -> AsyncSlackResponse: + """Stop the stream and finalize the message. + + Args: + blocks: A list of blocks that will be rendered at the bottom of the finalized message. + markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is + what will be appended to the message received so far. + metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you + post to Slack is accessible to any app or user who is a member of that workspace. + **kwargs: Additional arguments passed to the underlying API calls. + + Returns: + AsyncSlackResponse from the chat.stopStream API call. + + Raises: + SlackRequestError: If the stream is already completed. + + Example: + ```python + streamer = client.chat_stream( + channel="C0123456789", + thread_ts="1700000001.123456", + recipient_team_id="T0123456789", + recipient_user_id="U0123456789", + ) + streamer.append(markdown_text="**hello wo") + streamer.append(markdown_text="rld!**") + streamer.stop() + ``` + """ + if self._state == "completed": + raise e.SlackRequestError(f"Cannot stop stream: stream state is {self._state}") + if kwargs.get("token"): + self._token = kwargs.pop("token") + if markdown_text: + self._buffer += markdown_text + if not self._stream_ts: + response = await self._client.chat_startStream( + **self._stream_args, + token=self._token, + ) + if not response.get("ts"): + raise e.SlackRequestError("Failed to stop stream: stream not started") + self._stream_ts = str(response["ts"]) + self._state = "in_progress" + response = await self._client.chat_stopStream( + token=self._token, + channel=self._stream_args["channel"], + ts=self._stream_ts, + blocks=blocks, + markdown_text=self._buffer, + metadata=metadata, + **kwargs, + ) + self._state = "completed" + return response + + async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse: + """Flush the internal buffer by making appropriate API calls.""" + if not self._stream_ts: + response = await self._client.chat_startStream( + **self._stream_args, + token=self._token, + **kwargs, + markdown_text=self._buffer, + ) + self._stream_ts = response.get("ts") + self._state = "in_progress" + else: + response = await self._client.chat_appendStream( + token=self._token, + channel=self._stream_args["channel"], + ts=self._stream_ts, + **kwargs, + markdown_text=self._buffer, + ) + self._buffer = "" + return response+A helper class for streaming markdown text into a conversation using the chat streaming APIs.
+This class provides a convenient interface for the chat.startStream, chat.appendStream, and chat.stopStream API +methods, with automatic buffering and state management.
+Initialize a new ChatStream instance.
+The init method creates a unique ChatStream instance that keeps track of one chat stream.
+Args
+-
+
client
+- The WebClient instance to use for API calls. +
channel
+- An encoded ID that represents a channel, private group, or DM. +
logger
+- A logging channel for outputs. +
thread_ts
+- Provide another message's ts value to reply to. Streamed messages should always be replies to a user +request. +
recipient_team_id
+- The encoded ID of the team the user receiving the streaming text belongs to. Required when +streaming to channels. +
recipient_user_id
+- The encoded ID of the user to receive the streaming text. Required when streaming to channels. +
buffer_size
+- The length of markdown_text to buffer in-memory before calling a method. Increasing this value +decreases the number of method calls made for the same amount of text, which is useful to avoid rate limits. +
**kwargs
+- Additional arguments passed to the underlying API calls. +
Methods
+-
+
+async def append(self, *, markdown_text: str, **kwargs) ‑> AsyncSlackResponse | None +
+-
+++
+Expand source code +
+
+async def append( + self, + *, + markdown_text: str, + **kwargs, +) -> Optional[AsyncSlackResponse]: + """Append to the stream. + + The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream + is stopped this method cannot be called. + + Args: + markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is + what will be appended to the message received so far. + **kwargs: Additional arguments passed to the underlying API calls. + + Returns: + AsyncSlackResponse if the buffer was flushed, None if buffering. + + Raises: + SlackRequestError: If the stream is already completed. + + Example: + ```python + streamer = client.chat_stream( + channel="C0123456789", + thread_ts="1700000001.123456", + recipient_team_id="T0123456789", + recipient_user_id="U0123456789", + ) + streamer.append(markdown_text="**hello wo") + streamer.append(markdown_text="rld!**") + streamer.stop() + ``` + """ + if self._state == "completed": + raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}") + if kwargs.get("token"): + self._token = kwargs.pop("token") + self._buffer += markdown_text + if len(self._buffer) >= self._buffer_size: + return await self._flush_buffer(**kwargs) + details = { + "buffer_length": len(self._buffer), + "buffer_size": self._buffer_size, + "channel": self._stream_args.get("channel"), + "recipient_team_id": self._stream_args.get("recipient_team_id"), + "recipient_user_id": self._stream_args.get("recipient_user_id"), + "thread_ts": self._stream_args.get("thread_ts"), + } + self._logger.debug(f"ChatStream appended to buffer: {json.dumps(details)}") + return None+Append to the stream.
+The "append" method appends to the chat stream being used. This method can be called multiple times. After the stream +is stopped this method cannot be called.
+Args
+-
+
markdown_text
+- Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is +what will be appended to the message received so far. +
**kwargs
+- Additional arguments passed to the underlying API calls. +
Returns
+AsyncSlackResponse if the buffer was flushed, None if buffering.
+Raises
+-
+
SlackRequestError
+- If the stream is already completed. +
Example
+streamer = client.chat_stream( + channel="C0123456789", + thread_ts="1700000001.123456", + recipient_team_id="T0123456789", + recipient_user_id="U0123456789", +) +streamer.append(markdown_text="**hello wo") +streamer.append(markdown_text="rld!**") +streamer.stop() +
+ +async def stop(self,
*,
markdown_text: str | None = None,
blocks: str | Sequence[Dict | Block] | None = None,
metadata: Dict | Metadata | None = None,
**kwargs) ‑> AsyncSlackResponse +
+-
+++
+Expand source code +
+
+async def stop( + self, + *, + markdown_text: Optional[str] = None, + blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None, + metadata: Optional[Union[Dict, Metadata]] = None, + **kwargs, +) -> AsyncSlackResponse: + """Stop the stream and finalize the message. + + Args: + blocks: A list of blocks that will be rendered at the bottom of the finalized message. + markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is + what will be appended to the message received so far. + metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you + post to Slack is accessible to any app or user who is a member of that workspace. + **kwargs: Additional arguments passed to the underlying API calls. + + Returns: + AsyncSlackResponse from the chat.stopStream API call. + + Raises: + SlackRequestError: If the stream is already completed. + + Example: + ```python + streamer = client.chat_stream( + channel="C0123456789", + thread_ts="1700000001.123456", + recipient_team_id="T0123456789", + recipient_user_id="U0123456789", + ) + streamer.append(markdown_text="**hello wo") + streamer.append(markdown_text="rld!**") + streamer.stop() + ``` + """ + if self._state == "completed": + raise e.SlackRequestError(f"Cannot stop stream: stream state is {self._state}") + if kwargs.get("token"): + self._token = kwargs.pop("token") + if markdown_text: + self._buffer += markdown_text + if not self._stream_ts: + response = await self._client.chat_startStream( + **self._stream_args, + token=self._token, + ) + if not response.get("ts"): + raise e.SlackRequestError("Failed to stop stream: stream not started") + self._stream_ts = str(response["ts"]) + self._state = "in_progress" + response = await self._client.chat_stopStream( + token=self._token, + channel=self._stream_args["channel"], + ts=self._stream_ts, + blocks=blocks, + markdown_text=self._buffer, + metadata=metadata, + **kwargs, + ) + self._state = "completed" + return response+Stop the stream and finalize the message.
+Args
+-
+
blocks
+- A list of blocks that will be rendered at the bottom of the finalized message. +
markdown_text
+- Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is +what will be appended to the message received so far. +
metadata
+- JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you +post to Slack is accessible to any app or user who is a member of that workspace. +
**kwargs
+- Additional arguments passed to the underlying API calls. +
Returns
+AsyncSlackResponse from the chat.stopStream API call.
+Raises
+-
+
SlackRequestError
+- If the stream is already completed. +
Example
+streamer = client.chat_stream( + channel="C0123456789", + thread_ts="1700000001.123456", + recipient_team_id="T0123456789", + recipient_user_id="U0123456789", +) +streamer.append(markdown_text="**hello wo") +streamer.append(markdown_text="rld!**") +streamer.stop() +
+
+