December 2, 2024

Realtime goes GA

Keep your users updated with real-time task progress. Now with LLM streaming support and increased limits.

Today Trigger.dev Realtime goes GA with better reliability, increased limits, and a brand new feature that allows you to stream LLM responses directly to your frontend.

As a quick overview, Realtime allows you to subscribe to runs and receive updates in real-time, from your backend or your frontend. This allows you to bridge the gap between your long-running background tasks in Trigger.dev and your application to give your users a real-time experience about the status of their tasks:

  • Show progress bars & toasts
  • AI/LLM Streaming Responses
  • Multi-step task progress
  • AI Agent Observability

Demo


In this demo we cover the following use cases:

  • Uploading an image using UploadThing and passing the URL to a background task in Trigger.dev.
  • Using the Realtime API to subscribe to runs and receive updates in real-time from a Next.js app using our Realtime React hooks.
  • Updating fal.ai status in a run using metadata to provide custom real-time updates to the user.
  • Using tags to subscribe to multiple runs at once.

How it works

The Realtime API is built on top of Electric SQL, an open-source PostgreSQL syncing engine that works over HTTP. The Trigger.dev server wraps Electric SQL and provides a simple API to subscribe to runs and get real-time updates.

Features and how to use them

After you trigger a task, you can subscribe to the run using the runs.subscribeToRun function. This function returns an async iterator that you can use to get updates on the run status.


import { runs, tasks } from "@trigger.dev/sdk/v3";
// Somewhere in your backend code
async function myBackend() {
const handle = await tasks.trigger("my-task", { some: "data" });
for await (const run of runs.subscribeToRun(handle.id)) {
// This will log the run every time it changes
console.log(run);
}
}

Alternatively, you can subscribe to changes to any run that includes a specific tag (or tags) using the runs.subscribeToRunsWithTag function.


import { runs } from "@trigger.dev/sdk/v3";
// Somewhere in your backend code
for await (const run of runs.subscribeToRunsWithTag("user:1234")) {
// This will log the run every time it changes
// for all runs with the tag "user:1234"
console.log(run);
}

Or you can subscribe to changes to any run in a batch:


import { runs, tasks } from "@trigger.dev/sdk/v3";
async function myBackend() {
const handle = await tasks.batchTrigger("my-task", [
{ payload: { some: "data" } },
{ payload: { some: "other-data" } },
]);
for await (const run of runs.subscribeToBatch([handle.batchId])) {
// This will log whenever a run in the batch changes
console.log(run);
}
}

You will receive updates whenever a run changes for the following reasons:

  • The run moves to a new state or completes.
  • Run tags are added or removed.
  • Run metadata is updated.

Realtime React hooks

You can also use the Realtime API in your frontend using our new @trigger.dev/react-hooks package. Here's an example of how you can use the useRealtimeRun hook to subscribe to a run and get updates in real-time:


"use client"; // This is needed for Next.js App Router or other RSC frameworks
import { useRealtimeRun } from "@trigger.dev/react-hooks";
export function MyComponent({ runId }: { runId: string }) {
const { run, error } = useRealtimeRun(runId);
if (error) return <div>Error: {error.message}</div>;
return (
<div>
Run: {run.id} is in {run.status} state
</div>
);
}

Or subscribe to all runs that have a specific tag:


"use client"; // This is needed for Next.js App Router or other RSC frameworks
import { useRealtimeRunsWithTag } from "@trigger.dev/react-hooks";
export function MyComponent({ tag }: { tag: string }) {
const { runs, error } = useRealtimeRunsWithTag(tag);
if (error) return <div>Error: {error.message}</div>;
return (
<div>
{runs.map((run) => (
<div key={run.id}>
Run: {run.id} is in {run.status} state
</div>
))}
</div>
);
}

Or are in a specific batch:


"use client"; // This is needed for Next.js App Router or other RSC frameworks
import { useRealtimeBatch } from "@trigger.dev/react-hooks";
export function MyComponent({ batchId }: { batchId: string }) {
const { runs, error } = useRealtimeBatch(batchId);
if (error) return <div>Error: {error.message}</div>;
return (
<div>
{runs.map((run) => (
<div key={run.id}>
Run: {run.id} is in {run.status} state
</div>
))}
</div>
);
}

Realtime streams

If you are working with streams in your tasks, for example when using the OpenAI SDK, you can forward the stream through the Realtime API to provide real-time updates to your users.


import { task, metadata } from "@trigger.dev/sdk/v3";
import OpenAI from "openai";
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
export type STREAMS = {
openai: OpenAI.ChatCompletionChunk;
};
export const myTask = task({
id: "my-task",
run: async (payload: { prompt: string }) => {
const completion = await openai.chat.completions.create({
messages: [{ role: "user", content: payload.prompt }],
model: "gpt-3.5-turbo",
stream: true, // 👈 Enable stream of OpenAI.ChatCompletionChunk objects
});
// Register the stream with the key "openai"
// This will "tee" the stream and send it to the metadata system
const stream = await metadata.stream("openai", completion);
let text = "";
// You can read the returned stream as an async iterator
for await (const chunk of stream) {
logger.log("Received chunk", { chunk });
// The type of the chunk is determined by the provider
text += chunk.choices.map((choice) => choice.delta?.content).join("");
}
return { text };
},
});

You can then subscribe to the stream using the runs.subscribeToRun method:


import { runs } from "@trigger.dev/sdk/v3";
import type { STREAMS } from "./trigger/my-task";
// Somewhere in your backend
async function subscribeToStream(runId: string) {
// Use a for-await loop to subscribe to the stream
for await (const part of runs.subscribeToRun(runId).withStreams<STREAMS>()) {
switch (part.type) {
case "run": {
console.log("Received run", part.run);
break;
}
case "openai": {
// part.chunk is of type OpenAI.ChatCompletionChunk
console.log("Received OpenAI chunk", part.chunk);
break;
}
}
}
}

Stream support is also available in the Realtime React hooks:


import { useRealtimeRunWithStreams } from "@trigger.dev/sdk/v3";
import type { myTask, STREAMS } from "./trigger/my-task";
// Somewhere in your React component
function MyComponent({ runId }: { runId: string }) {
const { run, streams } = useRealtimeRunWithStreams<typeof myTask, STREAMS>(
runId
);
if (!run) {
return <div>Loading...</div>;
}
const text = streams.openai
?.map((chunk) =>
chunk.choices.map((choice) => choice.delta?.content).join("")
)
.join("");
return (
<div>
<h1>Run ID: {run.id}</h1>
<h2>OpenAI response:</h2>
<p>{text}</p>
</div>
);
}

AI SDK + Streams + Tool Tasks

We've tailored Realtime streams to work seamlessly with the AI SDK, allowing you to provide real-time updates to your users as they interact with your AI models.

This demo is built with Realtime streams, the AI SDK, and Tool Tasks, which allow you to build Trigger.dev tasks that double as tools for the AI SDK.

It uses streamText to get the live weather forecast for a location and then explains the forecast in real-time using the AI SDK:


import { toolTask, schemaTask, metadata } from "@trigger.dev/sdk/v3";
import { streamText, type TextStreamPart } from "ai";
import { openai } from "@ai-sdk/openai";
export type STREAMS = {
openai: TextStreamPart<{ getWeather: typeof weatherTask.tool }>;
};
// `toolTask` are regular tasks that can be used as tools in the AI SDK
export const weatherTask = toolTask({
id: "weather",
description: "Get the weather for a location",
parameters: z.object({
location: z.string(),
}),
run: async ({ location }) => {
// return mock data
return {
location,
temperature: 72 + Math.floor(Math.random() * 21) - 10,
};
},
});
export const aiWeather = schemaTask({
id: "ai-weather",
description: "Send the fullStream from the AI SDK to the metadata system",
schema: z.object({
model: z.string().default("gpt-4o-mini"),
prompt: z.string().default("Hello, how are you?"),
}),
run: async ({ model, prompt }) => {
const result = streamText({
model: openai(model),
prompt,
tools: {
getWeather: weatherTask.tool,
},
maxSteps: 10,
experimental_telemetry: {
// 👈 Log AI SDK telemetry to trigger.dev
isEnabled: true,
functionId: "ai-weather",
},
});
// Send the fullStream, which includes tool calls and results
await metadata.stream("openai", result.fullStream);
},
});

Viewing this task in the dashboard will show all the calls to the AI SDK and the tool tasks:

You can view and run the full demo here.

Type-safety all the way down

When developing any new API, we always strive to provide the best developer experience. The Realtime API is no exception. We've made sure that the API is fully type-safe, from the backend to the frontend.

All of the Realtime APIs take a generic parameter that specifies the type of the task (or tasks). For example, when using subscribeToRun, you can specify the type of the task that you're subscribing to:


import { runs } from "@trigger.dev/sdk/v3";
import type { myTask } from "./trigger/tasks";
// Somewhere in your backend
for await (const run of runs.subscribeToRun<typeof myTask>("run_1234")) {
console.log(run.payload); // 👈 typed to the payload of myTask
console.log(run.output); // 👈 typed to the output of myTask
}

When subscribing to multiple different tasks, you can provide a union of the task types:


import { runs } from "@trigger.dev/sdk/v3";
import type { myTask, myOtherTask } from "./trigger/tasks";
// Somewhere in your backend
for await (const run of runs.subscribeToRunsWithTag<
typeof myTask | typeof myOtherTask
>("tag:1234")) {
// Now narrow the run based on the task identifier
switch (run.taskIdentifier) {
case "my-task": {
console.log(run.payload); // 👈 typed to the payload of myTask
console.log(run.output); // 👈 typed to to the output of myTask
break;
}
case "my-other-task": {
console.log(run.payload); // 👈 typed to the payload of myOtherTask
console.log(run.output); // 👈 typed to the output of myOtherTask
break;
}
}
}

When using the Realtime React hooks, you can also specify the type of the task:


import { useRealtimeRun } from "@trigger.dev/react-hooks";
import type { myTask } from "./trigger/tasks";
// 👆 make sure you are importing the type only
function MyComponent() {
const { run, error } = useRealtimeRun<typeof myTask>("run_1234");
if (error) return <div>Error: {error.message}</div>;
return (
<div>
Run: {run.id} is in {run.status} state with output: {run.output.message}
</div>
);
}

Stream support is also fully type-safe:


import { runs } from "@trigger.dev/sdk/v3";
import type { STREAMS } from "./trigger/my-task";
// Somewhere in your backend
async function subscribeToStream(runId: string) {
// Use a for-await loop to subscribe to the stream
for await (const part of runs.subscribeToRun(runId).withStreams<STREAMS>()) {
switch (part.type) {
case "run": {
console.log("Received run", part.run);
break;
}
case "openai": {
// part.chunk is of type OpenAI.ChatCompletionChunk
console.log("Received OpenAI chunk", part.chunk);
break;
}
}
}
}

Granular access control

You can create Public Access Tokens with limited scopes that can be used to restrict access when using the Realtime API, especially from your frontend:

  • It's unsafe to share your Trigger.dev secret keys publicly.
  • Public Access Tokens can have limited scopes, such as read access to specific runs.

import { auth } from "@trigger.dev/sdk/v3";
// Somewhere in your backend
const publicToken = await auth.createPublicToken({
scopes: {
read: {
runs: ["run_1234"], // 👈 This token can only access run_1234
},
},
expirationTime: "1h", // 👈 The token will expire in 1 hour
});

Now you can use this token in your frontend:


import { useRealtimeRun } from "@trigger.dev/react-hooks";
export function MyComponent({
runId,
publicAccessToken,
}: {
runId: string;
publicAccessToken: string;
}) {
const { run, error } = useRealtimeRun("run_1234", {
accessToken: publicAccessToken,
});
if (error) return <div>Error: {error.message}</div>;
return (
<div>
Run: {run.id} is in {run.status} state
</div>
);
}

You can also scope to tags and batches:


import { auth } from "@trigger.dev/sdk/v3";
// Somewhere in your backend
const publicToken = await auth.createPublicToken({
scopes: {
read: {
tags: ["user:1234"],
batch: ["batch_1234"],
},
},
expirationTime: "1h",
});

For convienence, we'll also auto-generate Public Access Tokens for you when you trigger a task that has access to the run:


import { tasks } from "@trigger.dev/sdk/v3";
// Somewhere in your backend
const handle = await tasks.trigger("my-task", { some: "data" });
// This access token can be used to subscribe to the run
console.log(handle.publicAccessToken);

In the wild

Trigger.dev Realtime already has 60+ organizations using it in production, including:

Midday.ai

Pontus from Midday.ai has been posting about using the Trigger.dev and the Realtime API to sync bank connections and show real-time status updates to their users:

And since they are also open source, you can follow along with their usage in the Midday.ai GitHub repository.

Cookbook AI

Cookbook AI, a recipe generation tool, has been using the Realtime API to provide real-time updates to their users as they generate recipes:

Papermark.io

Papermark, the open-source document tracking tool, is using the Realtime API to provide real-time updates to their users as they upload and process documents:

You can view the Realtime PR in the Papermark repository.

Going GA

The Realtime API is now generally available and ready for production use. We've been working with a number of customers to ensure that the API is stable and reliable, and we're excited to see what you build with it. We've increased the number of concurrent connections for our Hobby and Pro plans:

  • Hobby plan now supports up to 50 concurrent connections.
  • Pro plan now supports 500 concurrent connections, with the ability to increase this limit if needed.

And if you are self-hosting, Electric SQL is open-source with an Apache 2.0 license, so you can use the Realtime API when self-hosting.

Getting started

To get started with the Realtime API, you can checkout the Realtime API documentation and the React hooks documentation.

Additional resources:

As always, we're here to help if you have any questions or need assistance. You can reach out to us on Discord or X.

Ready to start building?

Build and deploy your first task in 3 minutes.

Get started now
,