Triggers
Polling triggers aren’t currently supported. They will allow triggers to be built for APIs that don’t have webhooks, like Notion.
Webhooks AKA ExternalSource
Webhooks are added to an Integration by using ExternalSource
.
An example: Stripe onPriceCreated
You can subscribe to many different events using our Stripe integration.
We’re going to run through the steps required to create the onPriceCreated event.
Here’s an example Job using it, you should create Jobs like this while you’re developing so you can test your trigger. See the integration testing guide for more info.
import { Stripe } from "@trigger.dev/stripe";
const stripe = new Stripe({
id: "stripe",
apiKey: process.env["STRIPE_API_KEY"]!,
});
client.defineJob({
id: "stripe-on-price",
name: "Stripe On Price",
version: "0.1.0",
//this is what we're going to add in this guide
trigger: stripe.onPriceCreated(),
run: async (payload, io, ctx) => {
//do stuff with the payload
await io.logger.info("price created!", { current: payload.currency });
},
});
Creating Tasks that list/create/delete/update webhooks
We need to create Tasks that will be used to register webhooks. These will be used in the ExternalSource.
These are created just like normal Tasks, which we’ve already covered. In this case we’re creating three tasks:
io.stripe.webhookEndpoints.create
io.stripe.webhookEndpoints.update
io.stripe.webhookEndpoints.delete
io.stripe.webhookEndpoints.list
//...
import { WebhookEndpoints } from "./webhookEndpoints";
export class Stripe implements TriggerIntegration {
//...
get webhookEndpoints() {
return new WebhookEndpoints(this.runTask.bind(this));
}
//...
}
Creating events
An event is something that happens in the external system. In this case, it’s a price being created. We use EventSpecification
s to define events.
We have many events exported from this file, but here’s the one we care about:
export const onPriceCreated: EventSpecification<OnPriceEvent> = {
//this name matches the name of the event in the Stripe API
name: "price.created",
//used for display in the Dashboard UI
title: "On Price Created",
source: "stripe.com",
icon: "stripe",
//examples appear in the Testing UI in the Dashboard. They're optional but they make the DX really nice :)
examples: [
{
id: "recurring",
name: "Recurring Price",
icon: "stripe",
payload: {
id: "price_1NYV6vI0XSgju2urKsSmI53v",
object: "price",
//...
},
},
],
//you can just cast the payload to the type you want here
parsePayload: (payload) => payload as OnPriceEvent,
//these properties are displayed in the Dashboard UI
runProperties: (payload) => [{ label: "Price ID", text: payload.id }],
};
Adding a Trigger
We import all the events from the file above, and then we can add a trigger for the event we want.
//... other imports
//this imports all the events from the file above, as an Object
import * as events from "./events";
export class Stripe implements TriggerIntegration {
//...
//the source is used to register webhooks and to process the data when received
get source() {
return createWebhookEventSource(this);
}
//the actual trigger. This one has some configuration options (the optional params)
onPriceCreated(params?: { connect?: boolean; filter?: EventFilter }) {
return createTrigger(this.source, events.onPriceCreated, params ?? { connect: false });
}
}
//this creates a type that is the union of all the events
type StripeEvents = (typeof events)[keyof typeof events];
//this is the type of the trigger we're creating
type CreateTriggersResult<TEventSpecification extends StripeEvents> = ExternalSourceTrigger<
TEventSpecification,
ReturnType<typeof createWebhookEventSource>
>;
function createTrigger<TEventSpecification extends StripeEvents>(
source: ReturnType<typeof createWebhookEventSource>,
event: TEventSpecification,
params: TriggerParams
): CreateTriggersResult<TEventSpecification> {
return new ExternalSourceTrigger({
event,
params,
source,
options: {},
});
}
Creating the ExternalSource
The ExternalSource is response for registering webhooks and processing the data when it’s received.
//...everything we've already covered
//this defines the shape of the data that Stripe returns when we create/update a webhook
const WebhookDataSchema = z.object({
id: z.string(),
object: z.literal("webhook_endpoint"),
api_version: z.string().nullable(),
application: z.string().nullable(),
created: z.number(),
description: z.string().nullable(),
enabled_events: z.array(z.string()),
livemode: z.boolean(),
metadata: z.record(z.string()),
status: z.enum(["enabled", "disabled"]),
url: z.string(),
});
function createWebhookEventSource(
//the integration is used to register the webhook
integration: Stripe
// { connect?: boolean } comes through from the params in the ExternalSourceTrigger we defined above
): ExternalSource<Stripe, { connect?: boolean }, "HTTP", {}> {
return new ExternalSource("HTTP", {
//this needs to be unique in Trigger.dev
//there's only one stripe webhook endpoint (for all events), so we use "stripe.webhook"
id: "stripe.webhook",
//this is the schema for the params that come through from the ExternalSourceTrigger
schema: z.object({ connect: z.boolean().optional() }),
version: "0.1.0",
integration,
//if the key is the same then a webhook will be updated (with any new events added)
//if the key is different then a new webhook will be created
//in Stripe's case we can have webhooks with multiple events BUT not shared between connect and non-connect
key: (params) => `stripe.webhook${params.connect ? ".connect" : ""}`,
//the webookHandler is called when the webhook is received, this is the last step we'll cover
handler: webhookHandler,
//this function is called when the webhook is registered
register: async (event, io, ctx) => {
const { params, source: httpSource, options } = event;
//httpSource.data is the stored data about the existing webhooks that has the same key
//httpSource.data will be undefined if no webhook has been registered yet with the same key
const webhookData = WebhookDataSchema.safeParse(httpSource.data);
//this is the full list of events that we want to register (when we add more than just onPriceCreated)
const allEvents = Array.from(new Set([...options.event.desired, ...options.event.missing]));
const registeredOptions = {
event: allEvents,
};
//if there is already an active source (i.e. a webhook has been registered)
//and httpSource.data was parsed successfully
if (httpSource.active && webhookData.success) {
//there are no missing events, so we don't need to update the webhook
if (options.event.missing.length === 0) return;
//we want to update the existing webhook with the new events
//this uses the Task we created above
const updatedWebhook = await io.integration.webhookEndpoints.update("update-webhook", {
id: webhookData.data.id,
url: httpSource.url,
enabled_events: allEvents as unknown as WebhookEvents[],
});
//when registering new events, we need to return the data and the options
return {
data: WebhookDataSchema.parse(updatedWebhook),
options: registeredOptions,
};
}
//if there is no active source, or httpSource.data wasn't parsed successfully,
//but we might be able to add events to an existing webhook
const listResponse = await io.integration.webhookEndpoints.list("list-webhooks", {
limit: 100,
});
//if one of these webhooks has the URL we want, we can update it
const existingWebhook = listResponse.data.find((w) => w.url === httpSource.url);
if (existingWebhook) {
//add all the events to the webhook
const updatedWebhook = await io.integration.webhookEndpoints.update(
"update-found-webhook",
{
id: existingWebhook.id,
url: httpSource.url,
enabled_events: allEvents as unknown as WebhookEvents[],
disabled: false,
}
);
//return the data and the registered options
return {
data: WebhookDataSchema.parse(updatedWebhook),
options: registeredOptions,
};
}
//there are no matching webhooks, so we need to create a new one
const webhook = await io.integration.webhookEndpoints.create("create-webhook", {
url: httpSource.url,
enabled_events: allEvents as unknown as WebhookEvents[],
connect: params.connect,
});
//when creating a new webhook, we need to also return the secret that Stripe sends us
//the secret is used to validate the webhook payloads we receive
return {
data: WebhookDataSchema.parse(webhook),
secret: webhook.secret,
options: registeredOptions,
};
},
});
}
Handling the webhook payload
When a webhook is received, we need to validate the signature, parse the payload, and return the events. Each event will turn into a payload that can trigger a Job run.
//...everything we've already covered
//this function is called when a webhook is received
async function webhookHandler(event: HandlerEvent<"HTTP">, logger: Logger) {
logger.debug("[@trigger.dev/stripe] Handling webhook payload");
//The rawEvent is a Request, from this you can get the body, headers, etc.
//The source has info on the ExternalSource that received the webhook
const { rawEvent: request, source } = event;
//no body means no events
if (!request.body) {
logger.debug("[@trigger.dev/stripe] No body found");
return { events: [] };
}
//in Stripe's case, we need to get the text from the request, and we'll use their SDK to get an event
const rawBody = await request.text();
//it's important to verify webhooks payloads because anyone can send data to a URL
const signature = request.headers.get("stripe-signature");
if (signature) {
//The Stripe SDK is used to validate the signature
const stripeClient = new StripeClient("", { apiVersion: "2022-11-15" });
try {
//this will throw an error if the signature is invalid
const event = stripeClient.webhooks.constructEvent(rawBody, signature, source.secret);
return {
//move than one event can be returned, but for most APIs it will just be one
events: [
{
id: event.id,
payload: event.data.object,
source: "stripe.com",
//this name should match the EventSpecification name
name: event.type,
timestamp: new Date(event.created * 1000),
//the context can be any format, it will be passed to the Job run's context.source
context: {
apiVersion: event.api_version,
livemode: event.livemode,
request: event.request,
previousAttributes: event.data.previous_attributes,
},
},
],
//you can also return a response, which will be sent back to the webhook sender
//but this is optional, we return a 200 which is normally the requirement
//response: {
// status: 200,
// body: "ok",
// headers: {
// "Content-Type": "text/plain",
// },
//},
//metadata can be any format, is stored and can be used in the next webhookHandler call
// metadata: {
// requestId: event.request,
// }
};
} catch (error) {
if (error instanceof Error) {
logger.error("[@trigger.dev/stripe] Error while validating webhook signature", {
error: { name: error.name, message: error.message },
});
} else {
logger.error("[@trigger.dev/stripe] Unknown Error while validating webhook signature");
}
return { events: [] };
}
}
//if there's no signature, we can't validate the payload
return {
events: [],
};
}