Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Add score_trace_url to evaluation_run

Revision ID: 043
Revises: 042
Create Date: 2026-01-24 19:34:46.763908

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel.sql.sqltypes


revision = "043"
down_revision = "042"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"evaluation_run",
sa.Column(
"score_trace_url",
sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
comment="S3 URL where per-trace evaluation scores are stored",
),
)


def downgrade():
op.drop_column("evaluation_run", "score_trace_url")
Comment on lines +19 to +32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add return type hints to migration functions.

Both upgrade() and downgrade() should declare -> None to comply with the type-hinting rule.

✍️ Suggested update
-def upgrade():
+def upgrade() -> None:
     op.add_column(
         "evaluation_run",
         sa.Column(
             "score_trace_url",
             sqlmodel.sql.sqltypes.AutoString(),
             nullable=True,
             comment="S3 URL where per-trace evaluation scores are stored",
         ),
     )

-def downgrade():
+def downgrade() -> None:
     op.drop_column("evaluation_run", "score_trace_url")

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def upgrade():
op.add_column(
"evaluation_run",
sa.Column(
"score_trace_url",
sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
comment="S3 URL where per-trace evaluation scores are stored",
),
)
def downgrade():
op.drop_column("evaluation_run", "score_trace_url")
def upgrade() -> None:
op.add_column(
"evaluation_run",
sa.Column(
"score_trace_url",
sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
comment="S3 URL where per-trace evaluation scores are stored",
),
)
def downgrade() -> None:
op.drop_column("evaluation_run", "score_trace_url")
🤖 Prompt for AI Agents
In `@backend/app/alembic/versions/043_add_score_trace_url_to_evaluation_run.py`
around lines 19 - 32, The migration functions upgrade and downgrade lack return
type hints; update the function signatures for upgrade() and downgrade() to
declare return type -> None so they read as upgrade() -> None: and downgrade()
-> None: (keep existing bodies unchanged) to satisfy the type-hinting rule.

39 changes: 33 additions & 6 deletions backend/app/core/storage_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ def __init__(self, content: bytes):


def upload_jsonl_to_object_store(
storage: CloudStorage,
results: list[dict],
filename: str,
subdirectory: str,
storage: CloudStorage, results: list[dict], filename: str, subdirectory: str
) -> str | None:
"""
Upload JSONL (JSON Lines) content to object store.
Expand All @@ -114,8 +111,7 @@ def upload_jsonl_to_object_store(
# Create file path
file_path = Path(subdirectory) / filename

# Convert results to JSONL
jsonl_content = "\n".join([json.dumps(result) for result in results])
jsonl_content = json.dumps(results, ensure_ascii=False)
content_bytes = jsonl_content.encode("utf-8")

# Create UploadFile-like object
Expand Down Expand Up @@ -152,6 +148,37 @@ def upload_jsonl_to_object_store(
return None


def load_json_from_object_store(storage: CloudStorage, url: str) -> list | None:
logger.info(f"[load_json_from_object_store] Loading JSON from '{url}")
Comment on lines +151 to +152
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix log message formatting.

The opening quote isn’t closed, which makes the log line confusing.

✍️ Suggested update
-    logger.info(f"[load_json_from_object_store] Loading JSON from '{url}")
+    logger.info(f"[load_json_from_object_store] Loading JSON from '{url}'")
🤖 Prompt for AI Agents
In `@backend/app/core/storage_utils.py` around lines 151 - 152, The log statement
in load_json_from_object_store has a missing closing quote and brace in the
f-string; update the logger.info call (logger.info(...)) to properly close the
string and include the url variable, e.g., fix the f-string to something like
f"[load_json_from_object_store] Loading JSON from '{url}'" so the message is
well-formed and includes the trailing quote and bracket.

try:
body = storage.stream(url)
content = body.read()

data = json.loads(content.decode("utf-8"))

logger.info(
f"[load_json_from_object_store] Download successful | "
f"url='{url}', size={len(content)} bytes"
)
return data
except CloudStorageError as e:
logger.warning(
f"[load_json_from_object_store] failed to load JSON from '{url}': {e}",
)
return None
except json.JSONDecodeError as e:
logger.warning(
f"[load_json_from_object_store] JSON decode error loading JSON from '{url}': {e}",
)
return None
except Exception as e:
logger.warning(
f"[load_json_from_object_store] unexpected error loading JSON from '{url}': {e}",
exc_info=True,
)
return None


def generate_timestamped_filename(base_name: str, extension: str = "csv") -> str:
"""
Generate a filename with timestamp.
Expand Down
65 changes: 59 additions & 6 deletions backend/app/crud/evaluations/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def update_evaluation_run(
status: str | None = None,
error_message: str | None = None,
object_store_url: str | None = None,
score_trace_url: str | None = None,
score: dict | None = None,
embedding_batch_job_id: int | None = None,
) -> EvaluationRun:
Expand Down Expand Up @@ -219,6 +220,8 @@ def update_evaluation_run(
eval_run.score = score
if embedding_batch_job_id is not None:
eval_run.embedding_batch_job_id = embedding_batch_job_id
if score_trace_url is not None:
eval_run.score_trace_url = score_trace_url

# Always update timestamp
eval_run.updated_at = now()
Expand Down Expand Up @@ -336,6 +339,8 @@ def save_score(
Updated EvaluationRun instance, or None if not found
"""
from app.core.db import engine
from app.core.cloud.storage import get_cloud_storage
from app.core.storage_utils import upload_jsonl_to_object_store

with Session(engine) as session:
eval_run = get_evaluation_run_by_id(
Expand All @@ -344,12 +349,60 @@ def save_score(
organization_id=organization_id,
project_id=project_id,
)
if eval_run:
update_evaluation_run(session=session, eval_run=eval_run, score=score)
logger.info(
f"[save_score] Saved score | evaluation_id={eval_run_id} | "
f"traces={len(score.get('traces', []))}"
)
if not eval_run:
return None

traces = score.get("traces", [])
summary_score = score.get("summary_scores", [])
score_trace_url = None

if traces:
try:
storage = get_cloud_storage(session=session, project_id=project_id)
score_trace_url = upload_jsonl_to_object_store(
storage=storage,
results=traces,
filename=f"traces_{eval_run_id}.json",
subdirectory=f"evaluations/score/{eval_run_id}",
)
if score_trace_url:
logger.info(
f"[save_score] uploaded traces to S3 | "
f"evaluation_id={eval_run_id} | url={score_trace_url} | "
f"traces_count={len(traces)}"
)
else:
logger.warning(
f"[save_score] failed to upload traces to S3, "
f"falling back to DB storage | evaluation_id={eval_run_id}"
)
except Exception as e:
logger.error(
f"[save_score] Error uploading traces to S3: {e} | "
f"evaluation_id={eval_run_id}",
exc_info=True,
)

# IF TRACES DATA IS STORED IN S3 URL THEN HERE WE ARE JUST STORING THE SUMMARY SCORE
# TODO: Evaluate weather this behaviour is needed or completely discard the storing data in db
if score_trace_url:
db_score = {"summary_scores": summary_score}
else:
# fallback to store data in db if failed to store in s3
db_score = score

update_evaluation_run(
session=session,
eval_run=eval_run,
score=db_score,
score_trace_url=score_trace_url,
)

logger.info(
f"[save_score] Saved score | evaluation_id={eval_run_id} | "
f"traces={len(score.get('traces', []))}"
)

return eval_run


Expand Down
7 changes: 7 additions & 0 deletions backend/app/models/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,13 @@ class EvaluationRun(SQLModel, table=True):
description="Object store URL of processed evaluation results for future reference",
sa_column_kwargs={"comment": "S3 URL of processed evaluation results"},
)
score_trace_url: str | None = SQLField(
default=None,
description="S3 URL per-trace score data is stored",
sa_column_kwargs={
"comment": "S3 URL where per-trace evaluation scores are stored"
},
)
total_items: int = SQLField(
default=0,
description="Total number of items evaluated (set during processing)",
Expand Down
44 changes: 41 additions & 3 deletions backend/app/services/evaluations/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def get_evaluation_with_scores(
Returns:
Tuple of (EvaluationRun or None, error_message or None)
"""
from app.core.cloud.storage import get_cloud_storage
from app.core.storage_utils import load_json_from_object_store

logger.info(
f"[get_evaluation_with_scores] Fetching status for evaluation run | "
f"evaluation_id={evaluation_id} | "
Expand Down Expand Up @@ -227,9 +230,41 @@ def get_evaluation_with_scores(
return eval_run, None

# Check if we already have cached traces
has_cached_traces = eval_run.score is not None and "traces" in eval_run.score
if not resync_score and has_cached_traces:
return eval_run, None
has_cached_traces_s3 = eval_run.score_trace_url is not None
has_cached_traces_db = eval_run.score is not None and "traces" in eval_run.score
if not resync_score:
if has_cached_traces_s3:
try:
storage = get_cloud_storage(session=session, project_id=project_id)
traces = load_json_from_object_store(
storage=storage, url=eval_run.score_trace_url
)
if traces is not None:
eval_run.score = {
"summary_scores": (eval_run.score or {}).get(
"summary_scores", []
),
"traces": traces,
}
logger.info(
f"[get_evaluation_with_scores] Loaded traces from S3 | "
f"evaluation_id={evaluation_id} | "
f"traces_count={len(traces)}"
)
return eval_run, None
except Exception as e:
logger.error(
f"[get_evaluation_with_scores] Error loading traces from S3: {e} | "
f"evaluation_id={evaluation_id}",
exc_info=True,
)

if has_cached_traces_db:
logger.info(
f"[get_evaluation_with_scores] Returning traces from DB | "
f"evaluation_id={evaluation_id}"
)
return eval_run, None

langfuse = get_langfuse_client(
session=session,
Expand Down Expand Up @@ -289,4 +324,7 @@ def get_evaluation_with_scores(
score=score,
)

if eval_run:
eval_run.score = score

return eval_run, None
98 changes: 98 additions & 0 deletions backend/app/tests/core/test_storage_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Tests for storage_utils.py - upload and load functions for object store."""

import json
from io import BytesIO
from unittest.mock import MagicMock

import pytest

from app.core.cloud.storage import CloudStorageError
from app.core.storage_utils import (
load_json_from_object_store,
upload_jsonl_to_object_store,
)


class TestUploadJsonlToObjectStore:
"""Test uploading JSON content to object store."""

def test_upload_success(self) -> None:
"""Verify successful upload returns URL and content is correct."""
mock_storage = MagicMock()
mock_storage.put.return_value = "s3://bucket/path/traces.json"

results = [{"trace_id": "t1", "score": 0.9}]

url = upload_jsonl_to_object_store(
storage=mock_storage,
results=results,
filename="traces.json",
subdirectory="evaluations/score/70",
)

assert url == "s3://bucket/path/traces.json"
mock_storage.put.assert_called_once()

# Verify content is valid JSON
call_args = mock_storage.put.call_args
upload_file = call_args.kwargs.get("source")
content = upload_file.file.read().decode("utf-8")
assert json.loads(content) == results

def test_upload_returns_none_on_error(self) -> None:
"""Verify returns None on CloudStorageError."""
mock_storage = MagicMock()
mock_storage.put.side_effect = CloudStorageError("Upload failed")

url = upload_jsonl_to_object_store(
storage=mock_storage,
results=[{"id": 1}],
filename="test.json",
subdirectory="test",
)

assert url is None

Comment on lines +16 to +55
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add parameter type hints to test methods.

Annotate self to satisfy the type-hint requirement for function parameters.

✍️ Suggested update
-    def test_upload_success(self) -> None:
+    def test_upload_success(self: "TestUploadJsonlToObjectStore") -> None:
         """Verify successful upload returns URL and content is correct."""
         mock_storage = MagicMock()
         mock_storage.put.return_value = "s3://bucket/path/traces.json"
@@
-    def test_upload_returns_none_on_error(self) -> None:
+    def test_upload_returns_none_on_error(
+        self: "TestUploadJsonlToObjectStore",
+    ) -> None:
         """Verify returns None on CloudStorageError."""
         mock_storage = MagicMock()
         mock_storage.put.side_effect = CloudStorageError("Upload failed")

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

🤖 Prompt for AI Agents
In `@backend/app/tests/core/test_storage_utils.py` around lines 16 - 55, The test
methods in class TestUploadJsonlToObjectStore lack type annotations on the
implicit self parameter; update the method signatures for test_upload_success
and test_upload_returns_none_on_error to annotate self (e.g., self:
TestUploadJsonlToObjectStore) while keeping the existing return type hints, and
run tests to ensure no behavioral changes; locate these methods and the class
name TestUploadJsonlToObjectStore and add the self parameter type annotation
accordingly.


class TestLoadJsonFromObjectStore:
"""Test loading JSON from object store."""

def test_load_success(self) -> None:
"""Verify successful load returns parsed JSON."""
mock_storage = MagicMock()
test_data = [{"id": 1, "value": "test"}]
mock_storage.stream.return_value = BytesIO(
json.dumps(test_data).encode("utf-8")
)

result = load_json_from_object_store(
storage=mock_storage,
url="s3://bucket/path/file.json",
)

assert result == test_data
mock_storage.stream.assert_called_once_with("s3://bucket/path/file.json")

def test_load_returns_none_on_error(self) -> None:
"""Verify returns None on CloudStorageError."""
mock_storage = MagicMock()
mock_storage.stream.side_effect = CloudStorageError("Download failed")

result = load_json_from_object_store(
storage=mock_storage,
url="s3://bucket/file.json",
)

assert result is None

def test_load_returns_none_on_invalid_json(self) -> None:
"""Verify returns None on invalid JSON content."""
mock_storage = MagicMock()
mock_storage.stream.return_value = BytesIO(b"not valid json")

result = load_json_from_object_store(
storage=mock_storage,
url="s3://bucket/file.json",
)

assert result is None
Comment on lines +57 to +98
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add parameter type hints to test methods.

Same type-hint requirement applies to this class’ methods.

✍️ Suggested update
-    def test_load_success(self) -> None:
+    def test_load_success(self: "TestLoadJsonFromObjectStore") -> None:
         """Verify successful load returns parsed JSON."""
         mock_storage = MagicMock()
         test_data = [{"id": 1, "value": "test"}]
@@
-    def test_load_returns_none_on_error(self) -> None:
+    def test_load_returns_none_on_error(
+        self: "TestLoadJsonFromObjectStore",
+    ) -> None:
         """Verify returns None on CloudStorageError."""
         mock_storage = MagicMock()
         mock_storage.stream.side_effect = CloudStorageError("Download failed")
@@
-    def test_load_returns_none_on_invalid_json(self) -> None:
+    def test_load_returns_none_on_invalid_json(
+        self: "TestLoadJsonFromObjectStore",
+    ) -> None:
         """Verify returns None on invalid JSON content."""
         mock_storage = MagicMock()
         mock_storage.stream.return_value = BytesIO(b"not valid json")

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

🤖 Prompt for AI Agents
In `@backend/app/tests/core/test_storage_utils.py` around lines 57 - 98, The test
methods in TestLoadJsonFromObjectStore lack parameter type hints; update each
method signature (test_load_success, test_load_returns_none_on_error,
test_load_returns_none_on_invalid_json) to include the implicit self parameter
type (self: "TestLoadJsonFromObjectStore") and an explicit return type -> None;
ensure any local test helper parameters would also be annotated (e.g.,
mock_storage: MagicMock) if present, and keep the body unchanged so assertions
against load_json_from_object_store and mock_storage.stream continue to work.

Loading