File size: 3,402 Bytes
0608db0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
import { EventSourceParserStream } from 'eventsource-parser/stream';
import type { ParsedEvent } from 'eventsource-parser';
type TextStreamUpdate = {
done: boolean;
value: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
citations?: any;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
error?: any;
usage?: ResponseUsage;
};
type ResponseUsage = {
/** Including images and tools if any */
prompt_tokens: number;
/** The tokens generated */
completion_tokens: number;
/** Sum of the above two fields */
total_tokens: number;
/** Any other fields that aren't part of the base OpenAI spec */
[other: string]: unknown;
};
// createOpenAITextStream takes a responseBody with a SSE response,
// and returns an async generator that emits delta updates with large deltas chunked into random sized chunks
export async function createOpenAITextStream(
responseBody: ReadableStream<Uint8Array>,
splitLargeDeltas: boolean
): Promise<AsyncGenerator<TextStreamUpdate>> {
const eventStream = responseBody
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.getReader();
let iterator = openAIStreamToIterator(eventStream);
if (splitLargeDeltas) {
iterator = streamLargeDeltasAsRandomChunks(iterator);
}
return iterator;
}
async function* openAIStreamToIterator(
reader: ReadableStreamDefaultReader<ParsedEvent>
): AsyncGenerator<TextStreamUpdate> {
while (true) {
const { value, done } = await reader.read();
if (done) {
yield { done: true, value: '' };
break;
}
if (!value) {
continue;
}
const data = value.data;
if (data.startsWith('[DONE]')) {
yield { done: true, value: '' };
break;
}
try {
const parsedData = JSON.parse(data);
console.log(parsedData);
if (parsedData.error) {
yield { done: true, value: '', error: parsedData.error };
break;
}
if (parsedData.citations) {
yield { done: false, value: '', citations: parsedData.citations };
continue;
}
yield {
done: false,
value: parsedData.choices?.[0]?.delta?.content ?? '',
usage: parsedData.usage
};
} catch (e) {
console.error('Error extracting delta from SSE event:', e);
}
}
}
// streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters
// This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once
async function* streamLargeDeltasAsRandomChunks(
iterator: AsyncGenerator<TextStreamUpdate>
): AsyncGenerator<TextStreamUpdate> {
for await (const textStreamUpdate of iterator) {
if (textStreamUpdate.done) {
yield textStreamUpdate;
return;
}
if (textStreamUpdate.citations) {
yield textStreamUpdate;
continue;
}
let content = textStreamUpdate.value;
if (content.length < 5) {
yield { done: false, value: content };
continue;
}
while (content != '') {
const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length);
const chunk = content.slice(0, chunkSize);
yield { done: false, value: chunk };
// Do not sleep if the tab is hidden
// Timers are throttled to 1s in hidden tabs
if (document?.visibilityState !== 'hidden') {
await sleep(5);
}
content = content.slice(chunkSize);
}
}
}
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|