From c1a7f4102588dfa38d5b57521b7f2a6d396fd79c Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Mon, 6 Apr 2026 10:46:06 -0700 Subject: [PATCH 1/8] Fix hitl stream --- .../docs/en/blocks/human-in-the-loop.mdx | 27 +- .../[executionId]/[contextId]/route.ts | 7 + .../executor/human-in-the-loop-manager.ts | 272 ++++++++++++++++-- 3 files changed, 284 insertions(+), 22 deletions(-) diff --git a/apps/docs/content/docs/en/blocks/human-in-the-loop.mdx b/apps/docs/content/docs/en/blocks/human-in-the-loop.mdx index e28f5d3e918..d4e4705e22b 100644 --- a/apps/docs/content/docs/en/blocks/human-in-the-loop.mdx +++ b/apps/docs/content/docs/en/blocks/human-in-the-loop.mdx @@ -93,17 +93,36 @@ Access resume data in downstream blocks using ``. ### REST API - Programmatically resume workflows: + Programmatically resume workflows using the resume endpoint. The `contextId` is available from the block's `resumeEndpoint` output or from the paused execution detail. ```bash - POST /api/workflows/{workflowId}/executions/{executionId}/resume/{blockId} + POST /api/resume/{workflowId}/{executionId}/{contextId} + Content-Type: application/json { - "approved": true, - "comments": "Looks good to proceed" + "input": { + "approved": true, + "comments": "Looks good to proceed" + } } ``` + The response includes a new `executionId` for the resumed execution: + + ```json + { + "status": "started", + "executionId": "", + "message": "Resume execution started." + } + ``` + + To poll execution progress after resuming, connect to the SSE stream: + + ```bash + GET /api/workflows/{workflowId}/executions/{resumeExecutionId}/stream + ``` + Build custom approval UIs or integrate with existing systems. diff --git a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts index 194d9dfcdd0..5b0910d6548 100644 --- a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts +++ b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts @@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server' import { AuthType } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' import { generateId } from '@/lib/core/utils/uuid' +import { setExecutionMeta } from '@/lib/execution/event-buffer' import { preprocessExecution } from '@/lib/execution/preprocessing' import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' @@ -125,6 +126,12 @@ export async function POST( }) } + await setExecutionMeta(enqueueResult.resumeExecutionId, { + status: 'active', + userId, + workflowId, + }) + PauseResumeManager.startResumeExecution({ resumeEntryId: enqueueResult.resumeEntryId, resumeExecutionId: enqueueResult.resumeExecutionId, diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 7b9f0c66ece..3c43b6fb616 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -5,11 +5,23 @@ import { and, asc, desc, eq, inArray, lt, type SQL, sql } from 'drizzle-orm' import type { Edge } from 'reactflow' import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' import { generateId } from '@/lib/core/utils/uuid' +import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' +import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events' import { ExecutionSnapshot } from '@/executor/execution/snapshot' -import type { ExecutionResult, PausePoint, SerializedSnapshot } from '@/executor/types' +import type { + ChildWorkflowContext, + ExecutionCallbacks, + IterationContext, +} from '@/executor/execution/types' +import type { + ExecutionResult, + PausePoint, + SerializedSnapshot, + StreamingExecution, +} from '@/executor/types' import { filterOutputForLog } from '@/executor/utils/output-filter' import type { SerializedConnection } from '@/serializer/types' @@ -771,36 +783,260 @@ export class PauseResumeManager { actorUserId: metadata.userId, }) + const workflowId = pausedExecution.workflowId + const eventWriter = createExecutionEventWriter(resumeExecutionId) + await setExecutionMeta(resumeExecutionId, { + status: 'active', + userId: metadata.userId, + workflowId, + }) + + let localEventSeq = 0 + const writeBufferedEvent = (event: ExecutionEvent) => { + localEventSeq++ + event.eventId = localEventSeq + eventWriter.write(event).catch(() => {}) + } + + writeBufferedEvent({ + type: 'execution:started', + timestamp: new Date().toISOString(), + executionId: resumeExecutionId, + workflowId, + data: { startTime: new Date().toISOString() }, + } as ExecutionEvent) + + const callbacks: ExecutionCallbacks = { + onBlockStart: async ( + blockId: string, + blockName: string, + blockType: string, + executionOrder: number, + iterationContext?: IterationContext, + childWorkflowContext?: ChildWorkflowContext + ) => { + writeBufferedEvent({ + type: 'block:started', + timestamp: new Date().toISOString(), + executionId: resumeExecutionId, + workflowId, + data: { + blockId, + blockName, + blockType, + executionOrder, + ...(iterationContext && { + iterationCurrent: iterationContext.iterationCurrent, + iterationTotal: iterationContext.iterationTotal, + iterationType: iterationContext.iterationType, + iterationContainerId: iterationContext.iterationContainerId, + ...(iterationContext.parentIterations?.length && { + parentIterations: iterationContext.parentIterations, + }), + }), + ...(childWorkflowContext && { + childWorkflowBlockId: childWorkflowContext.parentBlockId, + childWorkflowName: childWorkflowContext.workflowName, + }), + }, + } as ExecutionEvent) + }, + onBlockComplete: async ( + blockId: string, + blockName: string, + blockType: string, + callbackData: Record, + iterationContext?: IterationContext, + childWorkflowContext?: ChildWorkflowContext + ) => { + const output = callbackData.output as Record | undefined + const hasError = output?.error + const sharedData = { + blockId, + blockName, + blockType, + input: callbackData.input, + durationMs: (callbackData.executionTime as number) || 0, + startedAt: callbackData.startedAt, + executionOrder: callbackData.executionOrder, + endedAt: callbackData.endedAt, + ...(iterationContext && { + iterationCurrent: iterationContext.iterationCurrent, + iterationTotal: iterationContext.iterationTotal, + iterationType: iterationContext.iterationType, + iterationContainerId: iterationContext.iterationContainerId, + ...(iterationContext.parentIterations?.length && { + parentIterations: iterationContext.parentIterations, + }), + }), + ...(childWorkflowContext && { + childWorkflowBlockId: childWorkflowContext.parentBlockId, + childWorkflowName: childWorkflowContext.workflowName, + }), + ...(callbackData.childWorkflowInstanceId && { + childWorkflowInstanceId: callbackData.childWorkflowInstanceId, + }), + } + + writeBufferedEvent({ + type: hasError ? 'block:error' : 'block:completed', + timestamp: new Date().toISOString(), + executionId: resumeExecutionId, + workflowId, + data: hasError + ? { ...sharedData, error: output?.error } + : { ...sharedData, output }, + } as ExecutionEvent) + }, + onChildWorkflowInstanceReady: ( + blockId: string, + childWorkflowInstanceId: string, + iterationContext?: IterationContext, + executionOrder?: number + ) => { + writeBufferedEvent({ + type: 'block:childWorkflowStarted', + timestamp: new Date().toISOString(), + executionId: resumeExecutionId, + workflowId, + data: { + blockId, + childWorkflowInstanceId, + ...(iterationContext && { + iterationCurrent: iterationContext.iterationCurrent, + iterationContainerId: iterationContext.iterationContainerId, + }), + ...(executionOrder !== undefined && { executionOrder }), + }, + } as ExecutionEvent) + }, + onStream: async (streamingExec: StreamingExecution) => { + const blockId = (streamingExec.execution as Record).blockId as string + const reader = streamingExec.stream.getReader() + const decoder = new TextDecoder() + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + const chunk = decoder.decode(value, { stream: true }) + writeBufferedEvent({ + type: 'stream:chunk', + timestamp: new Date().toISOString(), + executionId: resumeExecutionId, + workflowId, + data: { blockId, chunk }, + } as ExecutionEvent) + } + writeBufferedEvent({ + type: 'stream:done', + timestamp: new Date().toISOString(), + executionId: resumeExecutionId, + workflowId, + data: { blockId }, + } as ExecutionEvent) + } catch (streamError) { + logger.error('Error streaming block content during resume', { + resumeExecutionId, + blockId, + error: streamError instanceof Error ? streamError.message : String(streamError), + }) + } finally { + try { + await reader.cancel().catch(() => {}) + } catch {} + } + }, + } + const timeoutController = createTimeoutAbortController( preprocessingResult.executionTimeout?.async ) let result: ExecutionResult + let finalMetaStatus: 'complete' | 'error' | 'cancelled' = 'complete' try { result = await executeWorkflowCore({ snapshot: resumeSnapshot, - callbacks: {}, + callbacks, loggingSession, - skipLogCreation: true, // Reuse existing log entry - includeFileBase64: true, // Enable base64 hydration - base64MaxBytes: undefined, // Use default limit + skipLogCreation: true, + includeFileBase64: true, + base64MaxBytes: undefined, abortSignal: timeoutController.signal, }) + + if ( + result.status === 'cancelled' && + timeoutController.isTimedOut() && + timeoutController.timeoutMs + ) { + const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) + logger.info('Resume execution timed out', { + resumeExecutionId, + timeoutMs: timeoutController.timeoutMs, + }) + await loggingSession.markAsFailed(timeoutErrorMessage) + + writeBufferedEvent({ + type: 'execution:error', + timestamp: new Date().toISOString(), + executionId: resumeExecutionId, + workflowId, + data: { + error: timeoutErrorMessage, + duration: result.metadata?.duration || 0, + }, + } as ExecutionEvent) + finalMetaStatus = 'error' + } else if (result.status === 'cancelled') { + writeBufferedEvent({ + type: 'execution:cancelled', + timestamp: new Date().toISOString(), + executionId: resumeExecutionId, + workflowId, + data: { duration: result.metadata?.duration || 0 }, + } as ExecutionEvent) + finalMetaStatus = 'cancelled' + } else { + writeBufferedEvent({ + type: 'execution:completed', + timestamp: new Date().toISOString(), + executionId: resumeExecutionId, + workflowId, + data: { + success: result.success, + output: result.output, + duration: result.metadata?.duration || 0, + startTime: result.metadata?.startTime || new Date().toISOString(), + endTime: result.metadata?.endTime || new Date().toISOString(), + }, + } as ExecutionEvent) + finalMetaStatus = 'complete' + } + } catch (execError) { + writeBufferedEvent({ + type: 'execution:error', + timestamp: new Date().toISOString(), + executionId: resumeExecutionId, + workflowId, + data: { + error: execError instanceof Error ? execError.message : String(execError), + duration: 0, + }, + } as ExecutionEvent) + finalMetaStatus = 'error' + throw execError } finally { timeoutController.cleanup() - } - - if ( - result.status === 'cancelled' && - timeoutController.isTimedOut() && - timeoutController.timeoutMs - ) { - const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs) - logger.info('Resume execution timed out', { - resumeExecutionId, - timeoutMs: timeoutController.timeoutMs, - }) - await loggingSession.markAsFailed(timeoutErrorMessage) + try { + await eventWriter.close() + } catch (closeError) { + logger.warn('Failed to close event writer for resume', { + resumeExecutionId, + error: closeError instanceof Error ? closeError.message : String(closeError), + }) + } + setExecutionMeta(resumeExecutionId, { status: finalMetaStatus }).catch(() => {}) } return result From afb852223ca400836ac86ca0481b0ca854225a8c Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 6 Apr 2026 11:53:49 -0700 Subject: [PATCH 2/8] fix hitl pause persistence --- .../app/api/workflows/[id]/execute/route.ts | 30 ++------- apps/sim/background/schedule-execution.ts | 34 ++-------- apps/sim/background/webhook-execution.ts | 34 ++-------- apps/sim/background/workflow-execution.ts | 29 +-------- .../workflows/executor/execute-workflow.ts | 38 +---------- .../workflows/executor/pause-persistence.ts | 64 +++++++++++++++++++ .../executor/queued-workflow-execution.ts | 18 +----- 7 files changed, 88 insertions(+), 159 deletions(-) create mode 100644 apps/sim/lib/workflows/executor/pause-persistence.ts diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index d5b484dfaca..7e3ccb86587 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -41,7 +41,7 @@ import { } from '@/lib/uploads/utils/user-file-base64.server' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events' -import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' +import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence' import { DIRECT_WORKFLOW_JOB_NAME, type QueuedWorkflowExecutionPayload, @@ -903,6 +903,8 @@ async function handleExecutePost( abortSignal: timeoutController.signal, }) + await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession }) + if ( result.status === 'cancelled' && timeoutController.isTimedOut() && @@ -1359,31 +1361,7 @@ async function handleExecutePost( runFromBlock: resolvedRunFromBlock, }) - if (result.status === 'paused') { - if (!result.snapshotSeed) { - reqLogger.error('Missing snapshot seed for paused execution') - await loggingSession.markAsFailed('Missing snapshot seed for paused execution') - } else { - try { - await PauseResumeManager.persistPauseResult({ - workflowId, - executionId, - pausePoints: result.pausePoints || [], - snapshotSeed: result.snapshotSeed, - executorUserId: result.metadata?.userId, - }) - } catch (pauseError) { - reqLogger.error('Failed to persist pause result', { - error: pauseError instanceof Error ? pauseError.message : String(pauseError), - }) - await loggingSession.markAsFailed( - `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` - ) - } - } - } else { - await PauseResumeManager.processQueuedResumes(executionId) - } + await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession }) if (result.status === 'cancelled') { if (timeoutController.isTimedOut() && timeoutController.timeoutMs) { diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index 007f4d5b071..634adcaf753 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -13,7 +13,7 @@ import { executeWorkflowCore, wasExecutionFinalizedByCore, } from '@/lib/workflows/executor/execution-core' -import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' +import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence' import { blockExistsInDeployment, loadDeployedWorkflowState, @@ -237,33 +237,13 @@ async function runWorkflowExecution({ timeoutMs: timeoutController.timeoutMs, }) await loggingSession.markAsFailed(timeoutErrorMessage) - } else if (executionResult.status === 'paused') { - if (!executionResult.snapshotSeed) { - logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { - executionId, - }) - await loggingSession.markAsFailed('Missing snapshot seed for paused execution') - } else { - try { - await PauseResumeManager.persistPauseResult({ - workflowId: payload.workflowId, - executionId, - pausePoints: executionResult.pausePoints || [], - snapshotSeed: executionResult.snapshotSeed, - executorUserId: executionResult.metadata?.userId, - }) - } catch (pauseError) { - logger.error(`[${requestId}] Failed to persist pause result`, { - executionId, - error: pauseError instanceof Error ? pauseError.message : String(pauseError), - }) - await loggingSession.markAsFailed( - `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` - ) - } - } } else { - await PauseResumeManager.processQueuedResumes(executionId) + await handlePostExecutionPauseState({ + result: executionResult, + workflowId: payload.workflowId, + executionId, + loggingSession, + }) } await loggingSession.waitForPostExecution() diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index 7926ea0e382..d78a1401389 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -16,7 +16,7 @@ import { executeWorkflowCore, wasExecutionFinalizedByCore, } from '@/lib/workflows/executor/execution-core' -import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' +import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence' import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils' import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils' import { getBlock } from '@/blocks' @@ -205,33 +205,13 @@ async function handleExecutionResult( timeoutMs: ctx.timeoutController.timeoutMs, }) await ctx.loggingSession.markAsFailed(timeoutErrorMessage) - } else if (executionResult.status === 'paused') { - if (!executionResult.snapshotSeed) { - logger.error(`[${ctx.requestId}] Missing snapshot seed for paused execution`, { - executionId: ctx.executionId, - }) - await ctx.loggingSession.markAsFailed('Missing snapshot seed for paused execution') - } else { - try { - await PauseResumeManager.persistPauseResult({ - workflowId: ctx.workflowId, - executionId: ctx.executionId, - pausePoints: executionResult.pausePoints || [], - snapshotSeed: executionResult.snapshotSeed, - executorUserId: executionResult.metadata?.userId, - }) - } catch (pauseError) { - logger.error(`[${ctx.requestId}] Failed to persist pause result`, { - executionId: ctx.executionId, - error: pauseError instanceof Error ? pauseError.message : String(pauseError), - }) - await ctx.loggingSession.markAsFailed( - `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` - ) - } - } } else { - await PauseResumeManager.processQueuedResumes(ctx.executionId) + await handlePostExecutionPauseState({ + result: executionResult, + workflowId: ctx.workflowId, + executionId: ctx.executionId, + loggingSession: ctx.loggingSession, + }) } await ctx.loggingSession.waitForPostExecution() diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index 316ccfc5bbb..0990879ba69 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -10,7 +10,7 @@ import { executeWorkflowCore, wasExecutionFinalizedByCore, } from '@/lib/workflows/executor/execution-core' -import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' +import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence' import { ExecutionSnapshot } from '@/executor/execution/snapshot' import type { ExecutionMetadata } from '@/executor/execution/types' import { hasExecutionResult } from '@/executor/utils/errors' @@ -148,33 +148,8 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { timeoutMs: timeoutController.timeoutMs, }) await loggingSession.markAsFailed(timeoutErrorMessage) - } else if (result.status === 'paused') { - if (!result.snapshotSeed) { - logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { - executionId, - }) - await loggingSession.markAsFailed('Missing snapshot seed for paused execution') - } else { - try { - await PauseResumeManager.persistPauseResult({ - workflowId, - executionId, - pausePoints: result.pausePoints || [], - snapshotSeed: result.snapshotSeed, - executorUserId: result.metadata?.userId, - }) - } catch (pauseError) { - logger.error(`[${requestId}] Failed to persist pause result`, { - executionId, - error: pauseError instanceof Error ? pauseError.message : String(pauseError), - }) - await loggingSession.markAsFailed( - `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` - ) - } - } } else { - await PauseResumeManager.processQueuedResumes(executionId) + await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession }) } await loggingSession.waitForPostExecution() diff --git a/apps/sim/lib/workflows/executor/execute-workflow.ts b/apps/sim/lib/workflows/executor/execute-workflow.ts index b8c430406c4..18d42472a6f 100644 --- a/apps/sim/lib/workflows/executor/execute-workflow.ts +++ b/apps/sim/lib/workflows/executor/execute-workflow.ts @@ -3,7 +3,7 @@ import { generateId } from '@/lib/core/utils/uuid' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { captureServerEvent } from '@/lib/posthog/server' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' -import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' +import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence' import { ExecutionSnapshot } from '@/executor/execution/snapshot' import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types' import type { ExecutionResult, StreamingExecution } from '@/executor/types' @@ -127,41 +127,7 @@ export async function executeWorkflow( ) } - if (result.status === 'paused') { - if (!result.snapshotSeed) { - logger.error(`[${requestId}] Missing snapshot seed for paused execution`, { - executionId, - }) - await loggingSession.markAsFailed('Missing snapshot seed for paused execution') - } else { - try { - await PauseResumeManager.persistPauseResult({ - workflowId, - executionId, - pausePoints: result.pausePoints || [], - snapshotSeed: result.snapshotSeed, - executorUserId: result.metadata?.userId, - }) - } catch (pauseError) { - logger.error(`[${requestId}] Failed to persist pause result`, { - executionId, - error: pauseError instanceof Error ? pauseError.message : String(pauseError), - }) - await loggingSession.markAsFailed( - `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` - ) - } - } - } else { - try { - await PauseResumeManager.processQueuedResumes(executionId) - } catch (resumeError) { - logger.error(`[${requestId}] Failed to process queued resumes`, { - executionId, - error: resumeError instanceof Error ? resumeError.message : String(resumeError), - }) - } - } + await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession }) if (streamConfig?.skipLoggingComplete) { return { diff --git a/apps/sim/lib/workflows/executor/pause-persistence.ts b/apps/sim/lib/workflows/executor/pause-persistence.ts new file mode 100644 index 00000000000..c522fb5eeae --- /dev/null +++ b/apps/sim/lib/workflows/executor/pause-persistence.ts @@ -0,0 +1,64 @@ +import { createLogger } from '@sim/logger' +import type { LoggingSession } from '@/lib/logs/execution/logging-session' +import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' +import type { ExecutionResult } from '@/executor/types' + +const logger = createLogger('PausePersistence') + +interface HandlePostExecutionPauseStateArgs { + result: ExecutionResult + workflowId: string + executionId: string + loggingSession: LoggingSession +} + +/** + * Handles pause persistence and resume queue processing after `executeWorkflowCore` returns. + * + * Every caller of `executeWorkflowCore` must call this after execution completes + * to ensure HITL pause state is persisted to the database and queued resumes are drained. + * + * - If execution is paused with a valid snapshot: persists to `paused_executions` table + * - If execution is paused without a snapshot: marks execution as failed + * - If execution is not paused: processes any queued resume entries + */ +export async function handlePostExecutionPauseState({ + result, + workflowId, + executionId, + loggingSession, +}: HandlePostExecutionPauseStateArgs): Promise { + if (result.status === 'paused') { + if (!result.snapshotSeed) { + logger.error('Missing snapshot seed for paused execution', { executionId }) + await loggingSession.markAsFailed('Missing snapshot seed for paused execution') + } else { + try { + await PauseResumeManager.persistPauseResult({ + workflowId, + executionId, + pausePoints: result.pausePoints || [], + snapshotSeed: result.snapshotSeed, + executorUserId: result.metadata?.userId, + }) + } catch (pauseError) { + logger.error('Failed to persist pause result', { + executionId, + error: pauseError instanceof Error ? pauseError.message : String(pauseError), + }) + await loggingSession.markAsFailed( + `Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}` + ) + } + } + } else { + try { + await PauseResumeManager.processQueuedResumes(executionId) + } catch (resumeError) { + logger.error('Failed to process queued resumes', { + executionId, + error: resumeError instanceof Error ? resumeError.message : String(resumeError), + }) + } + } +} diff --git a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts index c60ba860a11..1a8b0c24ba0 100644 --- a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts +++ b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts @@ -15,7 +15,7 @@ import { createExecutionCallbacks, type ExecutionEvent, } from '@/lib/workflows/executor/execution-events' -import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' +import { handlePostExecutionPauseState } from '@/lib/workflows/executor/pause-persistence' import { ExecutionSnapshot } from '@/executor/execution/snapshot' import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types' import type { BlockLog, NormalizedBlockOutput } from '@/executor/types' @@ -194,21 +194,7 @@ export async function executeQueuedWorkflowJob( ) } - if (result.status === 'paused') { - if (!result.snapshotSeed) { - await loggingSession.markAsFailed('Missing snapshot seed for paused execution') - } else { - await PauseResumeManager.persistPauseResult({ - workflowId, - executionId, - pausePoints: result.pausePoints || [], - snapshotSeed: result.snapshotSeed, - executorUserId: result.metadata?.userId, - }) - } - } else { - await PauseResumeManager.processQueuedResumes(executionId) - } + await handlePostExecutionPauseState({ result, workflowId, executionId, loggingSession }) const outputWithBase64 = payload.includeFileBase64 ? await hydrateUserFilesWithBase64(result.output, { From 42c0774e11b9a40583699a4dd4b27e4b37a47486 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Mon, 6 Apr 2026 12:47:36 -0700 Subject: [PATCH 3/8] Fix /stream endpoint allowing api key usage --- .../executions/[executionId]/stream/route.ts | 20 +++++-------------- .../executor/human-in-the-loop-manager.ts | 4 +--- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts index 0893209c961..ad2f94722d1 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts @@ -1,6 +1,6 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' -import { checkHybridAuth } from '@/lib/auth/hybrid' +import { getSession } from '@/lib/auth' import { SSE_HEADERS } from '@/lib/core/utils/sse' import { type ExecutionStreamStatus, @@ -29,14 +29,14 @@ export async function GET( const { id: workflowId, executionId } = await params try { - const auth = await checkHybridAuth(req, { requireWorkflowId: false }) - if (!auth.success || !auth.userId) { - return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 }) + const session = await getSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) } const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({ workflowId, - userId: auth.userId, + userId: session.user.id, action: 'read', }) if (!workflowAuthorization.allowed) { @@ -46,16 +46,6 @@ export async function GET( ) } - if ( - auth.apiKeyType === 'workspace' && - workflowAuthorization.workflow?.workspaceId !== auth.workspaceId - ) { - return NextResponse.json( - { error: 'API key is not authorized for this workspace' }, - { status: 403 } - ) - } - const meta = await getExecutionMeta(executionId) if (!meta) { return NextResponse.json({ error: 'Execution buffer not found or expired' }, { status: 404 }) diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 3c43b6fb616..25320d1c966 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -883,9 +883,7 @@ export class PauseResumeManager { timestamp: new Date().toISOString(), executionId: resumeExecutionId, workflowId, - data: hasError - ? { ...sharedData, error: output?.error } - : { ...sharedData, output }, + data: hasError ? { ...sharedData, error: output?.error } : { ...sharedData, output }, } as ExecutionEvent) }, onChildWorkflowInstanceReady: ( From a38d0c4df45aa91e69a156240c11f39ce48f9eb8 Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 6 Apr 2026 13:18:45 -0700 Subject: [PATCH 4/8] resume page cleanup --- .../[executionId]/resume-page-client.tsx | 61 ++++++++++++++++--- 1 file changed, 52 insertions(+), 9 deletions(-) diff --git a/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx b/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx index ec34adc43ca..12f0e5926e5 100644 --- a/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx +++ b/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx @@ -6,6 +6,7 @@ import { useRouter } from 'next/navigation' import { Badge, Button, + Code, Input, Label, Table, @@ -155,7 +156,6 @@ function getBlockNameFromSnapshot( const parsed = JSON.parse(executionSnapshot.snapshot) const workflowState = parsed?.workflow if (!workflowState?.blocks || !Array.isArray(workflowState.blocks)) return null - // Blocks are stored as an array of serialized blocks with id and metadata.name const block = workflowState.blocks.find((b: { id: string }) => b.id === blockId) return block?.metadata?.name || null } catch { @@ -163,6 +163,47 @@ function getBlockNameFromSnapshot( } } +function renderStructuredValuePreview(value: any) { + if (value === null || value === undefined) { + return + } + + if (typeof value === 'object') { + return ( +
+ +
+ ) + } + + const stringValue = String(value) + return ( +
+ {stringValue} +
+ ) +} + export default function ResumeExecutionPage({ params, initialExecutionDetail, @@ -874,8 +915,11 @@ export default function ResumeExecutionPage({ Refresh @@ -1123,11 +1168,7 @@ export default function ResumeExecutionPage({ {row.name} {row.type} - - - {formatStructureValue(row.value)} - - + {renderStructuredValuePreview(row.value)} ))} @@ -1243,6 +1284,8 @@ export default function ResumeExecutionPage({ }} placeholder='{"example": "value"}' rows={6} + spellCheck={false} + className='min-h-[180px] border-[var(--border-1)] bg-[var(--surface-3)] font-mono text-[12px] leading-5' /> @@ -1267,10 +1310,10 @@ export default function ResumeExecutionPage({ {/* Footer */}
Date: Mon, 6 Apr 2026 13:35:11 -0700 Subject: [PATCH 5/8] fix type --- .../resume/[workflowId]/[executionId]/resume-page-client.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx b/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx index 12f0e5926e5..f6d2ef0182c 100644 --- a/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx +++ b/apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx @@ -163,7 +163,7 @@ function getBlockNameFromSnapshot( } } -function renderStructuredValuePreview(value: any) { +function renderStructuredValuePreview(value: unknown) { if (value === null || value === undefined) { return } From 0cdaa4960d6835c43cd09c4246a5f036102a578f Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 6 Apr 2026 14:03:09 -0700 Subject: [PATCH 6/8] make resume sync --- .../[executionId]/[contextId]/route.ts | 26 ++++++++- .../app/api/workflows/[id]/execute/route.ts | 55 ++++++++++++------- apps/sim/hooks/use-execution-stream.ts | 5 ++ apps/sim/lib/a2a/utils.ts | 11 +++- .../workflows/executor/execution-events.ts | 17 ++++++ .../executor/human-in-the-loop-manager.ts | 18 +++++- .../executor/queued-workflow-execution.ts | 14 +++++ 7 files changed, 123 insertions(+), 23 deletions(-) diff --git a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts index 5b0910d6548..f854b144d98 100644 --- a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts +++ b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts @@ -132,14 +132,36 @@ export async function POST( workflowId, }) - PauseResumeManager.startResumeExecution({ + const resumeArgs = { resumeEntryId: enqueueResult.resumeEntryId, resumeExecutionId: enqueueResult.resumeExecutionId, pausedExecution: enqueueResult.pausedExecution, contextId: enqueueResult.contextId, resumeInput: enqueueResult.resumeInput, userId: enqueueResult.userId, - }).catch((error) => { + } + + const isApiCaller = access.auth?.authType === AuthType.API_KEY + + if (isApiCaller) { + const result = await PauseResumeManager.startResumeExecution(resumeArgs) + + return NextResponse.json({ + success: result.success, + executionId: enqueueResult.resumeExecutionId, + output: result.output, + error: result.error, + metadata: result.metadata + ? { + duration: result.metadata.duration, + startTime: result.metadata.startTime, + endTime: result.metadata.endTime, + } + : undefined, + }) + } + + PauseResumeManager.startResumeExecution(resumeArgs).catch((error) => { logger.error('Failed to start resume execution', { workflowId, parentExecutionId: executionId, diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 7e3ccb86587..86a4a722eb8 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -1400,25 +1400,42 @@ async function handleExecutePost( return } - sendEvent({ - type: 'execution:completed', - timestamp: new Date().toISOString(), - executionId, - workflowId, - data: { - success: result.success, - output: includeFileBase64 - ? await hydrateUserFilesWithBase64(result.output, { - requestId, - executionId, - maxBytes: base64MaxBytes, - }) - : result.output, - duration: result.metadata?.duration || 0, - startTime: result.metadata?.startTime || startTime.toISOString(), - endTime: result.metadata?.endTime || new Date().toISOString(), - }, - }) + const sseOutput = includeFileBase64 + ? await hydrateUserFilesWithBase64(result.output, { + requestId, + executionId, + maxBytes: base64MaxBytes, + }) + : result.output + + if (result.status === 'paused') { + sendEvent({ + type: 'execution:paused', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + output: sseOutput, + duration: result.metadata?.duration || 0, + startTime: result.metadata?.startTime || startTime.toISOString(), + endTime: result.metadata?.endTime || new Date().toISOString(), + }, + }) + } else { + sendEvent({ + type: 'execution:completed', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + success: result.success, + output: sseOutput, + duration: result.metadata?.duration || 0, + startTime: result.metadata?.startTime || startTime.toISOString(), + endTime: result.metadata?.endTime || new Date().toISOString(), + }, + }) + } finalMetaStatus = 'complete' } catch (error: unknown) { const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut() diff --git a/apps/sim/hooks/use-execution-stream.ts b/apps/sim/hooks/use-execution-stream.ts index 1198a179bea..728cf52b6d2 100644 --- a/apps/sim/hooks/use-execution-stream.ts +++ b/apps/sim/hooks/use-execution-stream.ts @@ -9,6 +9,7 @@ import type { ExecutionCompletedData, ExecutionErrorData, ExecutionEvent, + ExecutionPausedData, ExecutionStartedData, StreamChunkData, StreamDoneData, @@ -74,6 +75,9 @@ export async function processSSEStream( case 'execution:completed': callbacks.onExecutionCompleted?.(event.data) break + case 'execution:paused': + callbacks.onExecutionPaused?.(event.data) + break case 'execution:error': callbacks.onExecutionError?.(event.data) break @@ -114,6 +118,7 @@ export async function processSSEStream( export interface ExecutionStreamCallbacks { onExecutionStarted?: (data: ExecutionStartedData) => void onExecutionCompleted?: (data: ExecutionCompletedData) => void + onExecutionPaused?: (data: ExecutionPausedData) => void onExecutionError?: (data: ExecutionErrorData) => void onExecutionCancelled?: (data: ExecutionCancelledData) => void onBlockStarted?: (data: BlockStartedData) => void diff --git a/apps/sim/lib/a2a/utils.ts b/apps/sim/lib/a2a/utils.ts index 5a704e20aea..9dd1b879050 100644 --- a/apps/sim/lib/a2a/utils.ts +++ b/apps/sim/lib/a2a/utils.ts @@ -264,7 +264,7 @@ export interface ParsedSSEChunk { /** Final success flag if this chunk contains the final event */ finalSuccess?: boolean /** Terminal task state if known */ - terminalState?: 'completed' | 'failed' | 'canceled' + terminalState?: 'completed' | 'failed' | 'canceled' | 'input-required' /** Final artifacts if present on terminal event */ finalArtifacts?: Artifact[] /** Whether this chunk indicates the stream is done */ @@ -326,6 +326,15 @@ export function parseWorkflowSSEChunk(chunk: string): ParsedSSEChunk { result.finalSuccess = parsed.data?.success !== false result.terminalState = result.finalSuccess ? 'completed' : 'failed' result.isDone = true + } else if (parsed.type === 'execution:paused') { + if (parsed.data?.output?.content) { + result.finalContent = parsed.data.output.content + } else if (parsed.data?.output) { + result.finalContent = JSON.stringify(parsed.data.output) + } + result.finalSuccess = true + result.terminalState = 'input-required' + result.isDone = true } else if (parsed.type === 'execution:cancelled') { result.finalSuccess = false result.terminalState = 'canceled' diff --git a/apps/sim/lib/workflows/executor/execution-events.ts b/apps/sim/lib/workflows/executor/execution-events.ts index 31eaacae6cd..1fe95bb4657 100644 --- a/apps/sim/lib/workflows/executor/execution-events.ts +++ b/apps/sim/lib/workflows/executor/execution-events.ts @@ -8,6 +8,7 @@ import type { SubflowType } from '@/stores/workflows/workflow/types' export type ExecutionEventType = | 'execution:started' | 'execution:completed' + | 'execution:paused' | 'execution:error' | 'execution:cancelled' | 'block:started' @@ -53,6 +54,20 @@ export interface ExecutionCompletedEvent extends BaseExecutionEvent { } } +/** + * Execution paused event (HITL block waiting for human input) + */ +export interface ExecutionPausedEvent extends BaseExecutionEvent { + type: 'execution:paused' + workflowId: string + data: { + output: any + duration: number + startTime: string + endTime: string + } +} + /** * Execution error event */ @@ -196,6 +211,7 @@ export interface StreamDoneEvent extends BaseExecutionEvent { export type ExecutionEvent = | ExecutionStartedEvent | ExecutionCompletedEvent + | ExecutionPausedEvent | ExecutionErrorEvent | ExecutionCancelledEvent | BlockStartedEvent @@ -207,6 +223,7 @@ export type ExecutionEvent = export type ExecutionStartedData = ExecutionStartedEvent['data'] export type ExecutionCompletedData = ExecutionCompletedEvent['data'] +export type ExecutionPausedData = ExecutionPausedEvent['data'] export type ExecutionErrorData = ExecutionErrorEvent['data'] export type ExecutionCancelledData = ExecutionCancelledEvent['data'] export type BlockStartedData = BlockStartedEvent['data'] diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 25320d1c966..a6a6cab064c 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -292,7 +292,7 @@ export class PauseResumeManager { }) } - static async startResumeExecution(args: StartResumeExecutionArgs): Promise { + static async startResumeExecution(args: StartResumeExecutionArgs): Promise { const { resumeEntryId, resumeExecutionId, pausedExecution, contextId, resumeInput, userId } = args @@ -357,6 +357,8 @@ export class PauseResumeManager { }) await PauseResumeManager.processQueuedResumes(pausedExecution.executionId) + + return result } catch (error) { await PauseResumeManager.markResumeFailed({ resumeEntryId, @@ -995,6 +997,20 @@ export class PauseResumeManager { data: { duration: result.metadata?.duration || 0 }, } as ExecutionEvent) finalMetaStatus = 'cancelled' + } else if (result.status === 'paused') { + writeBufferedEvent({ + type: 'execution:paused', + timestamp: new Date().toISOString(), + executionId: resumeExecutionId, + workflowId, + data: { + output: result.output, + duration: result.metadata?.duration || 0, + startTime: result.metadata?.startTime || new Date().toISOString(), + endTime: result.metadata?.endTime || new Date().toISOString(), + }, + } as ExecutionEvent) + finalMetaStatus = 'complete' } else { writeBufferedEvent({ type: 'execution:completed', diff --git a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts index 1a8b0c24ba0..75831b27b95 100644 --- a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts +++ b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts @@ -216,6 +216,20 @@ export async function executeQueuedWorkflowJob( }, }) await setExecutionMeta(executionId, { status: 'cancelled' }) + } else if (result.status === 'paused') { + await eventWriter.write({ + type: 'execution:paused', + timestamp: new Date().toISOString(), + executionId, + workflowId, + data: { + output: outputWithBase64, + duration: result.metadata?.duration || 0, + startTime: result.metadata?.startTime || metadata.startTime, + endTime: result.metadata?.endTime || new Date().toISOString(), + }, + }) + await setExecutionMeta(executionId, { status: 'complete' }) } else { await eventWriter.write({ type: 'execution:completed', From 13302547c1f9865d21054dc3b586fb82a5bbf1b6 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 6 Apr 2026 14:04:56 -0700 Subject: [PATCH 7/8] fix types --- .../lib/workflows/executor/human-in-the-loop-manager.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index a6a6cab064c..2c0d00fcbb7 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -875,9 +875,9 @@ export class PauseResumeManager { childWorkflowBlockId: childWorkflowContext.parentBlockId, childWorkflowName: childWorkflowContext.workflowName, }), - ...(callbackData.childWorkflowInstanceId && { - childWorkflowInstanceId: callbackData.childWorkflowInstanceId, - }), + ...(callbackData.childWorkflowInstanceId + ? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId } + : {}), } writeBufferedEvent({ @@ -911,7 +911,8 @@ export class PauseResumeManager { } as ExecutionEvent) }, onStream: async (streamingExec: StreamingExecution) => { - const blockId = (streamingExec.execution as Record).blockId as string + const blockId = (streamingExec.execution as unknown as Record) + .blockId as string const reader = streamingExec.stream.getReader() const decoder = new TextDecoder() try { From e0147eb1f250347637ac5b9507012b111773ecf2 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 6 Apr 2026 16:04:25 -0700 Subject: [PATCH 8/8] address bugbot comments --- .../api/resume/[workflowId]/[executionId]/[contextId]/route.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts index f854b144d98..1a75d8aa598 100644 --- a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts +++ b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts @@ -148,6 +148,7 @@ export async function POST( return NextResponse.json({ success: result.success, + status: result.status ?? (result.success ? 'completed' : 'failed'), executionId: enqueueResult.resumeExecutionId, output: result.output, error: result.error,