October 17, 2024

Learn how to use Trigger.dev with Supabase to create event-driven workflows. We'll cover triggering tasks from Edge Functions, performing database operations, and building a video processing pipeline with AI transcription.

Image for Trigger anything from a database change using Supabase with Trigger.dev

Introduction

In this article we'll be learning how to use Trigger.dev with Supabase:

What are the benefits of using Trigger.dev with Supabase?

Supabase is an open-source Firebase alternative, providing a backend-as-a-service with a full suite of tools you can use in your projects, including a Postgres-compatible relational database, Database Webhooks, Edge Functions, authentication, instant APIs, realtime and storage.

Used in conjunction with Trigger.dev, you can trigger tasks from database events, and then offload the execution to Trigger.dev’s elastic infrastructure.

This makes it easy to build event-driven workflows, written in normal code, that interact with your application, database and external services.


_16
// Add a new user
_16
export const supabaseDatabaseInsert = task({
_16
id: "add-new-user",
_16
run: async (payload: { userId: string }) => {
_16
const { userId } = payload;
_16
_16
// Insert a new row into the user_subscriptions table with the provided userId
_16
const { error } = await supabase.from("user_subscriptions").insert({
_16
user_id: userId,
_16
});
_16
_16
return {
_16
message: `New user added successfully: ${userId}`,
_16
};
_16
},
_16
});

Every run is observable in the Trigger.dev dashboard, giving you live trace views and detailed logs to help with development and debugging.

Runs table

Unlocking the power of Supabase and Trigger.dev

We have created a series of examples and guides which cover a range of common use cases. They can be extended and modified to get the best out of Supabase and Trigger.dev.

Triggering tasks from Supabase Edge Functions

Supabase Edge functions are functions that run on the edge, close to your users. They are executed in a secure environment and have access to the same APIs as your application. They can be used to trigger a task from an external event such as a webhook, or from a database event with the payload passed through to the task.

This very basic implementation demonstrates how to trigger a 'Hello World' task from a Supabase Edge Function when the Edge Function URL is visited.


_10
Deno.serve(async () => {
_10
await tasks.trigger<typeof helloWorldTask>(
_10
// Your task id
_10
"hello-world",
_10
// Your task payload
_10
"Hello from a Supabase Edge Function!"
_10
);
_10
return new Response("OK");
_10
});

To learn how to build this example yourself, check out our Triggering tasks from Supabase Edge Functions guide.

Performing database operations

You can also run CRUD operations directly from your tasks. This is useful for updating your database in response to an event, e.g. a user signing up, or adding data to a database on a repeating schedule, e.g. every day at 10am, etc.

The task below demonstrates updating a user's subscription in a table by either inserting a new row or updating an existing one with the new plan, depending on whether the user already has a subscription.

It takes a payload with a userId and newPlan, and updates the user_subscriptions table in Supabase with the new plan.


_54
export const supabaseUpdateUserSubscription = task({
_54
id: "update-user-subscription",
_54
run: async (payload: { userId: string; newPlan: PlanType }) => {
_54
const { userId, newPlan } = payload;
_54
_54
// Abort the task run without retrying if the new plan type is invalid
_54
if (!["hobby", "pro", "enterprise"].includes(newPlan)) {
_54
throw new AbortTaskRunError(
_54
`Invalid plan type: ${newPlan}. Allowed types are 'hobby', 'pro', or 'enterprise'.`
_54
);
_54
}
_54
_54
// Query the user_subscriptions table to check if the user already has a subscription
_54
const { data: existingSubscriptions } = await supabase
_54
.from("user_subscriptions")
_54
.select("user_id")
_54
.eq("user_id", userId);
_54
_54
if (!existingSubscriptions || existingSubscriptions.length === 0) {
_54
// If there are no existing users with the provided userId and plan, insert a new row
_54
const { error: insertError } = await supabase
_54
.from("user_subscriptions")
_54
.insert({
_54
user_id: userId,
_54
plan: newPlan,
_54
updated_at: new Date().toISOString(),
_54
});
_54
_54
// If there was an error inserting the new subscription, throw an error
_54
if (insertError) {
_54
throw new Error(
_54
`Failed to insert user subscription: ${insertError.message}`
_54
);
_54
}
_54
} else {
_54
// If the user already has a subscription, update their existing row
_54
const { error: updateError } = await supabase
_54
.from("user_subscriptions")
_54
// Set the plan to the new plan and update the timestamp
_54
.update({ plan: newPlan, updated_at: new Date().toISOString() })
_54
.eq("user_id", userId);
_54
_54
// If there was an error updating the subscription, throw an error
_54
if (updateError) {
_54
throw new Error(
_54
`Failed to update user subscription: ${updateError.message}`
_54
);
_54
}
_54
}
_54
_54
// Return an object with the userId and newPlan
_54
return { userId, newPlan };
_54
},
_54
});

Interacting with Supabase storage

Supabase Storage can be used to store images, videos, or any other type of file.

Tasks using Supabase Storage can be useful for media processing, such as transcoding videos, resizing images, AI manipulations, etc.

This task demonstrates uploading a video from a videoUrl to a storage bucket using the Supabase client:


_27
export const supabaseStorageUpload = task({
_27
id: "supabase-storage-upload",
_27
run: async (payload: { videoUrl: string }) => {
_27
const { videoUrl } = payload;
_27
_27
const bucket = "my_bucket";
_27
const objectKey = `video_${Date.now()}.mp4`;
_27
_27
// Download video data as a buffer
_27
const response = await fetch(videoUrl);
_27
const videoBuffer = await response.buffer();
_27
_27
// Upload the video directly to Supabase Storage
_27
const { error } = await supabase.storage
_27
.from(bucket)
_27
.upload(objectKey, videoBuffer, {
_27
contentType: "video/mp4",
_27
upsert: true,
_27
});
_27
_27
// Return the video object key and bucket
_27
return {
_27
objectKey,
_27
bucket: bucket,
_27
};
_27
},
_27
});

To learn more, check out our Supabase Storage Upload examples in our docs.

Creating a video processing and AI pipeline

You can also trigger tasks from database events, such as a new row being inserted into a table. This adds realtime functionality to your tasks, allowing you to build workflows that respond to changes in your database.

In this more advanced example, we'll show you how to trigger a task when a row is added to a table in a Supabase database, using a Database Webhook and Edge Function.

This task will download a video from a URL, extract the audio using FFmpeg, transcribe the video using Deepgram and then update the table with the new transcription.

Triggering a task from a database event

In Supabase, a database webhook has been created which triggers an Edge Function called video-processing-handler when a new row containing a video_url is inserted into the video_uploads table. The webhook will pass the new row data to the Edge Function as a payload.

This Edge Function triggers a videoProcessAndUpdate task:

video-processing-handler.ts

_16
Deno.serve(async (req) => {
_16
const payload = await req.json();
_16
_16
// This payload will contain the video url and id from the new row in the table
_16
const videoUrl = payload.record.video_url;
_16
const id = payload.record.id;
_16
_16
// Trigger the videoProcessAndUpdate task with the videoUrl payload
_16
await tasks.trigger<typeof videoProcessAndUpdate>(
_16
"video-process-and-update",
_16
{ videoUrl, id }
_16
);
_16
console.log(payload ?? "No name provided");
_16
_16
return new Response("ok");
_16
});

Creating the video processing task

The videoProcessAndUpdate task takes the payload from the Edge Function, and uses it to download the video as a temporary file from the URL, then transcribes the video using Deepgram AI and then updates the video_uploads table in Supabase with the new transcription.

videoProcessAndUpdate.ts

_65
export const videoProcessAndUpdate = task({
_65
id: "video-process-and-update",
_65
run: async (payload: { videoUrl: string; id: number }) => {
_65
const { videoUrl, id } = payload;
_65
_65
const outputPath = path.join(os.tmpdir(), `audio_${Date.now()}.wav`);
_65
const response = await fetch(videoUrl);
_65
_65
// Extract the audio using FFmpeg
_65
await new Promise((resolve, reject) => {
_65
if (!response.body) {
_65
return reject(new Error("Failed to fetch video"));
_65
}
_65
_65
ffmpeg(Readable.from(response.body))
_65
.outputOptions([
_65
"-vn", // Disable video output
_65
"-acodec pcm_s16le", // Use PCM 16-bit little-endian encoding
_65
"-ar 44100", // Set audio sample rate to 44.1 kHz
_65
"-ac 2", // Set audio channels to stereo
_65
])
_65
.output(outputPath)
_65
.on("end", resolve)
_65
.on("error", reject)
_65
.run();
_65
});
_65
_65
logger.log(`Audio extracted from video`, { outputPath });
_65
_65
// Transcribe the audio using Deepgram
_65
const { result, error } = await deepgram.listen.prerecorded.transcribeFile(
_65
fs.readFileSync(outputPath),
_65
{
_65
model: "nova-2", // Use the Nova 2 model
_65
smart_format: true, // Automatically format the transcription
_65
diarize: true, // Enable speaker diarization
_65
}
_65
);
_65
_65
if (error) throw error;
_65
_65
// Extract the transcription from the result
_65
const transcription =
_65
result.results.channels[0].alternatives[0].paragraphs?.transcript;
_65
_65
fs.unlinkSync(outputPath); // Delete the temporary audio file
_65
logger.log(`Temporary audio file deleted`, { outputPath });
_65
_65
const { error: updateError } = await supabase
_65
.from("video_transcriptions")
_65
// Update the transcription and video_url columns
_65
.update({ transcription: transcription, video_url: videoUrl })
_65
// Find the row by its ID
_65
.eq("id", id);
_65
_65
if (updateError) {
_65
throw new Error(`Failed to update transcription: ${updateError.message}`);
_65
}
_65
_65
return {
_65
message: `Summary of the audio: ${transcription}`,
_65
result,
_65
};
_65
},
_65
});

--- Insert video of the run in the dashboard here ---

--- Insert image of the table with the transcription here ---

To build this example yourself, check out our Triggering tasks from database events guide in our docs.

Next steps

In this article we've covered how to trigger tasks from Supabase Edge Functions, perform database operations and interact with Supabase Storage. We've also shown you how to trigger tasks from database events, and build a more complex workflow that interacts with your database and external services.

To follow along with the examples in this article, all our guides and examples can be found in our docs.

If you found this article useful, please consider sharing it with anyone you think would be interested in using Supabase with Trigger.dev.

Sign up to Trigger.dev and get started today.

Ready to start building?

Build and deploy your first task in 3 minutes.

Get started now
,