Skip to content

Commit e0a2de2

Browse files
authored
fix: make validation errors recoverable by llm (#2054)
1 parent f05905a commit e0a2de2

File tree

5 files changed

+55
-104
lines changed

5 files changed

+55
-104
lines changed

packages/xl-ai/src/streamTool/filterValidOperations.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export async function* filterValidOperations<T>(
2929
let forceNewOperation = false;
3030
for await (const chunk of operationsStream) {
3131
const operation = chunk.operation;
32+
3233
if (operation.ok) {
3334
yield {
3435
operation: operation.value,

packages/xl-ai/src/streamTool/preprocess.test.ts

Lines changed: 32 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
import { BlockNoteEditor } from "@blocknote/core";
22
import { beforeEach, describe, expect, it } from "vitest";
33
import { tools } from "../api/formats/json/tools/index.js";
4-
import {
5-
preprocessOperationsNonStreaming,
6-
preprocessOperationsStreaming,
7-
} from "./preprocess.js";
4+
import { preprocessOperationsStreaming } from "./preprocess.js";
85
import { StreamTool } from "./streamTool.js";
96

107
const addOperationValid = {
@@ -148,68 +145,37 @@ describe("preprocess", () => {
148145
});
149146
});
150147

151-
describe("preprocessOperationsNonStreaming", () => {
152-
it("should pass valid operations", async () => {
153-
async function* mockStream() {
154-
yield {
155-
partialOperation: addOperationValid,
156-
isUpdateToPreviousOperation: false,
157-
isPossiblyPartial: false,
158-
metadata: undefined,
159-
};
160-
}
161-
162-
const results = await collectStreamToArray(
163-
preprocessOperationsNonStreaming(mockStream(), streamTools),
164-
);
165-
166-
expect(results.length).toBe(1);
167-
});
168-
169-
it("should throw an error on invalid operations (invalid id)", async () => {
170-
async function* mockStream() {
171-
yield {
172-
partialOperation: addOperationInvalidId,
173-
isUpdateToPreviousOperation: false,
174-
isPossiblyPartial: false,
175-
metadata: undefined,
176-
};
177-
}
178-
179-
await expect(
180-
collectStreamToArray(
181-
preprocessOperationsNonStreaming(mockStream(), streamTools),
182-
),
183-
).rejects.toThrow();
184-
});
185-
186-
it("should throw an error on invalid operations (invalid type)", async () => {
187-
async function* mockStream() {
188-
yield {
189-
partialOperation: invalidOperationType,
190-
isUpdateToPreviousOperation: false,
191-
isPossiblyPartial: false,
192-
metadata: undefined,
193-
};
194-
}
195-
196-
await expect(
197-
collectStreamToArray(
198-
preprocessOperationsNonStreaming(mockStream(), streamTools),
199-
),
200-
).rejects.toThrow();
201-
});
202-
203-
it("should handle empty operation streams", async () => {
204-
async function* mockStream() {
205-
// Empty stream
206-
}
207-
208-
const results = await collectStreamToArray(
209-
preprocessOperationsNonStreaming(mockStream(), streamTools),
210-
);
148+
it("should throw an error on invalid operations (invalid id)", async () => {
149+
async function* mockStream() {
150+
yield {
151+
partialOperation: addOperationInvalidId,
152+
isUpdateToPreviousOperation: false,
153+
isPossiblyPartial: false,
154+
metadata: undefined,
155+
};
156+
}
157+
158+
await expect(
159+
collectStreamToArray(
160+
preprocessOperationsStreaming(mockStream(), streamTools),
161+
),
162+
).rejects.toThrow();
163+
});
211164

212-
expect(results).toHaveLength(0);
213-
});
165+
it("should throw an error on invalid operations (invalid type)", async () => {
166+
async function* mockStream() {
167+
yield {
168+
partialOperation: invalidOperationType,
169+
isUpdateToPreviousOperation: false,
170+
isPossiblyPartial: false,
171+
metadata: undefined,
172+
};
173+
}
174+
175+
await expect(
176+
collectStreamToArray(
177+
preprocessOperationsStreaming(mockStream(), streamTools),
178+
),
179+
).rejects.toThrow();
214180
});
215181
});
Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { getErrorMessage } from "@ai-sdk/provider-utils";
12
import { ChunkExecutionError } from "./ChunkExecutionError.js";
23
import { filterValidOperations } from "./filterValidOperations.js";
34
import { StreamTool, StreamToolCall } from "./streamTool.js";
@@ -36,47 +37,16 @@ export async function* preprocessOperationsStreaming<
3637
(chunk) => {
3738
if (!chunk.isPossiblyPartial) {
3839
// only throw if the operation is not possibly partial
39-
40-
throw new ChunkExecutionError("invalid operation: " + chunk.operation.error, chunk);
40+
throw new ChunkExecutionError(
41+
`Invalid operation. ${getErrorMessage(chunk.operation.error)}`,
42+
chunk,
43+
{
44+
cause: chunk.operation.error,
45+
},
46+
);
4147
}
4248
},
4349
);
4450

4551
yield* validOperationsStream;
4652
}
47-
48-
/**
49-
* Validates an stream of operations and throws an error if an invalid operation is found.
50-
*
51-
* TODO: remove
52-
*
53-
* @deprecated
54-
*/
55-
export async function* preprocessOperationsNonStreaming<
56-
T extends StreamTool<any>[],
57-
>(
58-
operationsStream: AsyncIterable<{
59-
partialOperation: any;
60-
isUpdateToPreviousOperation: boolean;
61-
isPossiblyPartial: boolean;
62-
metadata: any;
63-
}>,
64-
streamTools: T,
65-
): AsyncGenerator<PreprocessOperationResult<T>> {
66-
// from partial operations to valid / invalid operations
67-
const validatedOperationsStream = toValidatedOperations(
68-
operationsStream,
69-
streamTools,
70-
);
71-
72-
// filter valid operations, invalid operations should throw an error
73-
const validOperationsStream = filterValidOperations(
74-
validatedOperationsStream,
75-
(chunk) => {
76-
throw new Error("invalid operation: " + chunk.operation.error);
77-
},
78-
);
79-
80-
// yield results
81-
yield* validOperationsStream;
82-
}

packages/xl-ai/src/streamTool/toValidatedOperations.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,18 @@ export async function* toValidatedOperations<T extends StreamTool<any>[]>(
2222
metadata: any;
2323
}> {
2424
for await (const chunk of partialObjectStream) {
25+
if (!chunk.partialOperation.type) {
26+
yield {
27+
operation: {
28+
ok: false,
29+
error: "The `type` property of an operation is required.",
30+
},
31+
isUpdateToPreviousOperation: chunk.isUpdateToPreviousOperation,
32+
isPossiblyPartial: chunk.isPossiblyPartial,
33+
metadata: chunk.metadata,
34+
};
35+
continue;
36+
}
2537
const func = streamTools.find(
2638
(f) => f.name === chunk.partialOperation.type,
2739
)!;

packages/xl-ai/src/streamTool/vercelAiSdk/util/chatHandlers.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ export async function setupToolCallStreaming(
129129

130130
if (result.status === "rejected") {
131131
if (result.reason instanceof ChunkExecutionError) {
132+
// all errors thrown in the pipeline should be ChunkExecutionErrors,
133+
// so we can retrieve the chunk that caused the error
132134
error = result.reason;
133135
} else {
134136
if (!chat.error) {

0 commit comments

Comments
 (0)