Loading...
;
}
// streams.openai is an array of TextStreamPart
const toolCall = streams.openai.find(
(stream) => stream.type === "tool-call" && stream.toolName === "getWeather"
);
const toolResult = streams.openai.find((stream) => stream.type === "tool-result");
const textDeltas = streams.openai.filter((stream) => stream.type === "text-delta");
const text = textDeltas.map((delta) => delta.textDelta).join("");
const weatherLocation = toolCall ? toolCall.args.location : undefined;
const weather = toolResult ? toolResult.result.temperature : undefined;
return (
OpenAI response:
{text}
Weather:
{weatherLocation
? `The weather in ${weatherLocation} is ${weather} degrees.`
: "No weather data"}
);
}
```
### Using `toolTask`
As you can see above, we defined a tool which will be used in the `aiStreamingWithTools` task. You can also define a Trigger.dev task that can be used as a tool, and will automatically be invoked with `triggerAndWait` when the tool is called. This is done using the `toolTask` function:
```ts
import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask, toolTask } from "@trigger.dev/sdk/v3";
import { streamText, tool, type TextStreamPart } from "ai";
import { z } from "zod";
export const getWeather = toolTask({
id: "get-weather",
description: "Get the weather for a location",
// Define the parameters for the tool, which becomes the task payload
parameters: z.object({
location: z.string(),
}),
run: async ({ location }) => {
// return mock data
return {
location,
temperature: 72 + Math.floor(Math.random() * 21) - 10,
};
},
});
export type STREAMS = {
// Give the stream a type of TextStreamPart along with the tools
openai: TextStreamPart<{ getWeather: typeof getWeather.tool }>;
};
export const aiStreamingWithTools = schemaTask({
id: "ai-streaming-with-tools",
description: "Stream data from the AI SDK and use tools",
schema: z.object({
model: z.string().default("gpt-4o-mini"),
prompt: z
.string()
.default(
"Based on the temperature, will I need to wear extra clothes today in San Fransico? Please be detailed."
),
}),
run: async ({ model, prompt }) => {
logger.info("Running OpenAI model", { model, prompt });
const result = streamText({
model: openai(model),
prompt,
tools: {
getWeather: getWeather.tool, // pass weatherTask.tool as a tool
},
maxSteps: 5, // Allow streamText to repeatedly call the model
});
// pass the fullStream to the metadata system
const stream = await metadata.stream("openai", result.fullStream);
let text = "";
for await (const chunk of stream) {
logger.log("Received chunk", { chunk });
// chunk is a TextStreamPart
if (chunk.type === "text-delta") {
text += chunk.textDelta;
}
}
return { text };
},
});
```
# runs.subscribeToBatch
Subscribes to all changes for runs in a batch.