Skip to content
798 changes: 798 additions & 0 deletions EDGE_CASES_AND_RECOVERY.md

Large diffs are not rendered by default.

1,100 changes: 1,100 additions & 0 deletions FRONTEND_TRANSPORT_DESIGN.md

Large diffs are not rendered by default.

420 changes: 420 additions & 0 deletions FRONTEND_TRANSPORT_SUMMARY.md

Large diffs are not rendered by default.

533 changes: 533 additions & 0 deletions INTEGRATION_VALUE_ASSESSMENT.md

Large diffs are not rendered by default.

1,035 changes: 1,035 additions & 0 deletions PGFLOW_VERCEL_AI_SDK_INTEGRATION.md

Large diffs are not rendered by default.

440 changes: 440 additions & 0 deletions PRAGMATIC_HYBRID_ARCHITECTURE.md

Large diffs are not rendered by default.

679 changes: 679 additions & 0 deletions SPLIT_ARCHITECTURE.md

Large diffs are not rendered by default.

792 changes: 792 additions & 0 deletions UNIFIED_STREAM_ARCHITECTURE.md

Large diffs are not rendered by default.

641 changes: 641 additions & 0 deletions examples/vercel-ai-sdk-integration/COMPLETE_EXAMPLE.md

Large diffs are not rendered by default.

148 changes: 148 additions & 0 deletions examples/vercel-ai-sdk-integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Vercel AI SDK Integration with Pgflow

This example demonstrates how to use **pgflow client in the frontend** as a custom `ChatTransport` for Vercel AI SDK's `useChat` hook, with streaming support.

## Architecture

```
Frontend (React)
├─ useChat({ transport: PgflowChatTransport })
└─ PgflowClient (browser) → Supabase Realtime
Backend (Supabase Edge Functions)
└─ Pgflow Flows with streaming context
└─ ctx.stream.emitText(chunk)
```

## Features

- ✅ **Frontend pgflow client** - Direct Supabase connection, no API routes
- ✅ **Streaming responses** - LLM tokens streamed in real-time via Supabase Realtime
- ✅ **Type-safe** - Full TypeScript support end-to-end
- ✅ **Progress indicators** - Show intermediate workflow steps
- ✅ **Auto-reconnection** - Supabase handles network failures
- ✅ **RLS security** - Database policies enforce access control

## Quick Start

### 1. Install Dependencies

```bash
npm install @pgflow/client @pgflow/dsl @ai-sdk/react ai @supabase/supabase-js
```

### 2. Create Streaming Flow

See `./backend/flows/streaming-chat.ts` for a complete example.

```typescript
import { Flow } from '@pgflow/dsl';

export const ChatFlow = new Flow<{ message: string }>({ slug: 'streaming_chat' })
.step('generate', async (input, ctx) => {
// Stream LLM response
for await (const chunk of llm.stream(input.message)) {
await ctx.stream.emitText(chunk);
}
return { response: fullText };
});
```

### 3. Set Up Frontend Transport

See `./frontend/lib/pgflow-chat-transport.ts` for implementation.

```typescript
import { PgflowChatTransport } from './lib/pgflow-chat-transport';

const transport = new PgflowChatTransport(supabase, 'streaming_chat');

const { messages, sendMessage } = useChat({ transport });
```

## File Structure

```
examples/vercel-ai-sdk-integration/
├── README.md (this file)
├── backend/
│ ├── flows/
│ │ └── streaming-chat.ts # Example flow with streaming
│ ├── helpers/
│ │ ├── streaming-context.ts # Streaming context implementation
│ │ └── openai-adapter.ts # OpenAI streaming helper
│ └── types/
│ └── streaming-events.ts # TypeScript types for events
├── frontend/
│ ├── lib/
│ │ └── pgflow-chat-transport.ts # Custom ChatTransport implementation
│ ├── components/
│ │ └── chat.tsx # Example chat UI
│ └── hooks/
│ └── use-pgflow-chat.ts # React hook wrapper
└── supabase/
└── migrations/
└── 001_streaming_support.sql # Database setup
```

## How It Works

### 1. Backend: Streaming Context

Flows receive a `StreamingContext` that allows emitting incremental data:

```typescript
.step('generate', async (input, ctx) => {
// ctx.stream is the streaming context
await ctx.stream.emitText('Hello');
await ctx.stream.emitText(' world');

return { response: 'Hello world' };
})
```

Events are broadcast via Supabase Realtime to connected clients.

### 2. Frontend: PgflowChatTransport

The custom transport:
- Starts pgflow flows when messages are sent
- Subscribes to streaming events via Supabase Realtime
- Converts pgflow events → AI SDK `UIMessageChunk`s
- Handles reconnection automatically

```typescript
const transport = new PgflowChatTransport(supabase, 'streaming_chat');

// useChat automatically uses the transport
const { messages, sendMessage, status } = useChat({ transport });
```

### 3. Event Flow

```
1. User sends message
→ useChat calls transport.sendMessages()

2. Transport starts pgflow flow
→ pgflow.startFlow('streaming_chat', { message })

3. Backend flow executes
→ ctx.stream.emitText('chunk')
→ Broadcasts to Supabase channel

4. Frontend receives event
→ BroadcastStepStreamEvent
→ Mapped to UIMessageChunk { type: 'text-delta', text: 'chunk' }
→ useChat updates UI
```

## Next Steps

See the implementation files in `./backend` and `./frontend` for complete examples.

For production use, you'll need to:
1. Implement the streaming context in pgflow executor
2. Add RLS policies for security
3. Deploy flows to Supabase Edge Functions
4. Configure Supabase Realtime channels
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/**
* Example: Streaming Chat Flow for Vercel AI SDK Integration
*
* This flow demonstrates how to use pgflow with streaming context
* to provide real-time updates to a frontend using Vercel AI SDK's useChat hook.
*
* Key Features:
* - Streams LLM responses token-by-token
* - Emits intermediate progress (reasoning, search results)
* - Type-safe input/output
* - Works with PgflowChatTransport on frontend
*/

import { Flow } from '@pgflow/dsl';
import type { StreamingContext } from '../helpers/streaming-context';
import { streamOpenAIResponse } from '../helpers/streaming-context';

// Mock implementations (replace with real implementations)
import { OpenAI } from 'openai';

/**
* Flow input type
*/
interface ChatInput {
message: string;
conversationId: string;
userId: string;
history?: Array<{ role: string; content: string }>;
}

/**
* Streaming chat flow
*/
export const StreamingChatFlow = new Flow<ChatInput>({
slug: 'streaming_chat',
})

/**
* Step 1: Classify user intent
* Shows reasoning to user
*/
.step('classify_intent', async (input, ctx: { stream: StreamingContext }) => {
// Show progress
await ctx.stream.emitReasoning('Analyzing your message...');

const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});

const response = await openai.chat.completions.create({
model: 'gpt-3.5-turbo',
messages: [{
role: 'user',
content: `Classify the intent of this message in one word: "${input.message}"`
}],
temperature: 0,
});

const intent = response.choices[0]?.message?.content || 'general';

// Emit the classification result
await ctx.stream.emitData('intent', {
classification: intent,
confidence: 0.9,
});

return { intent };
})

/**
* Step 2: Retrieve relevant context
* Shows search progress to user
*/
.step('retrieve_context', async (input, ctx: { stream: StreamingContext }) => {
await ctx.stream.emitReasoning('Searching knowledge base...');

// Simulate vector search (replace with real implementation)
await new Promise(resolve => setTimeout(resolve, 500));

const mockResults = [
{ id: '1', content: 'Document 1 content...', score: 0.95 },
{ id: '2', content: 'Document 2 content...', score: 0.87 },
{ id: '3', content: 'Document 3 content...', score: 0.76 },
];

// Emit search results as they come in
await ctx.stream.emitData('search_results', {
count: mockResults.length,
sources: mockResults.map(r => ({ id: r.id, score: r.score })),
});

return {
context: mockResults.map(r => r.content).join('\n\n'),
sources: mockResults,
};
})

/**
* Step 3: Generate streaming response
* Streams LLM tokens in real-time
*/
.step('generate_response', async (input, ctx: { stream: StreamingContext }) => {
await ctx.stream.emitReasoning('Generating response...');

const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});

// Create streaming completion
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{
role: 'system',
content: `You are a helpful assistant. Use the following context to answer the user's question:

Context:
${input.context}

Answer concisely and cite sources when possible.`
},
...(input.history || []),
{
role: 'user',
content: input.message,
}
],
stream: true,
temperature: 0.7,
});

// Stream response through pgflow streaming context
// Each token is emitted to frontend immediately
const fullResponse = await streamOpenAIResponse(stream, ctx.stream);

return {
response: fullResponse,
model: 'gpt-4',
tokensUsed: fullResponse.split(' ').length * 1.3, // Rough estimate
};
})

/**
* Step 4: Format and finalize
* Add metadata, citations, etc.
*/
.step('finalize', async (input, ctx: { stream: StreamingContext }) => {
// Add citations if sources were used
const citations = input.sources.slice(0, 3).map((s: any, i: number) =>
`[${i + 1}] Source ${s.id}`
);

await ctx.stream.emitData('citations', citations);

return {
response: input.response,
citations,
metadata: {
intent: input.intent,
sourcesUsed: input.sources.length,
model: input.model,
tokensUsed: input.tokensUsed,
},
};
});

/**
* Alternative: Simpler streaming chat without context retrieval
*/
export const SimpleStreamingChatFlow = new Flow<{
message: string;
conversationId: string;
}>({ slug: 'simple_streaming_chat' })

.step('generate', async (input, ctx: { stream: StreamingContext }) => {
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});

const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: input.message }],
stream: true,
});

const response = await streamOpenAIResponse(stream, ctx.stream);

return { response };
});

/**
* Usage in Supabase Edge Function:
*
* ```typescript
* import { serve } from 'https://deno.land/[email protected]/http/server.ts';
* import { createClient } from 'https://esm.sh/@supabase/supabase-js@2';
* import { StreamingChatFlow } from './flows/streaming-chat.ts';
*
* serve(async (req) => {
* const supabase = createClient(
* Deno.env.get('SUPABASE_URL')!,
* Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
* );
*
* // Flow execution happens here
* // Streaming context is provided automatically by executor
*
* return new Response('Flow started', { status: 200 });
* });
* ```
*/
Loading