Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion python/lib/sift_client/_internal/low_level_wrappers/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

from abc import ABC
from typing import Any, Callable
from typing import Any, Awaitable, Callable

from sift_py.grpc.cache import with_cache, with_force_refresh


class LowLevelClientBase(ABC):
Expand Down Expand Up @@ -50,3 +52,67 @@ async def _handle_pagination(
if max_results and len(results) > max_results:
results = results[:max_results]
return results

@staticmethod
async def _call_with_cache(
stub_method: Callable[..., Awaitable[Any]],
request: Any,
*,
use_cache: bool = True,
force_refresh: bool = False,
ttl: int | None = None,
) -> Any:
"""Call a gRPC stub method with cache control.

This is a convenience method for low-level wrappers to easily enable caching
on their gRPC calls without manually constructing metadata.

Args:
stub_method: The gRPC stub method to call (e.g., stub.GetData).
request: The protobuf request object.
use_cache: Whether to enable caching for this request. Default: True.
force_refresh: Whether to force refresh the cache. Default: False.
ttl: Optional custom TTL in seconds. If not provided, uses the default TTL.

Returns:
The response from the gRPC call.

Example:
# Enable caching
response = await self._call_with_cache(
stub.GetData,
request,
use_cache=True,
)

# Force refresh
response = await self._call_with_cache(
stub.GetData,
request,
force_refresh=True,
)

# With custom TTL
response = await self._call_with_cache(
stub.GetData,
request,
use_cache=True,
ttl=7200, # 2 hours
)

# Ignore cache
response = await self._call_with_cache(
stub.GetData,
request,
use_cache=False,
)
"""
if not use_cache:
return await stub_method(request)

if force_refresh:
metadata = with_force_refresh(ttl=ttl)
else:
metadata = with_cache(ttl=ttl)

return await stub_method(request, metadata=metadata)
37 changes: 33 additions & 4 deletions python/lib/sift_client/_internal/low_level_wrappers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ def _update_name_id_map(self, channels: list[Channel]):
)
self.channel_cache.name_id_map[channel.name] = str(channel.id_)

# TODO: Cache calls. Only read cache if end_time is more than 30 min in the past.
# Also, consider manually caching full channel data and evaluating start/end times while ignoring pagination. Do this ful caching at a higher level though to handle case where pagination fails.
async def _get_data_impl(
self,
*,
Expand All @@ -86,8 +84,27 @@ async def _get_data_impl(
page_size: int | None = None,
page_token: str | None = None,
order_by: str | None = None,
use_cache: bool = False,
force_refresh: bool = False,
cache_ttl: int | None = None,
) -> tuple[list[Any], str | None]:
"""Get the data for a channel during a run."""
"""Get the data for a channel during a run.

Args:
channel_ids: List of channel IDs to fetch data for.
run_id: Optional run ID to filter data.
start_time: Optional start time for the data range.
end_time: End time for the data range.
page_size: Number of results per page.
page_token: Token for pagination.
order_by: Field to order results by.
use_cache: Whether to enable caching for this request. Default: False.
force_refresh: Whether to force refresh the cache. Default: False.
cache_ttl: Optional custom TTL in seconds for cached responses.

Returns:
Tuple of (data list, next page token).
"""
queries = [
Query(channel=ChannelQuery(channel_id=channel_id, run_id=run_id))
for channel_id in channel_ids
Expand All @@ -102,7 +119,19 @@ async def _get_data_impl(
}

request = GetDataRequest(**request_kwargs)
response = await self._grpc_client.get_stub(DataServiceStub).GetData(request)

# Use cache helper if caching is enabled
if use_cache or force_refresh:
response = await self._call_with_cache(
self._grpc_client.get_stub(DataServiceStub).GetData,
request,
use_cache=use_cache,
force_refresh=force_refresh,
ttl=cache_ttl,
Copy link
Contributor

@ian-sift ian-sift Oct 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could these just always be passed and defer the if use_cache or force_refresh: logic to the grpc interceptor? idk if it could be tagged onto get_stub or something

)
else:
response = await self._grpc_client.get_stub(DataServiceStub).GetData(request)

response = cast("GetDataResponse", response)
return response.data, response.next_page_token # type: ignore # mypy doesn't know RepeatedCompositeFieldContainer can be treated like a list

Expand Down
10 changes: 8 additions & 2 deletions python/lib/sift_client/_internal/low_level_wrappers/ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,24 @@ class PingLowLevelClient(LowLevelClientBase, WithGrpcClient):
It handles common concerns like error handling and retries.
"""

_cache_results: bool
"""Whether to cache the results of the ping request. Used for testing."""

def __init__(self, grpc_client: GrpcClient):
"""Initialize the PingLowLevelClient.

Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client=grpc_client)
self._cache_results = False

async def ping(self) -> str:
async def ping(self, _force_refresh: bool = False) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noticing none of this plumbing is exposed at the high level resource layer -- would the expectation be that people call the low level client directly?

"""Send a ping request to the server in the current event loop."""
# get stub bound to this loop
stub = self._grpc_client.get_stub(PingServiceStub)
request = PingRequest()
response = await stub.Ping(request)
response = await self._call_with_cache(
stub.Ping, request, use_cache=self._cache_results, force_refresh=_force_refresh, ttl=1
)
return cast("PingResponse", response).response
2 changes: 2 additions & 0 deletions python/lib/sift_client/_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest

from sift_client import SiftClient, SiftConnectionConfig
from sift_client.transport import CacheConfig, CacheMode
from sift_client.util.util import AsyncAPIs


Expand All @@ -26,6 +27,7 @@ def sift_client() -> SiftClient:
grpc_url=grpc_url,
rest_url=rest_url,
use_ssl=True,
cache_config=CacheConfig(mode=CacheMode.CLEAR_ON_INIT),
)
)

Expand Down
Loading
Loading