import { serve } from "@upstash/workflow/nextjs";
import { realtime } from "@/lib/realtime";
type WorkflowPayload = {
userId: string;
action: string;
};
export const { POST } = serve<WorkflowPayload>(
async (context) => {
const { userId, action } = context.requestPayload;
const workflowRunId = context.workflowRunId;
const channel = realtime.channel(workflowRunId);
// Emit run start
await context.run("start-workflow", () =>
channel.emit("workflow.update", {
type: "runStart",
workflowRunId,
timestamp: Date.now(),
})
);
// Step 1: Initial Processing
await context.run("initial-processing", async () => {
await channel.emit("workflow.update", {
type: "stepStart",
workflowRunId,
stepName: "initial-processing",
timestamp: Date.now(),
});
// Your processing logic
const result = {
preprocessed: true,
userId,
action,
requiresApproval: true
};
await channel.emit("workflow.update", {
type: "stepFinish",
workflowRunId,
stepName: "initial-processing",
timestamp: Date.now(),
result,
});
return result;
});
// Step 2: Wait for Human Approval
const eventId = `approval-${workflowRunId}`;
const [{ eventData, timeout }] = await Promise.all([
// Wait for approval event
context.waitForEvent<{ approved: boolean }>(
"wait-for-approval",
eventId,
{ timeout: "5m" }
),
// Notify frontend that we're waiting
context.run("notify-waiting", () =>
channel.emit("workflow.update", {
type: "waitingForInput",
workflowRunId,
eventId,
message: `Waiting for approval to process action: ${action}`,
timestamp: Date.now(),
})
),
]);
// Handle timeout
if (timeout) {
await channel.emit("workflow.update", {
type: "stepFail",
workflowRunId,
stepName: "wait-for-approval",
timestamp: Date.now(),
error: "Approval timeout - no response received within 5 minutes",
});
return { success: false, reason: "timeout" };
}
// Notify that input was resolved
await channel.emit("workflow.update", {
type: "inputResolved",
workflowRunId,
eventId,
timestamp: Date.now(),
});
const status = eventData.approved ? "approved" : "rejected"
// Step 3: Process based on approval
await context.run(`process-${status}`, async () => {
await channel.emit("workflow.update", {
type: "stepStart",
workflowRunId,
stepName: `process-${status}`,
timestamp: Date.now(),
});
const result = {
status,
processedAt: Date.now(),
action,
userId,
};
await channel.emit("workflow.update", {
type: "stepFinish",
workflowRunId,
stepName: `process-${status}`,
timestamp: Date.now(),
result,
});
return result;
});
// Step 4: Finalize (only if approved)
if (eventData.approved) {
// Additional steps...
}
// Emit completion
await channel.emit("workflow.update", {
type: "runFinish",
workflowRunId,
timestamp: Date.now(),
status: "success",
});
return {
success: true,
approved: eventData.approved,
workflowRunId
};
},
{
failureFunction: async ({ context }) => {
const workflowRunId = context.workflowRunId;
const channel = realtime.channel(workflowRunId);
await channel.emit("workflow.update", {
type: "runFinish",
workflowRunId,
timestamp: Date.now(),
status: "failed",
error: "Workflow execution failed",
});
},
}
);