Skip to content

Commit ac417a2

Browse files
fix(ChatCompletionStream): abort on async iterator break and handle errors (#699)
`break`-ing the async iterator did not previously abort the request which increases usage. Errors are now handled more effectively in the async iterator.
1 parent 64041fd commit ac417a2

File tree

2 files changed

+81
-7
lines changed

2 files changed

+81
-7
lines changed

src/lib/ChatCompletionRunFunctions.test.ts

+52-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import OpenAI from 'openai';
2-
import { OpenAIError } from 'openai/error';
2+
import { OpenAIError, APIConnectionError } from 'openai/error';
33
import { PassThrough } from 'stream';
44
import {
55
ParsingToolFunction,
@@ -2207,6 +2207,7 @@ describe('resource completions', () => {
22072207
await listener.sanityCheck();
22082208
});
22092209
});
2210+
22102211
describe('stream', () => {
22112212
test('successful flow', async () => {
22122213
const { fetch, handleRequest } = mockStreamingChatCompletionFetch();
@@ -2273,5 +2274,55 @@ describe('resource completions', () => {
22732274
expect(listener.finalMessage).toEqual({ role: 'assistant', content: 'The weather is great today!' });
22742275
await listener.sanityCheck();
22752276
});
2277+
test('handles network errors', async () => {
2278+
const { fetch, handleRequest } = mockFetch();
2279+
2280+
const openai = new OpenAI({ apiKey: '...', fetch });
2281+
2282+
const stream = openai.beta.chat.completions.stream(
2283+
{
2284+
max_tokens: 1024,
2285+
model: 'gpt-3.5-turbo',
2286+
messages: [{ role: 'user', content: 'Say hello there!' }],
2287+
},
2288+
{ maxRetries: 0 },
2289+
);
2290+
2291+
handleRequest(async () => {
2292+
throw new Error('mock request error');
2293+
}).catch(() => {});
2294+
2295+
async function runStream() {
2296+
await stream.done();
2297+
}
2298+
2299+
await expect(runStream).rejects.toThrow(APIConnectionError);
2300+
});
2301+
test('handles network errors on async iterator', async () => {
2302+
const { fetch, handleRequest } = mockFetch();
2303+
2304+
const openai = new OpenAI({ apiKey: '...', fetch });
2305+
2306+
const stream = openai.beta.chat.completions.stream(
2307+
{
2308+
max_tokens: 1024,
2309+
model: 'gpt-3.5-turbo',
2310+
messages: [{ role: 'user', content: 'Say hello there!' }],
2311+
},
2312+
{ maxRetries: 0 },
2313+
);
2314+
2315+
handleRequest(async () => {
2316+
throw new Error('mock request error');
2317+
}).catch(() => {});
2318+
2319+
async function runStream() {
2320+
for await (const _event of stream) {
2321+
continue;
2322+
}
2323+
}
2324+
2325+
await expect(runStream).rejects.toThrow(APIConnectionError);
2326+
});
22762327
});
22772328
});

src/lib/ChatCompletionStream.ts

+29-6
Original file line numberDiff line numberDiff line change
@@ -210,13 +210,16 @@ export class ChatCompletionStream
210210

211211
[Symbol.asyncIterator](): AsyncIterator<ChatCompletionChunk> {
212212
const pushQueue: ChatCompletionChunk[] = [];
213-
const readQueue: ((chunk: ChatCompletionChunk | undefined) => void)[] = [];
213+
const readQueue: {
214+
resolve: (chunk: ChatCompletionChunk | undefined) => void;
215+
reject: (err: unknown) => void;
216+
}[] = [];
214217
let done = false;
215218

216219
this.on('chunk', (chunk) => {
217220
const reader = readQueue.shift();
218221
if (reader) {
219-
reader(chunk);
222+
reader.resolve(chunk);
220223
} else {
221224
pushQueue.push(chunk);
222225
}
@@ -225,7 +228,23 @@ export class ChatCompletionStream
225228
this.on('end', () => {
226229
done = true;
227230
for (const reader of readQueue) {
228-
reader(undefined);
231+
reader.resolve(undefined);
232+
}
233+
readQueue.length = 0;
234+
});
235+
236+
this.on('abort', (err) => {
237+
done = true;
238+
for (const reader of readQueue) {
239+
reader.reject(err);
240+
}
241+
readQueue.length = 0;
242+
});
243+
244+
this.on('error', (err) => {
245+
done = true;
246+
for (const reader of readQueue) {
247+
reader.reject(err);
229248
}
230249
readQueue.length = 0;
231250
});
@@ -236,13 +255,17 @@ export class ChatCompletionStream
236255
if (done) {
237256
return { value: undefined, done: true };
238257
}
239-
return new Promise<ChatCompletionChunk | undefined>((resolve) => readQueue.push(resolve)).then(
240-
(chunk) => (chunk ? { value: chunk, done: false } : { value: undefined, done: true }),
241-
);
258+
return new Promise<ChatCompletionChunk | undefined>((resolve, reject) =>
259+
readQueue.push({ resolve, reject }),
260+
).then((chunk) => (chunk ? { value: chunk, done: false } : { value: undefined, done: true }));
242261
}
243262
const chunk = pushQueue.shift()!;
244263
return { value: chunk, done: false };
245264
},
265+
return: async () => {
266+
this.abort();
267+
return { value: undefined, done: true };
268+
},
246269
};
247270
}
248271

0 commit comments

Comments
 (0)