Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/bedrock_agentcore/memory/integrations/strands/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ class AgentCoreMemoryConfig(BaseModel):
session_id: Required unique ID for the session
actor_id: Required unique ID for the agent instance/user
retrieval_config: Optional dictionary mapping namespaces to retrieval configurations
batch_size: Number of messages to batch before sending to AgentCore Memory.
Default of 1 means immediate sending (no batching). Max 100.
"""

memory_id: str = Field(min_length=1)
session_id: str = Field(min_length=1)
actor_id: str = Field(min_length=1)
retrieval_config: Optional[Dict[str, RetrievalConfig]] = None
batch_size: int = Field(default=1, ge=1, le=100)
163 changes: 155 additions & 8 deletions src/bedrock_agentcore/memory/integrations/strands/session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def __init__(
session = boto_session or boto3.Session(region_name=region_name)
self.has_existing_agent = False

# Batching support - stores pre-processed messages: (session_id, messages, is_blob, timestamp)
self._message_buffer: list[tuple[str, list[tuple[str, str]], bool, datetime]] = []
self._buffer_lock = threading.Lock()

# Add strands-agents to the request user agent
if boto_client_config:
existing_user_agent = getattr(boto_client_config, "user_agent_extra", None)
Expand Down Expand Up @@ -380,6 +384,9 @@ def create_message(
) -> Optional[dict[str, Any]]:
"""Create a new message in AgentCore Memory.

If batch_size > 1, the message is buffered and sent when the buffer reaches batch_size.
Use _flush_messages() or close() to send any remaining buffered messages.

Args:
session_id (str): The session ID to create the message in.
agent_id (str): The agent ID associated with the message (only here for the interface.
Expand All @@ -389,6 +396,7 @@ def create_message(

Returns:
Optional[dict[str, Any]]: The created event data from AgentCore Memory.
Returns empty dict if message is buffered (batch_size > 1).

Raises:
SessionException: If session ID doesn't match configuration or message creation fails.
Expand All @@ -409,16 +417,33 @@ def create_message(
if session_id != self.config.session_id:
raise SessionException(f"Session ID mismatch: expected {self.config.session_id}, got {session_id}")

try:
messages = AgentCoreMemoryConverter.message_to_payload(session_message)
if not messages:
return
# Convert and check size ONCE (not again at flush)
messages = AgentCoreMemoryConverter.message_to_payload(session_message)
if not messages:
return None

is_blob = AgentCoreMemoryConverter.exceeds_conversational_limit(messages[0])

# Parse the original timestamp and use it as desired timestamp
original_timestamp = datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00"))
monotonic_timestamp = self._get_monotonic_timestamp(original_timestamp)

if self.config.batch_size > 1:
# Buffer the pre-processed message
should_flush = False
with self._buffer_lock:
self._message_buffer.append((session_id, messages, is_blob, monotonic_timestamp))
should_flush = len(self._message_buffer) >= self.config.batch_size

# Flush outside the lock to prevent deadlock
if should_flush:
self._flush_messages()

# Parse the original timestamp and use it as desired timestamp
original_timestamp = datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00"))
monotonic_timestamp = self._get_monotonic_timestamp(original_timestamp)
return {} # No eventId yet

if not AgentCoreMemoryConverter.exceeds_conversational_limit(messages[0]):
# Immediate send (batch_size == 1)
try:
if not is_blob:
event = self.memory_client.create_event(
memory_id=self.config.memory_id,
actor_id=self.config.actor_id,
Expand Down Expand Up @@ -645,3 +670,125 @@ def initialize(self, agent: "Agent", **kwargs: Any) -> None:
RepositorySessionManager.initialize(self, agent, **kwargs)

# endregion RepositorySessionManager overrides

# region Batching support

def _flush_messages(self) -> list[dict[str, Any]]:
"""Flush all buffered messages to AgentCore Memory.

Call this method to send any remaining buffered messages when batch_size > 1.
This is automatically called when the buffer reaches batch_size, but should
also be called explicitly when the session is complete (via close() or context manager).

Messages are batched by session_id - all conversational messages for the same
session are combined into a single create_event() call to reduce API calls.
Blob messages (>9KB) are sent individually as they require a different API path.

Returns:
list[dict[str, Any]]: List of created event responses from AgentCore Memory.

Raises:
SessionException: If any message creation fails. On failure, all messages
remain in the buffer to prevent data loss.
"""
with self._buffer_lock:
messages_to_send = list(self._message_buffer)

if not messages_to_send:
return []

# Group conversational messages by session_id, preserve order
# Structure: {session_id: {"messages": [...], "timestamp": latest_timestamp}}
session_groups: dict[str, dict[str, Any]] = {}
blob_messages: list[tuple[str, list[tuple[str, str]], datetime]] = []

for session_id, messages, is_blob, monotonic_timestamp in messages_to_send:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not actually buffering right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can take all the messages_to_send and put it under messages, if it's not a blob. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agentcore/client/create_event.html. Make sure the messages are ordered in the earilers to latest when storing in one create_event.

if is_blob:
# Blobs cannot be combined - collect them separately
blob_messages.append((session_id, messages, monotonic_timestamp))
else:
# Group conversational messages by session_id
if session_id not in session_groups:
session_groups[session_id] = {"messages": [], "timestamp": monotonic_timestamp}
# Extend messages list to preserve order (earlier messages first)
session_groups[session_id]["messages"].extend(messages)
# Use the latest timestamp for the combined event
if monotonic_timestamp > session_groups[session_id]["timestamp"]:
session_groups[session_id]["timestamp"] = monotonic_timestamp

results = []
try:
# Send one create_event per session_id with combined messages
for session_id, group in session_groups.items():
event = self.memory_client.create_event(
memory_id=self.config.memory_id,
actor_id=self.config.actor_id,
session_id=session_id,
messages=group["messages"],
event_timestamp=group["timestamp"],
)
results.append(event)
logger.debug("Flushed batched event for session %s: %s", session_id, event.get("eventId"))

# Send blob messages individually (they use a different API path)
for session_id, messages, monotonic_timestamp in blob_messages:
event = self.memory_client.gmdp_client.create_event(
memoryId=self.config.memory_id,
actorId=self.config.actor_id,
sessionId=session_id,
payload=[
{"blob": json.dumps(messages[0])},
],
eventTimestamp=monotonic_timestamp,
)
results.append(event)
logger.debug("Flushed blob event for session %s: %s", session_id, event.get("eventId"))

# Clear buffer only after ALL messages succeed
with self._buffer_lock:
self._message_buffer.clear()

except Exception as e:
logger.error("Failed to flush messages to AgentCore Memory for session: %s", e)
raise SessionException(f"Failed to flush messages: {e}") from e

logger.info("Flushed %d events to AgentCore Memory", len(results))
return results

def pending_message_count(self) -> int:
"""Return the number of messages pending in the buffer.

Returns:
int: Number of buffered messages waiting to be sent.
"""
with self._buffer_lock:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There must be a some atomic library that would incorporate the lock whenever you do operations in _message_buffer.

return len(self._message_buffer)

def close(self) -> None:
"""Explicitly flush pending messages and close the session manager.

Call this method when the session is complete to ensure all buffered
messages are sent to AgentCore Memory. Alternatively, use the context
manager protocol (with statement) for automatic cleanup.
"""
self._flush_messages()

def __enter__(self) -> "AgentCoreMemorySessionManager":
"""Enter the context manager.

Returns:
AgentCoreMemorySessionManager: This session manager instance.
"""
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Exit the context manager and flush any pending messages.

Args:
exc_type: Exception type if an exception occurred.
exc_val: Exception value if an exception occurred.
exc_tb: Exception traceback if an exception occurred.
"""
self._flush_messages()

# endregion Batching support
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a comment?

Loading