diff --git a/backend/app/alembic/versions/044_add_score_trace_url_to_evaluation_run.py b/backend/app/alembic/versions/044_add_score_trace_url_to_evaluation_run.py new file mode 100644 index 00000000..bc1f94c6 --- /dev/null +++ b/backend/app/alembic/versions/044_add_score_trace_url_to_evaluation_run.py @@ -0,0 +1,32 @@ +"""Add score_trace_url to evaluation_run + +Revision ID: 044 +Revises: 043 +Create Date: 2026-01-24 19:34:46.763908 + +""" +from alembic import op +import sqlalchemy as sa +import sqlmodel.sql.sqltypes + + +revision = "044" +down_revision = "043" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "evaluation_run", + sa.Column( + "score_trace_url", + sqlmodel.sql.sqltypes.AutoString(), + nullable=True, + comment="S3 URL where per-trace evaluation scores are stored", + ), + ) + + +def downgrade(): + op.drop_column("evaluation_run", "score_trace_url") diff --git a/backend/app/core/storage_utils.py b/backend/app/core/storage_utils.py index 63830d7d..a3e8c720 100644 --- a/backend/app/core/storage_utils.py +++ b/backend/app/core/storage_utils.py @@ -15,6 +15,7 @@ from starlette.datastructures import Headers, UploadFile from app.core.cloud.storage import CloudStorage, CloudStorageError +from typing import Literal logger = logging.getLogger(__name__) @@ -88,6 +89,7 @@ def upload_jsonl_to_object_store( results: list[dict], filename: str, subdirectory: str, + format: Literal["json", "jsonl"] = "jsonl", ) -> str | None: """ Upload JSONL (JSON Lines) content to object store. @@ -114,12 +116,20 @@ def upload_jsonl_to_object_store( # Create file path file_path = Path(subdirectory) / filename - # Convert results to JSONL - jsonl_content = "\n".join([json.dumps(result) for result in results]) + if format == "jsonl": + jsonl_content = ( + "\n".join(json.dumps(result, ensure_ascii=False) for result in results) + + "\n" + ) + content_type = {"content-type": "application/jsonl"} + else: + jsonl_content = json.dumps(results, ensure_ascii=False) + content_type = {"content-type": "application/json"} + content_bytes = jsonl_content.encode("utf-8") # Create UploadFile-like object - headers = Headers({"content-type": "application/jsonl"}) + headers = Headers(content_type) upload_file = UploadFile( filename=filename, file=BytesIO(content_bytes), @@ -152,6 +162,37 @@ def upload_jsonl_to_object_store( return None +def load_json_from_object_store(storage: CloudStorage, url: str) -> list | dict | None: + logger.info(f"[load_json_from_object_store] Loading JSON from '{url}") + try: + body = storage.stream(url) + content = body.read() + + data = json.loads(content.decode("utf-8")) + + logger.info( + f"[load_json_from_object_store] Download successful | " + f"url='{url}', size={len(content)} bytes" + ) + return data + except CloudStorageError as e: + logger.warning( + f"[load_json_from_object_store] failed to load JSON from '{url}': {e}", + ) + return None + except json.JSONDecodeError as e: + logger.warning( + f"[load_json_from_object_store] JSON decode error loading JSON from '{url}': {e}", + ) + return None + except Exception as e: + logger.warning( + f"[load_json_from_object_store] unexpected error loading JSON from '{url}': {e}", + exc_info=True, + ) + return None + + def generate_timestamped_filename(base_name: str, extension: str = "csv") -> str: """ Generate a filename with timestamp. diff --git a/backend/app/crud/evaluations/core.py b/backend/app/crud/evaluations/core.py index 6d17afe6..81c5e248 100644 --- a/backend/app/crud/evaluations/core.py +++ b/backend/app/crud/evaluations/core.py @@ -13,6 +13,10 @@ from app.models.llm.request import ConfigBlob, LLMCallConfig from app.services.llm.jobs import resolve_config_blob +from app.core.db import engine +from app.core.cloud.storage import get_cloud_storage +from app.core.storage_utils import upload_jsonl_to_object_store + logger = logging.getLogger(__name__) @@ -187,6 +191,7 @@ def update_evaluation_run( status: str | None = None, error_message: str | None = None, object_store_url: str | None = None, + score_trace_url: str | None = None, score: dict | None = None, embedding_batch_job_id: int | None = None, ) -> EvaluationRun: @@ -219,6 +224,8 @@ def update_evaluation_run( eval_run.score = score if embedding_batch_job_id is not None: eval_run.embedding_batch_job_id = embedding_batch_job_id + if score_trace_url is not None: + eval_run.score_trace_url = score_trace_url or None # Always update timestamp eval_run.updated_at = now() @@ -335,7 +342,6 @@ def save_score( Returns: Updated EvaluationRun instance, or None if not found """ - from app.core.db import engine with Session(engine) as session: eval_run = get_evaluation_run_by_id( @@ -344,12 +350,61 @@ def save_score( organization_id=organization_id, project_id=project_id, ) - if eval_run: - update_evaluation_run(session=session, eval_run=eval_run, score=score) - logger.info( - f"[save_score] Saved score | evaluation_id={eval_run_id} | " - f"traces={len(score.get('traces', []))}" - ) + if not eval_run: + return None + + traces = score.get("traces", []) + summary_score = score.get("summary_scores", []) + score_trace_url: str | None = "" if not traces else None + + if traces: + try: + storage = get_cloud_storage(session=session, project_id=project_id) + score_trace_url = upload_jsonl_to_object_store( + storage=storage, + results=traces, + filename=f"traces_{eval_run_id}.json", + subdirectory=f"evaluations/score/{eval_run_id}", + format="json", + ) + if score_trace_url: + logger.info( + f"[save_score] uploaded traces to S3 | " + f"evaluation_id={eval_run_id} | url={score_trace_url} | " + f"traces_count={len(traces)}" + ) + else: + logger.warning( + f"[save_score] failed to upload traces to S3, " + f"falling back to DB storage | evaluation_id={eval_run_id}" + ) + except Exception as e: + logger.error( + f"[save_score] Error uploading traces to S3: {e} | " + f"evaluation_id={eval_run_id}", + exc_info=True, + ) + + # IF TRACES DATA IS STORED IN S3 URL THEN HERE WE ARE JUST STORING THE SUMMARY SCORE + # TODO: Evaluate whether this behaviour is needed or completely discard the storing data in db + if score_trace_url: + db_score = {"summary_scores": summary_score} + else: + # fallback to store data in db if failed to store in s3 + db_score = score + + update_evaluation_run( + session=session, + eval_run=eval_run, + score=db_score, + score_trace_url=score_trace_url, + ) + + logger.info( + f"[save_score] Saved score | evaluation_id={eval_run_id} | " + f"traces={len(score.get('traces', []))}" + ) + return eval_run diff --git a/backend/app/models/evaluation.py b/backend/app/models/evaluation.py index 6ae4542f..5aa8bf29 100644 --- a/backend/app/models/evaluation.py +++ b/backend/app/models/evaluation.py @@ -250,6 +250,13 @@ class EvaluationRun(SQLModel, table=True): description="Object store URL of processed evaluation results for future reference", sa_column_kwargs={"comment": "S3 URL of processed evaluation results"}, ) + score_trace_url: str | None = SQLField( + default=None, + description="S3 URL per-trace score data is stored", + sa_column_kwargs={ + "comment": "S3 URL where per-trace evaluation scores are stored" + }, + ) total_items: int = SQLField( default=0, description="Total number of items evaluated (set during processing)", diff --git a/backend/app/services/evaluations/evaluation.py b/backend/app/services/evaluations/evaluation.py index 785eb02a..8c3d0165 100644 --- a/backend/app/services/evaluations/evaluation.py +++ b/backend/app/services/evaluations/evaluation.py @@ -18,6 +18,8 @@ from app.models.evaluation import EvaluationRun from app.services.llm.providers import LLMProvider from app.utils import get_langfuse_client, get_openai_client +from app.core.cloud.storage import get_cloud_storage +from app.core.storage_utils import load_json_from_object_store logger = logging.getLogger(__name__) @@ -189,6 +191,7 @@ def get_evaluation_with_scores( Returns: Tuple of (EvaluationRun or None, error_message or None) """ + logger.info( f"[get_evaluation_with_scores] Fetching status for evaluation run | " f"evaluation_id={evaluation_id} | " @@ -227,9 +230,41 @@ def get_evaluation_with_scores( return eval_run, None # Check if we already have cached traces - has_cached_traces = eval_run.score is not None and "traces" in eval_run.score - if not resync_score and has_cached_traces: - return eval_run, None + has_cached_traces_s3 = eval_run.score_trace_url is not None + has_cached_traces_db = eval_run.score is not None and "traces" in eval_run.score + if not resync_score: + if has_cached_traces_s3: + try: + storage = get_cloud_storage(session=session, project_id=project_id) + traces = load_json_from_object_store( + storage=storage, url=eval_run.score_trace_url + ) + if traces is not None: + eval_run.score = { + "summary_scores": (eval_run.score or {}).get( + "summary_scores", [] + ), + "traces": traces, + } + logger.info( + f"[get_evaluation_with_scores] Loaded traces from S3 | " + f"evaluation_id={evaluation_id} | " + f"traces_count={len(traces)}" + ) + return eval_run, None + except Exception as e: + logger.error( + f"[get_evaluation_with_scores] Error loading traces from S3: {e} | " + f"evaluation_id={evaluation_id}", + exc_info=True, + ) + + if has_cached_traces_db: + logger.info( + f"[get_evaluation_with_scores] Returning traces from DB | " + f"evaluation_id={evaluation_id}" + ) + return eval_run, None langfuse = get_langfuse_client( session=session, @@ -289,4 +324,7 @@ def get_evaluation_with_scores( score=score, ) + if eval_run: + eval_run.score = score + return eval_run, None diff --git a/backend/app/tests/core/test_storage_utils.py b/backend/app/tests/core/test_storage_utils.py new file mode 100644 index 00000000..d0b392c7 --- /dev/null +++ b/backend/app/tests/core/test_storage_utils.py @@ -0,0 +1,133 @@ +"""Tests for storage_utils.py - upload and load functions for object store.""" + +import json +from io import BytesIO +from unittest.mock import MagicMock + +import pytest + +from app.core.cloud.storage import CloudStorageError +from app.core.storage_utils import ( + load_json_from_object_store, + upload_jsonl_to_object_store, +) + + +class TestUploadToObjectStore: + """Test uploading content to object store.""" + + # ==================== Upload Success Tests ==================== + + def test_upload_json_file_success(self) -> None: + """Verify successful JSON format upload returns URL with correct content.""" + mock_storage = MagicMock() + mock_storage.put.return_value = "s3://bucket/path/traces.json" + + results = [{"trace_id": "t1", "score": 0.9}, {"trace_id": "t2", "score": 0.8}] + + url = upload_jsonl_to_object_store( + storage=mock_storage, + results=results, + filename="traces.json", + subdirectory="evaluations/score/70", + format="json", + ) + + assert url == "s3://bucket/path/traces.json" + mock_storage.put.assert_called_once() + + # Verify content is valid JSON array + call_args = mock_storage.put.call_args + upload_file = call_args.kwargs.get("source") + content = upload_file.file.read().decode("utf-8") + assert json.loads(content) == results + + def test_upload_jsonl_file_success(self) -> None: + """Verify successful JSONL format upload returns URL with correct content.""" + mock_storage = MagicMock() + mock_storage.put.return_value = "s3://bucket/path/traces.jsonl" + + results = [{"trace_id": "t1", "score": 0.9}, {"trace_id": "t2", "score": 0.8}] + + url = upload_jsonl_to_object_store( + storage=mock_storage, + results=results, + filename="traces.jsonl", + subdirectory="evaluations/score/70", + format="jsonl", + ) + + assert url == "s3://bucket/path/traces.jsonl" + mock_storage.put.assert_called_once() + + # Verify content is valid JSONL (one JSON object per line) + call_args = mock_storage.put.call_args + upload_file = call_args.kwargs.get("source") + content = upload_file.file.read().decode("utf-8") + parsed_results = [json.loads(line) for line in content.strip().split("\n")] + assert parsed_results == results + + # ==================== Upload Failure Tests ==================== + + def test_upload_returns_none_on_storage_error(self) -> None: + """Verify returns None on CloudStorageError.""" + mock_storage = MagicMock() + mock_storage.put.side_effect = CloudStorageError("Upload failed") + + url = upload_jsonl_to_object_store( + storage=mock_storage, + results=[{"id": 1}], + filename="test.json", + subdirectory="test", + format="json", + ) + + assert url is None + + +class TestLoadJsonFromObjectStore: + """Test loading JSON from object store.""" + + # ==================== Load Success Tests ==================== + + def test_load_success(self) -> None: + """Verify successful load returns parsed JSON.""" + mock_storage = MagicMock() + test_data = [{"id": 1, "value": "test"}] + mock_storage.stream.return_value = BytesIO( + json.dumps(test_data).encode("utf-8") + ) + + result = load_json_from_object_store( + storage=mock_storage, + url="s3://bucket/path/file.json", + ) + + assert result == test_data + mock_storage.stream.assert_called_once_with("s3://bucket/path/file.json") + + # ==================== Load Failure Tests ==================== + + def test_load_returns_none_on_storage_error(self) -> None: + """Verify returns None on CloudStorageError.""" + mock_storage = MagicMock() + mock_storage.stream.side_effect = CloudStorageError("Download failed") + + result = load_json_from_object_store( + storage=mock_storage, + url="s3://bucket/file.json", + ) + + assert result is None + + def test_load_returns_none_on_invalid_json(self) -> None: + """Verify returns None on invalid JSON content.""" + mock_storage = MagicMock() + mock_storage.stream.return_value = BytesIO(b"not valid json") + + result = load_json_from_object_store( + storage=mock_storage, + url="s3://bucket/file.json", + ) + + assert result is None diff --git a/backend/app/tests/crud/evaluations/test_score_storage.py b/backend/app/tests/crud/evaluations/test_score_storage.py new file mode 100644 index 00000000..87a82845 --- /dev/null +++ b/backend/app/tests/crud/evaluations/test_score_storage.py @@ -0,0 +1,110 @@ +"""Tests for save_score() S3 upload functionality.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from app.crud.evaluations.core import save_score +from app.models import EvaluationRun + + +class TestSaveScoreS3Upload: + """Test save_score() S3 upload functionality.""" + + @pytest.fixture + def mock_eval_run(self): + """Create a mock EvaluationRun.""" + eval_run = MagicMock(spec=EvaluationRun) + eval_run.id = 100 + return eval_run + + @patch("app.crud.evaluations.core.update_evaluation_run") + @patch("app.crud.evaluations.core.get_evaluation_run_by_id") + @patch("app.crud.evaluations.core.upload_jsonl_to_object_store") + @patch("app.crud.evaluations.core.get_cloud_storage") + @patch("app.core.db.engine") + def test_uploads_traces_to_s3_and_stores_summary_only( + self, + mock_engine, + mock_get_storage, + mock_upload, + mock_get_eval, + mock_update, + mock_eval_run, + ) -> None: + """Verify traces uploaded to S3, only summary_scores stored in DB.""" + mock_get_eval.return_value = mock_eval_run + mock_get_storage.return_value = MagicMock() + mock_upload.return_value = "s3://bucket/traces.json" + + score = { + "summary_scores": [{"name": "accuracy", "avg": 0.9}], + "traces": [{"trace_id": "t1"}], + } + + save_score(eval_run_id=100, organization_id=1, project_id=1, score=score) + + # Verify upload was called with traces + mock_upload.assert_called_once() + assert mock_upload.call_args.kwargs["results"] == [{"trace_id": "t1"}] + + # Verify DB gets summary only, not traces + call_kwargs = mock_update.call_args.kwargs + assert call_kwargs["score"] == { + "summary_scores": [{"name": "accuracy", "avg": 0.9}] + } + assert call_kwargs["score_trace_url"] == "s3://bucket/traces.json" + + @patch("app.crud.evaluations.core.update_evaluation_run") + @patch("app.crud.evaluations.core.get_evaluation_run_by_id") + @patch("app.crud.evaluations.core.upload_jsonl_to_object_store") + @patch("app.crud.evaluations.core.get_cloud_storage") + @patch("app.core.db.engine") + def test_fallback_to_db_when_s3_fails( + self, + mock_engine, + mock_get_storage, + mock_upload, + mock_get_eval, + mock_update, + mock_eval_run, + ) -> None: + """Verify full score stored in DB when S3 upload fails.""" + mock_get_eval.return_value = mock_eval_run + mock_get_storage.return_value = MagicMock() + mock_upload.return_value = None # S3 failed + + score = { + "summary_scores": [{"name": "accuracy", "avg": 0.9}], + "traces": [{"trace_id": "t1"}], + } + + save_score(eval_run_id=100, organization_id=1, project_id=1, score=score) + + # Full score stored in DB as fallback + call_kwargs = mock_update.call_args.kwargs + assert call_kwargs["score"] == score + assert call_kwargs["score_trace_url"] is None + + @patch("app.crud.evaluations.core.update_evaluation_run") + @patch("app.crud.evaluations.core.get_evaluation_run_by_id") + @patch("app.crud.evaluations.core.upload_jsonl_to_object_store") + @patch("app.crud.evaluations.core.get_cloud_storage") + @patch("app.core.db.engine") + def test_no_s3_upload_when_no_traces( + self, + mock_engine, + mock_get_storage, + mock_upload, + mock_get_eval, + mock_update, + mock_eval_run, + ) -> None: + """Verify S3 upload skipped when traces is empty.""" + mock_get_eval.return_value = mock_eval_run + + score = {"summary_scores": [{"name": "accuracy", "avg": 0.9}], "traces": []} + + save_score(eval_run_id=100, organization_id=1, project_id=1, score=score) + + mock_upload.assert_not_called() diff --git a/backend/app/tests/services/__init__.py b/backend/app/tests/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/tests/services/evaluations/__init__.py b/backend/app/tests/services/evaluations/__init__.py new file mode 100644 index 00000000..29303195 --- /dev/null +++ b/backend/app/tests/services/evaluations/__init__.py @@ -0,0 +1 @@ +"""Evaluation service tests.""" diff --git a/backend/app/tests/services/evaluations/test_evaluation_service_s3.py b/backend/app/tests/services/evaluations/test_evaluation_service_s3.py new file mode 100644 index 00000000..60465b87 --- /dev/null +++ b/backend/app/tests/services/evaluations/test_evaluation_service_s3.py @@ -0,0 +1,152 @@ +"""Tests for get_evaluation_with_scores() S3 retrieval.""" + +from collections.abc import Callable +from typing import Optional +from unittest.mock import MagicMock, patch + +import pytest + +from app.models import EvaluationRun +from app.services.evaluations.evaluation import get_evaluation_with_scores + + +class TestGetEvaluationWithScoresS3: + """Test get_evaluation_with_scores() S3 retrieval.""" + + @pytest.fixture + def eval_run_factory(self) -> Callable[..., MagicMock]: + """Factory that creates a MagicMock(spec=EvaluationRun) with given attrs.""" + + def _factory( + *, + id: int, + status: str, + score: dict, + score_trace_url: Optional[str] = None, + dataset_name: Optional[str] = None, + run_name: Optional[str] = None, + ) -> MagicMock: + eval_run = MagicMock(spec=EvaluationRun) + eval_run.id = id + eval_run.status = status + eval_run.score = score + eval_run.score_trace_url = score_trace_url + eval_run.dataset_name = dataset_name + eval_run.run_name = run_name + return eval_run + + return _factory + + @patch("app.services.evaluations.evaluation.get_evaluation_run_by_id") + @patch("app.services.evaluations.evaluation.load_json_from_object_store") + @patch("app.services.evaluations.evaluation.get_cloud_storage") + def test_loads_traces_from_s3( + self, + mock_get_storage: MagicMock, + mock_load: MagicMock, + mock_get_eval: MagicMock, + eval_run_factory: Callable[..., MagicMock], + ) -> None: + """Verify traces loaded from S3 and score reconstructed.""" + eval_run = eval_run_factory( + id=100, + status="completed", + score={"summary_scores": [{"name": "accuracy", "avg": 0.9}]}, + score_trace_url="s3://bucket/traces.json", + dataset_name="test_dataset", + run_name="test_run", + ) + mock_get_eval.return_value = eval_run + mock_get_storage.return_value = MagicMock() + mock_load.return_value = [{"trace_id": "s3_trace"}] + + result, error = get_evaluation_with_scores( + session=MagicMock(), + evaluation_id=100, + organization_id=1, + project_id=1, + get_trace_info=True, + resync_score=False, + ) + + assert error is None + mock_load.assert_called_once() + assert result.score["traces"] == [{"trace_id": "s3_trace"}] + assert result.score["summary_scores"] == [{"name": "accuracy", "avg": 0.9}] + + @patch("app.services.evaluations.evaluation.get_evaluation_run_by_id") + @patch("app.services.evaluations.evaluation.get_cloud_storage") + def test_returns_db_traces_when_no_s3_url( + self, + mock_get_storage: MagicMock, + mock_get_eval: MagicMock, + eval_run_factory: Callable[..., MagicMock], + ) -> None: + """Verify DB traces returned when no S3 URL.""" + eval_run = eval_run_factory( + id=101, + status="completed", + score={ + "summary_scores": [{"name": "accuracy", "avg": 0.85}], + "traces": [{"trace_id": "db_trace"}], + }, + ) + mock_get_eval.return_value = eval_run + + result, error = get_evaluation_with_scores( + session=MagicMock(), + evaluation_id=101, + organization_id=1, + project_id=1, + get_trace_info=True, + resync_score=False, + ) + + assert error is None + mock_get_storage.assert_not_called() + assert result.score["traces"] == [{"trace_id": "db_trace"}] + + @patch("app.services.evaluations.evaluation.save_score") + @patch("app.services.evaluations.evaluation.fetch_trace_scores_from_langfuse") + @patch("app.services.evaluations.evaluation.get_langfuse_client") + @patch("app.services.evaluations.evaluation.get_evaluation_run_by_id") + @patch("app.services.evaluations.evaluation.load_json_from_object_store") + @patch("app.services.evaluations.evaluation.get_cloud_storage") + def test_resync_bypasses_cache_and_fetches_langfuse( + self, + mock_get_storage: MagicMock, + mock_load: MagicMock, + mock_get_eval: MagicMock, + mock_get_langfuse: MagicMock, + mock_fetch_langfuse: MagicMock, + mock_save_score: MagicMock, + eval_run_factory: Callable[..., MagicMock], + ) -> None: + """Verify resync=True skips S3/DB and fetches from Langfuse.""" + eval_run = eval_run_factory( + id=100, + status="completed", + score={"summary_scores": [{"name": "accuracy", "avg": 0.9}]}, + score_trace_url="s3://bucket/traces.json", + dataset_name="test_dataset", + run_name="test_run", + ) + mock_get_eval.return_value = eval_run + mock_get_langfuse.return_value = MagicMock() + mock_fetch_langfuse.return_value = { + "summary_scores": [], + "traces": [{"trace_id": "new"}], + } + mock_save_score.return_value = eval_run + + get_evaluation_with_scores( + session=MagicMock(), + evaluation_id=100, + organization_id=1, + project_id=1, + get_trace_info=True, + resync_score=True, + ) + + mock_load.assert_not_called() # S3 skipped + mock_fetch_langfuse.assert_called_once() # Langfuse called