Realtime Streams v2

Introducing Realtime Streams v2: Unlimited, Reliable, and Type-Safe

Eric Allam

Eric Allam

CTO, Trigger.dev

Image for Realtime Streams v2

If you've ever tried to stream AI completions or real-time data from a long-running task to your frontend, you know the pain: connection drops, arbitrary limits, and wrestling with type safety across your entire stack. Today, we're solving all of that with Realtime Streams v2.

What's new

Streams v2 is a complete overhaul of how you stream data from Trigger.dev tasks, powered by S2. Here's what changed:

FeatureStreams v1Streams v2
Maximum stream length2,000 chunksUnlimited
Active streams per run5Unlimited
Maximum streams per run10Unlimited
Stream retention1 day28 days
Maximum stream size10MB300 MiB
Multiple sources per stream
Automatic resumption

But the numbers only tell part of the story. Let's talk about what this means for building real applications.

The reliability problem

Streaming long-running AI outputs is inherently fragile. Network hiccups happen. Connections drop. In v1, these issues meant lost data and broken streams. Your users would see partial responses, or worse, nothing at all.

Streams v2 fixes this at the protocol level. Both sides of the stream (writing from your task and reading from your frontend) now automatically resume from the last successful chunk. Lost your connection for 10 seconds during a 5-minute Claude reasoning task? No problem. The stream picks up right where it left off.

This isn't a retry mechanism you have to implement. It's built into the core.

A new developer experience

The biggest improvement isn't in the infrastructure, it's in how you use it. We've completely rethought the API around a simple idea: define once, use everywhere with full type safety.

Define your streams

Instead of scattering stream keys across your codebase, define them once:


// app/streams.ts
import { streams, InferStreamType } from "@trigger.dev/sdk";
import { UIMessageChunk } from "ai";
export const aiStream = streams.define<UIMessageChunk>({
id: "ai",
});
export const progressStream = streams.define<{ step: string; percent: number }>(
{
id: "progress",
}
);
// Export types for your frontend
export type AIStreamPart = InferStreamType<typeof aiStream>;
export type ProgressStreamPart = InferStreamType<typeof progressStream>;

That's it. Now you have fully-typed, reusable stream definitions that work across your entire application.

Pipe from your tasks

The new streams.pipe() API replaces metadata.stream() with a cleaner, more flexible interface:


import { task } from "@trigger.dev/sdk";
import { openai } from "@ai-sdk/openai";
import { streamText } from "ai";
import { aiStream, progressStream } from "@/app/streams";
export const generateContent = task({
id: "generate-content",
run: async (payload: { prompt: string }) => {
// Update progress
await progressStream.append({ step: "Starting AI generation", percent: 0 });
const result = streamText({
model: openai("gpt-4o"),
prompt: payload.prompt,
});
// Pipe the AI stream - returns immediately, doesn't block
const { stream, waitUntilComplete } = aiStream.pipe(
result.toUIMessageStream()
);
// You can iterate locally if needed
for await (const chunk of stream) {
console.log("Chunk generated:", chunk);
}
// Or just wait for completion
await waitUntilComplete();
await progressStream.append({ step: "Complete", percent: 100 });
return { success: true };
},
});

Notice how we're using the same defined streams? That means:

  • Type safety - TypeScript knows exactly what type each chunk is
  • No magic strings - Stream IDs are centralized
  • Easy refactoring - Change the ID once, it updates everywhere

Read from anywhere

The real power shows when you consume streams. The same defined stream works in your backend, your frontend, anywhere:


// Backend
import { aiStream } from "@/app/streams";
async function processInBackend(runId: string) {
const stream = await aiStream.read(runId);
for await (const chunk of stream) {
console.log("Received:", chunk); // Fully typed!
}
}

Advanced options

Streams v2 gives you fine-grained control when you need it:


const stream = await aiStream.read(runId, {
timeoutInSeconds: 300, // Wait up to 5 minutes
startIndex: lastChunk + 1, // Resume from a specific chunk
signal: controller.signal, // Cancel with AbortController
});

The startIndex option is particularly powerful. It lets you implement robust retry logic that resumes exactly where you left off, rather than re-fetching the entire stream.

React integration: useRealtimeStream

For frontend developers, we've built a new hook that's as simple as it gets:


"use client";
import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { aiStream } from "@/app/streams";
export function StreamViewer({ runId, accessToken }) {
const { parts, error } = useRealtimeStream(aiStream, runId, {
accessToken,
timeoutInSeconds: 600,
throttleInMs: 50, // Control update frequency
});
if (error) return <div>Error: {error.message}</div>;
if (!parts) return <div>Loading...</div>;
return (
<div>
{parts.map((part, i) => (
<span key={i}>{part}</span>
))}
</div>
);
}

Pass in your defined stream, and you get:

  • Full type safety - parts is typed as your stream's chunk type
  • Automatic reconnection - Built into the protocol
  • Throttled updates - Don't re-render on every single chunk
  • Error handling - Properly typed errors

Compare this to the old useRealtimeRunWithStreams hook, which required manual type annotations and returned all streams in a single object. The new hook is focused, type-safe, and declarative.

Dashboard visibility

Streams are now first-class citizens in the Trigger.dev dashboard. You can:

  • Watch streams in real-time as your tasks generate them
  • Inspect historical streams for completed runs
  • Debug streaming issues with full chunk visibility

This makes debugging production issues dramatically easier. See exactly what your task streamed, when, and in what order.

Default streams

For simple cases where you only need one stream per run, you can skip the stream key entirely:


// In your task
const { waitUntilComplete } = streams.pipe(myStream);
// In your frontend
const stream = await streams.read(runId); // Uses the default stream

This reduces boilerplate while keeping the flexibility to add named streams later.

Building resumable streams

One of the most powerful features of Streams v2 is the ability to build truly resilient stream consumption with the startIndex option. This lets you resume reading from exactly where you left off, making it trivial to implement retry logic that doesn't re-fetch data:


import { aiStream } from "@/app/streams";
async function consumeStreamWithResumption(runId: string) {
let lastChunkIndex = 0;
const allChunks: string[] = [];
let attempts = 0;
const maxRetries = 3;
while (attempts < maxRetries) {
try {
// Resume from where we left off
const stream = await aiStream.read(runId, {
startIndex: lastChunkIndex,
timeoutInSeconds: 120,
});
for await (const chunk of stream) {
allChunks.push(chunk);
lastChunkIndex++;
}
// Success! Break out of retry loop
break;
} catch (error) {
attempts++;
if (attempts >= maxRetries) {
throw new Error(`Failed after ${maxRetries} attempts`);
}
console.log(`Retry attempt ${attempts} from chunk ${lastChunkIndex}`);
// Exponential backoff
await new Promise((resolve) =>
setTimeout(resolve, 1000 * Math.pow(2, attempts))
);
}
}
return allChunks;
}

This pattern is incredibly useful for:

  • Long-running AI completions - 10-minute Claude responses that might hit network issues
  • Server-side processing - Background jobs that consume streams without blocking
  • Batch operations - Processing thousands of streamed items with guaranteed delivery

The key insight: the stream is stored durably for 28 days. You can read it whenever you want, from wherever you want, and pick up exactly where you left off. No data loss, no duplicate processing.

Getting started

Start using streams v2 today by upgrading your packages to 4.1.0 or later:


npm install @trigger.dev/sdk@latest @trigger.dev/react-hooks@latest

Then follow our comprehensive guide to start defining and using streams.

Migrating from v1

If you're using metadata.stream(), migration is straightforward:

Before (v1):


import { metadata, task } from "@trigger.dev/sdk";
export const myTask = task({
id: "my-task",
run: async (payload) => {
const stream = getStream();
await metadata.stream("my-stream", stream);
},
});

After (v2):


import { streams, task } from "@trigger.dev/sdk";
// 1. Define your stream
export const myStream = streams.define<string>({
id: "my-stream",
});
// 2. Use it in your task
export const myTask = task({
id: "my-task",
run: async (payload) => {
const stream = getStream();
const { waitUntilComplete } = myStream.pipe(stream);
await waitUntilComplete();
},
});

The old API will continue to work, but we recommend migrating to get the benefits of unlimited streams, automatic resumption, and better type safety.


Ready to try it? Check out our complete Realtime Streams v2 documentation or jump straight into our example projects.

Questions? Join us in our Discord community or reach out on Twitter/X.

Ready to start building?

Build and deploy your first task in 3 minutes.

Get started now