Streaming JSON & text in one request
Learn how to stream JSON and text in one request using ReadableStream which will enhance your app's UX
Introduction
When building applications with large language models (LLMs), you typically want to stream results back to the user to provide a better experience. This is often done with a simple setup:
async function POST(req: Request) {
const { question } = await req.json();
const stream = await openai.chat.completions.create({
stream: true,
messages: [
{
role: "user",
content: question,
},
],
// ...
});
return new Response(stream);
}
Building RAG (Retrieval-Augmented Generation)
While this setup is useful, it can be enhanced by including additional data like sources and citations. This approach is known as Retrieval-Augmented Generation (RAG). RAG works by:
- Retrieving data from a source (e.g., a database, web service, or file).
- Augmenting the LLM response with the retrieved data.
- Generating a new response based on the augmented data.
Implementing retrieval
Let’s start by implementing a search function that returns a list of search results based on the user’s question.
async function POST(req: Request) {
const { question } = await req.json();
const searchResults = await getSearchResults(question);
// ...
}
The internal workings of getSearchResults
are complex, so let’s save that for another day. Now, the question is, how do we stream back both the generated answer and the search results?
Merging streams
First, we need to understand how the Response
class handles streams. According to the MDN article on Response.body, it accepts a ReadableStream
.
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
// TODO: add results + answer to stream
controller.enqueue(encoder.encode());
controller.close();
},
});
return new Response(stream);
Since the response expects a byte stream, we must use the TextEncoder
to encode the payload (and the TextDecoder
to read this data).
Streaming JSON
To begin, let’s stream the search results back to the user first.
async function POST(req: Request) {
const { question } = await req.json();
const searchResults = await getSearchResults(question);
const encoder = new TextEncoder();
const encodedSearchResults = encoder.encode(JSON.stringify(searchResults));
const resultStream = new ReadableStream({
async start(controller) {
controller.enqueue(encodedSearchResults);
controller.close();
},
});
// ...
return new Response(resultStream);
}
Now we have the search results in the stream. Great! But including the LLM stream is more complex.
Since we have both JSON and plain text, we need to unify them into a single format for easy parsing on the client. Here’s the type we’ll use:
type StreamChunk =
| {
type: "search_results";
results: Array<SearchResultItem>;
}
| {
type: "answer_text";
text: string;
};
We expect the endpoint to stream JSON chunks that can be parsed into the StreamChunk
type.
Enter generators
Fortunately, generators are an effective solution for this. These are functions that can yield values as they produce data, making them suitable for our problem.
function* generateAnswer(question: string): AsyncGenerator<StreamChunk> {
const searchResults = await getSearchResults(question);
yield { type: "search_results", results: searchResults };
const llmStream = await openai.chat.completions.create(...);
for await (const chunk of transformLlmStream(llmStream)) {
yield { type: "answer_text", text: chunk };
}
}
Handling the LLM response
We need to process the LLM response, which is also a stream. We’ll extract the text value by decoding and parsing the JSON.
async function* transformLlmStream(
stream: ReadableStream
): AsyncGenerator<string> {
const reader = stream.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const decoded = decoder.decode(value);
const json = JSON.parse(decoded);
if (json.type === "content_block_delta" && json.delta.text) {
yield json.delta.text;
}
}
}
Integrating the transformed stream
Now, we integrate this into our answer generator.
function* generateAnswer(question: string): AsyncGenerator<StreamChunk> {
const searchResults = await getSearchResults(question);
yield { type: "search_results", results: searchResults };
const llmStream = await openai.chat.completions.create(...);
for await (const chunk of transformLlmStream(llmStream)) {
yield { type: "answer_text", text: chunk };
}
}
Streaming generated values
We can now create a stream from a generator. Here’s how:
export function createStream(generator: AsyncGenerator<StreamChunk>) {
const encoder = new TextEncoder();
return new ReadableStream({
async start(controller) {
for await (const chunk of generator) {
const data = encoder.encode(JSON.stringify(chunk));
controller.enqueue(data);
}
controller.close();
},
});
}
Stitching everything together
Combining all the pieces together, we get this:
async function POST(req: Request) {
const { question } = await req.json();
const generator = generateAnswer(question);
const stream = createStream(generator);
return new Response(stream);
}
And voila, that’s it!