Skip to main content
Some workflows require human approval or input before proceeding. When combined with Upstash Realtime, you can create interactive workflows that pause for user input and provide real-time feedback to your frontend during the entire process. This guide shows you how to implement a human-in-the-loop workflow pattern with real-time updates using Upstash Workflow and Upstash Realtime.

How It Works

In a human-in-the-loop workflow:
  1. The workflow executes initial steps and emits progress events
  2. The workflow pauses at a specific point using context.waitForEvent()
  3. A “waiting for input” event is emitted to notify the frontend
  4. The user makes a decision in the frontend (approve/reject)
  5. The frontend calls an API to notify the workflow using client.notify()
  6. The workflow resumes with the user’s decision
  7. An “input resolved” event is emitted so the frontend can update its UI
  8. The workflow continues and completes based on the decision

Prerequisites

  • An Upstash account with:
    • A QStash project for workflows
    • A Redis database for Realtime
  • Next.js application set up
  • Completed the basic real-time workflow setup

Event Types

For human-in-the-loop workflows, add these additional event types to your lib/realtime.ts:
const events = z.discriminatedUnion("type", [
  z.object({ type: z.literal("runStart"), workflowRunId: z.string(), timestamp: z.number() }),
  z.object({ type: z.literal("runFinish"), workflowRunId: z.string(), timestamp: z.number(), status: z.union([z.literal("success"), z.literal("failed")]), error: z.string().optional() }),
  z.object({ type: z.literal("stepStart"), workflowRunId: z.string(), timestamp: z.number(), stepName: z.string() }),
  z.object({ type: z.literal("stepFinish"), workflowRunId: z.string(), timestamp: z.number(), stepName: z.string(), result: z.unknown().optional() }),
  z.object({ type: z.literal("stepFail"), workflowRunId: z.string(), timestamp: z.number(), stepName: z.string(), error: z.string() }),
  z.object({ type: z.literal("waitingForInput"), workflowRunId: z.string(), eventId: z.string(), message: z.string(), timestamp: z.number() }),
  z.object({ type: z.literal("inputResolved"), workflowRunId: z.string(), eventId: z.string(), timestamp: z.number() }),
])
The new event types are:
  • waitingForInput: Emitted when the workflow pauses and needs user input
  • inputResolved: Emitted when the user provides input, so the frontend knows to clear the waiting state

Building the Workflow

1. Create the Workflow Endpoint

Create your workflow at app/api/workflow/human-in-loop/route.ts:
import { serve } from "@upstash/workflow/nextjs";
import { realtime } from "@/lib/realtime";

type WorkflowPayload = {
  userId: string;
  action: string;
};

export const { POST } = serve<WorkflowPayload>(
  async (context) => {
    const { userId, action } = context.requestPayload;
    const workflowRunId = context.workflowRunId;
    const channel = realtime.channel(workflowRunId);

    // Emit run start
    await context.run("start-workflow", () =>
      channel.emit("workflow.update", {
        type: "runStart",
        workflowRunId,
        timestamp: Date.now(),
      })
    );

    // Step 1: Initial Processing
    await context.run("initial-processing", async () => {
      await channel.emit("workflow.update", {
        type: "stepStart",
        workflowRunId,
        stepName: "initial-processing",
        timestamp: Date.now(),
      });

      // Your processing logic
      const result = {
        preprocessed: true,
        userId,
        action,
        requiresApproval: true
      };

      await channel.emit("workflow.update", {
        type: "stepFinish",
        workflowRunId,
        stepName: "initial-processing",
        timestamp: Date.now(),
        result,
      });

      return result;
    });

    // Step 2: Wait for Human Approval
    const eventId = `approval-${workflowRunId}`;

    const [{ eventData, timeout }] = await Promise.all([
      // Wait for approval event
      context.waitForEvent<{ approved: boolean }>(
        "wait-for-approval",
        eventId,
        { timeout: "5m" }
      ),
      // Notify frontend that we're waiting
      context.run("notify-waiting", () =>
        channel.emit("workflow.update", {
          type: "waitingForInput",
          workflowRunId,
          eventId,
          message: `Waiting for approval to process action: ${action}`,
          timestamp: Date.now(),
        })
      ),
    ]);

    // Handle timeout
    if (timeout) {
      await channel.emit("workflow.update", {
        type: "stepFail",
        workflowRunId,
        stepName: "wait-for-approval",
        timestamp: Date.now(),
        error: "Approval timeout - no response received within 5 minutes",
      });
      return { success: false, reason: "timeout" };
    }

    // Notify that input was resolved
    await channel.emit("workflow.update", {
      type: "inputResolved",
      workflowRunId,
      eventId,
      timestamp: Date.now(),
    });

    const status = eventData.approved ? "approved" : "rejected"

    // Step 3: Process based on approval
    await context.run(`process-${status}`, async () => {
      await channel.emit("workflow.update", {
        type: "stepStart",
        workflowRunId,
        stepName: `process-${status}`,
        timestamp: Date.now(),
      });

      const result = {
        status,
        processedAt: Date.now(),
        action,
        userId,
      };

      await channel.emit("workflow.update", {
        type: "stepFinish",
        workflowRunId,
        stepName: `process-${status}`,
        timestamp: Date.now(),
        result,
      });

      return result;
    });

    // Step 4: Finalize (only if approved)
    if (eventData.approved) {
      // Additional steps...
    }

    // Emit completion
    await channel.emit("workflow.update", {
      type: "runFinish",
      workflowRunId,
      timestamp: Date.now(),
      status: "success",
    });

    return {
      success: true,
      approved: eventData.approved,
      workflowRunId
    };
  },
  {
    failureFunction: async ({ context }) => {
      const workflowRunId = context.workflowRunId;
      const channel = realtime.channel(workflowRunId);

      await channel.emit("workflow.update", {
        type: "runFinish",
        workflowRunId,
        timestamp: Date.now(),
        status: "failed",
        error: "Workflow execution failed",
      });
    },
  }
);
Key patterns:
  1. Using Promise.all: We wait for the event and emit the “waiting” notification simultaneously, ensuring the frontend is notified immediately
  2. Unique event IDs: Use a unique eventId (like approval-${workflowRunId}) to identify which approval request this is
  3. Timeout handling: Always handle the timeout case when waiting for events
  4. Input resolved event: Emit inputResolved after receiving input so the frontend knows to clear the waiting UI

2. Create the Notify Endpoint

Create an endpoint at app/api/notify/route.ts to handle user input:
import { NextRequest, NextResponse } from "next/server";
import { workflowClient } from "@/lib/workflow";

export async function POST(request: NextRequest) {
  try {
    const body = await request.json();
    const { eventId, eventData } = body;

    if (!eventId) {
      return NextResponse.json(
        { success: false, error: "eventId is required" },
        { status: 400 }
      );
    }

    // Notify the workflow
    await workflowClient.notify({
      eventId,
      eventData,
    });

    return NextResponse.json({ success: true });
  } catch (error) {
    console.error("Error notifying workflow:", error);
    return NextResponse.json(
      { success: false, error: "Failed to notify workflow" },
      { status: 500 }
    );
  }
}

Building the Frontend

1. Extend the Custom Hook

Extend your hook from the basic example to handle waiting states:
"use client";

import { useRealtime } from "@upstash/realtime/client";
import { useState, useCallback, useRef } from "react";
import type { RealtimeEvents } from "@/lib/realtime";

interface WorkflowStep {
  stepName: string;
  status: "running" | "completed" | "failed";
  timestamp: number;
  error?: string;
  result?: unknown;
}

interface WaitingState {
  eventId: string;
  message: string;
  timestamp: number;
}

export function useWorkflowWithRealtime() {
  const [workflowRunId, setWorkflowRunId] = useState<string | null>(null);
  const [steps, setSteps] = useState<WorkflowStep[]>([]);
  const [waitingState, setWaitingState] = useState<WaitingState | null>(null);
  const [isTriggering, setIsTriggering] = useState(false);
  const [runStatus, setRunStatus] = useState<{
    status: "running" | "success" | "failed";
    error?: string;
  } | null>(null);
  
  // Track resolved event IDs to ignore historical waiting states
  const resolvedEventIdsRef = useRef<Set<string>>(new Set());

  useRealtime<RealtimeEvents>({
    enabled: !!workflowRunId,
    channels: workflowRunId ? [workflowRunId] : [],
    event: "workflow.update",
    history: true,
    onData(data) {
      if (data.type === "runStart") {
        setRunStatus({ status: "running" });
      } else if (data.type === "runFinish") {
        setRunStatus({
          status: data.status,
          error: data.error,
        });
      } else if (data.type === "inputResolved") {
        // Mark this event as resolved
        resolvedEventIdsRef.current.add(data.eventId);
        // Clear waiting state if it matches
        setWaitingState((prev) => 
          prev?.eventId === data.eventId ? null : prev
        );
      } else if (data.type === "stepStart") {
        setSteps((prev) => [
          ...prev,
          {
            stepName: data.stepName,
            status: "running",
            timestamp: data.timestamp,
          },
        ]);
      } else if (data.type === "stepFinish") {
        setSteps((prev) =>
          prev.map((step) =>
            step.stepName === data.stepName
              ? { ...step, status: "completed", result: data.result }
              : step
          )
        );
      } else if (data.type === "stepFail") {
        setSteps((prev) =>
          prev.map((step) =>
            step.stepName === data.stepName
              ? { ...step, status: "failed", error: data.error }
              : step
          )
        );
      } else if (data.type === "waitingForInput") {
        // Only show waiting state if not already resolved
        if (!resolvedEventIdsRef.current.has(data.eventId)) {
          setWaitingState({
            eventId: data.eventId,
            message: data.message,
            timestamp: data.timestamp,
          });
        }
      }
    },
  });

  const trigger = useCallback(async () => {
    setIsTriggering(true);
    setSteps([]);
    setWaitingState(null);
    setRunStatus(null);
    resolvedEventIdsRef.current.clear();

    try {
      const response = await fetch("/api/trigger", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ workflowType: "human-in-loop" }),
      });

      const data = await response.json();
      setWorkflowRunId(data.workflowRunId);
    } catch (error) {
      console.error("Error triggering workflow:", error);
    } finally {
      setIsTriggering(false);
    }
  }, []);

  const continueWorkflow = useCallback(
    async (data: { approved: boolean }) => {
      if (!waitingState) {
        throw new Error("No workflow waiting for input");
      }

      try {
        const response = await fetch("/api/notify", {
          method: "POST",
          headers: { "Content-Type": "application/json" },
          body: JSON.stringify({
            eventId: waitingState.eventId,
            eventData: data,
          }),
        });

        if (!response.ok) {
          throw new Error("Failed to notify workflow");
        }

        // The waiting state will be cleared when we receive inputResolved event
      } catch (error) {
        console.error("Error continuing workflow:", error);
        throw error;
      }
    },
    [waitingState]
  );

  return {
    trigger,
    continueWorkflow,
    isTriggering,
    workflowRunId,
    steps,
    waitingState,
    runStatus,
  };
}
Key additions:
  • waitingState: Tracks when the workflow is waiting for input
  • resolvedEventIdsRef: Prevents showing historical waiting states when history: true fetches past events
  • continueWorkflow: Function to submit user decisions back to the workflow

2. Use the Hook with Approval UI

"use client";

import { useWorkflowWithRealtime } from "@/hooks/useWorkflowWithRealtime";

export default function WorkflowPage() {
  const {
    trigger,
    isTriggering,
    steps,
    runStatus,
    waitingState,
    continueWorkflow,
  } = useWorkflowWithRealtime();

  return (
    <div
      style={{
        maxWidth: "600px",
        margin: "40px auto",
        fontFamily: "Arial, sans-serif",
      }}
    >
      <button onClick={trigger} disabled={isTriggering}>
        {isTriggering ? "Starting..." : "Click to Trigger Workflow"}
      </button>

      <h3 style={{ marginTop: "20px" }}>Run Status:</h3>

      {runStatus && <div>{runStatus.status}</div>}

      {/* Show workflow steps */}
      <h3 style={{ marginTop: "20px" }}>Workflow Steps:</h3>
      <div>
        {steps.map((step, index) => (
          <div key={index}>
            <strong>{step.stepName}</strong>: {step.status}
            {step.error && <span> - {step.error}</span>}
          </div>
        ))}
      </div>

      {/* Show approval UI when waiting for input */}
      {waitingState && (
        <div style={{ marginTop: "20px" }} className="approval-prompt">
          <p>{waitingState.message}</p>
          <p>
            <button onClick={() => continueWorkflow({ approved: true })}>
              Click to Approve
            </button>
          </p>
          <p>
            <button onClick={() => continueWorkflow({ approved: false })}>
              Click to Reject
            </button>
          </p>
        </div>
      )}
    </div>
  );
}

How the Pattern Works

Timeline of Events

  1. Workflow starts: runStart event → Frontend shows “Running”
  2. Initial processing: stepStartstepFinish events → Frontend shows progress
  3. Waiting for approval: waitingForInput event → Frontend shows approval buttons
  4. User clicks approve/reject: Frontend calls /api/notify
  5. Workflow resumes: inputResolved event → Frontend hides approval buttons
  6. Processing continues: More step events as workflow continues
  7. Workflow completes: runFinish event → Frontend shows final status

Handling History with resolvedEventIdsRef

When a user refreshes the page or reconnects:
  • history: true replays all past events
  • Without resolvedEventIdsRef, a resolved approval would show again
  • By tracking resolved event IDs, we skip showing old waiting states
  • This ensures the UI only shows currently waiting approvals

Benefits

  • Real-time feedback: Users see exactly when their approval is needed
  • No polling: Instant updates via Server-Sent Events
  • Graceful reconnection: History ensures users see full workflow state
  • Timeout handling: Workflows don’t hang indefinitely waiting for input

Full Example

For a complete working example with all steps, error handling, and full UI components, check out the Upstash Realtime example on GitHub.

Next Steps