Skip to main content

Documentation Index

Fetch the complete documentation index at: https://trigger.dev/docs/llms.txt

Use this file to discover all available pages before exploring further.

The AI Agents and Prompts surface ships as part of the v4.5 release candidate. Install with @trigger.dev/sdk@rc (or pin 4.5.0-rc.0 or later) to use these features — they aren’t yet on the latest stable, and APIs may still change before the 4.5.0 GA. See supported AI SDK versions and the AI chat changelog for details.

chat.agent()

The highest-level approach. Handles message accumulation, stop signals, turn lifecycle, and auto-piping automatically.
To fix a custom UIMessage subtype or typed client data schema, use the ChatBuilder via chat.withUIMessage<...>() and/or chat.withClientData({ schema }). Builder-level hooks can also be chained before .agent(). See Types.
Every chat.agent conversation is backed by a durable Session — externalId is your chatId, type is "chat.agent", taskIdentifier is the agent’s task ID. The session is the run manager: it owns the chat’s runs, persists across run lifecycles, and orchestrates handoffs (idle continuation, chat.requestUpgrade). You rarely need to touch the session directly (chat.stream, chat.messages, chat.stopSignal wrap everything), but payload.sessionId is available if you want to reach in — e.g. sessions.open(payload.sessionId) to write from a sub-agent or from outside the turn loop.
Always spread chat.toStreamTextOptions() into every streamText call. It wires up the prepareStep callback that drives compaction, steering, and background injection — features that silently no-op if the spread is missing. It also injects the system prompt set via chat.prompt(), the resolved model (when a registry is provided), and telemetry metadata.Spread it first in the options object so any explicit overrides win:
streamText({
  ...chat.toStreamTextOptions(),     // or: chat.toStreamTextOptions({ registry, tools }) — see below
  messages,
  abortSignal: signal,
  // any explicit overrides go here
  stopWhen: stepCountIs(15),
});
Examples in this doc keep the spread implicit for brevity, but you should include it in real code.

Simple: return a StreamTextResult

Return the streamText result from run and it’s automatically piped to the frontend:
import { chat } from "@trigger.dev/sdk/ai";
import { streamText, stepCountIs } from "ai";
import { anthropic } from "@ai-sdk/anthropic";

export const simpleChat = chat.agent({
  id: "simple-chat",
  run: async ({ messages, signal }) => {
    return streamText({
      ...chat.toStreamTextOptions(), // prepareStep, system, telemetry — see callout above
      model: anthropic("claude-sonnet-4-5"),
      system: "You are a helpful assistant.",
      messages,
      abortSignal: signal,
      stopWhen: stepCountIs(15),
    });
  },
});

Using chat.pipe() for complex flows

For complex agent flows where streamText is called deep inside your code, use chat.pipe(). It works from anywhere inside a task — even nested function calls.
trigger/agent-chat.ts
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { anthropic } from "@ai-sdk/anthropic";
import type { ModelMessage } from "ai";

export const agentChat = chat.agent({
  id: "agent-chat",
  run: async ({ messages }) => {
    // Don't return anything — chat.pipe is called inside
    await runAgentLoop(messages);
  },
});

async function runAgentLoop(messages: ModelMessage[]) {
  // ... agent logic, tool calls, etc.

  const result = streamText({
    model: anthropic("claude-sonnet-4-5"),
    messages,
    stopWhen: stepCountIs(15),
  });

  // Pipe from anywhere — no need to return it
  await chat.pipe(result);
}

Custom data parts

Add custom data-* parts to the assistant’s response message via chat.response.write() (from run()) or the writer parameter in lifecycle hooks. Non-transient data-* chunks are automatically added to responseMessage.parts and surface in onTurnComplete for persistence:
export const myChat = chat.agent({
  id: "my-chat",
  onBeforeTurnComplete: async ({ writer, turn }) => {
    // This data part will be in responseMessage.parts in onTurnComplete
    writer.write({
      type: "data-metadata",
      data: { turn, model: "gpt-4o", timestamp: Date.now() },
    });
  },
  onTurnComplete: async ({ responseMessage }) => {
    // responseMessage.parts includes the data-metadata part
    await db.messages.save(responseMessage);
  },
  run: async ({ messages, signal }) => {
    // Also works from run() via chat.response
    chat.response.write({
      type: "data-context",
      data: { searchResults: results },
    });

    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
Add transient: true to data chunks that should stream to the frontend but NOT persist in the response message. Use this for progress indicators, loading states, and other temporary UI:
// Transient — frontend sees it, but NOT in onTurnComplete's responseMessage
writer.write({
  type: "data-progress",
  id: "search",
  data: { percent: 50 },
  transient: true,
});
This matches the AI SDK’s semantics: data-* chunks persist to message.parts by default. Only transient: true chunks are ephemeral. Non-data chunks (text-delta, tool-*, etc.) are handled by streamText and captured via onFinish — they don’t need chat.response.
chat.response and the writer accumulation behavior work with chat.agent and chat.createSession. If you’re using chat.customAgent, you own the accumulator — see the raw-task example for the manual pattern.

Raw streaming with chat.stream

For low-level stream access (piping from subtasks, reading streams by run ID), use chat.stream. Chunks written via chat.stream go directly to the realtime output — they are NOT accumulated into the response message regardless of the transient flag.
// Raw stream — always ephemeral, never in responseMessage
const { waitUntilComplete } = chat.stream.writer({
  execute: ({ write }) => {
    write({ type: "data-status", data: { message: "Processing..." } });
  },
});
await waitUntilComplete();
Use data-* chunk types (e.g. data-status, data-progress) for custom data. The AI SDK processes these into DataUIPart objects in message.parts on the frontend. Writing the same type + id again updates the existing part instead of creating a new one — useful for live progress.
chat.stream exposes the full stream API:
MethodDescription
chat.stream.writer(options)Write individual chunks via a callback
chat.stream.pipe(stream, options?)Pipe a ReadableStream or AsyncIterable
chat.stream.append(value, options?)Append raw data
chat.stream.read(runId, options?)Read the stream by run ID
For piping streams from subtasks to the parent chat (via target: "root"), see the Sub-agents pattern.

Lifecycle hooks

chat.agent({ ... }) accepts hooks that fire in a fixed order around each turn, plus dedicated suspend/resume hooks. The full reference lives on its own page:
  • Lifecycle hooksonPreload, onChatStart, onValidateMessages, hydrateMessages, onTurnStart, onBeforeTurnComplete, onTurnComplete, onChatSuspend / onChatResume, exitAfterPreloadIdle, plus how ctx plumbs through every callback.
Per-turn order: onValidateMessageshydrateMessagesonChatStart (chat’s first message only) → onTurnStartrun()onBeforeTurnCompleteonTurnComplete.

Using prompts

Use AI Prompts to manage your system prompt as versioned, overridable config. Store the resolved prompt in a lifecycle hook with chat.prompt.set(), then spread chat.toStreamTextOptions() into streamText — it includes the system prompt, model, config, and telemetry automatically.
import { chat } from "@trigger.dev/sdk/ai";
import { prompts } from "@trigger.dev/sdk";
import { streamText, createProviderRegistry } from "ai";
import { anthropic } from "@ai-sdk/anthropic";
import { z } from "zod";

const registry = createProviderRegistry({ anthropic });

const systemPrompt = prompts.define({
  id: "my-chat-system",
  model: "anthropic:claude-sonnet-4-5",
  config: { temperature: 0.7 },
  variables: z.object({ name: z.string() }),
  content: `You are a helpful assistant for {{name}}.`,
});

export const myChat = chat.agent({
  id: "my-chat",
  clientDataSchema: z.object({ userId: z.string() }),
  onChatStart: async ({ clientData }) => {
    const user = await db.user.findUnique({ where: { id: clientData.userId } });
    const resolved = await systemPrompt.resolve({ name: user.name });
    chat.prompt.set(resolved);
  },
  run: async ({ messages, signal }) => {
    return streamText({
      ...chat.toStreamTextOptions({ registry }), // system, model, config, telemetry
      messages,
      abortSignal: signal,
      stopWhen: stepCountIs(15),
    });
  },
});
chat.toStreamTextOptions() returns an object with system, model (resolved via the registry), temperature, and experimental_telemetry — all from the stored prompt. Properties you set after the spread (like a client-selected model) take precedence. Which form to call:
FormUse when
chat.toStreamTextOptions()Default. Wires up prepareStep (compaction, steering, background injection), the stored prompt’s system / model / config, and telemetry metadata.
chat.toStreamTextOptions({ registry })You’re using Prompts with a provider-prefixed model string (e.g. "anthropic:claude-sonnet-4-5"). The registry resolves the prefix to a real model instance via createProviderRegistry({ anthropic, openai, ... }).
chat.toStreamTextOptions({ tools })You want HITL tool approvals — pass the same tools object you give to streamText. The SDK then knows which tool calls need to pause on needsApproval: true.
chat.toStreamTextOptions({ registry, tools })Both of the above.
See Prompts for the full guide — defining templates, variable schemas, dashboard overrides, and the management SDK.

Stop generation

How stop works

Calling stop() from useChat sends a stop signal to the running task via input streams. The task’s streamText call aborts (if you passed signal or stopSignal), but the run stays alive and waits for the next message. The partial response is captured and accumulated normally.

Abort signals

The run function receives three abort signals:
SignalFires whenUse for
signalStop or cancelPass to streamText — handles both cases. Use this in most cases.
stopSignalStop only (per-turn, reset each turn)Custom logic that should only run on user stop, not cancellation
cancelSignalRun cancel, expire, or maxDuration exceededCleanup that should only happen on full cancellation
export const myChat = chat.agent({
  id: "my-chat",
  run: async ({ messages, signal, stopSignal, cancelSignal }) => {
    return streamText({
      model: anthropic("claude-sonnet-4-5"),
      messages,
      abortSignal: signal, // Handles both stop and cancel
      stopWhen: stepCountIs(15),
    });
  },
});
Use signal (the combined signal) in most cases. The separate stopSignal and cancelSignal are only needed if you want different behavior for stop vs cancel.

Detecting stop in callbacks

The onTurnComplete event includes a stopped boolean that indicates whether the user stopped generation during that turn:
export const myChat = chat.agent({
  id: "my-chat",
  onTurnComplete: async ({ chatId, uiMessages, stopped }) => {
    await db.chat.update({
      where: { id: chatId },
      data: { messages: uiMessages, lastStoppedAt: stopped ? new Date() : undefined },
    });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
You can also check stop status from anywhere during a turn using chat.isStopped(). This is useful inside streamText’s onFinish callback where the AI SDK’s isAborted flag can be unreliable (e.g. when using createUIMessageStream + writer.merge()):
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";

export const myChat = chat.agent({
  id: "my-chat",
  run: async ({ messages, signal }) => {
    return streamText({
      model: anthropic("claude-sonnet-4-5"),
      messages,
      abortSignal: signal,
      onFinish: ({ isAborted }) => {
        // isAborted may be false even after stop when using createUIMessageStream
        const wasStopped = isAborted || chat.isStopped();
        if (wasStopped) {
          // handle stop — e.g. log analytics
        }
      },
      stopWhen: stepCountIs(15),
    });
  },
});

Cleaning up aborted messages

When stop happens mid-stream, the captured response message can contain parts in an incomplete state — tool calls stuck in partial-call, reasoning blocks still marked as streaming, etc. These can cause UI issues like permanent spinners. chat.agent automatically cleans up the responseMessage when stop is detected before passing it to onTurnComplete. If you use chat.pipe() manually and capture response messages yourself, use chat.cleanupAbortedParts():
const cleaned = chat.cleanupAbortedParts(rawResponseMessage);
This removes tool invocation parts stuck in partial-call state and marks any streaming text or reasoning parts as done.
Stop signal delivery is best-effort. There is a small race window where the model may finish before the stop signal arrives, in which case the turn completes normally with stopped: false. This is expected and does not require special handling.

Tool approvals

Tools with needsApproval: true pause execution until the user approves or denies via the frontend. Define the tool as normal and pass it to streamTextchat.agent handles the rest:
const sendEmail = tool({
  description: "Send an email. Requires human approval.",
  inputSchema: z.object({ to: z.string(), subject: z.string(), body: z.string() }),
  needsApproval: true,
  execute: async ({ to, subject, body }) => {
    await emailService.send({ to, subject, body });
    return { sent: true };
  },
});

export const myChat = chat.agent({
  id: "my-chat",
  run: async ({ messages, signal }) => {
    return streamText({
      model: anthropic("claude-sonnet-4-5"),
      messages,
      tools: { sendEmail },
      abortSignal: signal,
      stopWhen: stepCountIs(15),
    });
  },
});
When the model calls an approval-required tool, the turn completes with the tool in approval-requested state. After the user approves on the frontend, the updated message is sent back and chat.agent replaces it in the conversation accumulator by matching the message ID. streamText then executes the approved tool and continues. See Tool approvals in the frontend docs for the UI setup.

Persistence

To build a chat app that survives page refreshes you persist two things, both server-side from inside the agent:
  1. Conversation state. Full UIMessage[] keyed by chatId. Written from onTurnStart (so the user message is durable before streaming begins) and onTurnComplete (so the assistant reply lands).
  2. Session state. The transport’s reconnect metadata: publicAccessToken and lastEventId. Written alongside the messages from the same hooks.
Sessions let the transport reconnect to an existing run after a page refresh. Without them, every page load would start a new run, losing the conversation context that was accumulated in the previous run.
For the full per-hook breakdown, race-condition warnings (atomic lastEventId writes, why not to use chat.defer in onTurnStart), token renewal via the accessToken callback, and an end-to-end three-file example, see Database persistence.

Pending messages (steering)

Users can send messages while the agent is executing tool calls. With pendingMessages, these messages are injected between tool-call steps, steering the agent mid-execution:
export const myChat = chat.agent({
  id: "my-chat",
  pendingMessages: {
    shouldInject: ({ steps }) => steps.length > 0,
  },
  run: async ({ messages, signal }) => {
    return streamText({
      ...chat.toStreamTextOptions({ registry }),
      messages,
      tools: {
        /* ... */
      },
      abortSignal: signal,
      stopWhen: stepCountIs(15),
    });
  },
});
On the frontend, the usePendingMessages hook handles sending, tracking, and rendering injection points.
See Pending Messages for the full guide — backend configuration, frontend hook, queuing vs steering, and how injection works with all three chat variants.

Background injection

Inject context from background work into the conversation using chat.inject(). Combine with chat.defer() to run analysis between turns and inject results before the next response — self-review, RAG augmentation, safety checks, etc.
export const myChat = chat.agent({
  id: "my-chat",
  onTurnComplete: async ({ messages }) => {
    chat.defer(
      (async () => {
        const review = await generateObject({
          /* ... */
        });
        if (review.object.needsImprovement) {
          chat.inject([
            {
              role: "system",
              content: `[Self-review]\n${review.object.suggestions.join("\n")}`,
            },
          ]);
        }
      })()
    );
  },
  run: async ({ messages, signal }) => {
    return streamText({ ...chat.toStreamTextOptions({ registry }), messages, abortSignal: signal });
  },
});
See Background Injection for the full guide — timing, self-review example, and how it differs from pending messages.

Actions

Custom actions let the frontend send structured commands (undo, rollback, edit, regenerate) that modify the conversation state. Actions are not turns: they fire hydrateMessages (if set) and onAction only. The full surface (defining actionSchema, returning a model response from onAction, gating against pending HITL tool calls, and sending actions from the frontend) lives on its own page. See Actions.

Chat history

Imperative API for reading and modifying the accumulated message history. Works from any hook (onAction, onTurnStart, onBeforeTurnComplete, onTurnComplete, hydrateMessages) or from run() and AI SDK tools.
The agent’s accumulator — not session.out — is the source of truth for the full conversation. The .out stream is a bounded sliding window (roughly one turn at steady state, see Records on session.out); the durable history lives in the agent’s accumulator and is persisted to S3 between turns for fast next-run boots. chat.history reads and mutates that accumulator directly.
Reads. Synchronous against the current accumulator state.
MethodDescription
chat.history.all()Returns a copy of the current accumulated UI messages.
chat.history.getChain()Same as all(). Use whichever name reads better in context.
chat.history.findMessage(messageId)Returns the message with that id, or undefined.
chat.history.getPendingToolCalls()Tool calls on the most recent assistant message that are still in input-available state (waiting on addToolOutput).
chat.history.getResolvedToolCalls()All tool calls in the chain in output-available or output-error state.
chat.history.extractNewToolResults(message)Tool results in message whose toolCallId is not already resolved in the chain. Most useful in hydrateMessages against an incoming wire message, before the runtime merges it.
Each pending and resolved entry is shaped { toolCallId, toolName, messageId }. Each new-result entry is { toolCallId, toolName, output, errorText? }, where errorText is set only for output-error parts. Mutations. Applied at lifecycle checkpoints (after hooks return). Multiple mutations in the same hook compose correctly.
MethodDescription
chat.history.set(messages)Replace all messages. Same as chat.setMessages().
chat.history.remove(messageId)Remove a specific message by ID.
chat.history.rollbackTo(messageId)Keep messages up to and including the given ID (undo).
chat.history.replace(messageId, message)Replace a specific message by ID (edit).
chat.history.slice(start, end?)Keep only messages in the given range.
// Undo the last exchange in onAction
onAction: async ({ action }) => {
  if (action.type === "undo") {
    chat.history.slice(0, -2);
  }
},

// Trim history in onTurnComplete
onTurnComplete: async ({ uiMessages }) => {
  if (uiMessages.length > 50) {
    chat.history.slice(-20);
  }
},
The HITL reads let an action or hook decide what to do without walking the accumulator manually:
// Refuse a regenerate while a tool call is still awaiting an answer
onAction: async ({ action }) => {
  if (action.type === "regenerate") {
    if (chat.history.getPendingToolCalls().length > 0) return;
    chat.history.slice(0, -1);
  }
},

// Side-effect once per net-new tool result when wire messages come in
hydrateMessages: async ({ incomingMessages }) => {
  for (const msg of incomingMessages) {
    for (const r of chat.history.extractNewToolResults(msg)) {
      await onToolResolved({ id: r.toolCallId, output: r.output, errorText: r.errorText });
    }
  }
  return incomingMessages;
},
extractNewToolResults compares against the current chain. Inside onTurnComplete, the chain already contains the just-finished responseMessage, so it returns []. Use it where the message is from outside the accumulator: hydrateMessages (incoming wire), onAction if the action carries a message, or any custom pre-merge code path.

prepareMessages

Transform model messages before they’re used anywhere — in run(), in compaction rebuilds, and in compaction results. Define once, applied everywhere. Use this for Anthropic cache breaks, injecting system context, stripping PII, etc.
export const myChat = chat.agent({
  id: "my-chat",
  prepareMessages: ({ messages, reason }) => {
    // Add Anthropic cache breaks to the last message
    if (messages.length === 0) return messages;
    const last = messages[messages.length - 1];
    return [
      ...messages.slice(0, -1),
      {
        ...last,
        providerOptions: {
          ...last.providerOptions,
          anthropic: { cacheControl: { type: "ephemeral" } },
        },
      },
    ];
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
The reason field tells you why messages are being prepared:
ReasonDescription
"run"Messages being passed to run() for streamText
"compaction-rebuild"Rebuilding from a previous compaction summary
"compaction-result"Fresh compaction just produced these messages

Version upgrades

Chat agent runs are pinned to the worker version they started on. When you deploy a new version, suspended runs resume on the old code. Call chat.requestUpgrade() in onTurnStart to skip run() and exit immediately — the transport re-triggers the same message on the latest version. See the Version Upgrades pattern for the full guide.

Ending a run on your terms

By default, a chat agent stays idle after each turn waiting for the next user message. Call chat.endRun() from run(), chat.defer(), onBeforeTurnComplete, or onTurnComplete to exit the loop once the current turn finishes — no upgrade signal, no idle wait.
chat.agent({
  id: "one-shot",
  run: async ({ messages, signal }) => {
    // Single-response agent — exit after this turn.
    chat.endRun();
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
The current turn streams through normally, onBeforeTurnComplete / onTurnComplete fire, the turn-complete chunk is written, and the run exits instead of suspending. The next user message on the same chatId starts a fresh run via the standard continuation flow. Use this when the agent knows its work is done (budget exhausted, goal achieved, one-shot response) rather than relying on the idle timeout. Unlike chat.requestUpgrade(), no upgrade-required signal is sent to the client, so there’s no version-migration semantics.

Runtime configuration

chat.setTurnTimeout()

Override how long the run stays suspended waiting for the next message. Call from inside run():
run: async ({ messages, signal }) => {
  chat.setTurnTimeout("2h"); // Wait longer for this conversation
  return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
},

chat.setIdleTimeoutInSeconds()

Override how long the run stays idle (active, using compute) after each turn:
run: async ({ messages, signal }) => {
  chat.setIdleTimeoutInSeconds(60); // Stay idle for 1 minute
  return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
},
Longer idle timeout means faster responses but more compute usage. Set to 0 to suspend immediately after each turn (minimum latency cost, slight delay on next message).

Stream options

Control how streamText results are converted to the frontend stream via toUIMessageStream(). Set static defaults on the task, or override per-turn.
Error handling with onError
When streamText encounters an error mid-stream (rate limits, API failures, network errors), the onError callback converts it to a string that’s sent to the frontend as an { type: "error", errorText } chunk. The AI SDK’s useChat receives this via its onError callback. By default, the raw error message is sent to the frontend. Use onError to sanitize errors and avoid leaking internal details:
export const myChat = chat.agent({
  id: "my-chat",
  uiMessageStreamOptions: {
    onError: (error) => {
      // Log the full error server-side for debugging
      console.error("Stream error:", error);
      // Return a sanitized message — this is what the frontend sees
      if (error instanceof Error && error.message.includes("rate limit")) {
        return "Rate limited — please wait a moment and try again.";
      }
      return "Something went wrong. Please try again.";
    },
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
onError is also called for tool execution errors, so a single handler covers both LLM errors and tool failures. On the frontend, handle the error in useChat:
const { messages, sendMessage } = useChat({
  transport,
  onError: (error) => {
    // error.message contains the string returned by your onError handler
    toast.error(error.message);
  },
});
Reasoning and sources
Control which AI SDK features are forwarded to the frontend:
export const myChat = chat.agent({
  id: "my-chat",
  uiMessageStreamOptions: {
    sendReasoning: true, // Forward model reasoning (default: true)
    sendSources: true, // Forward source citations (default: false)
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
Custom message IDs
By default, response message IDs are generated using the AI SDK’s built-in generateId. Pass a custom generateMessageId function to use your own ID format (e.g. UUID-v7):
import { v7 as uuidv7 } from "uuid";

export const myChat = chat.agent({
  id: "my-chat",
  uiMessageStreamOptions: {
    generateMessageId: () => uuidv7(),
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
With the .withUIMessage() builder, set it under streamOptions:
import { v7 as uuidv7 } from "uuid";

export const myChat = chat
  .withUIMessage<MyChatUIMessage>({
    streamOptions: {
      generateMessageId: () => uuidv7(),
      sendReasoning: true,
    },
  })
  .agent({
    id: "my-chat",
    run: async ({ messages, signal }) => {
      return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
    },
  });
The generated ID is sent to the frontend in the stream’s start chunk, so frontend and backend always reference the same ID for each message. This is important for features like tool approvals, where the frontend resends an assistant message and the backend needs to match it by ID in the conversation accumulator.
Per-turn overrides
Override per-turn with chat.setUIMessageStreamOptions() — per-turn values merge with the static config (per-turn wins on conflicts). The override is cleared automatically after each turn.
run: async ({ messages, clientData, signal }) => {
  // Enable reasoning only for certain models
  if (clientData.model?.includes("claude")) {
    chat.setUIMessageStreamOptions({ sendReasoning: true });
  }
  return streamText({ model: openai(clientData.model ?? "gpt-4o"), messages, abortSignal: signal });
},
chat.setUIMessageStreamOptions() works across all abstraction levels — chat.agent(), chat.createSession() / turn.complete(), and chat.pipeAndCapture(). See ChatUIMessageStreamOptions for the full reference.
onFinish is managed internally for response capture and cannot be overridden here. Use streamText’s onFinish callback for custom finish handling, or use raw task mode for full control over toUIMessageStream().

Manual mode with task()

If you need full control over task options, use the standard task() with ChatTaskPayload and chat.pipe():
import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskPayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { anthropic } from "@ai-sdk/anthropic";

export const manualChat = task({
  id: "manual-chat",
  retry: { maxAttempts: 3 },
  queue: { concurrencyLimit: 10 },
  run: async (payload: ChatTaskPayload) => {
    const result = streamText({
      model: anthropic("claude-sonnet-4-5"),
      messages: payload.messages,
      stopWhen: stepCountIs(15),
    });

    await chat.pipe(result);
  },
});
Manual mode does not get automatic message accumulation or the onTurnComplete/onChatStart lifecycle hooks. The responseMessage field in onTurnComplete will be undefined when using chat.pipe() directly. Use chat.agent() for the full multi-turn experience.

chat.createSession()

A middle ground between chat.agent() and raw primitives. You get an async iterator that yields ChatTurn objects — each turn handles stop signals, message accumulation, and turn-complete signaling automatically. You control initialization, model/tool selection, persistence, and any custom per-turn logic. Use chat.createSession() inside a standard task():
import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { anthropic } from "@ai-sdk/anthropic";

export const myChat = task({
  id: "my-chat",
  run: async (payload: ChatTaskWirePayload, { signal }) => {
    // One-time initialization — just code, no hooks
    const clientData = payload.metadata as { userId: string };
    await db.chat.create({ data: { id: payload.chatId, userId: clientData.userId } });

    const session = chat.createSession(payload, {
      signal,
      idleTimeoutInSeconds: 60,
      timeout: "1h",
    });

    for await (const turn of session) {
      const result = streamText({
        model: anthropic("claude-sonnet-4-5"),
        messages: turn.messages,
        abortSignal: turn.signal,
        stopWhen: stepCountIs(15),
      });

      // Pipe, capture, accumulate, and signal turn-complete — all in one call
      await turn.complete(result);

      // Persist after each turn
      await db.chat.update({
        where: { id: turn.chatId },
        data: { messages: turn.uiMessages },
      });
    }
  },
});

ChatSessionOptions

OptionTypeDefaultDescription
signalAbortSignalrequiredRun-level cancel signal (from task context)
idleTimeoutInSecondsnumber30Seconds to stay idle between turns
timeoutstring"1h"Duration string for suspend timeout
maxTurnsnumber100Max turns before ending

ChatTurn

Each turn yielded by the iterator provides:
FieldTypeDescription
numbernumberTurn number (0-indexed)
chatIdstringChat session ID
triggerstringWhat triggered this turn
clientDataunknownClient data from the transport
messagesModelMessage[]Full accumulated model messages — pass to streamText
uiMessagesUIMessage[]Full accumulated UI messages — use for persistence
signalAbortSignalCombined stop+cancel signal (fresh each turn)
stoppedbooleanWhether the user stopped generation this turn
continuationbooleanWhether this is a continuation run
MethodDescription
turn.complete(source)Pipe stream, capture response, accumulate, and signal turn-complete
turn.done()Just signal turn-complete (when you’ve piped manually)
turn.addResponse(response)Add a response to the accumulator manually

turn.complete() vs manual control

turn.complete(result) is the easy path — it handles piping, capturing the response, accumulating messages, cleaning up aborted parts, and writing the turn-complete chunk. For more control, you can do each step manually:
for await (const turn of session) {
  const result = streamText({
    model: anthropic("claude-sonnet-4-5"),
    messages: turn.messages,
    abortSignal: turn.signal,
    stopWhen: stepCountIs(15),
  });

  // Manual: pipe and capture separately
  const response = await chat.pipeAndCapture(result, { signal: turn.signal });

  if (response) {
    // Custom processing before accumulating
    await turn.addResponse(response);
  }

  // Custom persistence, analytics, etc.
  await db.chat.update({ ... });

  // Must call done() when not using complete()
  await turn.done();
}

Raw task with primitives

For full control, use a standard task() with the composable primitives from the chat namespace. You manage everything: the turn loop, stop signals, message accumulation, and turn-complete signaling. Raw task mode also lets you call .toUIMessageStream() yourself with any options — including onFinish and originalMessages. This is the right choice when you need complete control over the stream conversion beyond what chat.setUIMessageStreamOptions() provides.

Primitives

PrimitiveDescription
chat.messagesInput stream for incoming messages — use .waitWithIdleTimeout() to wait for the next turn
chat.createStopSignal()Create a managed stop signal wired to the stop input stream
chat.pipeAndCapture(result)Pipe a StreamTextResult to the chat stream and capture the response
chat.writeTurnComplete()Signal the frontend that the current turn is complete
chat.MessageAccumulatorAccumulates conversation messages across turns
chat.pipe(stream)Pipe a stream to the frontend (no response capture)
chat.cleanupAbortedParts(msg)Clean up incomplete parts from a stopped response

Example

import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { anthropic } from "@ai-sdk/anthropic";

export const myChat = task({
  id: "my-chat-raw",
  run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => {
    let currentPayload = payload;

    // Handle preload — wait for the first real message
    if (currentPayload.trigger === "preload") {
      const result = await chat.messages.waitWithIdleTimeout({
        idleTimeoutInSeconds: 60,
        timeout: "1h",
        spanName: "waiting for first message",
      });
      if (!result.ok) return;
      currentPayload = result.output;
    }

    const stop = chat.createStopSignal();
    const conversation = new chat.MessageAccumulator();

    for (let turn = 0; turn < 100; turn++) {
      stop.reset();

      const messages = await conversation.addIncoming(
        currentPayload.messages,
        currentPayload.trigger,
        turn
      );

      const combinedSignal = AbortSignal.any([runSignal, stop.signal]);

      const result = streamText({
        model: anthropic("claude-sonnet-4-5"),
        messages,
        abortSignal: combinedSignal,
        stopWhen: stepCountIs(15),
      });

      let response;
      try {
        response = await chat.pipeAndCapture(result, { signal: combinedSignal });
      } catch (error) {
        if (error instanceof Error && error.name === "AbortError") {
          if (runSignal.aborted) break;
          // Stop — fall through to accumulate partial
        } else {
          throw error;
        }
      }

      if (response) {
        const cleaned =
          stop.signal.aborted && !runSignal.aborted ? chat.cleanupAbortedParts(response) : response;
        await conversation.addResponse(cleaned);
      }

      if (runSignal.aborted) break;

      // Persist, analytics, etc.
      await db.chat.update({
        where: { id: currentPayload.chatId },
        data: { messages: conversation.uiMessages },
      });

      await chat.writeTurnComplete();

      // Wait for the next message
      const next = await chat.messages.waitWithIdleTimeout({
        idleTimeoutInSeconds: 60,
        timeout: "1h",
        spanName: "waiting for next message",
      });
      if (!next.ok) break;
      currentPayload = next.output;
    }

    stop.cleanup();
  },
});

MessageAccumulator

The MessageAccumulator handles the transport protocol automatically:
  • Turn 0: replaces messages (full history from frontend)
  • Subsequent turns: appends new messages (frontend only sends the new user message)
  • Regenerate: replaces messages (full history minus last assistant message)
const conversation = new chat.MessageAccumulator();

// Returns full accumulated ModelMessage[] for streamText
const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn);

// After piping, add the response
const response = await chat.pipeAndCapture(result);
if (response) await conversation.addResponse(response);

// Access accumulated messages for persistence
conversation.uiMessages; // UIMessage[]
conversation.modelMessages; // ModelMessage[]