|
import { EventSourceParserStream } from 'eventsource-parser/stream'; |
|
import type { ParsedEvent } from 'eventsource-parser'; |
|
|
|
type TextStreamUpdate = { |
|
done: boolean; |
|
value: string; |
|
|
|
citations?: any; |
|
|
|
error?: any; |
|
usage?: ResponseUsage; |
|
}; |
|
|
|
type ResponseUsage = { |
|
|
|
prompt_tokens: number; |
|
|
|
completion_tokens: number; |
|
|
|
total_tokens: number; |
|
|
|
[other: string]: unknown; |
|
}; |
|
|
|
|
|
|
|
export async function createOpenAITextStream( |
|
responseBody: ReadableStream<Uint8Array>, |
|
splitLargeDeltas: boolean |
|
): Promise<AsyncGenerator<TextStreamUpdate>> { |
|
const eventStream = responseBody |
|
.pipeThrough(new TextDecoderStream()) |
|
.pipeThrough(new EventSourceParserStream()) |
|
.getReader(); |
|
let iterator = openAIStreamToIterator(eventStream); |
|
if (splitLargeDeltas) { |
|
iterator = streamLargeDeltasAsRandomChunks(iterator); |
|
} |
|
return iterator; |
|
} |
|
|
|
async function* openAIStreamToIterator( |
|
reader: ReadableStreamDefaultReader<ParsedEvent> |
|
): AsyncGenerator<TextStreamUpdate> { |
|
while (true) { |
|
const { value, done } = await reader.read(); |
|
if (done) { |
|
yield { done: true, value: '' }; |
|
break; |
|
} |
|
if (!value) { |
|
continue; |
|
} |
|
const data = value.data; |
|
if (data.startsWith('[DONE]')) { |
|
yield { done: true, value: '' }; |
|
break; |
|
} |
|
|
|
try { |
|
const parsedData = JSON.parse(data); |
|
console.log(parsedData); |
|
|
|
if (parsedData.error) { |
|
yield { done: true, value: '', error: parsedData.error }; |
|
break; |
|
} |
|
|
|
if (parsedData.citations) { |
|
yield { done: false, value: '', citations: parsedData.citations }; |
|
continue; |
|
} |
|
|
|
yield { |
|
done: false, |
|
value: parsedData.choices?.[0]?.delta?.content ?? '', |
|
usage: parsedData.usage |
|
}; |
|
} catch (e) { |
|
console.error('Error extracting delta from SSE event:', e); |
|
} |
|
} |
|
} |
|
|
|
|
|
|
|
async function* streamLargeDeltasAsRandomChunks( |
|
iterator: AsyncGenerator<TextStreamUpdate> |
|
): AsyncGenerator<TextStreamUpdate> { |
|
for await (const textStreamUpdate of iterator) { |
|
if (textStreamUpdate.done) { |
|
yield textStreamUpdate; |
|
return; |
|
} |
|
if (textStreamUpdate.citations) { |
|
yield textStreamUpdate; |
|
continue; |
|
} |
|
let content = textStreamUpdate.value; |
|
if (content.length < 5) { |
|
yield { done: false, value: content }; |
|
continue; |
|
} |
|
while (content != '') { |
|
const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length); |
|
const chunk = content.slice(0, chunkSize); |
|
yield { done: false, value: chunk }; |
|
|
|
|
|
if (document?.visibilityState !== 'hidden') { |
|
await sleep(5); |
|
} |
|
content = content.slice(chunkSize); |
|
} |
|
} |
|
} |
|
|
|
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); |
|
|