Skip to content

feat(memory): implement true message batching for flush_messages()#256

Open
jariy17 wants to merge 4 commits intofeat/event-metadata-state-identificationfrom
feat/memory-session-manager-optimization
Open

feat(memory): implement true message batching for flush_messages()#256
jariy17 wants to merge 4 commits intofeat/event-metadata-state-identificationfrom
feat/memory-session-manager-optimization

Conversation

@jariy17
Copy link
Contributor

@jariy17 jariy17 commented Feb 6, 2026

Summary

  • Add batch_size configuration option for message buffering (1-100, default 1)
  • Implement true message batching in flush_messages() - groups conversational messages by session_id into single API calls
  • Preserve message order within each session's payload (earliest first)
  • Use latest timestamp from grouped messages for combined events
  • Send blob messages (>9KB) individually via different API path
  • Clear buffer only after ALL API calls succeed to prevent data loss

Test plan

  • Run batching tests: pytest tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py -k "Batching" -v
  • Verify multi-session grouping works correctly
  • Verify timestamp selection uses latest timestamp
  • Verify partial failure preserves entire buffer
  • Verify blob messages sent individually
  • All 87 session manager tests pass

Add optional batch_size parameter to AgentCoreMemoryConfig that allows
customers to buffer messages before sending to AgentCore Memory.

Changes:
- Add batch_size parameter (default=1, max=100) to AgentCoreMemoryConfig
- Add message buffering with thread-safe _message_buffer and _buffer_lock
- Modify create_message() to buffer when batch_size > 1
- Add flush_messages() to send all buffered messages
- Add pending_message_count() to check buffer size
- Add close() and context manager for cleanup
- Add 24 unit tests covering batching functionality

Default batch_size=1 preserves backward compatibility (immediate send).
Previously, batch_size buffered messages but still made N separate API
calls for N messages. Now flush_messages() groups conversational messages
by session_id and combines them into a single create_event() call per
session, significantly reducing API calls.

Key changes:
- Group conversational messages by session_id into combined payloads
- Preserve message order within each session's payload (earliest first)
- Use the latest timestamp from grouped messages for the combined event
- Send blob messages (>9KB) individually (different API path)
- Clear buffer only after ALL API calls succeed to prevent data loss
- Improve error messages to include session context

Tests added for critical scenarios:
- Multiple sessions grouped into separate API calls
- Latest timestamp used for combined events
- Partial failure with multiple sessions preserves entire buffer
- Multiple blob messages sent individually (not batched)
- Mixed sessions with blobs and conversational messages

Also fixes pre-existing test issues:
- Fix test_read_agent_legacy_migration mock setup to match actual impl
- Fix test_load_long_term_memories_with_validation_failure for strands API

# region Batching support

def flush_messages(self) -> list[dict[str, Any]]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be private method.

"""
self.flush_messages()

# endregion Batching support
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a comment?

Returns:
int: Number of buffered messages waiting to be sent.
"""
with self._buffer_lock:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There must be a some atomic library that would incorporate the lock whenever you do operations in _message_buffer.

return []

results = []
for session_id, messages, is_blob, monotonic_timestamp in messages_to_send:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not actually buffering right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can take all the messages_to_send and put it under messages, if it's not a blob. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agentcore/client/create_event.html. Make sure the messages are ordered in the earilers to latest when storing in one create_event.

Extract _create_session_manager helper to eliminate triple-nested
context managers and move batching_config/batching_session_manager
to module-level fixtures shared across all test classes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant