-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Feature/clone session #4191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feature/clone session #4191
Conversation
Summary of ChangesHello @lwangverizon, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the ADK session services by adding a Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a valuable clone_session feature across all session service backends, complete with a comprehensive set of unit tests. The implementation is well-structured and addresses a key use case for transitioning user states. My review highlights several opportunities for improvement, primarily focusing on performance and correctness. I've identified a recurring N+1 query pattern that could be optimized, as well as redundant data fetching that can be avoided. Additionally, there are a couple of potential correctness issues in the VertexAiSessionService related to missing deep copies that could lead to unexpected state mutations. Addressing these points will enhance the robustness and efficiency of the new feature.
| # Merge states from all source sessions | ||
| merged_state = {} | ||
| for session in source_sessions: | ||
| merged_state.update(session.state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The session state is being merged using a shallow copy (.update()). This is inconsistent with the other SessionService implementations, which use copy.deepcopy(). If session.state contains mutable objects (e.g., lists, dicts), changes in the cloned session could unintentionally alter the state of the source sessions. To prevent these side effects and ensure correctness, you should use a deep copy.
Note: You will need to add import copy at the top of the file.
| merged_state.update(session.state) | |
| merged_state.update(copy.deepcopy(session.state)) |
| for event in all_events: | ||
| await self.append_event(new_session, event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event object from a source session is passed directly to self.append_event(). The base implementation of append_event can mutate this object (e.g., by calling _trim_temp_delta_state). This creates an unintended side effect, modifying the original event from the source session. To prevent this, you should pass a deep copy of the event.
Note: This also requires import copy at the top of the file.
| for event in all_events: | |
| await self.append_event(new_session, event) | |
| for event in all_events: | |
| await self.append_event(new_session, copy.deepcopy(event)) |
| return await self.get_session( | ||
| app_name=new_session.app_name, | ||
| user_id=new_session.user_id, | ||
| session_id=new_session.id, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This final get_session call is redundant and inefficient as it performs an extra database query to fetch data you already have. The new_session object returned by create_session contains the correct state, and you have the deduplicated all_events list. You can construct the final Session object in memory.
# Return the new session with events
new_session.events = all_events
return new_session| for sess in list_response.sessions: | ||
| full_session = self._get_session_impl( | ||
| app_name=app_name, | ||
| user_id=src_user_id, | ||
| session_id=sess.id, | ||
| ) | ||
| if full_session: | ||
| source_sessions.append(full_session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop exhibits an N+1-like pattern. You call _list_sessions_impl and then loop to call _get_session_impl for each session. While this is an in-memory operation, it's less efficient than necessary and follows a pattern that is problematic in database-backed services. You could optimize this by directly accessing the sessions for the user from self.sessions and collecting the events without repeated get_session calls.
| return self._get_session_impl( | ||
| app_name=new_session.app_name, | ||
| user_id=new_session.user_id, | ||
| session_id=new_session.id, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This final call to _get_session_impl is unnecessary. You have already created the new_session object and populated the storage_session with the correct events and latest_update_time. Instead of re-fetching, you can update the new_session object you already have in memory and return it directly.
new_session.events = all_events
new_session.last_update_time = latest_update_time
return new_session| for sess in list_response.sessions: | ||
| full_session = await self.get_session( | ||
| app_name=app_name, | ||
| user_id=src_user_id, | ||
| session_id=sess.id, | ||
| ) | ||
| if full_session: | ||
| source_sessions.append(full_session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop introduces an N+1 query problem. You call list_sessions once, and then get_session is called inside a loop for every session found. This can lead to a large number of database queries and impact performance, especially for users with many sessions.
You could optimize this by:
- Getting all session objects from
list_sessions. - Collecting all
session_ids. - Fetching all events for those session IDs in a single query using
WHERE session_id IN (...). - Mapping the events back to their respective sessions in memory.
| return await self.get_session( | ||
| app_name=new_session.app_name, | ||
| user_id=new_session.user_id, | ||
| session_id=new_session.id, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This get_session call to fetch the newly created session is redundant. You have all the necessary information to construct the Session object in memory and return it directly. The new_session object returned from create_session already contains the correct state, and you have the all_events list. You can simply assign the events to the session and return it.
# Return the new session with events
new_session.events = all_events
return new_session| for sess in list_response.sessions: | ||
| full_session = await self.get_session( | ||
| app_name=app_name, | ||
| user_id=src_user_id, | ||
| session_id=sess.id, | ||
| ) | ||
| if full_session: | ||
| source_sessions.append(full_session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop creates an N+1 query problem. list_sessions is called once, and then get_session is called for each session found. This results in multiple API calls to Vertex AI, which can be inefficient for users with many sessions. If the API supports batch retrieval of sessions with their events, that would be a more performant approach.
| return await self.get_session( | ||
| app_name=new_session.app_name, | ||
| user_id=new_session.user_id, | ||
| session_id=new_session.id, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This final get_session call is redundant. You already have all the necessary information to construct the final Session object in memory. The new_session object from create_session has the correct state, and you have the all_events list. You can assign the events to the session and return it directly, avoiding an unnecessary API call.
# Return the new session with events
new_session.events = all_events
return new_session|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a valuable clone_session feature across all session service implementations, enabling context transfer between sessions, such as for guest-to-authenticated user transitions. The implementation is well-structured and includes comprehensive unit tests. My review focuses on addressing some non-deterministic behaviors in the session merging logic and suggests performance improvements for database operations. Overall, this is a solid feature addition.
| merged_state = {} | ||
| for session in source_sessions: | ||
| merged_state.update(copy.deepcopy(session.state)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The merging of states from source sessions is non-deterministic. The source_sessions list is not ordered, so when merged_state.update() is called in a loop, the final value for any conflicting state keys will depend on the arbitrary order of sessions. According to the PR description, "later sessions [should overwrite] earlier values", which implies a deterministic order. Please sort the source_sessions before merging to ensure deterministic behavior.
# Sort sessions by update time for deterministic state merging
source_sessions.sort(key=lambda s: s.last_update_time)
# Merge states from all source sessions
merged_state = {}
for session in source_sessions:
merged_state.update(copy.deepcopy(session.state))| all_events = [] | ||
| seen_event_ids = set() | ||
| for session in source_sessions: | ||
| for event in session.events: | ||
| if event.id in seen_event_ids: | ||
| continue | ||
| seen_event_ids.add(event.id) | ||
| all_events.append(event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event deduplication logic is non-deterministic when cloning all sessions. You are iterating over source_sessions which is not guaranteed to be chronologically ordered, so the "first occurrence" of an event with a duplicate ID is arbitrary. To ensure the chronologically first event is kept, you should collect all events from all source sessions, sort them by timestamp, and then perform deduplication.
# Collect all events, sort by timestamp, then deduplicate to ensure chronological "first occurrence wins".
all_source_events = []
for session in source_sessions:
all_source_events.extend(session.events)
all_source_events.sort(key=lambda e: e.timestamp)
all_events = []
seen_event_ids = set()
for event in all_source_events:
if event.id in seen_event_ids:
continue
seen_event_ids.add(event.id)
all_events.append(event)| merged_state = {} | ||
| for session in source_sessions: | ||
| merged_state.update(copy.deepcopy(session.state)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The merging of states from source sessions can be non-deterministic. When cloning all sessions, source_sessions is populated by iterating over a dictionary. While insertion order is preserved in recent Python versions, it's not guaranteed to be chronological if sessions are deleted and re-added. To ensure deterministic state merging where later sessions overwrite earlier ones, you should sort source_sessions by last_update_time.
# Sort sessions by update time for deterministic state merging
source_sessions.sort(key=lambda s: s.last_update_time)
# Merge states from all source sessions
merged_state = {}
for session in source_sessions:
merged_state.update(copy.deepcopy(session.state))| all_events = [] | ||
| seen_event_ids = set() | ||
| latest_update_time = 0.0 | ||
| for session in source_sessions: | ||
| for event in session.events: | ||
| if event.id in seen_event_ids: | ||
| continue | ||
| seen_event_ids.add(event.id) | ||
| all_events.append(copy.deepcopy(event)) | ||
| if session.last_update_time > latest_update_time: | ||
| latest_update_time = session.last_update_time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event deduplication logic is non-deterministic when cloning all sessions. You are iterating over source_sessions which is not guaranteed to be chronologically ordered, so the "first occurrence" of an event with a duplicate ID is arbitrary. To ensure the chronologically first event is kept, you should collect all events from all source sessions, sort them by timestamp, and then perform deduplication. The latest_update_time can then be derived from the sorted sessions.
# Collect all events, sort by timestamp, then deduplicate to ensure chronological "first occurrence wins".
all_source_events = []
for session in source_sessions:
all_source_events.extend(session.events)
all_source_events.sort(key=lambda e: e.timestamp)
all_events = []
seen_event_ids = set()
for event in all_source_events:
if event.id in seen_event_ids:
continue
seen_event_ids.add(event.id)
all_events.append(copy.deepcopy(event))
latest_update_time = (
source_sessions[-1].last_update_time if source_sessions else 0.0
)| merged_state = {} | ||
| for session in source_sessions: | ||
| merged_state.update(copy.deepcopy(session.state)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The merging of states from source sessions is non-deterministic. The source_sessions list is not ordered, so when merged_state.update() is called in a loop, the final value for any conflicting state keys will depend on the arbitrary order of sessions. Please sort the source_sessions before merging to ensure deterministic behavior where later sessions overwrite earlier ones.
# Sort sessions by update time for deterministic state merging
source_sessions.sort(key=lambda s: s.last_update_time)
# Merge states from all source sessions
merged_state = {}
for session in source_sessions:
merged_state.update(copy.deepcopy(session.state))| all_events = [] | ||
| seen_event_ids = set() | ||
| for session in source_sessions: | ||
| for event in session.events: | ||
| if event.id in seen_event_ids: | ||
| continue | ||
| seen_event_ids.add(event.id) | ||
| all_events.append(event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event deduplication logic is non-deterministic when cloning all sessions. You are iterating over source_sessions which is not guaranteed to be chronologically ordered, so the "first occurrence" of an event with a duplicate ID is arbitrary. To ensure the chronologically first event is kept, you should collect all events from all source sessions, sort them by timestamp, and then perform deduplication.
# Collect all events, sort by timestamp, then deduplicate to ensure chronological "first occurrence wins".
all_source_events = []
for session in source_sessions:
all_source_events.extend(session.events)
all_source_events.sort(key=lambda e: e.timestamp)
all_events = []
seen_event_ids = set()
for event in all_source_events:
if event.id in seen_event_ids:
continue
seen_event_ids.add(event.id)
all_events.append(event)| merged_state = {} | ||
| for session in source_sessions: | ||
| merged_state.update(copy.deepcopy(session.state)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The merging of states from source sessions is non-deterministic. The source_sessions list is populated from list_sessions which does not guarantee a chronological order. When merged_state.update() is called in a loop, the final value for any conflicting state keys will depend on the arbitrary order of sessions. Please sort the source_sessions by last_update_time before merging to ensure deterministic behavior.
# Sort sessions by update time for deterministic state merging
source_sessions.sort(key=lambda s: s.last_update_time)
# Merge states from all source sessions (deep copy to avoid mutations)
merged_state = {}
for session in source_sessions:
merged_state.update(copy.deepcopy(session.state))| all_events = [] | ||
| seen_event_ids = set() | ||
| for session in source_sessions: | ||
| for event in session.events: | ||
| if event.id in seen_event_ids: | ||
| continue | ||
| seen_event_ids.add(event.id) | ||
| all_events.append(event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event deduplication logic is non-deterministic when cloning all sessions. You are iterating over source_sessions which is not guaranteed to be chronologically ordered, so the "first occurrence" of an event with a duplicate ID is arbitrary. To ensure the chronologically first event is kept, you should collect all events from all source sessions, sort them by timestamp, and then perform deduplication.
# Collect all events, sort by timestamp, then deduplicate to ensure chronological "first occurrence wins".
all_source_events = []
for session in source_sessions:
all_source_events.extend(session.events)
all_source_events.sort(key=lambda e: e.timestamp)
all_events = []
seen_event_ids = set()
for event in all_source_events:
if event.id in seen_event_ids:
continue
seen_event_ids.add(event.id)
all_events.append(event)| async with self.database_session_factory() as sql_session: | ||
| for event in all_events: | ||
| cloned_event = copy.deepcopy(event) | ||
| new_storage_event = schema.StorageEvent.from_event( | ||
| new_session, cloned_event | ||
| ) | ||
| sql_session.add(new_storage_event) | ||
|
|
||
| await sql_session.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better performance, it's recommended to use a bulk insert operation to add all the cloned events to the database rather than adding them one by one in a loop. You can use sql_session.add_all() for this.
# Copy events to the new session
async with self.database_session_factory() as sql_session:
new_storage_events = [
schema.StorageEvent.from_event(new_session, copy.deepcopy(event))
for event in all_events
]
sql_session.add_all(new_storage_events)
await sql_session.commit()| async with self._get_db_connection() as db: | ||
| for event in all_events: | ||
| cloned_event = copy.deepcopy(event) | ||
| await db.execute( | ||
| """ | ||
| INSERT INTO events (id, app_name, user_id, session_id, invocation_id, timestamp, event_data) | ||
| VALUES (?, ?, ?, ?, ?, ?, ?) | ||
| """, | ||
| ( | ||
| cloned_event.id, | ||
| new_session.app_name, | ||
| new_session.user_id, | ||
| new_session.id, | ||
| cloned_event.invocation_id, | ||
| cloned_event.timestamp, | ||
| cloned_event.model_dump_json(exclude_none=True), | ||
| ), | ||
| ) | ||
| await db.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better performance, it's recommended to use a bulk insert operation to add all the cloned events to the database rather than inserting them one by one in a loop. You can use db.executemany() to achieve this.
async with self._get_db_connection() as db:
event_params = []
for event in all_events:
cloned_event = copy.deepcopy(event)
event_params.append((
cloned_event.id,
new_session.app_name,
new_session.user_id,
new_session.id,
cloned_event.invocation_id,
cloned_event.timestamp,
cloned_event.model_dump_json(exclude_none=True),
))
await db.executemany(
"""
INSERT INTO events (id, app_name, user_id, session_id, invocation_id, timestamp, event_data)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
event_params,
)
await db.commit()|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a valuable clone_session feature, which is well-implemented across the different session service backends. The problem and solution are clearly articulated, and the test coverage for the new functionality is comprehensive.
I've identified a critical correctness issue in the state-merging logic within all clone_session implementations. The current approach incorrectly carries over app: and user: specific state from the source user to the new user, which could lead to unintended data leakage between user contexts. I've left specific comments with suggested fixes in each of the implementation files. I've also suggested an improvement to the tests to cover this scenario and prevent future regressions.
Additionally, there's a fair amount of duplicated logic across the clone_session implementations (e.g., collecting source sessions, sorting, deduplicating events). For future maintainability, you might consider refactoring this common logic into a shared helper or a method on the BaseSessionService.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a clone_session feature, which is a valuable addition for handling user state transitions like guest-to-authenticated. The implementation is well-structured, with a shared helper method in the base class ensuring consistent logic for state merging and event deduplication across different session service backends. The implementations for in-memory, database, and SQLite services are efficient, using optimizations like bulk inserts and avoiding N+1 query problems. The test coverage is excellent and thorough. I have a few suggestions to improve maintainability and performance, particularly regarding a fragile dependency in the in-memory implementation and a potential performance bottleneck in the Vertex AI implementation. Please see the detailed comments.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a valuable clone_session feature, which is well-motivated and consistently implemented across all session service backends. The code demonstrates thoughtful optimizations, such as avoiding N+1 queries in database services and using parallel fetches for the Vertex AI service. The shared helper method, _prepare_sessions_for_cloning, is a great way to ensure consistent logic for state merging and event deduplication. The unit tests are comprehensive and cover many important scenarios. I have a few suggestions to further improve code clarity and robustness in some edge cases.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a valuable clone_session feature across all session services, enabling seamless context transfer for use cases like guest-to-authenticated user transitions. The implementation is well-designed, utilizing a shared helper in the base class and providing optimized logic for each backend. The new unit tests are comprehensive and cover the feature thoroughly. My review focuses on a few key areas to ensure consistency and correctness, particularly around updating the last_update_time of cloned sessions to prevent returning stale timestamps. I've also included a suggestion to improve code clarity. Overall, this is a high-quality and well-executed feature addition.
2985126 to
bc8839a
Compare
Please ensure you have read the contribution guide before creating a pull request.
Link to Issue or Description of Change
1. Link to an existing issue (if applicable):
2. Or, if no issue exists, describe the change:
Problem:
In e-commerce and customer service scenarios, agentic chatbots commonly need to handle both non-signed-in (guest) and signed-in (authenticated) user states. A typical user journey looks like:
session_id,cookie_id, ordevice_id)customer_id,account_number)Without this capability, the agent loses all prior context when the user authenticates, forcing them to repeat their questions and preferences. This creates a poor user experience and reduces conversion rates.
Current limitation: ADK session services do not provide a way to clone or transfer session history from one user identity to another.
Solution:
Add a new
clone_sessionmethod toBaseSessionServicethat enables applications to seamlessly transfer conversation context when users transition between identity states.Primary use case - Guest to Authenticated transition:
The method is implemented across all session service backends:
InMemorySessionServiceDatabaseSessionService(SQLAlchemy)SqliteSessionService(aiosqlite)VertexAiSessionServiceAdditional supported scenarios:
Key features:
new_session_idis not providedValueErrorfor missing sources,AlreadyExistsErrorfor duplicate IDs)API signature:
Testing Plan
Unit Tests:
Added 10 new test cases that run across all 3 testable session services (InMemory, Database, SQLite) for a total of 30 new tests:
test_clone_session_basictest_clone_session_with_different_user_idtest_clone_session_with_custom_session_idtest_clone_session_with_existing_id_raises_errortest_clone_session_preserves_event_contenttest_clone_session_does_not_affect_sourcetest_clone_all_user_sessionstest_clone_session_no_source_raises_errortest_clone_all_sessions_no_sessions_raises_errortest_clone_session_deduplicates_eventspytest results:
Manual End-to-End (E2E) Tests:
Checklist
Additional context
Files changed:
src/google/adk/sessions/base_session_service.pyclone_sessionmethod with comprehensive docstringsrc/google/adk/sessions/in_memory_session_service.pyclone_sessionsrc/google/adk/sessions/database_session_service.pyclone_sessionsrc/google/adk/sessions/sqlite_session_service.pyclone_sessionsrc/google/adk/sessions/vertex_ai_session_service.pyclone_session(note:new_session_idnot supported due to Vertex AI API limitation)tests/unittests/sessions/test_session_service.pyDesign decisions:
new_session_idparameter raisesValueErrorforVertexAiSessionServicesince the backend auto-generates session IDs