Realtime Streams allow you to pipe streaming data from your Trigger.dev tasks to your frontend or backend applications in real-time. This is perfect for use cases like streaming AI completions, progress updates, or any continuous data flow.
Streams v2 requires SDK version 4.1.0 or later. Make sure to upgrade your @trigger.dev/sdk
and @trigger.dev/react-hooks packages to use these features. If you’re on an earlier version,
see the metadata.stream() documentation.
Overview
Streams v2 is a major upgrade that provides:
- Unlimited stream length (previously capped at 2000 chunks)
- Unlimited active streams per run (previously 5)
- Improved reliability with automatic resumption on connection loss
- 28-day stream retention (previously 1 day)
- Multiple client streams can pipe to a single stream
- Enhanced dashboard visibility for viewing stream data in real-time
Enabling Streams v2
Streams v2 is automatically enabled when triggering runs from the SDK using 4.1.0 or later. If you aren’t triggering via the SDK, you’ll need to explicitly enable v2 streams via setting the x-trigger-realtime-streams-version=v2 header when triggering the task.
If you’d like to opt-out of the v2 streams, you can see so in one of the following two ways:
import { auth } from "@trigger.dev/sdk";
auth.configure({
future: {
v2RealtimeStreams: false,
},
});
Option 2: Environment Variable
Set the TRIGGER_V2_REALTIME_STREAMS=0 environment variable in your backend code (where you trigger tasks).
Limits Comparison
| Limit | Streams v1 | Streams v2 |
|---|
| Maximum stream length | 2000 | Unlimited |
| Number of active streams per run | 5 | Unlimited |
| Maximum streams per run | 10 | Unlimited |
| Maximum stream TTL | 1 day | 28 days |
| Maximum stream size | 10MB | 300 MiB |
Quick Start
The recommended workflow for using Realtime Streams v2:
- Define your streams in a shared location using
streams.define()
- Use the defined stream in your tasks with
.pipe(), .append(), or .writer()
- Read from the stream using
.read() or the useRealtimeStream hook in React
This approach gives you full type safety, better code organization, and easier maintenance as your application grows.
Defining Typed Streams (Recommended)
The recommended way to work with streams is to define them once with streams.define(). This allows you to specify the chunk type and stream ID in one place, and then reuse that definition throughout your codebase with full type safety.
Creating a Defined Stream
Define your streams in a shared location (like app/streams.ts or trigger/streams.ts):
import { streams, InferStreamType } from "@trigger.dev/sdk";
// Define a stream with a specific type
export const aiStream = streams.define<string>({
id: "ai-output",
});
// Export the type for use in frontend components
export type AIStreamPart = InferStreamType<typeof aiStream>;
You can define streams for any JSON-serializable type:
import { streams, InferStreamType } from "@trigger.dev/sdk";
import { UIMessageChunk } from "ai";
// Stream for AI UI message chunks
export const aiStream = streams.define<UIMessageChunk>({
id: "ai",
});
// Stream for progress updates
export const progressStream = streams.define<{ step: string; percent: number }>({
id: "progress",
});
// Stream for simple text
export const logStream = streams.define<string>({
id: "logs",
});
// Export types
export type AIStreamPart = InferStreamType<typeof aiStream>;
export type ProgressStreamPart = InferStreamType<typeof progressStream>;
export type LogStreamPart = InferStreamType<typeof logStream>;
Using Defined Streams in Tasks
Once defined, you can use all stream methods on your defined stream:
import { task } from "@trigger.dev/sdk";
import { aiStream } from "./streams";
export const streamTask = task({
id: "stream-task",
run: async (payload: { prompt: string }) => {
// Get a stream from an AI service, database, etc.
const stream = await getAIStream(payload.prompt);
// Pipe the stream using your defined stream
const { stream: readableStream, waitUntilComplete } = aiStream.pipe(stream);
// Option A: Iterate over the stream locally
for await (const chunk of readableStream) {
console.log("Received chunk:", chunk);
}
// Option B: Wait for the stream to complete
await waitUntilComplete();
return { message: "Stream completed" };
},
});
Reading from a Stream
Use the defined stream’s read() method to consume data from anywhere (frontend, backend, or another task):
import { aiStream } from "./streams";
const stream = await aiStream.read(runId);
for await (const chunk of stream) {
console.log(chunk); // chunk is typed as the stream's chunk type
}
With options:
const stream = await aiStream.read(runId, {
timeoutInSeconds: 60, // Stop if no data for 60 seconds
startIndex: 10, // Start from the 10th chunk
});
Appending to a Stream
Use the defined stream’s append() method to add a single chunk:
import { task } from "@trigger.dev/sdk";
import { aiStream, progressStream, logStream } from "./streams";
export const appendTask = task({
id: "append-task",
run: async (payload) => {
// Append to different streams with full type safety
await logStream.append("Processing started");
await progressStream.append({ step: "Initialization", percent: 0 });
// Do some work...
await progressStream.append({ step: "Processing", percent: 50 });
await logStream.append("Step 1 complete");
// Do more work...
await progressStream.append({ step: "Complete", percent: 100 });
await logStream.append("All steps complete");
},
});
Writing Multiple Chunks
Use the defined stream’s writer() method for more complex stream writing:
import { task } from "@trigger.dev/sdk";
import { logStream } from "./streams";
export const writerTask = task({
id: "writer-task",
run: async (payload) => {
const { waitUntilComplete } = logStream.writer({
execute: ({ write, merge }) => {
// Write individual chunks
write("Chunk 1");
write("Chunk 2");
// Merge another stream
const additionalStream = ReadableStream.from(["Chunk 3", "Chunk 4", "Chunk 5"]);
merge(additionalStream);
},
});
await waitUntilComplete();
},
});
Using Defined Streams in React
Defined streams work seamlessly with the useRealtimeStream hook:
"use client";
import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { aiStream } from "@/app/streams";
export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) {
// Pass the defined stream directly - full type safety!
const { parts, error } = useRealtimeStream(aiStream, runId, {
accessToken,
timeoutInSeconds: 600,
});
if (error) return <div>Error: {error.message}</div>;
if (!parts) return <div>Loading...</div>;
return (
<div>
{parts.map((part, i) => (
<span key={i}>{part}</span>
))}
</div>
);
}
Direct Stream Methods (Without Defining)
We strongly recommend using streams.define() instead of direct methods. Defined streams provide
better organization, full type safety, and make it easier to maintain your codebase as it grows.
If you have a specific reason to avoid defined streams, you can use stream methods directly by specifying the stream key each time.
Direct Piping
import { streams, task } from "@trigger.dev/sdk";
export const directStreamTask = task({
id: "direct-stream",
run: async (payload: { prompt: string }) => {
const stream = await getAIStream(payload.prompt);
// Specify the stream key directly
const { stream: readableStream, waitUntilComplete } = streams.pipe("ai-output", stream);
await waitUntilComplete();
},
});
Direct Reading
import { streams } from "@trigger.dev/sdk";
// Specify the stream key when reading
const stream = await streams.read(runId, "ai-output");
for await (const chunk of stream) {
console.log(chunk);
}
Direct Appending
import { streams, task } from "@trigger.dev/sdk";
export const directAppendTask = task({
id: "direct-append",
run: async (payload) => {
// Specify the stream key each time
await streams.append("logs", "Processing started");
await streams.append("progress", "50%");
await streams.append("logs", "Complete");
},
});
Direct Writing
import { streams, task } from "@trigger.dev/sdk";
export const directWriterTask = task({
id: "direct-writer",
run: async (payload) => {
const { waitUntilComplete } = streams.writer("output", {
execute: ({ write, merge }) => {
write("Chunk 1");
write("Chunk 2");
},
});
await waitUntilComplete();
},
});
Default Stream
Every run has a “default” stream, allowing you to skip the stream key entirely. This is useful for simple cases where you only need one stream per run.
Using direct methods:
import { streams, task } from "@trigger.dev/sdk";
export const defaultStreamTask = task({
id: "default-stream",
run: async (payload) => {
const stream = getDataStream();
// No stream key needed - uses "default"
const { waitUntilComplete } = streams.pipe(stream);
await waitUntilComplete();
},
});
// Reading from the default stream
const readStream = await streams.read(runId);
Targeting Different Runs
You can pipe streams to parent, root, or any other run using the target option. This works with both defined streams and direct methods.
With Defined Streams
import { task } from "@trigger.dev/sdk";
import { logStream } from "./streams";
export const childTask = task({
id: "child-task",
run: async (payload, { ctx }) => {
const stream = getDataStream();
// Pipe to parent run
logStream.pipe(stream, { target: "parent" });
// Pipe to root run
logStream.pipe(stream, { target: "root" });
// Pipe to self (default behavior)
logStream.pipe(stream, { target: "self" });
// Pipe to a specific run ID
logStream.pipe(stream, { target: payload.otherRunId });
},
});
With Direct Methods
import { streams, task } from "@trigger.dev/sdk";
export const childTask = task({
id: "child-task",
run: async (payload, { ctx }) => {
const stream = getDataStream();
// Pipe to parent run
streams.pipe("output", stream, { target: "parent" });
// Pipe to root run
streams.pipe("output", stream, { target: "root" });
// Pipe to a specific run ID
streams.pipe("output", stream, { target: payload.otherRunId });
},
});
Streaming from Outside a Task
If you specify a target run ID, you can pipe streams from anywhere (like a Next.js API route):
import { streams } from "@trigger.dev/sdk";
import { openai } from "@ai-sdk/openai";
import { streamText } from "ai";
export async function POST(req: Request) {
const { messages, runId } = await req.json();
const result = streamText({
model: openai("gpt-4o"),
messages,
});
// Pipe AI stream to a Trigger.dev run
const { stream } = streams.pipe("ai-stream", result.toUIMessageStream(), {
target: runId,
});
return new Response(stream as any, {
headers: { "Content-Type": "text/event-stream" },
});
}
React Hook
Use the useRealtimeStream hook to subscribe to streams in your React components.
With Defined Streams (Recommended)
"use client";
import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { aiStream } from "@/app/streams";
export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) {
// Pass the defined stream directly for full type safety
const { parts, error } = useRealtimeStream(aiStream, runId, {
accessToken,
timeoutInSeconds: 600,
onData: (chunk) => {
console.log("New chunk:", chunk); // chunk is typed!
},
});
if (error) return <div>Error: {error.message}</div>;
if (!parts) return <div>Loading...</div>;
return (
<div>
{parts.map((part, i) => (
<span key={i}>{part}</span>
))}
</div>
);
}
With Direct Stream Keys
If you prefer not to use defined streams, you can specify the stream key directly:
"use client";
import { useRealtimeStream } from "@trigger.dev/react-hooks";
export function StreamViewer({ accessToken, runId }: { accessToken: string; runId: string }) {
const { parts, error } = useRealtimeStream<string>(runId, "ai-output", {
accessToken,
timeoutInSeconds: 600,
});
if (error) return <div>Error: {error.message}</div>;
if (!parts) return <div>Loading...</div>;
return (
<div>
{parts.map((part, i) => (
<span key={i}>{part}</span>
))}
</div>
);
}
Using Default Stream
// Omit stream key to use the default stream
const { parts, error } = useRealtimeStream<string>(runId, {
accessToken,
});
Hook Options
const { parts, error } = useRealtimeStream(streamDef, runId, {
accessToken: "pk_...", // Required: Public access token
baseURL: "https://api.trigger.dev", // Optional: Custom API URL
timeoutInSeconds: 60, // Optional: Timeout (default: 60)
startIndex: 0, // Optional: Start from specific chunk
throttleInMs: 16, // Optional: Throttle updates (default: 16ms)
onData: (chunk) => {}, // Optional: Callback for each chunk
});
Complete Example: AI Streaming
Define the stream
// app/streams.ts
import { streams, InferStreamType } from "@trigger.dev/sdk";
import { UIMessageChunk } from "ai";
export const aiStream = streams.define<UIMessageChunk>({
id: "ai",
});
export type AIStreamPart = InferStreamType<typeof aiStream>;
Create the task
// trigger/ai-task.ts
import { task } from "@trigger.dev/sdk";
import { openai } from "@ai-sdk/openai";
import { streamText } from "ai";
import { aiStream } from "@/app/streams";
export const generateAI = task({
id: "generate-ai",
run: async (payload: { prompt: string }) => {
const result = streamText({
model: openai("gpt-4o"),
prompt: payload.prompt,
});
const { waitUntilComplete } = aiStream.pipe(result.toUIMessageStream());
await waitUntilComplete();
return { success: true };
},
});
Frontend component
// components/ai-stream.tsx
"use client";
import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { aiStream } from "@/app/streams";
export function AIStream({ accessToken, runId }: { accessToken: string; runId: string }) {
const { parts, error } = useRealtimeStream(aiStream, runId, {
accessToken,
timeoutInSeconds: 300,
});
if (error) return <div>Error: {error.message}</div>;
if (!parts) return <div>Loading...</div>;
return (
<div className="prose">
{parts.map((part, i) => (
<span key={i}>{part}</span>
))}
</div>
);
}
Migration from v1
If you’re using the old metadata.stream() API, here’s how to migrate to the recommended v2 approach:
Step 1: Define Your Streams
Create a shared streams definition file:
// app/streams.ts or trigger/streams.ts
import { streams, InferStreamType } from "@trigger.dev/sdk";
export const myStream = streams.define<string>({
id: "my-stream",
});
export type MyStreamPart = InferStreamType<typeof myStream>;
Step 2: Update Your Tasks
Replace metadata.stream() with the defined stream’s pipe() method:
// Before (v1)
import { metadata, task } from "@trigger.dev/sdk";
export const myTask = task({
id: "my-task",
run: async (payload) => {
const stream = getDataStream();
await metadata.stream("my-stream", stream);
},
});
// After (v2 - Recommended)
import { task } from "@trigger.dev/sdk";
import { myStream } from "./streams";
export const myTask = task({
id: "my-task",
run: async (payload) => {
const stream = getDataStream();
// Don't await - returns immediately
const { waitUntilComplete } = myStream.pipe(stream);
// Optionally wait for completion
await waitUntilComplete();
},
});
Step 3: Update Your Frontend
Use the defined stream with useRealtimeStream:
// Before
const { parts, error } = useRealtimeStream<string>(runId, "my-stream", {
accessToken,
});
// After
import { myStream } from "@/app/streams";
const { parts, error } = useRealtimeStream(myStream, runId, {
accessToken,
});
Alternative: Direct Methods (Not Recommended)
If you prefer not to use defined streams, you can use direct methods:
import { streams, task } from "@trigger.dev/sdk";
export const myTask = task({
id: "my-task",
run: async (payload) => {
const stream = getDataStream();
const { waitUntilComplete } = streams.pipe("my-stream", stream);
await waitUntilComplete();
},
});
Reliability Features
Streams v2 includes automatic reliability improvements:
- Automatic resumption: If a connection is lost, both appending and reading will automatically resume from the last successful chunk
- No data loss: Network issues won’t cause stream data to be lost
- Idempotent operations: Duplicate chunks are automatically handled
These improvements happen automatically - no code changes needed.
Dashboard Integration
Streams are now visible in the Trigger.dev dashboard, allowing you to:
- View stream data in real-time as it’s generated
- Inspect historical stream data for completed runs
- Debug streaming issues with full visibility into chunk delivery
Best Practices
- Always use
streams.define(): Define your streams in a shared location for better organization, type safety, and code reusability. This is the recommended approach for all streams.
- Export stream types: Use
InferStreamType to export types for your frontend components
- Handle errors gracefully: Always check for errors when reading streams in your UI
- Set appropriate timeouts: Adjust
timeoutInSeconds based on your use case (AI completions may need longer timeouts)
- Target parent runs: When orchestrating with child tasks, pipe to parent runs for easier consumption
- Throttle frontend updates: Use
throttleInMs in useRealtimeStream to prevent excessive re-renders
- Use descriptive stream IDs: Choose clear, descriptive IDs like
"ai-output" or "progress" instead of generic names
Troubleshooting
Stream not appearing in dashboard
- Ensure you’ve enabled Streams v2 via the future flag or environment variable
- Verify your task is actually writing to the stream
- Check that the stream key matches between writing and reading
Stream timeout errors
- Increase
timeoutInSeconds in your read() or useRealtimeStream() calls
- Ensure your stream source is actively producing data
- Check network connectivity between your application and Trigger.dev
Missing chunks
- With v2, chunks should never be lost due to automatic resumption
- Verify you’re reading from the correct stream key
- Check the
startIndex option if you’re not seeing expected chunks