5 minute read

Streaming JSON & text in one request

Learn how to stream both JSON and text to a client in a single request using a ReadableStream.

Introduction

When building applications with large language models (LLMs), you typically want to stream results back to the user to provide a better experience. This is often done with a simple setup:

async function POST(req: Request) {
  const { question } = await req.json();

  const stream = await openai.chat.completions.create({
    stream: true, 
    messages: [
      {
        role: "user",
        content: question,
      },
    ],
    // ...
  });

  return new Response(stream);
}

Building RAG (Retrieval-Augmented Generation)

While this setup is useful, it can be enhanced by including additional data like sources and citations. This approach is known as Retrieval-Augmented Generation (RAG). RAG works by:

  1. Retrieving data from a source (e.g., a database, web service, or file).
  2. Augmenting the LLM response with the retrieved data.
  3. Generating a new response based on the augmented data.

Implementing retrieval

Let’s start by implementing a search function that returns a list of search results based on the user’s question.

async function POST(req: Request) {
  const { question } = await req.json();
  const searchResults = await getSearchResults(question);
  // ...
}

The internal workings of getSearchResults are complex, so let’s save that for another day. Now, the question is, how do we stream back both the generated answer and the search results?

Merging streams

First, we need to understand how the Response class handles streams. According to the MDN article on Response.body, it accepts a ReadableStream.

const encoder = new TextEncoder();
const stream = new ReadableStream({
  async start(controller) {
    // TODO: add results + answer to stream
    controller.enqueue(encoder.encode());
    controller.close();
  },
});

return new Response(stream);

Since the response expects a byte stream, we must use the TextEncoder to encode the payload (and the TextDecoder to read this data).

Streaming JSON

To begin, let’s stream the search results back to the user first.

async function POST(req: Request) {
  const { question } = await req.json();

  const searchResults = await getSearchResults(question);

  const encoder = new TextEncoder();
  const encodedSearchResults = encoder.encode(JSON.stringify(searchResults));

  const resultStream = new ReadableStream({
    async start(controller) {
      controller.enqueue(encodedSearchResults);
      controller.close();
    },
  });
  // ...
  return new Response(resultStream);
}

Now we have the search results in the stream. Great! But including the LLM stream is more complex.

Since we have both JSON and plain text, we need to unify them into a single format for easy parsing on the client. Here’s the type we’ll use:

type StreamChunk =
  | {
      type: "search_results";
      results: Array<SearchResultItem>;
    }
  | {
      type: "answer_text";
      text: string;
    };

We expect the endpoint to stream JSON chunks that can be parsed into the StreamChunk type.

Enter generators

Fortunately, generators are an effective solution for this. These are functions that can yield values as they produce data, making them suitable for our problem.

function* generateAnswer(question: string): AsyncGenerator<StreamChunk> {
  const searchResults = await getSearchResults(question);
  yield { type: "search_results", results: searchResults };

  const llmStream = await openai.chat.completions.create(...);
  for await (const chunk of transformLlmStream(llmStream)) {
    yield { type: "answer_text", text: chunk };
  }
}

Handling the LLM response

We need to process the LLM response, which is also a stream. We’ll extract the text value by decoding and parsing the JSON.

async function* transformLlmStream(
  stream: ReadableStream
): AsyncGenerator<string> {
  const reader = stream.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();

    if (done) break;

    const decoded = decoder.decode(value);
    const json = JSON.parse(decoded);

    if (json.type === "content_block_delta" && json.delta.text) {
      yield json.delta.text;
    }
  }
}

Integrating the transformed stream

Now, we integrate this into our answer generator.

function* generateAnswer(question: string): AsyncGenerator<StreamChunk> {
  const searchResults = await getSearchResults(question);
  yield { type: "search_results", results: searchResults };

  const llmStream = await openai.chat.completions.create(...);
  for await (const chunk of transformLlmStream(llmStream)) {
    yield { type: "answer_text", text: chunk };
  }
}

Streaming generated values

We can now create a stream from a generator. Here’s how:

export function createStream(generator: AsyncGenerator<StreamChunk>) {
  const encoder = new TextEncoder();
  return new ReadableStream({
    async start(controller) {
      for await (const chunk of generator) {
        const data = encoder.encode(JSON.stringify(chunk));
        controller.enqueue(data);
      }
      controller.close();
    },
  });
}

Stitching everything together

Combining all the pieces together, we get this:

async function POST(req: Request) {
  const { question } = await req.json();
  const generator = generateAnswer(question);
  const stream = createStream(generator);
  return new Response(stream);
}

And voila, that’s it!

Further reading


More posts