Conversation
📝 WalkthroughWalkthroughThis pull request introduces LLM chain job functionality. It adds a new job type, database migration, API endpoint to initiate chain jobs, supporting data models for requests/responses, and backend service logic for executing multi-block LLM chains with configuration support and asynchronous callback delivery. Changes
Sequence DiagramsequenceDiagram
participant Client
participant API as API Endpoint
participant Database
participant Celery as Celery Task
participant Provider as LLM Provider
participant Callback as Callback Handler
Client->>API: POST /llm_chain (LLMChainRequest)
API->>API: Validate project permission
API->>API: Validate callback URL
API->>Database: Create LLM_CHAIN job
API->>Celery: Schedule execute_chain_job task
API-->>Client: Return job started confirmation
Celery->>Database: Fetch job and blocks
loop For each block in chain
Celery->>Celery: Resolve block config (stored/inline)
Celery->>Celery: Interpolate template with input
Celery->>Provider: Execute provider call
Provider-->>Celery: Return response
Celery->>Database: Create/update LlmCall record
Celery->>Callback: Send intermediate callback (if enabled)
Callback-->>Celery: Acknowledge
end
Celery->>Database: Update job status to SUCCESS
Celery->>Callback: Send final chain response
Callback-->>Celery: Acknowledge
Celery->>Database: Update LlmCall with final result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Fix all issues with AI agents
In `@backend/app/alembic/versions/045_add_llm_chain_job_type.py`:
- Around line 17-22: Add explicit return type hints to the migration functions
by changing the signatures of upgrade() and downgrade() to include "-> None"
(i.e., def upgrade() -> None: and def downgrade() -> None:), and apply the same
change across all migration revision files (001–045) so every migration function
follows the codebase guideline for type hints.
In `@backend/app/api/routes/llm_chain.py`:
- Around line 21-23: The function llm_chain is missing a return type annotation;
update its signature to include the appropriate return type matching the
decorator (i.e., APIResponse[Message]) so the function signature for llm_chain
explicitly returns APIResponse[Message]; locate the llm_chain definition and add
the return type hint to match the response_model.
In `@backend/app/models/llm/request.py`:
- Around line 275-295: The LlmCall model uses several names that aren’t
imported, causing NameError; add appropriate imports at the top of the file for
Index and text (from sqlalchemy), uuid4 (from uuid), sa (import sqlalchemy as
sa), JSONB (from sqlalchemy.dialects.postgresql import JSONB), and datetime/now
utilities (e.g., from datetime import datetime and/or from sqlalchemy import
func as now or use sa.func.now) so symbols referenced in LlmCall (Index, text,
uuid4, sa, JSONB, datetime, now) are defined and the module can import cleanly.
In `@backend/app/services/llm/chain_executor.py`:
- Around line 118-123: The current code logs the full block_config at info level
and uses a nonconforming prefix; update the logging in execute_chain_job to
avoid dumping sensitive data by replacing logger.info(f"[BLOCK CONFIG] ===>
{block_config}") with a debug-level, sanitized summary (e.g.,
logger.debug(f"[execute_chain_job] Block config summary:
{mask_string(block_config_summary)}")) where block_config_summary only contains
non-sensitive keys, and ensure the subsequent execution log uses the required
prefix (logger.info(f"[execute_chain_job] Executing block
{block_idx}/{len(request.blocks) - 1} ...")). Use mask_string(...) for any
fields that may contain secrets and only expose minimal summary info.
- Around line 87-94: The parameter task_instance in execute_chain_job needs a
type hint and must be marked as intentionally unused; update the function
signature to use a typed, unused parameter (e.g., _task_instance: Any) and add
the required import from typing (Any) at the top of the module if missing so
Ruff stops flagging it as unused. Ensure you only rename the parameter to a
leading-underscore form and add the Any type hint; do not change any other logic
or parameter order.
- Around line 105-110: The exception handler references block.request_metadata
but block is only defined inside the for-loop, so initialize block to None
before the try/update or change the handler to avoid using block when it's not
set; specifically, in the scope around the JobCrud update (Session(engine) /
JobCrud(...).update and surrounding try/except in chain_executor.py) either set
block = None before the try or update the except block to use fallback data
(e.g., job_id or a safe empty metadata) and guard access to block (check if
block is not None) before accessing block.request_metadata to prevent
UnboundLocalError masking the original DB update error.
🧹 Nitpick comments (2)
backend/app/models/job.py (1)
17-20: Update the job_type column comment to include LLM_CHAIN.The enum now includes LLM_CHAIN, but the column comment still lists only RESPONSE/LLM_API; this will drift schema documentation.
✏️ Suggested update
- "comment": "Type of job being executed (e.g., RESPONSE, LLM_API)" + "comment": "Type of job being executed (e.g., RESPONSE, LLM_API, LLM_CHAIN)"As per coding guidelines: Use sa_column_kwargs["comment"] to describe database columns, especially for non-obvious purposes, status/type fields, JSON/metadata columns, and foreign keys.
backend/app/models/llm/request.py (1)
7-7: Remove unused import.
Dictis imported but never used in this file. All dictionary type hints correctly use the lowercasedictsyntax.🧹 Proposed fix
-from typing import Dict -
| def upgrade(): | ||
| op.execute("ALTER TYPE jobtype ADD VALUE IF NOT EXISTS 'LLM_CHAIN'") | ||
|
|
||
|
|
||
| def downgrade(): | ||
| pass |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's find and examine the migration file
find . -name "045_add_llm_chain_job_type.py" -type fRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 135
🏁 Script executed:
# Check the file structure and content around lines 17-22
if [ -f "backend/app/alembic/versions/045_add_llm_chain_job_type.py" ]; then
cat -n "backend/app/alembic/versions/045_add_llm_chain_job_type.py"
fiRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 591
🏁 Script executed:
# Check other migration files to see the pattern used
fd "\.py$" backend/app/alembic/versions/ | head -5 | xargs -I {} sh -c 'echo "=== {} ===" && head -30 {}'Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 4173
Migration files lack type hints across the codebase.
The upgrade() and downgrade() functions should include explicit return type hints (-> None) per coding guidelines. This is a codebase-wide issue affecting all migration files (001–045), not limited to this revision. Consider adding type hints to all migration functions for consistency.
🤖 Prompt for AI Agents
In `@backend/app/alembic/versions/045_add_llm_chain_job_type.py` around lines 17 -
22, Add explicit return type hints to the migration functions by changing the
signatures of upgrade() and downgrade() to include "-> None" (i.e., def
upgrade() -> None: and def downgrade() -> None:), and apply the same change
across all migration revision files (001–045) so every migration function
follows the codebase guideline for type hints.
| def llm_chain( | ||
| _current_user: AuthContextDep, _session: SessionDep, request: LLMChainRequest | ||
| ): |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "llm_chain.py" -type f | grep -E "backend/app/api"Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 112
🏁 Script executed:
head -50 backend/app/api/routes/llm_chain.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 1231
🏁 Script executed:
wc -l backend/app/api/routes/llm_chain.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 113
Add a return type hint for llm_chain.
The function is missing a return type annotation. Since the decorator specifies response_model=APIResponse[Message] and the function returns an APIResponse object, add the corresponding return type:
-def llm_chain(
- _current_user: AuthContextDep, _session: SessionDep, request: LLMChainRequest
-):
+def llm_chain(
+ _current_user: AuthContextDep, _session: SessionDep, request: LLMChainRequest
+) -> APIResponse[Message]:As per coding guidelines: Always add type hints to all function parameters and return values in Python code.
🤖 Prompt for AI Agents
In `@backend/app/api/routes/llm_chain.py` around lines 21 - 23, The function
llm_chain is missing a return type annotation; update its signature to include
the appropriate return type matching the decorator (i.e., APIResponse[Message])
so the function signature for llm_chain explicitly returns APIResponse[Message];
locate the llm_chain definition and add the return type hint to match the
response_model.
| class LlmCall(SQLModel, table=True): | ||
| """ | ||
| Database model for tracking LLM API call requests and responses. | ||
|
|
||
| Stores both request inputs and response outputs for traceability, | ||
| supporting multimodal inputs (text, audio, image) and various completion types. | ||
| """ | ||
|
|
||
| __tablename__ = "llm_call" | ||
| __table_args__ = ( | ||
| Index( | ||
| "idx_llm_call_job_id", | ||
| "job_id", | ||
| postgresql_where=text("deleted_at IS NULL"), | ||
| ), | ||
| Index( | ||
| "idx_llm_call_conversation_id", | ||
| "conversation_id", | ||
| postgresql_where=text("conversation_id IS NOT NULL AND deleted_at IS NULL"), | ||
| ), | ||
| ) |
There was a problem hiding this comment.
Critical: Missing imports cause NameError at runtime.
The LlmCall model references Index, text, uuid4, sa, JSONB, datetime, and now without importing them. This causes the pipeline failure and will prevent the module from loading.
🐛 Proposed fix: Add missing imports at the top of the file
from typing import Annotated, Any, Literal, Union
from uuid import UUID
+from uuid import uuid4
+from datetime import datetime
+
+import sqlalchemy as sa
+from sqlalchemy import Index, text
+from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import Field, SQLModel
from pydantic import Discriminator, model_validator, HttpUrl
-from typing import Dict
+from app.core.util import now📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class LlmCall(SQLModel, table=True): | |
| """ | |
| Database model for tracking LLM API call requests and responses. | |
| Stores both request inputs and response outputs for traceability, | |
| supporting multimodal inputs (text, audio, image) and various completion types. | |
| """ | |
| __tablename__ = "llm_call" | |
| __table_args__ = ( | |
| Index( | |
| "idx_llm_call_job_id", | |
| "job_id", | |
| postgresql_where=text("deleted_at IS NULL"), | |
| ), | |
| Index( | |
| "idx_llm_call_conversation_id", | |
| "conversation_id", | |
| postgresql_where=text("conversation_id IS NOT NULL AND deleted_at IS NULL"), | |
| ), | |
| ) | |
| from typing import Annotated, Any, Literal, Union | |
| from uuid import UUID | |
| from uuid import uuid4 | |
| from datetime import datetime | |
| import sqlalchemy as sa | |
| from sqlalchemy import Index, text | |
| from sqlalchemy.dialects.postgresql import JSONB | |
| from sqlmodel import Field, SQLModel | |
| from pydantic import Discriminator, model_validator, HttpUrl | |
| from app.core.util import now |
🧰 Tools
🪛 GitHub Actions: Kaapi CI
[error] 285-285: NameError: name 'Index' is not defined
🪛 Ruff (0.14.14)
[error] 285-285: Undefined name Index
(F821)
[error] 288-288: Undefined name text
(F821)
[error] 290-290: Undefined name Index
(F821)
[error] 293-293: Undefined name text
(F821)
🤖 Prompt for AI Agents
In `@backend/app/models/llm/request.py` around lines 275 - 295, The LlmCall model
uses several names that aren’t imported, causing NameError; add appropriate
imports at the top of the file for Index and text (from sqlalchemy), uuid4 (from
uuid), sa (import sqlalchemy as sa), JSONB (from sqlalchemy.dialects.postgresql
import JSONB), and datetime/now utilities (e.g., from datetime import datetime
and/or from sqlalchemy import func as now or use sa.func.now) so symbols
referenced in LlmCall (Index, text, uuid4, sa, JSONB, datetime, now) are defined
and the module can import cleanly.
| def execute_chain_job( | ||
| request_data: dict, | ||
| project_id: int, | ||
| organization_id: int, | ||
| job_id: str, | ||
| task_id: str, | ||
| task_instance, | ||
| ) -> dict: |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's read the file to see the current state of the function
cat -n backend/app/services/llm/chain_executor.py | head -120Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 4712
🏁 Script executed:
# Read more of the function to see if task_instance is used anywhere
sed -n '87,250p' backend/app/services/llm/chain_executor.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 7024
🏁 Script executed:
# Search for any references to task_instance in the entire file
rg "task_instance" backend/app/services/llm/chain_executor.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 93
Add type hint to task_instance and mark as intentionally unused.
The parameter task_instance lacks a type hint and is flagged as unused by Ruff. Per coding guidelines, all function parameters must have type hints.
✏️ Suggested update
def execute_chain_job(
request_data: dict,
project_id: int,
organization_id: int,
job_id: str,
task_id: str,
- task_instance,
+ task_instance: object | None,
) -> dict:
"""Celery task to process an LLM chain request asynchronously."""
+ _ = task_instance
request = LLMChainRequest(**request_data)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def execute_chain_job( | |
| request_data: dict, | |
| project_id: int, | |
| organization_id: int, | |
| job_id: str, | |
| task_id: str, | |
| task_instance, | |
| ) -> dict: | |
| def execute_chain_job( | |
| request_data: dict, | |
| project_id: int, | |
| organization_id: int, | |
| job_id: str, | |
| task_id: str, | |
| task_instance: object | None, | |
| ) -> dict: | |
| """Celery task to process an LLM chain request asynchronously.""" | |
| _ = task_instance |
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 93-93: Unused function argument: task_instance
(ARG001)
🤖 Prompt for AI Agents
In `@backend/app/services/llm/chain_executor.py` around lines 87 - 94, The
parameter task_instance in execute_chain_job needs a type hint and must be
marked as intentionally unused; update the function signature to use a typed,
unused parameter (e.g., _task_instance: Any) and add the required import from
typing (Any) at the top of the module if missing so Ruff stops flagging it as
unused. Ensure you only rename the parameter to a leading-underscore form and
add the Any type hint; do not change any other logic or parameter order.
| try: | ||
| with Session(engine) as session: | ||
| JobCrud(session=session).update( | ||
| job_id=job_id, | ||
| job_update=JobUpdate(status=JobStatus.PROCESSING), | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n backend/app/services/llm/chain_executor.py | sed -n '95,130p'Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 1649
🏁 Script executed:
cat -n backend/app/services/llm/chain_executor.py | sed -n '325,355p'Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 938
🏁 Script executed:
wc -l backend/app/services/llm/chain_executor.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 121
🏁 Script executed:
sed -n '335,344p' backend/app/services/llm/chain_executor.py | grep -n "block"Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 121
🏁 Script executed:
sed -n '85,105p' backend/app/services/llm/chain_executor.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 621
Guard against block being unbound in the exception handler.
The exception handler at line 342 references block.request_metadata, but block is only defined inside the for loop (line 115). If an error occurs during the DB update (lines 106-110) before the loop executes, attempting to access block raises UnboundLocalError, masking the real failure.
🛠️ Suggested fix
+ block_metadata: dict | None = None
try:
with Session(engine) as session:
JobCrud(session=session).update(
job_id=job_id,
job_update=JobUpdate(status=JobStatus.PROCESSING),
)
previous_output: str | None = None
last_response = None
for block_idx, block in enumerate(request.blocks):
+ block_metadata = block.request_metadata
is_last = block_idx == len(request.blocks) - 1 callback_response = APIResponse.failure_response(
error="Unexpected error occurred during chain execution",
- metadata=block.request_metadata,
+ metadata=block_metadata,
)🤖 Prompt for AI Agents
In `@backend/app/services/llm/chain_executor.py` around lines 105 - 110, The
exception handler references block.request_metadata but block is only defined
inside the for-loop, so initialize block to None before the try/update or change
the handler to avoid using block when it's not set; specifically, in the scope
around the JobCrud update (Session(engine) / JobCrud(...).update and surrounding
try/except in chain_executor.py) either set block = None before the try or
update the except block to use fallback data (e.g., job_id or a safe empty
metadata) and guard access to block (check if block is not None) before
accessing block.request_metadata to prevent UnboundLocalError masking the
original DB update error.
| block_config = block.config | ||
|
|
||
| logger.info(f"[BLOCK CONFIG] ===> {block_config}") | ||
|
|
||
| logger.info( | ||
| f"[execute_chain_job] Executing block {block_idx}/{len(request.blocks) - 1} " |
There was a problem hiding this comment.
Avoid logging full block_config and fix the log prefix.
[BLOCK CONFIG] violates the required prefix format and dumping the full config at info level can leak prompt/content or other sensitive fields. Prefer a sanitized summary at debug level.
🧹 Suggested update
- logger.info(f"[BLOCK CONFIG] ===> {block_config}")
+ logger.debug(
+ f"[execute_chain_job] Block config loaded | block_index={block_idx}"
+ )As per coding guidelines: Prefix all log messages with the function name in square brackets: logger.info(f"[function_name] Message {mask_string(sensitive_value)}").
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| block_config = block.config | |
| logger.info(f"[BLOCK CONFIG] ===> {block_config}") | |
| logger.info( | |
| f"[execute_chain_job] Executing block {block_idx}/{len(request.blocks) - 1} " | |
| block_config = block.config | |
| logger.debug( | |
| f"[execute_chain_job] Block config loaded | block_index={block_idx}" | |
| ) | |
| logger.info( | |
| f"[execute_chain_job] Executing block {block_idx}/{len(request.blocks) - 1} " |
🤖 Prompt for AI Agents
In `@backend/app/services/llm/chain_executor.py` around lines 118 - 123, The
current code logs the full block_config at info level and uses a nonconforming
prefix; update the logging in execute_chain_job to avoid dumping sensitive data
by replacing logger.info(f"[BLOCK CONFIG] ===> {block_config}") with a
debug-level, sanitized summary (e.g., logger.debug(f"[execute_chain_job] Block
config summary: {mask_string(block_config_summary)}")) where
block_config_summary only contains non-sensitive keys, and ensure the subsequent
execution log uses the required prefix (logger.info(f"[execute_chain_job]
Executing block {block_idx}/{len(request.blocks) - 1} ...")). Use
mask_string(...) for any fields that may contain secrets and only expose minimal
summary info.
Summary
Target issue is #542
Explain the motivation for making this change. What existing problem does the pull request solve?
Enable chaining of multiple LLM calls
Usecases:
Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.Notes
Please add here if any other information is required for the reviewer.
Summary by CodeRabbit
Release Notes