> ## Documentation Index
> Fetch the complete documentation index at: https://trigger.dev/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Backend

> Three approaches to building your chat backend — chat.agent(), session iterator, or raw task primitives.

<Warning>
  The AI Agents and Prompts surface ships as part of the **v4.5 release candidate**. Install with `@trigger.dev/sdk@rc` (or pin `4.5.0-rc.0` or later) to use these features — they aren't yet on the latest stable, and APIs may still change before the 4.5.0 GA. See [supported AI SDK versions](/ai-chat/reference#compatibility) and the [AI chat changelog](/ai-chat/changelog) for details.
</Warning>

## chat.agent()

The highest-level approach. Handles message accumulation, stop signals, turn lifecycle, and auto-piping automatically.

### Simple: return a StreamTextResult

Return the `streamText` result from `run` and it's automatically piped to the frontend:

```ts theme={"theme":"css-variables"}
import { chat } from "@trigger.dev/sdk/ai";
import { streamText, stepCountIs } from "ai";
import { anthropic } from "@ai-sdk/anthropic";

export const simpleChat = chat.agent({
  id: "simple-chat",
  run: async ({ messages, signal }) => {
    return streamText({
      ...chat.toStreamTextOptions(), // prepareStep, system, telemetry (see note below)
      model: anthropic("claude-sonnet-4-5"),
      system: "You are a helpful assistant.",
      messages,
      abortSignal: signal,
      stopWhen: stepCountIs(15),
    });
  },
});
```

<Warning>
  **Always spread `chat.toStreamTextOptions()` first** (as above) so your explicit overrides win. It wires up the `prepareStep` callback behind [compaction](/ai-chat/compaction), [steering](/ai-chat/pending-messages), and [background injection](/ai-chat/background-injection), all of which silently no-op without it, and injects the system prompt from `chat.prompt()`, the resolved model (when you pass a `registry`), and telemetry metadata. Examples below keep the spread implicit for brevity, so include it in real code.
</Warning>

### Using chat.pipe() for complex flows

For complex agent flows where `streamText` is called deep inside your code, use `chat.pipe()`. It works from **anywhere inside a task** — even nested function calls.

```ts trigger/agent-chat.ts theme={"theme":"css-variables"}
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { anthropic } from "@ai-sdk/anthropic";
import type { ModelMessage } from "ai";

export const agentChat = chat.agent({
  id: "agent-chat",
  run: async ({ messages }) => {
    // Don't return anything — chat.pipe is called inside
    await runAgentLoop(messages);
  },
});

async function runAgentLoop(messages: ModelMessage[]) {
  // ... agent logic, tool calls, etc.

  const result = streamText({
    model: anthropic("claude-sonnet-4-5"),
    messages,
    stopWhen: stepCountIs(15),
  });

  // Pipe from anywhere — no need to return it
  await chat.pipe(result);
}
```

### Custom data parts

Add custom `data-*` parts to the assistant's response message via `chat.response.write()` (from `run()`) or the `writer` parameter in lifecycle hooks. Non-transient `data-*` chunks are automatically added to `responseMessage.parts` and surface in `onTurnComplete` for persistence:

```ts theme={"theme":"css-variables"}
export const myChat = chat.agent({
  id: "my-chat",
  onBeforeTurnComplete: async ({ writer, turn }) => {
    // This data part will be in responseMessage.parts in onTurnComplete
    writer.write({
      type: "data-metadata",
      data: { turn, model: "gpt-4o", timestamp: Date.now() },
    });
  },
  onTurnComplete: async ({ responseMessage }) => {
    // responseMessage.parts includes the data-metadata part
    await db.messages.save(responseMessage);
  },
  run: async ({ messages, signal }) => {
    // Also works from run() via chat.response
    chat.response.write({
      type: "data-context",
      data: { searchResults: results },
    });

    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
```

Add `transient: true` to data chunks that should stream to the frontend but NOT persist in the response message. Use this for progress indicators, loading states, and other temporary UI:

```ts theme={"theme":"css-variables"}
// Transient — frontend sees it, but NOT in onTurnComplete's responseMessage
writer.write({
  type: "data-progress",
  id: "search",
  data: { percent: 50 },
  transient: true,
});
```

<Info>
  This matches the AI SDK's semantics: `data-*` chunks persist to `message.parts` by default. Only `transient: true` chunks are ephemeral. Non-data chunks (`text-delta`, `tool-*`, etc.) are handled by `streamText` and captured via `onFinish` — they don't need `chat.response`.
</Info>

<Note>
  `chat.response` and the `writer` accumulation behavior work with `chat.agent` and `chat.createSession`. If you're using [`chat.customAgent`](#raw-task-with-primitives), you own the accumulator — see the raw-task example for the manual pattern.
</Note>

### Raw streaming with `chat.stream`

For low-level stream access (piping from subtasks, reading streams by run ID), use `chat.stream`. Chunks written via `chat.stream` go directly to the realtime output — they are **NOT** accumulated into the response message regardless of the `transient` flag.

```ts theme={"theme":"css-variables"}
// Raw stream — always ephemeral, never in responseMessage
const { waitUntilComplete } = chat.stream.writer({
  execute: ({ write }) => {
    write({ type: "data-status", data: { message: "Processing..." } });
  },
});
await waitUntilComplete();
```

<Tip>
  Use `data-*` chunk types (e.g. `data-status`, `data-progress`) for custom data. The AI SDK processes these into `DataUIPart` objects in `message.parts` on the frontend. Writing the same `type` + `id` again updates the existing part instead of creating a new one — useful for live progress.
</Tip>

`chat.stream` exposes the full stream API:

| Method                                | Description                                |
| ------------------------------------- | ------------------------------------------ |
| `chat.stream.writer(options)`         | Write individual chunks via a callback     |
| `chat.stream.pipe(stream, options?)`  | Pipe a `ReadableStream` or `AsyncIterable` |
| `chat.stream.append(value, options?)` | Append raw data                            |
| `chat.stream.read(runId, options?)`   | Read the stream by run ID                  |

For piping streams from subtasks to the parent chat (via `target: "root"`), see the [Sub-agents pattern](/ai-chat/patterns/sub-agents).

### Backed by a Session

Every `chat.agent` conversation is backed by a durable [Session](/ai-chat/sessions): `externalId` is your `chatId`, `type` is `"chat.agent"`, and `taskIdentifier` is the agent's task ID. The session is the run manager. It owns the chat's runs, persists across run lifecycles, and orchestrates handoffs (idle continuation, `chat.requestUpgrade`). You rarely touch it directly, since `chat.stream`, `chat.messages`, and `chat.stopSignal` wrap everything, but `payload.sessionId` is there when you need to reach in, e.g. `sessions.open(payload.sessionId)` to write from a sub-agent or from outside the turn loop.

### Tools

Declare your tools on the agent config, then read them back (typed) from the `run()` payload. Declaring them on the config, not just on `streamText`, is what lets the SDK re-apply each tool's `toModelOutput` when it re-converts history on later turns.

```ts theme={"theme":"css-variables"}
const tools = { searchDocs };

export const myChat = chat.agent({
  id: "my-chat",
  tools,
  run: async ({ messages, tools, signal }) =>
    streamText({
      ...chat.toStreamTextOptions({ tools }),
      model: anthropic("claude-sonnet-4-5"),
      messages,
      abortSignal: signal,
      stopWhen: stepCountIs(15),
    }),
});
```

See [Tools](/ai-chat/tools) for `toModelOutput` across turns, per-turn dynamic tools, the typed run payload, and how config tools relate to skills.

### Lifecycle hooks

`chat.agent({ ... })` accepts hooks that fire in a fixed order around each turn, plus dedicated suspend/resume hooks. The full reference lives on its own page:

* [Lifecycle hooks](/ai-chat/lifecycle-hooks) — `onPreload`, `onChatStart`, `onValidateMessages`, `hydrateMessages`, `onTurnStart`, `onBeforeTurnComplete`, `onTurnComplete`, `onChatSuspend` / `onChatResume`, `exitAfterPreloadIdle`, plus how `ctx` plumbs through every callback.

**Per-turn order:** `onValidateMessages` → `hydrateMessages` → `onChatStart` (chat's first message only) → `onTurnStart` → `run()` → `onBeforeTurnComplete` → `onTurnComplete`.

### Using prompts

Use [AI Prompts](/ai/prompts) to manage your system prompt as versioned, overridable config. Store the resolved prompt in a lifecycle hook with `chat.prompt.set()`, then spread `chat.toStreamTextOptions()` into `streamText` — it includes the system prompt, model, config, and telemetry automatically.

```ts theme={"theme":"css-variables"}
import { chat } from "@trigger.dev/sdk/ai";
import { prompts } from "@trigger.dev/sdk";
import { streamText, createProviderRegistry } from "ai";
import { anthropic } from "@ai-sdk/anthropic";
import { z } from "zod";

const registry = createProviderRegistry({ anthropic });

const systemPrompt = prompts.define({
  id: "my-chat-system",
  model: "anthropic:claude-sonnet-4-5",
  config: { temperature: 0.7 },
  variables: z.object({ name: z.string() }),
  content: `You are a helpful assistant for {{name}}.`,
});

export const myChat = chat.agent({
  id: "my-chat",
  clientDataSchema: z.object({ userId: z.string() }),
  onChatStart: async ({ clientData }) => {
    const user = await db.user.findUnique({ where: { id: clientData.userId } });
    const resolved = await systemPrompt.resolve({ name: user.name });
    chat.prompt.set(resolved);
  },
  run: async ({ messages, signal }) => {
    return streamText({
      ...chat.toStreamTextOptions({ registry }), // system, model, config, telemetry
      messages,
      abortSignal: signal,
      stopWhen: stepCountIs(15),
    });
  },
});
```

`chat.toStreamTextOptions()` returns an object with `system`, `model` (resolved via the registry), `temperature`, and `experimental_telemetry` — all from the stored prompt. Properties you set after the spread (like a client-selected model) take precedence.

**Which form to call:**

| Form                                            | Use when                                                                                                                                                                                                                              |
| ----------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `chat.toStreamTextOptions()`                    | Default. Wires up `prepareStep` (compaction, steering, background injection), the stored prompt's `system` / `model` / `config`, and telemetry metadata.                                                                              |
| `chat.toStreamTextOptions({ registry })`        | You're using [Prompts](/ai/prompts) with a provider-prefixed model string (e.g. `"anthropic:claude-sonnet-4-5"`). The registry resolves the prefix to a real model instance via `createProviderRegistry({ anthropic, openai, ... })`. |
| `chat.toStreamTextOptions({ tools })`           | You want HITL tool approvals — pass the same `tools` object you give to `streamText`. The SDK then knows which tool calls need to pause on `needsApproval: true`.                                                                     |
| `chat.toStreamTextOptions({ registry, tools })` | Both of the above.                                                                                                                                                                                                                    |

<Tip>
  See [Prompts](/ai/prompts) for the full guide — defining templates, variable schemas, dashboard
  overrides, and the management SDK.
</Tip>

### Stop generation

#### How stop works

Calling `stop()` from `useChat` sends a stop signal to the running task via input streams. The task's `streamText` call aborts (if you passed `signal` or `stopSignal`), but the **run stays alive** and waits for the next message. The partial response is captured and accumulated normally.

#### Abort signals

The `run` function receives three abort signals:

| Signal         | Fires when                                  | Use for                                                                |
| -------------- | ------------------------------------------- | ---------------------------------------------------------------------- |
| `signal`       | Stop **or** cancel                          | Pass to `streamText` — handles both cases. **Use this in most cases.** |
| `stopSignal`   | Stop only (per-turn, reset each turn)       | Custom logic that should only run on user stop, not cancellation       |
| `cancelSignal` | Run cancel, expire, or maxDuration exceeded | Cleanup that should only happen on full cancellation                   |

```ts theme={"theme":"css-variables"}
export const myChat = chat.agent({
  id: "my-chat",
  run: async ({ messages, signal, stopSignal, cancelSignal }) => {
    return streamText({
      model: anthropic("claude-sonnet-4-5"),
      messages,
      abortSignal: signal, // Handles both stop and cancel
      stopWhen: stepCountIs(15),
    });
  },
});
```

<Tip>
  Use `signal` (the combined signal) in most cases. The separate `stopSignal` and `cancelSignal` are
  only needed if you want different behavior for stop vs cancel.
</Tip>

#### Detecting stop in callbacks

The `onTurnComplete` event includes a `stopped` boolean that indicates whether the user stopped generation during that turn:

```ts theme={"theme":"css-variables"}
export const myChat = chat.agent({
  id: "my-chat",
  onTurnComplete: async ({ chatId, uiMessages, stopped }) => {
    await db.chat.update({
      where: { id: chatId },
      data: { messages: uiMessages, lastStoppedAt: stopped ? new Date() : undefined },
    });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
```

You can also check stop status from **anywhere** during a turn using `chat.isStopped()`. This is useful inside `streamText`'s `onFinish` callback where the AI SDK's `isAborted` flag can be unreliable (e.g. when using `createUIMessageStream` + `writer.merge()`):

```ts theme={"theme":"css-variables"}
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";

export const myChat = chat.agent({
  id: "my-chat",
  run: async ({ messages, signal }) => {
    return streamText({
      model: anthropic("claude-sonnet-4-5"),
      messages,
      abortSignal: signal,
      onFinish: ({ isAborted }) => {
        // isAborted may be false even after stop when using createUIMessageStream
        const wasStopped = isAborted || chat.isStopped();
        if (wasStopped) {
          // handle stop — e.g. log analytics
        }
      },
      stopWhen: stepCountIs(15),
    });
  },
});
```

#### Cleaning up aborted messages

When stop happens mid-stream, the captured response message can contain parts in an incomplete state — tool calls stuck in `partial-call`, reasoning blocks still marked as `streaming`, etc. These can cause UI issues like permanent spinners.

`chat.agent` automatically cleans up the `responseMessage` when stop is detected before passing it to `onTurnComplete`. If you use `chat.pipe()` manually and capture response messages yourself, use `chat.cleanupAbortedParts()`:

```ts theme={"theme":"css-variables"}
const cleaned = chat.cleanupAbortedParts(rawResponseMessage);
```

This removes tool invocation parts stuck in `partial-call` state and marks any `streaming` text or reasoning parts as `done`.

<Note>
  Stop signal delivery is best-effort. There is a small race window where the model may finish
  before the stop signal arrives, in which case the turn completes normally with `stopped: false`.
  This is expected and does not require special handling.
</Note>

### Tool approvals

Tools with `needsApproval: true` pause execution until the user approves or denies via the frontend. Define the tool as normal and pass it to `streamText` — `chat.agent` handles the rest:

```ts theme={"theme":"css-variables"}
const sendEmail = tool({
  description: "Send an email. Requires human approval.",
  inputSchema: z.object({ to: z.string(), subject: z.string(), body: z.string() }),
  needsApproval: true,
  execute: async ({ to, subject, body }) => {
    await emailService.send({ to, subject, body });
    return { sent: true };
  },
});

export const myChat = chat.agent({
  id: "my-chat",
  run: async ({ messages, signal }) => {
    return streamText({
      model: anthropic("claude-sonnet-4-5"),
      messages,
      tools: { sendEmail },
      abortSignal: signal,
      stopWhen: stepCountIs(15),
    });
  },
});
```

When the model calls an approval-required tool, the turn completes with the tool in `approval-requested` state. After the user approves on the frontend, the updated message is sent back and `chat.agent` replaces it in the conversation accumulator by matching the message ID. `streamText` then executes the approved tool and continues.

See [Tool approvals](/ai-chat/frontend#tool-approvals) in the frontend docs for the UI setup.

### Persistence

To build a chat app that survives page refreshes you persist two things, both server-side from inside the agent:

1. **Conversation state.** Full `UIMessage[]` keyed by `chatId`. Written from `onTurnStart` (so the user message is durable before streaming begins) and `onTurnComplete` (so the assistant reply lands).
2. **Session state.** The transport's reconnect metadata: `publicAccessToken` and `lastEventId`. Written alongside the messages from the same hooks.

<Note>
  Sessions let the transport reconnect to an existing run after a page refresh. Without them, every page load would start a new run, losing the conversation context that was accumulated in the previous run.
</Note>

For the full per-hook breakdown, race-condition warnings (atomic `lastEventId` writes, why not to use `chat.defer` in `onTurnStart`), token renewal via the `accessToken` callback, and an end-to-end three-file example, see [Database persistence](/ai-chat/patterns/database-persistence).

### Pending messages (steering)

Users can send messages while the agent is executing tool calls. With `pendingMessages`, these messages are injected between tool-call steps, steering the agent mid-execution:

```ts theme={"theme":"css-variables"}
export const myChat = chat.agent({
  id: "my-chat",
  pendingMessages: {
    shouldInject: ({ steps }) => steps.length > 0,
  },
  run: async ({ messages, signal }) => {
    return streamText({
      ...chat.toStreamTextOptions({ registry }),
      messages,
      tools: {
        /* ... */
      },
      abortSignal: signal,
      stopWhen: stepCountIs(15),
    });
  },
});
```

On the frontend, the `usePendingMessages` hook handles sending, tracking, and rendering injection points.

<Tip>
  See [Pending Messages](/ai-chat/pending-messages) for the full guide — backend configuration,
  frontend hook, queuing vs steering, and how injection works with all three chat variants.
</Tip>

### Background injection

Inject context from background work into the conversation using `chat.inject()`. Combine with `chat.defer()` to run analysis between turns and inject results before the next response — self-review, RAG augmentation, safety checks, etc.

```ts theme={"theme":"css-variables"}
export const myChat = chat.agent({
  id: "my-chat",
  onTurnComplete: async ({ messages }) => {
    chat.defer(
      (async () => {
        const review = await generateObject({
          /* ... */
        });
        if (review.object.needsImprovement) {
          chat.inject([
            {
              role: "system",
              content: `[Self-review]\n${review.object.suggestions.join("\n")}`,
            },
          ]);
        }
      })()
    );
  },
  run: async ({ messages, signal }) => {
    return streamText({ ...chat.toStreamTextOptions({ registry }), messages, abortSignal: signal });
  },
});
```

<Tip>
  See [Background Injection](/ai-chat/background-injection) for the full guide — timing, self-review
  example, and how it differs from pending messages.
</Tip>

### Actions

Custom actions let the frontend send structured commands (undo, rollback, edit, regenerate) that modify the conversation state. **Actions are not turns**: they fire `hydrateMessages` (if set) and `onAction` only. The full surface (defining `actionSchema`, returning a model response from `onAction`, gating against pending HITL tool calls, and sending actions from the frontend) lives on its own page.

See [Actions](/ai-chat/actions).

### Chat history

Imperative API for reading and modifying the accumulated message history. Works from any hook (`onAction`, `onTurnStart`, `onBeforeTurnComplete`, `onTurnComplete`, `hydrateMessages`) or from `run()` and AI SDK tools.

<Note>
  The agent's accumulator — not `session.out` — is the source of truth for the full conversation. The `.out` stream is a bounded sliding window (roughly one turn at steady state, see [Records on `session.out`](/ai-chat/client-protocol#records-on-session-out)); the durable history lives in the agent's accumulator and is persisted to S3 between turns for fast next-run boots. `chat.history` reads and mutates that accumulator directly.
</Note>

**Reads.** Synchronous against the current accumulator state.

| Method                                        | Description                                                                                                                                                                         |
| --------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `chat.history.all()`                          | Returns a copy of the current accumulated UI messages.                                                                                                                              |
| `chat.history.getChain()`                     | Same as `all()`. Use whichever name reads better in context.                                                                                                                        |
| `chat.history.findMessage(messageId)`         | Returns the message with that id, or `undefined`.                                                                                                                                   |
| `chat.history.getPendingToolCalls()`          | Tool calls on the most recent assistant message that are still in `input-available` state (waiting on `addToolOutput`).                                                             |
| `chat.history.getResolvedToolCalls()`         | All tool calls in the chain in `output-available` or `output-error` state.                                                                                                          |
| `chat.history.extractNewToolResults(message)` | Tool results in `message` whose `toolCallId` is not already resolved in the chain. Most useful in `hydrateMessages` against an incoming wire message, before the runtime merges it. |

Each pending and resolved entry is shaped `{ toolCallId, toolName, messageId }`. Each new-result entry is `{ toolCallId, toolName, output, errorText? }`, where `errorText` is set only for `output-error` parts.

**Mutations.** Applied at lifecycle checkpoints (after hooks return). Multiple mutations in the same hook compose correctly.

| Method                                     | Description                                            |
| ------------------------------------------ | ------------------------------------------------------ |
| `chat.history.set(messages)`               | Replace all messages. Same as `chat.setMessages()`.    |
| `chat.history.remove(messageId)`           | Remove a specific message by ID.                       |
| `chat.history.rollbackTo(messageId)`       | Keep messages up to and including the given ID (undo). |
| `chat.history.replace(messageId, message)` | Replace a specific message by ID (edit).               |
| `chat.history.slice(start, end?)`          | Keep only messages in the given range.                 |

```ts theme={"theme":"css-variables"}
// Undo the last exchange in onAction
onAction: async ({ action }) => {
  if (action.type === "undo") {
    chat.history.slice(0, -2);
  }
},

// Trim history in onTurnComplete
onTurnComplete: async ({ uiMessages }) => {
  if (uiMessages.length > 50) {
    chat.history.slice(-20);
  }
},
```

The HITL reads let an action or hook decide what to do without walking the accumulator manually:

```ts theme={"theme":"css-variables"}
// Refuse a regenerate while a tool call is still awaiting an answer
onAction: async ({ action }) => {
  if (action.type === "regenerate") {
    if (chat.history.getPendingToolCalls().length > 0) return;
    chat.history.slice(0, -1);
  }
},

// Side-effect once per net-new tool result when wire messages come in
hydrateMessages: async ({ incomingMessages }) => {
  for (const msg of incomingMessages) {
    for (const r of chat.history.extractNewToolResults(msg)) {
      await onToolResolved({ id: r.toolCallId, output: r.output, errorText: r.errorText });
    }
  }
  return incomingMessages;
},
```

`extractNewToolResults` compares against the *current* chain. Inside `onTurnComplete`, the chain already contains the just-finished `responseMessage`, so it returns `[]`. Use it where the message is from outside the accumulator: `hydrateMessages` (incoming wire), `onAction` if the action carries a message, or any custom pre-merge code path.

### prepareMessages

Transform model messages before they're used anywhere — in `run()`, in compaction rebuilds, and in compaction results. Define once, applied everywhere.

Use this for Anthropic cache breaks, injecting system context, stripping PII, etc.

```ts theme={"theme":"css-variables"}
export const myChat = chat.agent({
  id: "my-chat",
  prepareMessages: ({ messages, reason }) => {
    // Add Anthropic cache breaks to the last message
    if (messages.length === 0) return messages;
    const last = messages[messages.length - 1];
    return [
      ...messages.slice(0, -1),
      {
        ...last,
        providerOptions: {
          ...last.providerOptions,
          anthropic: { cacheControl: { type: "ephemeral" } },
        },
      },
    ];
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
```

The `reason` field tells you why messages are being prepared:

| Reason                 | Description                                       |
| ---------------------- | ------------------------------------------------- |
| `"run"`                | Messages being passed to `run()` for `streamText` |
| `"compaction-rebuild"` | Rebuilding from a previous compaction summary     |
| `"compaction-result"`  | Fresh compaction just produced these messages     |

### Version upgrades

Chat agent runs are pinned to the worker version they started on. When you deploy a new version, suspended runs resume on the old code. Call `chat.requestUpgrade()` in `onTurnStart` to skip `run()` and exit immediately — the transport re-triggers the same message on the latest version. See the [Version Upgrades pattern](/ai-chat/patterns/version-upgrades) for the full guide.

### Ending a run on your terms

By default, a chat agent stays idle after each turn waiting for the next user message. Call `chat.endRun()` from `run()`, `chat.defer()`, `onBeforeTurnComplete`, or `onTurnComplete` to exit the loop once the current turn finishes — no upgrade signal, no idle wait.

```ts theme={"theme":"css-variables"}
chat.agent({
  id: "one-shot",
  run: async ({ messages, signal }) => {
    // Single-response agent — exit after this turn.
    chat.endRun();
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
```

The current turn streams through normally, `onBeforeTurnComplete` / `onTurnComplete` fire, the turn-complete chunk is written, and the run exits instead of suspending. The next user message on the same `chatId` starts a fresh run via the standard continuation flow.

Use this when the agent knows its work is done (budget exhausted, goal achieved, one-shot response) rather than relying on the idle timeout. Unlike `chat.requestUpgrade()`, no `upgrade-required` signal is sent to the client, so there's no version-migration semantics.

<Warning>
  If you persist `lastEventId` to your own storage for cross-page-load resume, **don't clear it on `chat.endRun()`**. The cursor is sessionId-keyed and stays valid across Run boundaries — clearing it forces the next `sendMessages` to subscribe from `seq_num=0`, where it may hit the prior turn's stale `turn-complete` record and close the stream empty before the new Run's chunks arrive.
</Warning>

### Runtime configuration

#### chat.setTurnTimeout()

Override how long the run stays suspended waiting for the next message. Call from inside `run()`:

```ts theme={"theme":"css-variables"}
run: async ({ messages, signal }) => {
  chat.setTurnTimeout("2h"); // Wait longer for this conversation
  return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
},
```

#### chat.setIdleTimeoutInSeconds()

Override how long the run stays idle (active, using compute) after each turn:

```ts theme={"theme":"css-variables"}
run: async ({ messages, signal }) => {
  chat.setIdleTimeoutInSeconds(60); // Stay idle for 1 minute
  return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
},
```

<Info>
  Longer idle timeout means faster responses but more compute usage. Set to `0` to suspend
  immediately after each turn (minimum latency cost, slight delay on next message).
</Info>

#### Stream options

Control how `streamText` results are converted to the frontend stream via `toUIMessageStream()`. Set static defaults on the task, or override per-turn.

##### Error handling with onError

When `streamText` encounters an error mid-stream (rate limits, API failures, network errors), the `onError` callback converts it to a string that's sent to the frontend as an `{ type: "error", errorText }` chunk. The AI SDK's `useChat` receives this via its `onError` callback.

By default, the raw error message is sent to the frontend. Use `onError` to sanitize errors and avoid leaking internal details:

```ts theme={"theme":"css-variables"}
export const myChat = chat.agent({
  id: "my-chat",
  uiMessageStreamOptions: {
    onError: (error) => {
      // Log the full error server-side for debugging
      console.error("Stream error:", error);
      // Return a sanitized message — this is what the frontend sees
      if (error instanceof Error && error.message.includes("rate limit")) {
        return "Rate limited — please wait a moment and try again.";
      }
      return "Something went wrong. Please try again.";
    },
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
```

`onError` is also called for tool execution errors, so a single handler covers both LLM errors and tool failures.

On the frontend, handle the error in `useChat`:

```tsx theme={"theme":"css-variables"}
const { messages, sendMessage } = useChat({
  transport,
  onError: (error) => {
    // error.message contains the string returned by your onError handler
    toast.error(error.message);
  },
});
```

##### Reasoning and sources

Control which AI SDK features are forwarded to the frontend:

```ts theme={"theme":"css-variables"}
export const myChat = chat.agent({
  id: "my-chat",
  uiMessageStreamOptions: {
    sendReasoning: true, // Forward model reasoning (default: true)
    sendSources: true, // Forward source citations (default: false)
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
```

##### Custom message IDs

By default, response message IDs are generated using the AI SDK's built-in `generateId`. Pass a custom `generateMessageId` function to use your own ID format (e.g. UUID-v7):

```ts theme={"theme":"css-variables"}
import { v7 as uuidv7 } from "uuid";

export const myChat = chat.agent({
  id: "my-chat",
  uiMessageStreamOptions: {
    generateMessageId: () => uuidv7(),
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
  },
});
```

With the `.withUIMessage()` builder, set it under `streamOptions`:

```ts theme={"theme":"css-variables"}
import { v7 as uuidv7 } from "uuid";

export const myChat = chat
  .withUIMessage<MyChatUIMessage>({
    streamOptions: {
      generateMessageId: () => uuidv7(),
      sendReasoning: true,
    },
  })
  .agent({
    id: "my-chat",
    run: async ({ messages, signal }) => {
      return streamText({ model: anthropic("claude-sonnet-4-5"), messages, abortSignal: signal });
    },
  });
```

<Info>
  The generated ID is sent to the frontend in the stream's `start` chunk, so frontend and backend
  always reference the same ID for each message. This is important for features like tool
  approvals, where the frontend resends an assistant message and the backend needs to match it
  by ID in the conversation accumulator.
</Info>

##### Per-turn overrides

Override per-turn with `chat.setUIMessageStreamOptions()` — per-turn values merge with the static config (per-turn wins on conflicts). The override is cleared automatically after each turn.

```ts theme={"theme":"css-variables"}
run: async ({ messages, clientData, signal }) => {
  // Enable reasoning only for certain models
  if (clientData.model?.includes("claude")) {
    chat.setUIMessageStreamOptions({ sendReasoning: true });
  }
  return streamText({ model: openai(clientData.model ?? "gpt-4o"), messages, abortSignal: signal });
},
```

`chat.setUIMessageStreamOptions()` works across all abstraction levels — `chat.agent()`, `chat.createSession()` / `turn.complete()`, and `chat.pipeAndCapture()`.

See [ChatUIMessageStreamOptions](/ai-chat/reference#chatuimessagestreamoptions) for the full reference.

<Note>
  `onFinish` is managed internally for response capture and cannot be overridden here. Use
  `streamText`'s `onFinish` callback for custom finish handling, or use [raw task
  mode](#raw-task-with-primitives) for full control over `toUIMessageStream()`.
</Note>

### Manual mode with task()

If you need full control over task options, use the standard `task()` with `ChatTaskPayload` and `chat.pipe()`:

```ts theme={"theme":"css-variables"}
import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskPayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { anthropic } from "@ai-sdk/anthropic";

export const manualChat = task({
  id: "manual-chat",
  retry: { maxAttempts: 3 },
  queue: { concurrencyLimit: 10 },
  run: async (payload: ChatTaskPayload) => {
    const result = streamText({
      model: anthropic("claude-sonnet-4-5"),
      messages: payload.messages,
      stopWhen: stepCountIs(15),
    });

    await chat.pipe(result);
  },
});
```

<Warning>
  Manual mode does not get automatic message accumulation or the `onTurnComplete`/`onChatStart`
  lifecycle hooks. The `responseMessage` field in `onTurnComplete` will be `undefined` when using
  `chat.pipe()` directly. Use `chat.agent()` for the full multi-turn experience.
</Warning>

***

## chat.createSession()

A middle ground between `chat.agent()` and raw primitives. You get an async iterator that yields `ChatTurn` objects — each turn handles stop signals, message accumulation, and turn-complete signaling automatically. You control initialization, model/tool selection, persistence, and any custom per-turn logic.

Use `chat.createSession()` inside a standard `task()`:

```ts theme={"theme":"css-variables"}
import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { anthropic } from "@ai-sdk/anthropic";

export const myChat = task({
  id: "my-chat",
  run: async (payload: ChatTaskWirePayload, { signal }) => {
    // One-time initialization — just code, no hooks
    const clientData = payload.metadata as { userId: string };
    await db.chat.create({ data: { id: payload.chatId, userId: clientData.userId } });

    const session = chat.createSession(payload, {
      signal,
      idleTimeoutInSeconds: 60,
      timeout: "1h",
    });

    for await (const turn of session) {
      const result = streamText({
        model: anthropic("claude-sonnet-4-5"),
        messages: turn.messages,
        abortSignal: turn.signal,
        stopWhen: stepCountIs(15),
      });

      // Pipe, capture, accumulate, and signal turn-complete — all in one call
      await turn.complete(result);

      // Persist after each turn
      await db.chat.update({
        where: { id: turn.chatId },
        data: { messages: turn.uiMessages },
      });
    }
  },
});
```

### ChatSessionOptions

| Option                 | Type          | Default  | Description                                 |
| ---------------------- | ------------- | -------- | ------------------------------------------- |
| `signal`               | `AbortSignal` | required | Run-level cancel signal (from task context) |
| `idleTimeoutInSeconds` | `number`      | `30`     | Seconds to stay idle between turns          |
| `timeout`              | `string`      | `"1h"`   | Duration string for suspend timeout         |
| `maxTurns`             | `number`      | `100`    | Max turns before ending                     |

### ChatTurn

Each turn yielded by the iterator provides:

| Field          | Type             | Description                                            |
| -------------- | ---------------- | ------------------------------------------------------ |
| `number`       | `number`         | Turn number (0-indexed)                                |
| `chatId`       | `string`         | Chat session ID                                        |
| `trigger`      | `string`         | What triggered this turn                               |
| `clientData`   | `unknown`        | Client data from the transport                         |
| `messages`     | `ModelMessage[]` | Full accumulated model messages — pass to `streamText` |
| `uiMessages`   | `UIMessage[]`    | Full accumulated UI messages — use for persistence     |
| `signal`       | `AbortSignal`    | Combined stop+cancel signal (fresh each turn)          |
| `stopped`      | `boolean`        | Whether the user stopped generation this turn          |
| `continuation` | `boolean`        | Whether this is a continuation run                     |

| Method                       | Description                                                         |
| ---------------------------- | ------------------------------------------------------------------- |
| `turn.complete(source)`      | Pipe stream, capture response, accumulate, and signal turn-complete |
| `turn.done()`                | Just signal turn-complete (when you've piped manually)              |
| `turn.addResponse(response)` | Add a response to the accumulator manually                          |

### turn.complete() vs manual control

`turn.complete(result)` is the easy path — it handles piping, capturing the response, accumulating messages, cleaning up aborted parts, and writing the turn-complete chunk.

For more control, you can do each step manually:

```ts theme={"theme":"css-variables"}
for await (const turn of session) {
  const result = streamText({
    model: anthropic("claude-sonnet-4-5"),
    messages: turn.messages,
    abortSignal: turn.signal,
    stopWhen: stepCountIs(15),
  });

  // Manual: pipe and capture separately
  const response = await chat.pipeAndCapture(result, { signal: turn.signal });

  if (response) {
    // Custom processing before accumulating
    await turn.addResponse(response);
  }

  // Custom persistence, analytics, etc.
  await db.chat.update({ ... });

  // Must call done() when not using complete()
  await turn.done();
}
```

***

## Raw task with primitives

For full control, use a standard `task()` with the composable primitives from the `chat` namespace. You manage everything: the turn loop, stop signals, message accumulation, and turn-complete signaling.

Raw task mode also lets you call `.toUIMessageStream()` yourself with any options — including `onFinish` and `originalMessages`. This is the right choice when you need complete control over the stream conversion beyond what `chat.setUIMessageStreamOptions()` provides.

### Primitives

| Primitive                       | Description                                                                                 |
| ------------------------------- | ------------------------------------------------------------------------------------------- |
| `chat.messages`                 | Input stream for incoming messages — use `.waitWithIdleTimeout()` to wait for the next turn |
| `chat.createStopSignal()`       | Create a managed stop signal wired to the stop input stream                                 |
| `chat.pipeAndCapture(result)`   | Pipe a `StreamTextResult` to the chat stream and capture the response                       |
| `chat.writeTurnComplete()`      | Signal the frontend that the current turn is complete                                       |
| `chat.MessageAccumulator`       | Accumulates conversation messages across turns                                              |
| `chat.pipe(stream)`             | Pipe a stream to the frontend (no response capture)                                         |
| `chat.cleanupAbortedParts(msg)` | Clean up incomplete parts from a stopped response                                           |

### Example

```ts theme={"theme":"css-variables"}
import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { anthropic } from "@ai-sdk/anthropic";

export const myChat = task({
  id: "my-chat-raw",
  run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => {
    let currentPayload = payload;

    // Handle preload — wait for the first real message
    if (currentPayload.trigger === "preload") {
      const result = await chat.messages.waitWithIdleTimeout({
        idleTimeoutInSeconds: 60,
        timeout: "1h",
        spanName: "waiting for first message",
      });
      if (!result.ok) return;
      currentPayload = result.output;
    }

    const stop = chat.createStopSignal();
    const conversation = new chat.MessageAccumulator();

    for (let turn = 0; turn < 100; turn++) {
      stop.reset();

      const messages = await conversation.addIncoming(
        currentPayload.messages,
        currentPayload.trigger,
        turn
      );

      const combinedSignal = AbortSignal.any([runSignal, stop.signal]);

      const result = streamText({
        model: anthropic("claude-sonnet-4-5"),
        messages,
        abortSignal: combinedSignal,
        stopWhen: stepCountIs(15),
      });

      let response;
      try {
        response = await chat.pipeAndCapture(result, { signal: combinedSignal });
      } catch (error) {
        if (error instanceof Error && error.name === "AbortError") {
          if (runSignal.aborted) break;
          // Stop — fall through to accumulate partial
        } else {
          throw error;
        }
      }

      if (response) {
        const cleaned =
          stop.signal.aborted && !runSignal.aborted ? chat.cleanupAbortedParts(response) : response;
        await conversation.addResponse(cleaned);
      }

      if (runSignal.aborted) break;

      // Persist, analytics, etc.
      await db.chat.update({
        where: { id: currentPayload.chatId },
        data: { messages: conversation.uiMessages },
      });

      await chat.writeTurnComplete();

      // Wait for the next message
      const next = await chat.messages.waitWithIdleTimeout({
        idleTimeoutInSeconds: 60,
        timeout: "1h",
        spanName: "waiting for next message",
      });
      if (!next.ok) break;
      currentPayload = next.output;
    }

    stop.cleanup();
  },
});
```

### MessageAccumulator

The `MessageAccumulator` handles the transport protocol automatically:

* Turn 0: replaces messages (full history from frontend)
* Subsequent turns: appends new messages (frontend only sends the new user message)
* Regenerate: replaces messages (full history minus last assistant message)

```ts theme={"theme":"css-variables"}
const conversation = new chat.MessageAccumulator();

// Returns full accumulated ModelMessage[] for streamText
const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn);

// After piping, add the response
const response = await chat.pipeAndCapture(result);
if (response) await conversation.addResponse(response);

// Access accumulated messages for persistence
conversation.uiMessages; // UIMessage[]
conversation.modelMessages; // ModelMessage[]
```
