import { serve } from "@upstash/workflow/nextjs";
import { realtime } from "@/lib/realtime";
import { WorkflowAbort } from "@upstash/workflow";
type WorkflowPayload = {
userId: string;
action: string;
};
export const { POST } = serve<WorkflowPayload>(
async (context) => {
const { userId, action } = context.requestPayload;
const workflowRunId = context.workflowRunId;
// Create a channel based on the workflow run ID
const channel = realtime.channel(workflowRunId);
// Emit run start event
await context.run("start-workflow", () =>
channel.emit("workflow.update", {
type: "runStart",
workflowRunId,
timestamp: Date.now(),
})
);
// Step 1: Data Validation
try {
await context.run("validate-data", async () => {
// Emit step start
await channel.emit("workflow.update", {
type: "stepStart",
workflowRunId,
stepName: "validate-data",
timestamp: Date.now(),
});
// Your validation logic
if (!userId || !action) {
throw new Error("Missing required fields");
}
const result = { valid: true, userId, action };
// sleep 500 ms
await new Promise((resolve) => setTimeout(resolve, 500));
// Emit step completion
await channel.emit("workflow.update", {
type: "stepFinish",
workflowRunId,
stepName: "validate-data",
timestamp: Date.now(),
result,
});
return result;
});
} catch (error) {
if (error instanceof WorkflowAbort) {
throw error;
}
// Emit failure event
await channel.emit("workflow.update", {
type: "stepFail",
workflowRunId,
stepName: "validate-data",
timestamp: Date.now(),
error: error instanceof Error ? error.message : "Unknown error",
});
throw error;
}
// Additional steps follow the same pattern...
// Emit run completion
await channel.emit("workflow.update", {
type: "runFinish",
workflowRunId,
timestamp: Date.now(),
status: "success",
});
return { success: true, workflowRunId };
},
{
// Handle workflow failures
failureFunction: async ({ context }) => {
const workflowRunId = context.workflowRunId;
const channel = realtime.channel(workflowRunId);
await channel.emit("workflow.update", {
type: "runFinish",
workflowRunId,
timestamp: Date.now(),
status: "failed",
error: "Workflow execution failed",
});
},
}
);