A Task is a cached unit of work in a Job Run that are logged to the Trigger.dev UI.

Any interaction with an external service (database or API) should be wrapped in a Task. Failing to do so could result in repeated work when runs are resumed.

Why do you need tasks?

Tasks are a key building block of how Trigger.dev works, and failing to use them will result in unpredictable results. Tasks allow bits of work inside a Job Run to be cached and the results of those tasks to be reused.

This is very important because for a Job Run to be resumable (e.g. after a serverless function timeout, or because of a call to io.wait()), we need to call the Job.run function multiple times. If we didn’t cache the results of Tasks, then we would be repeating work on each run.

client.defineJob({
  id: "new-user",
  name: "Run when a new user signs up",
  version: "1.0.0",
  trigger: eventTrigger({
    name: "new.user",
    schema: z.object({
      userId: z.string(),
    }),
  }),
  integrations: {
    resend,
  },
  run: async (payload, io, ctx) => {
    // This code will run twice. Once when the run first starts, and once after the wait
    const user = await prisma.user.findUniqueOrThrow({
      where: { id: payload.userId },
      select: { email: true, name: true },
    });

    // This code will run once, because the resend integration creates a task with the "welcome-email" cacheKey
    await io.resend.sendEmail("welcome-email", {
      to: user.email,
      from: "[email protected]",
      subject: "Welcome!",
      html: welcomeEmail(user.name),
    });

    // This code will run once, because io.wait creates a task with the "wait" cacheKey
    await io.wait("wait", 60 * 60 * 3); // wait for 3 hours

    // This code will run once, because we're manually creating a task with the "my-task" cacheKey
    const response = await io.runTask(
      "my-task",
      async () => {
        return await longRunningCode(payload.userId);
      },
      { name: "My Task" }
    );

    return response;
  },
});

As well as powering the resumable nature of Trigger.dev, Tasks also provide:

  • Retryable – If a Task fails, it can be retried. You can configure how (or if) a Task is retried. Full details in the io.runTask() SDK reference.
  • Logging – Tasks are logged, so you can see what happened in a Run. Find out more about viewing runs.

Task Cache Keys

The first param of all Tasks is a cacheKey. This is a unique identifier for the Task inside that Run. It is used for storing the cached result of a task. It is also used to identify the Task in the Viewing Runs Dashboard.

It’s important that cacheKey’s are unique inside an individual Job Run.

Creating Tasks

There are 3 ways of using tasks in your code:

Using io.runTask()

The io.runTask() function allows you to run a Task manually. It takes a cacheKey and a function to run. The function will only be run if the Task is not already cached.

const response = await io.runTask("my-task", async (task) => {
  return await longRunningCode(payload.userId);
});

The callback function is passed a task object, which can be useful for providing an idempotency key to an external service. For example, Stripe:

Our Stripe Integration handles this for you automatically, this is just for documentation purposes

const stripe = new Stripe(process.env.STRIPE_SECRET_KEY, {
  apiVersion: "2020-08-27",
});

await io.runTask("create-customer", async (task) => {
  await stripe.customers.create(
    {
      email: "[email protected]",
    },
    {
      idempotencyKey: task.idempotencyKey,
    }
  );
});

runTask also takes an optional 3rd argument, which allows you to customize how the Task is displayed and run. For example, you can supply a name and some properties to be displayed in the Viewing Runs Dashboard:

const response = await io.runTask(
  "my-task",
  async (task) => {
    return await longRunningCode(payload.userId);
  },
  {
    name: "My Task",
    properties: [
      {
        label: "User ID",
        value: payload.userId,
      },
    ],
    icon: "user",
  }
);

See the io.runTask() SDK reference for more information.

Using io.integration.runTask()

All of our Integration packages expose a runTask() function. The main differences between this and io.runTask() are:

  • Adds an additional callback parameter which provides the underlying authenticated integration client
  • Automatically sets the icon property on the Task.
  • Configures sensible defaults for retries and error handling.

An example here demonstrates using the GitHub integration’s runTask function to create a project card when a new user signs up:

import { Github } from "@trigger.dev/github";

const github = new Github({
  id: "github",
});

client.defineJob({
  id: "create-project-card",
  name: "Create Project Card",
  version: "1.0.0",
  trigger: eventTrigger({
    name: "new.user",
  }),
  integrations: {
    github,
  },
  run: async (payload, io, ctx) => {
    await io.github.runTask(
      "create-card",
      async (client, task) => {
        // client is an authenticated GitHub client (https://github.com/octokit/octokit.js)
        return client.rest.projects.createCard({
          column_id: process.env.GITHUB_PROJECT_COLUMN_ID,
          note: `New User ${payload.user.name} signed up!`,
        });
      },
      { name: "Create card" }
    );
  },
});

Using an Integration Task Wrapper Function

Our Integration packages also expose a number of task wrapper functions. These are functions that wrap a common task for that integration. For example, the Slack integration exposes a postMessage() function:

import { Slack } from "@trigger.dev/slack";

const slack = new Slack({
  id: "slack",
});

client.defineJob({
  id: "send-welcome-message",
  name: "Send welcome message",
  version: "1.0.0",
  trigger: eventTrigger({
    name: "new.user",
  }),
  integrations: {
    slack,
  },
  run: async (payload, io, ctx) => {
    await io.slack.postMessage("send-message", {
      channel: process.env.SLACK_CHANNEL_ID,
      text: `New user ${payload.user.name} signed up!`,
    });
  },
});

All task wrapper functions take a cacheKey as the first argument, because they are Tasks under the hood. Think of them as a convenience wrapper around io.runTask().

We strive to document all of the task wrapper functions in our Integration packages. For example, checkout our GitHub integration task docs.

Subtasks

You can break up a task into multiple subtasks. This is useful for breaking up a long-running task into smaller chunks, while consolidating the logging into a single task in the dashboard with children.

We currently support nesting up to 5 levels
const response = await io.runTask("parent-task", async (task) => {
  await io.runTask("child-1", async () => {
    // do something
  });

  await io.runTask("child-2", async () => {
    // do something
  });
});

Task cacheKey’s are automatically scoped to the parent task. So for example, you can reuse a cacheKey inside a parent task and it will not conflict with another top-level task.

const response = await io.runTask("parent-task", async (task) => {
  await io.runTask("child-1", async () => {
    // do something
  });

  await io.runTask("child-2", async () => {
    // do something
  });
});

// This will not conflict with the child-1 task above
const response = await io.runTask("child-1", async (task) => {
  // do something
});

Extracting Common Tasks

Subtasks allow you to DRY up any repeating task code into a single function. For example, if you have a common task that sends a welcome email, you can extract that into a function:

const sendWelcomeEmail = async (cacheKey: string, io: IO, resend: Resend, userId: string) => {
  return await io.runTask(cacheKey, async () => {
    const user = await io.runTask("fetch-user", async () => {
      return prisma.user.findUniqueOrThrow({
        where: { id: userId },
        select: { email: true, name: true },
      });
    });

    await io.resend.sendEmail("📧", {
      to: user.email,
      from: "[email protected]",
      subject: "Welcome!",
      html: welcomeEmail(user.name),
    });
  });
};

client.defineJob({
  id: "new-user",
  name: "Run when a new user signs up",
  version: "1.0.0",
  trigger: eventTrigger({
    name: "new.user",
    schema: z.object({
      userId: z.string(),
    }),
  }),
  integrations: {
    resend,
  },
  run: async (payload, io, ctx) => {
    await sendWelcomeEmail("🫡", io, io.resend, payload.userId);
  },
});

Always make sure you are allow passing a unique cacheKey to the runTask function, so the tasks inside the function are not accidentally reused.

Limitations

A single task has an upper-bound on it’s execution duration, which must be less than the serverless function execution timeout of your deployed platform. For more information see our Limits docs

References