Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,99 @@ data (like incoming email addresses), check carefully before relaying informatio
side to the trusted side (for instance: instruct the untrusted side to provide responses in a
JSON message you can parse in your code before handing off).

## Streaming

ContextWindow supports streaming responses for real-time token display. Streaming allows you to
receive and display tokens as they arrive from the LLM provider, providing a better user experience
with faster perceived response times.

### Basic Streaming

The simplest streaming usage involves providing a callback function that receives chunks as they arrive:

```go
callback := func(chunk contextwindow.StreamChunk) error {
if !chunk.Done {
fmt.Print(chunk.Delta)
}
return nil
}

response, err := cw.CallModelStreaming(ctx, callback)
if err != nil {
log.Fatalf("Failed to call model: %v", err)
}
```

The callback receives `StreamChunk` objects containing:
- `Delta`: The incremental text/token content for this chunk
- `Done`: Whether the stream has completed
- `Metadata`: Provider-specific metadata (optional)
- `Error`: Any streaming error that occurred

### Callback Error Handling

If your callback returns a non-nil error, streaming will be stopped immediately. This allows you to
implement early cancellation:

```go
callback := func(chunk contextwindow.StreamChunk) error {
if chunk.Error != nil {
return chunk.Error
}
if !chunk.Done {
fmt.Print(chunk.Delta)
}
// Return error to cancel stream early if needed
return nil
}
```

### Fallback Behavior

If your model doesn't support streaming (doesn't implement `StreamingCapable` or `StreamingOptsCapable`),
`CallModelStreaming` automatically falls back to the buffered `CallModel` method. This ensures backward
compatibility - your code will work with both streaming and non-streaming models.

### Streaming with Options

You can use `CallModelStreamingWithOpts` to disable tools or pass other options:

```go
opts := contextwindow.CallModelOpts{
DisableTools: true,
}

response, err := cw.CallModelStreamingWithOpts(ctx, opts, callback)
```

### Tool Calls During Streaming

Tool calls work seamlessly with streaming. When a tool is called during streaming, the tool execution
happens after the tool call is complete, and the tool result is streamed back to the callback. The
complete response (including tool results) is persisted after the stream finishes, just like with
non-streaming calls.

### Performance Characteristics

- **Latency**: Streaming provides faster time-to-first-token compared to buffered responses
- **Memory**: Streaming uses similar memory overhead as non-streaming (complete response is still accumulated)
- **Persistence**: The complete response is persisted after streaming completes, maintaining the same
database consistency as non-streaming calls

### Provider-Specific Behaviors

Different providers handle streaming differently:

- **OpenAI**: Streams token deltas directly
- **Claude**: Uses event-based streaming (message_start, content_block_delta, etc.)
- **Gemini**: Streams content parts incrementally

The `StreamChunk.Metadata` field may contain provider-specific information. The library abstracts
these differences, so your callback code works the same across all providers.

See `_examples/streaming/main.go` for complete examples including progress tracking and tool calls.

## Context Management and Stats

For building table views or managing multiple conversations, you can efficiently get statistics
Expand Down Expand Up @@ -125,6 +218,21 @@ for _, context := range contexts {
The `GetContextStats` method uses a single aggregation query with existing indexes,
making it efficient even with many contexts and large conversation histories.

## Testing

This project uses Go build tags to separate unit tests from integration tests:

- **Unit tests**: Run with `go test` (default). These tests don't require API keys or make external API calls.
- **Integration tests**: Run with `go test -tags=integration`. These tests require API keys and make real API calls to LLM providers.

Integration tests are behind the `integration` build tag because they:
- Require API keys (OPENAI_API_KEY, ANTHROPIC_API_KEY, GOOGLE_GENAI_API_KEY, etc.)
- Make real API calls that consume quota
- May be rate-limited or fail due to network issues
- Are slower and more expensive to run

For IDE support (VS Code, GoLand, etc.), configure your editor to use `-tags=integration` build flags so that integration test files are recognized and can be run individually.

## Maturity

This is alpha-quality code. Happy to get feedback or PRs or whatever. Publishing this more
Expand Down
258 changes: 258 additions & 0 deletions _examples/streaming/gemini/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"

"github.com/superfly/contextwindow"
)

func main() {
ctx := context.Background()

// Example 1: Basic streaming
fmt.Println("=== Example 1: Basic Streaming ===")
basicStreamingExample(ctx)
fmt.Println()

// Example 2: Advanced streaming with progress tracking
fmt.Println("=== Example 2: Advanced Streaming with Progress Tracking ===")
advancedStreamingExample(ctx)
fmt.Println()

// Example 3: Streaming with tool calls
fmt.Println("=== Example 3: Streaming with Tool Calls ===")
toolCallStreamingExample(ctx)
}

// basicStreamingExample demonstrates the simplest streaming usage.
func basicStreamingExample(ctx context.Context) {
// reads GOOGLE_GENAI_API_KEY or GEMINI_API_KEY
model, err := contextwindow.NewGeminiModel(contextwindow.ModelGemini20Flash)
if err != nil {
log.Fatalf("Failed to create model: %v", err)
}

db, err := contextwindow.NewContextDB(":memory:")
if err != nil {
log.Fatalf("Failed to create database: %v", err)
}
defer db.Close()

cw, err := contextwindow.NewContextWindow(db, model, "")
if err != nil {
log.Fatalf("Failed to create context window: %v", err)
}
defer cw.Close()

if err := cw.AddPrompt("Write a short haiku about programming."); err != nil {
log.Fatalf("Failed to add prompt: %v", err)
}

// Basic streaming callback: print deltas as they arrive
callback := func(chunk contextwindow.StreamChunk) error {
if !chunk.Done {
fmt.Print(chunk.Delta)
}
return nil
}

response, err := cw.CallModelStreaming(ctx, callback)
if err != nil {
log.Fatalf("Failed to call model: %v", err)
}

fmt.Printf("\n\n[Complete response: %s]\n", response)
}

// advancedStreamingExample demonstrates streaming with progress tracking and metadata.
func advancedStreamingExample(ctx context.Context) {
// reads GOOGLE_GENAI_API_KEY or GEMINI_API_KEY
model, err := contextwindow.NewGeminiModel(contextwindow.ModelGemini20Flash)
if err != nil {
log.Fatalf("Failed to create model: %v", err)
}

db, err := contextwindow.NewContextDB(":memory:")
if err != nil {
log.Fatalf("Failed to create database: %v", err)
}
defer db.Close()

cw, err := contextwindow.NewContextWindow(db, model, "")
if err != nil {
log.Fatalf("Failed to create context window: %v", err)
}
defer cw.Close()

if err := cw.AddPrompt("Explain quantum computing in simple terms, in about 100 words."); err != nil {
log.Fatalf("Failed to add prompt: %v", err)
}

var (
chunkCount int
startTime = time.Now()
lastUpdate = time.Now()
accumulated strings.Builder
)

// Advanced callback with progress tracking
callback := func(chunk contextwindow.StreamChunk) error {
chunkCount++

// Accumulate text
if chunk.Delta != "" {
accumulated.WriteString(chunk.Delta)
fmt.Print(chunk.Delta)
}

// Check for errors
if chunk.Error != nil {
return fmt.Errorf("streaming error: %w", chunk.Error)
}

// Update progress every 500ms
now := time.Now()
if now.Sub(lastUpdate) > 500*time.Millisecond {
elapsed := now.Sub(startTime)
text := accumulated.String()
words := len(strings.Fields(text))
fmt.Fprintf(os.Stderr, "\n[Progress: %d chunks, %d words, %.2fs elapsed]\n", chunkCount, words, elapsed.Seconds())
lastUpdate = now
}

// Check metadata if available (example of accessing provider-specific metadata)
if chunk.Metadata != nil {
if tokens, ok := chunk.Metadata["tokens"].(int); ok {
_ = tokens // tokens available in metadata if provided by model
}
}

// Stream is complete
if chunk.Done {
elapsed := time.Since(startTime)
fmt.Fprintf(os.Stderr,
"\n[Stream complete: %d chunks, %d words, %.2fs total]\n",
chunkCount, len(strings.Fields(accumulated.String())), elapsed.Seconds())
}

return nil
}

response, err := cw.CallModelStreaming(ctx, callback)
if err != nil {
log.Fatalf("Failed to call model: %v", err)
}

fmt.Printf("\n\n[Final response length: %d characters]\n", len(response))
}

// toolCallStreamingExample demonstrates streaming with tool calls.
func toolCallStreamingExample(ctx context.Context) {
// reads GOOGLE_GENAI_API_KEY or GEMINI_API_KEY
model, err := contextwindow.NewGeminiModel(contextwindow.ModelGemini20Flash)
if err != nil {
log.Fatalf("Failed to create model: %v", err)
}

db, err := contextwindow.NewContextDB(":memory:")
if err != nil {
log.Fatalf("Failed to create database: %v", err)
}
defer db.Close()

cw, err := contextwindow.NewContextWindow(db, model, "")
if err != nil {
log.Fatalf("Failed to create context window: %v", err)
}
defer cw.Close()

// Add a tool for getting the current time
cw.AddTool(
contextwindow.NewTool("get_current_time", "Gets the current time in a specified timezone").
AddStringParameter("timezone", "The timezone (e.g., 'America/New_York', 'UTC')", false),
contextwindow.ToolRunnerFunc(func(ctx context.Context, args json.RawMessage) (string, error) {
var params struct {
Timezone string `json:"timezone"`
}
if err := json.Unmarshal(args, &params); err != nil {
return "", err
}

loc := time.UTC
if params.Timezone != "" {
var err error
loc, err = time.LoadLocation(params.Timezone)
if err != nil {
return "", fmt.Errorf("invalid timezone: %w", err)
}
}

return time.Now().In(loc).Format(time.RFC3339), nil
}),
)

// Add middleware to track tool calls
cw.AddMiddleware(&toolCallMiddleware{})

if err := cw.AddPrompt("What time is it in New York? Use the get_current_time tool."); err != nil {
log.Fatalf("Failed to add prompt: %v", err)
}

var accumulated strings.Builder

// Streaming callback that handles tool calls
callback := func(chunk contextwindow.StreamChunk) error {
// Check if this chunk is related to a tool call
if chunk.Metadata != nil {
if toolName, ok := chunk.Metadata["tool_call"].(string); ok {
fmt.Fprintf(os.Stderr, "\n[Tool call detected: %s]\n", toolName)
}
}

// Print deltas
if chunk.Delta != "" {
accumulated.WriteString(chunk.Delta)
fmt.Print(chunk.Delta)
}

// Handle errors
if chunk.Error != nil {
return fmt.Errorf("streaming error: %w", chunk.Error)
}

// Stream complete
if chunk.Done {
fmt.Fprintf(os.Stderr, "\n[Stream complete]\n")
}

return nil
}

response, err := cw.CallModelStreaming(ctx, callback)
if err != nil {
log.Fatalf("Failed to call model: %v", err)
}

fmt.Printf("\n\n[Final response: %s]\n", response)
}

// toolCallMiddleware implements Middleware to track tool calls during streaming.
type toolCallMiddleware struct{}

func (m *toolCallMiddleware) OnToolCall(ctx context.Context, name, args string) {
fmt.Fprintf(os.Stderr, "[Middleware] Tool called: %s with args: %s\n", name, args)
}

func (m *toolCallMiddleware) OnToolResult(ctx context.Context, name, result string, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "[Middleware] Tool %s returned error: %v\n", name, err)
} else {
fmt.Fprintf(os.Stderr, "[Middleware] Tool %s returned: %s\n", name, result)
}
}
Loading