generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 80
feat(memory): implement true message batching for flush_messages() #256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
jariy17
wants to merge
4
commits into
feat/event-metadata-state-identification
Choose a base branch
from
feat/memory-session-manager-optimization
base: feat/event-metadata-state-identification
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
a331c97
feat(memory): add batch_size configuration for message buffering
jariy17 4dd193d
feat(memory): implement true message batching in flush_messages()
jariy17 964a59e
refactor: rename flush_messages to _flush_messages (private method)
jariy17 00cb10d
refactor(tests): deduplicate batching test fixtures
jariy17 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -119,6 +119,10 @@ def __init__( | |
| session = boto_session or boto3.Session(region_name=region_name) | ||
| self.has_existing_agent = False | ||
|
|
||
| # Batching support - stores pre-processed messages: (session_id, messages, is_blob, timestamp) | ||
| self._message_buffer: list[tuple[str, list[tuple[str, str]], bool, datetime]] = [] | ||
| self._buffer_lock = threading.Lock() | ||
|
|
||
| # Add strands-agents to the request user agent | ||
| if boto_client_config: | ||
| existing_user_agent = getattr(boto_client_config, "user_agent_extra", None) | ||
|
|
@@ -380,6 +384,9 @@ def create_message( | |
| ) -> Optional[dict[str, Any]]: | ||
| """Create a new message in AgentCore Memory. | ||
|
|
||
| If batch_size > 1, the message is buffered and sent when the buffer reaches batch_size. | ||
| Use _flush_messages() or close() to send any remaining buffered messages. | ||
|
|
||
| Args: | ||
| session_id (str): The session ID to create the message in. | ||
| agent_id (str): The agent ID associated with the message (only here for the interface. | ||
|
|
@@ -389,6 +396,7 @@ def create_message( | |
|
|
||
| Returns: | ||
| Optional[dict[str, Any]]: The created event data from AgentCore Memory. | ||
| Returns empty dict if message is buffered (batch_size > 1). | ||
|
|
||
| Raises: | ||
| SessionException: If session ID doesn't match configuration or message creation fails. | ||
|
|
@@ -409,16 +417,33 @@ def create_message( | |
| if session_id != self.config.session_id: | ||
| raise SessionException(f"Session ID mismatch: expected {self.config.session_id}, got {session_id}") | ||
|
|
||
| try: | ||
| messages = AgentCoreMemoryConverter.message_to_payload(session_message) | ||
| if not messages: | ||
| return | ||
| # Convert and check size ONCE (not again at flush) | ||
| messages = AgentCoreMemoryConverter.message_to_payload(session_message) | ||
| if not messages: | ||
| return None | ||
|
|
||
| is_blob = AgentCoreMemoryConverter.exceeds_conversational_limit(messages[0]) | ||
|
|
||
| # Parse the original timestamp and use it as desired timestamp | ||
| original_timestamp = datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00")) | ||
| monotonic_timestamp = self._get_monotonic_timestamp(original_timestamp) | ||
|
|
||
| if self.config.batch_size > 1: | ||
| # Buffer the pre-processed message | ||
| should_flush = False | ||
| with self._buffer_lock: | ||
| self._message_buffer.append((session_id, messages, is_blob, monotonic_timestamp)) | ||
| should_flush = len(self._message_buffer) >= self.config.batch_size | ||
|
|
||
| # Flush outside the lock to prevent deadlock | ||
| if should_flush: | ||
| self._flush_messages() | ||
|
|
||
| # Parse the original timestamp and use it as desired timestamp | ||
| original_timestamp = datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00")) | ||
| monotonic_timestamp = self._get_monotonic_timestamp(original_timestamp) | ||
| return {} # No eventId yet | ||
|
|
||
| if not AgentCoreMemoryConverter.exceeds_conversational_limit(messages[0]): | ||
| # Immediate send (batch_size == 1) | ||
| try: | ||
| if not is_blob: | ||
| event = self.memory_client.create_event( | ||
| memory_id=self.config.memory_id, | ||
| actor_id=self.config.actor_id, | ||
|
|
@@ -645,3 +670,125 @@ def initialize(self, agent: "Agent", **kwargs: Any) -> None: | |
| RepositorySessionManager.initialize(self, agent, **kwargs) | ||
|
|
||
| # endregion RepositorySessionManager overrides | ||
|
|
||
| # region Batching support | ||
|
|
||
| def _flush_messages(self) -> list[dict[str, Any]]: | ||
| """Flush all buffered messages to AgentCore Memory. | ||
|
|
||
| Call this method to send any remaining buffered messages when batch_size > 1. | ||
| This is automatically called when the buffer reaches batch_size, but should | ||
| also be called explicitly when the session is complete (via close() or context manager). | ||
|
|
||
| Messages are batched by session_id - all conversational messages for the same | ||
| session are combined into a single create_event() call to reduce API calls. | ||
| Blob messages (>9KB) are sent individually as they require a different API path. | ||
|
|
||
| Returns: | ||
| list[dict[str, Any]]: List of created event responses from AgentCore Memory. | ||
|
|
||
| Raises: | ||
| SessionException: If any message creation fails. On failure, all messages | ||
| remain in the buffer to prevent data loss. | ||
| """ | ||
| with self._buffer_lock: | ||
| messages_to_send = list(self._message_buffer) | ||
|
|
||
| if not messages_to_send: | ||
| return [] | ||
|
|
||
| # Group conversational messages by session_id, preserve order | ||
| # Structure: {session_id: {"messages": [...], "timestamp": latest_timestamp}} | ||
| session_groups: dict[str, dict[str, Any]] = {} | ||
| blob_messages: list[tuple[str, list[tuple[str, str]], datetime]] = [] | ||
|
|
||
| for session_id, messages, is_blob, monotonic_timestamp in messages_to_send: | ||
| if is_blob: | ||
| # Blobs cannot be combined - collect them separately | ||
| blob_messages.append((session_id, messages, monotonic_timestamp)) | ||
| else: | ||
| # Group conversational messages by session_id | ||
| if session_id not in session_groups: | ||
| session_groups[session_id] = {"messages": [], "timestamp": monotonic_timestamp} | ||
| # Extend messages list to preserve order (earlier messages first) | ||
| session_groups[session_id]["messages"].extend(messages) | ||
| # Use the latest timestamp for the combined event | ||
| if monotonic_timestamp > session_groups[session_id]["timestamp"]: | ||
| session_groups[session_id]["timestamp"] = monotonic_timestamp | ||
|
|
||
| results = [] | ||
| try: | ||
| # Send one create_event per session_id with combined messages | ||
| for session_id, group in session_groups.items(): | ||
| event = self.memory_client.create_event( | ||
| memory_id=self.config.memory_id, | ||
| actor_id=self.config.actor_id, | ||
| session_id=session_id, | ||
| messages=group["messages"], | ||
| event_timestamp=group["timestamp"], | ||
| ) | ||
| results.append(event) | ||
| logger.debug("Flushed batched event for session %s: %s", session_id, event.get("eventId")) | ||
|
|
||
| # Send blob messages individually (they use a different API path) | ||
| for session_id, messages, monotonic_timestamp in blob_messages: | ||
| event = self.memory_client.gmdp_client.create_event( | ||
| memoryId=self.config.memory_id, | ||
| actorId=self.config.actor_id, | ||
| sessionId=session_id, | ||
| payload=[ | ||
| {"blob": json.dumps(messages[0])}, | ||
| ], | ||
| eventTimestamp=monotonic_timestamp, | ||
| ) | ||
| results.append(event) | ||
| logger.debug("Flushed blob event for session %s: %s", session_id, event.get("eventId")) | ||
|
|
||
| # Clear buffer only after ALL messages succeed | ||
| with self._buffer_lock: | ||
| self._message_buffer.clear() | ||
|
|
||
| except Exception as e: | ||
| logger.error("Failed to flush messages to AgentCore Memory for session: %s", e) | ||
| raise SessionException(f"Failed to flush messages: {e}") from e | ||
|
|
||
| logger.info("Flushed %d events to AgentCore Memory", len(results)) | ||
| return results | ||
|
|
||
| def pending_message_count(self) -> int: | ||
| """Return the number of messages pending in the buffer. | ||
|
|
||
| Returns: | ||
| int: Number of buffered messages waiting to be sent. | ||
| """ | ||
| with self._buffer_lock: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There must be a some atomic library that would incorporate the lock whenever you do operations in _message_buffer. |
||
| return len(self._message_buffer) | ||
|
|
||
| def close(self) -> None: | ||
| """Explicitly flush pending messages and close the session manager. | ||
|
|
||
| Call this method when the session is complete to ensure all buffered | ||
| messages are sent to AgentCore Memory. Alternatively, use the context | ||
| manager protocol (with statement) for automatic cleanup. | ||
| """ | ||
| self._flush_messages() | ||
|
|
||
| def __enter__(self) -> "AgentCoreMemorySessionManager": | ||
| """Enter the context manager. | ||
|
|
||
| Returns: | ||
| AgentCoreMemorySessionManager: This session manager instance. | ||
| """ | ||
| return self | ||
|
|
||
| def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: | ||
| """Exit the context manager and flush any pending messages. | ||
|
|
||
| Args: | ||
| exc_type: Exception type if an exception occurred. | ||
| exc_val: Exception value if an exception occurred. | ||
| exc_tb: Exception traceback if an exception occurred. | ||
| """ | ||
| self._flush_messages() | ||
|
|
||
| # endregion Batching support | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this a comment? |
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not actually buffering right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can take all the messages_to_send and put it under messages, if it's not a blob. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agentcore/client/create_event.html. Make sure the messages are ordered in the earilers to latest when storing in one create_event.