December 2, 2024
Realtime goes GA
Keep your users updated with real-time task progress. Now with LLM streaming support and increased limits.
Today Trigger.dev Realtime goes GA with better reliability, increased limits, and a brand new feature that allows you to stream LLM responses directly to your frontend.
As a quick overview, Realtime allows you to subscribe to runs and receive updates in real-time, from your backend or your frontend. This allows you to bridge the gap between your long-running background tasks in Trigger.dev and your application to give your users a real-time experience about the status of their tasks:
- Show progress bars & toasts
- AI/LLM Streaming Responses
- Multi-step task progress
- AI Agent Observability
Demo
In this demo we cover the following use cases:
- Uploading an image using UploadThing and passing the URL to a background task in Trigger.dev.
- Using the Realtime API to subscribe to runs and receive updates in real-time from a Next.js app using our Realtime React hooks.
- Updating fal.ai status in a run using metadata to provide custom real-time updates to the user.
- Using tags to subscribe to multiple runs at once.
How it works
The Realtime API is built on top of Electric SQL, an open-source PostgreSQL syncing engine that works over HTTP. The Trigger.dev server wraps Electric SQL and provides a simple API to subscribe to runs and get real-time updates.
Features and how to use them
After you trigger a task, you can subscribe to the run using the runs.subscribeToRun
function. This function returns an async iterator that you can use to get updates on the run status.
import { runs, tasks } from "@trigger.dev/sdk/v3";// Somewhere in your backend codeasync function myBackend() { const handle = await tasks.trigger("my-task", { some: "data" }); for await (const run of runs.subscribeToRun(handle.id)) { // This will log the run every time it changes console.log(run); }}
Alternatively, you can subscribe to changes to any run that includes a specific tag (or tags) using the runs.subscribeToRunsWithTag
function.
import { runs } from "@trigger.dev/sdk/v3";// Somewhere in your backend codefor await (const run of runs.subscribeToRunsWithTag("user:1234")) { // This will log the run every time it changes // for all runs with the tag "user:1234" console.log(run);}
Or you can subscribe to changes to any run in a batch:
import { runs, tasks } from "@trigger.dev/sdk/v3";async function myBackend() { const handle = await tasks.batchTrigger("my-task", [ { payload: { some: "data" } }, { payload: { some: "other-data" } }, ]); for await (const run of runs.subscribeToBatch([handle.batchId])) { // This will log whenever a run in the batch changes console.log(run); }}
You will receive updates whenever a run changes for the following reasons:
- The run moves to a new state or completes.
- Run tags are added or removed.
- Run metadata is updated.
Realtime React hooks
You can also use the Realtime API in your frontend using our new @trigger.dev/react-hooks
package. Here's an example of how you can use the useRealtimeRun
hook to subscribe to a run and get updates in real-time:
"use client"; // This is needed for Next.js App Router or other RSC frameworksimport { useRealtimeRun } from "@trigger.dev/react-hooks";export function MyComponent({ runId }: { runId: string }) { const { run, error } = useRealtimeRun(runId); if (error) return <div>Error: {error.message}</div>; return ( <div> Run: {run.id} is in {run.status} state </div> );}
Or subscribe to all runs that have a specific tag:
"use client"; // This is needed for Next.js App Router or other RSC frameworksimport { useRealtimeRunsWithTag } from "@trigger.dev/react-hooks";export function MyComponent({ tag }: { tag: string }) { const { runs, error } = useRealtimeRunsWithTag(tag); if (error) return <div>Error: {error.message}</div>; return ( <div> {runs.map((run) => ( <div key={run.id}> Run: {run.id} is in {run.status} state </div> ))} </div> );}
Or are in a specific batch:
"use client"; // This is needed for Next.js App Router or other RSC frameworksimport { useRealtimeBatch } from "@trigger.dev/react-hooks";export function MyComponent({ batchId }: { batchId: string }) { const { runs, error } = useRealtimeBatch(batchId); if (error) return <div>Error: {error.message}</div>; return ( <div> {runs.map((run) => ( <div key={run.id}> Run: {run.id} is in {run.status} state </div> ))} </div> );}
Realtime streams
If you are working with streams in your tasks, for example when using the OpenAI SDK, you can forward the stream through the Realtime API to provide real-time updates to your users.
import { task, metadata } from "@trigger.dev/sdk/v3";import OpenAI from "openai";const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY,});export type STREAMS = { openai: OpenAI.ChatCompletionChunk;};export const myTask = task({ id: "my-task", run: async (payload: { prompt: string }) => { const completion = await openai.chat.completions.create({ messages: [{ role: "user", content: payload.prompt }], model: "gpt-3.5-turbo", stream: true, // 👈 Enable stream of OpenAI.ChatCompletionChunk objects }); // Register the stream with the key "openai" // This will "tee" the stream and send it to the metadata system const stream = await metadata.stream("openai", completion); let text = ""; // You can read the returned stream as an async iterator for await (const chunk of stream) { logger.log("Received chunk", { chunk }); // The type of the chunk is determined by the provider text += chunk.choices.map((choice) => choice.delta?.content).join(""); } return { text }; },});
You can then subscribe to the stream using the runs.subscribeToRun
method:
import { runs } from "@trigger.dev/sdk/v3";import type { STREAMS } from "./trigger/my-task";// Somewhere in your backendasync function subscribeToStream(runId: string) { // Use a for-await loop to subscribe to the stream for await (const part of runs.subscribeToRun(runId).withStreams<STREAMS>()) { switch (part.type) { case "run": { console.log("Received run", part.run); break; } case "openai": { // part.chunk is of type OpenAI.ChatCompletionChunk console.log("Received OpenAI chunk", part.chunk); break; } } }}
Stream support is also available in the Realtime React hooks:
import { useRealtimeRunWithStreams } from "@trigger.dev/sdk/v3";import type { myTask, STREAMS } from "./trigger/my-task";// Somewhere in your React componentfunction MyComponent({ runId }: { runId: string }) { const { run, streams } = useRealtimeRunWithStreams<typeof myTask, STREAMS>( runId ); if (!run) { return <div>Loading...</div>; } const text = streams.openai ?.map((chunk) => chunk.choices.map((choice) => choice.delta?.content).join("") ) .join(""); return ( <div> <h1>Run ID: {run.id}</h1> <h2>OpenAI response:</h2> <p>{text}</p> </div> );}
AI SDK + Streams + Tool Tasks
We've tailored Realtime streams to work seamlessly with the AI SDK, allowing you to provide real-time updates to your users as they interact with your AI models.
This demo is built with Realtime streams, the AI SDK, and Tool Tasks, which allow you to build Trigger.dev tasks that double as tools for the AI SDK.
It uses streamText to get the live weather forecast for a location and then explains the forecast in real-time using the AI SDK:
import { toolTask, schemaTask, metadata } from "@trigger.dev/sdk/v3";import { streamText, type TextStreamPart } from "ai";import { openai } from "@ai-sdk/openai";export type STREAMS = { openai: TextStreamPart<{ getWeather: typeof weatherTask.tool }>;};// `toolTask` are regular tasks that can be used as tools in the AI SDKexport const weatherTask = toolTask({ id: "weather", description: "Get the weather for a location", parameters: z.object({ location: z.string(), }), run: async ({ location }) => { // return mock data return { location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }; },});export const aiWeather = schemaTask({ id: "ai-weather", description: "Send the fullStream from the AI SDK to the metadata system", schema: z.object({ model: z.string().default("gpt-4o-mini"), prompt: z.string().default("Hello, how are you?"), }), run: async ({ model, prompt }) => { const result = streamText({ model: openai(model), prompt, tools: { getWeather: weatherTask.tool, }, maxSteps: 10, experimental_telemetry: { // 👈 Log AI SDK telemetry to trigger.dev isEnabled: true, functionId: "ai-weather", }, }); // Send the fullStream, which includes tool calls and results await metadata.stream("openai", result.fullStream); },});
Viewing this task in the dashboard will show all the calls to the AI SDK and the tool tasks:
You can view and run the full demo here.
Type-safety all the way down
When developing any new API, we always strive to provide the best developer experience. The Realtime API is no exception. We've made sure that the API is fully type-safe, from the backend to the frontend.
All of the Realtime APIs take a generic parameter that specifies the type of the task (or tasks). For example, when using subscribeToRun
, you can specify the type of the task that you're subscribing to:
import { runs } from "@trigger.dev/sdk/v3";import type { myTask } from "./trigger/tasks";// Somewhere in your backendfor await (const run of runs.subscribeToRun<typeof myTask>("run_1234")) { console.log(run.payload); // 👈 typed to the payload of myTask console.log(run.output); // 👈 typed to the output of myTask}
When subscribing to multiple different tasks, you can provide a union of the task types:
import { runs } from "@trigger.dev/sdk/v3";import type { myTask, myOtherTask } from "./trigger/tasks";// Somewhere in your backendfor await (const run of runs.subscribeToRunsWithTag< typeof myTask | typeof myOtherTask>("tag:1234")) { // Now narrow the run based on the task identifier switch (run.taskIdentifier) { case "my-task": { console.log(run.payload); // 👈 typed to the payload of myTask console.log(run.output); // 👈 typed to to the output of myTask break; } case "my-other-task": { console.log(run.payload); // 👈 typed to the payload of myOtherTask console.log(run.output); // 👈 typed to the output of myOtherTask break; } }}
When using the Realtime React hooks, you can also specify the type of the task:
import { useRealtimeRun } from "@trigger.dev/react-hooks";import type { myTask } from "./trigger/tasks";// 👆 make sure you are importing the type onlyfunction MyComponent() { const { run, error } = useRealtimeRun<typeof myTask>("run_1234"); if (error) return <div>Error: {error.message}</div>; return ( <div> Run: {run.id} is in {run.status} state with output: {run.output.message} </div> );}
Stream support is also fully type-safe:
import { runs } from "@trigger.dev/sdk/v3";import type { STREAMS } from "./trigger/my-task";// Somewhere in your backendasync function subscribeToStream(runId: string) { // Use a for-await loop to subscribe to the stream for await (const part of runs.subscribeToRun(runId).withStreams<STREAMS>()) { switch (part.type) { case "run": { console.log("Received run", part.run); break; } case "openai": { // part.chunk is of type OpenAI.ChatCompletionChunk console.log("Received OpenAI chunk", part.chunk); break; } } }}
Granular access control
You can create Public Access Tokens with limited scopes that can be used to restrict access when using the Realtime API, especially from your frontend:
- It's unsafe to share your Trigger.dev secret keys publicly.
- Public Access Tokens can have limited scopes, such as read access to specific runs.
import { auth } from "@trigger.dev/sdk/v3";// Somewhere in your backendconst publicToken = await auth.createPublicToken({ scopes: { read: { runs: ["run_1234"], // 👈 This token can only access run_1234 }, }, expirationTime: "1h", // 👈 The token will expire in 1 hour});
Now you can use this token in your frontend:
import { useRealtimeRun } from "@trigger.dev/react-hooks";export function MyComponent({ runId, publicAccessToken,}: { runId: string; publicAccessToken: string;}) { const { run, error } = useRealtimeRun("run_1234", { accessToken: publicAccessToken, }); if (error) return <div>Error: {error.message}</div>; return ( <div> Run: {run.id} is in {run.status} state </div> );}
You can also scope to tags and batches:
import { auth } from "@trigger.dev/sdk/v3";// Somewhere in your backendconst publicToken = await auth.createPublicToken({ scopes: { read: { tags: ["user:1234"], batch: ["batch_1234"], }, }, expirationTime: "1h",});
For convienence, we'll also auto-generate Public Access Tokens for you when you trigger a task that has access to the run:
import { tasks } from "@trigger.dev/sdk/v3";// Somewhere in your backendconst handle = await tasks.trigger("my-task", { some: "data" });// This access token can be used to subscribe to the runconsole.log(handle.publicAccessToken);
In the wild
Trigger.dev Realtime already has 60+ organizations using it in production, including:
Midday.ai
Pontus from Midday.ai has been posting about using the Trigger.dev and the Realtime API to sync bank connections and show real-time status updates to their users:
And since they are also open source, you can follow along with their usage in the Midday.ai GitHub repository.
Cookbook AI
Cookbook AI, a recipe generation tool, has been using the Realtime API to provide real-time updates to their users as they generate recipes:
Papermark.io
Papermark, the open-source document tracking tool, is using the Realtime API to provide real-time updates to their users as they upload and process documents:
You can view the Realtime PR in the Papermark repository.
Going GA
The Realtime API is now generally available and ready for production use. We've been working with a number of customers to ensure that the API is stable and reliable, and we're excited to see what you build with it. We've increased the number of concurrent connections for our Hobby and Pro plans:
- Hobby plan now supports up to 50 concurrent connections.
- Pro plan now supports 500 concurrent connections, with the ability to increase this limit if needed.
And if you are self-hosting, Electric SQL is open-source with an Apache 2.0 license, so you can use the Realtime API when self-hosting.
Getting started
To get started with the Realtime API, you can checkout the Realtime API documentation and the React hooks documentation.
Additional resources:
- Trigger.dev 14 minute walkthrough
- Next.js Realtime Demo
- Realtime Streams documentation
- Using tool tasks with the AI SDK
As always, we're here to help if you have any questions or need assistance. You can reach out to us on Discord or X.