Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { normalizeName } from '@/executor/constants'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { Serializer } from '@/serializer'
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'

Expand Down Expand Up @@ -467,17 +468,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}

return NextResponse.json(filteredResult)
} catch (error: any) {
const errorMessage = error.message || 'Unknown error'
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)

const executionResult = error.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined

return NextResponse.json(
{
success: false,
output: executionResult?.output,
error: executionResult?.error || error.message || 'Execution failed',
error: executionResult?.error || errorMessage || 'Execution failed',
metadata: executionResult?.metadata
? {
duration: executionResult.metadata.duration,
Expand Down Expand Up @@ -788,11 +789,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:

// Cleanup base64 cache for this execution
await cleanupExecutionBase64Cache(executionId)
} catch (error: any) {
const errorMessage = error.message || 'Unknown error'
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)

const executionResult = error.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined

sendEvent({
type: 'execution:error',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import { useExecutionStream } from '@/hooks/use-execution-stream'
Expand Down Expand Up @@ -76,17 +77,6 @@ function normalizeErrorMessage(error: unknown): string {
return WORKFLOW_EXECUTION_FAILURE_MESSAGE
}

function isExecutionResult(value: unknown): value is ExecutionResult {
if (!isRecord(value)) return false
return typeof value.success === 'boolean' && isRecord(value.output)
}

function extractExecutionResult(error: unknown): ExecutionResult | null {
if (!isRecord(error)) return null
const candidate = error.executionResult
return isExecutionResult(candidate) ? candidate : null
}

export function useWorkflowExecution() {
const queryClient = useQueryClient()
const currentWorkflow = useCurrentWorkflow()
Expand Down Expand Up @@ -1138,11 +1128,11 @@ export function useWorkflowExecution() {

const handleExecutionError = (error: unknown, options?: { executionId?: string }) => {
const normalizedMessage = normalizeErrorMessage(error)
const executionResultFromError = extractExecutionResult(error)

let errorResult: ExecutionResult

if (executionResultFromError) {
if (hasExecutionResult(error)) {
const executionResultFromError = error.executionResult
const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : []

errorResult = {
Expand Down
5 changes: 2 additions & 3 deletions apps/sim/background/schedule-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
} from '@/lib/workflows/schedules/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'

const logger = createLogger('TriggerScheduleExecution')
Expand Down Expand Up @@ -231,8 +231,7 @@ async function runWorkflowExecution({
} catch (error: unknown) {
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)

const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }

await loggingSession.safeCompleteWithError({
Expand Down
15 changes: 8 additions & 7 deletions apps/sim/background/webhook-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
import { getWorkflowById } from '@/lib/workflows/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { safeAssign } from '@/tools/safe-assign'
import { getTrigger, isTriggerValid } from '@/triggers'

Expand Down Expand Up @@ -578,12 +578,13 @@ async function executeWebhookJobInternal(
deploymentVersionId,
})

const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult || {
success: false,
output: {},
logs: [],
}
const executionResult = hasExecutionResult(error)
? error.executionResult
: {
success: false,
output: {},
logs: [],
}
const { traceSpans } = buildTraceSpans(executionResult)

await loggingSession.safeCompleteWithError({
Expand Down
5 changes: 2 additions & 3 deletions apps/sim/background/workflow-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-m
import { getWorkflowById } from '@/lib/workflows/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import type { CoreTriggerType } from '@/stores/logs/filters/types'

const logger = createLogger('TriggerWorkflowExecution')
Expand Down Expand Up @@ -160,8 +160,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
executionId,
})

const errorWithResult = error as { executionResult?: ExecutionResult }
const executionResult = errorWithResult?.executionResult
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }

await loggingSession.safeCompleteWithError({
Expand Down
31 changes: 31 additions & 0 deletions apps/sim/executor/errors/child-workflow-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { ExecutionResult } from '@/executor/types'

interface ChildWorkflowErrorOptions {
message: string
childWorkflowName: string
childTraceSpans?: TraceSpan[]
executionResult?: ExecutionResult
cause?: Error
}

/**
* Error raised when a child workflow execution fails.
*/
export class ChildWorkflowError extends Error {
readonly childTraceSpans: TraceSpan[]
readonly childWorkflowName: string
readonly executionResult?: ExecutionResult

constructor(options: ChildWorkflowErrorOptions) {
super(options.message, { cause: options.cause })
this.name = 'ChildWorkflowError'
this.childWorkflowName = options.childWorkflowName
this.childTraceSpans = options.childTraceSpans ?? []
this.executionResult = options.executionResult
}

static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
return error instanceof ChildWorkflowError
}
}
23 changes: 13 additions & 10 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
isSentinelBlockType,
} from '@/executor/constants'
import type { DAGNode } from '@/executor/dag/builder'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import {
generatePauseContextId,
Expand Down Expand Up @@ -213,24 +214,26 @@ export class BlockExecutor {
? resolvedInputs
: ((block.config?.params as Record<string, any> | undefined) ?? {})

if (blockLog) {
blockLog.endedAt = new Date().toISOString()
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
blockLog.input = input
}

const errorOutput: NormalizedBlockOutput = {
error: errorMessage,
}

if (error && typeof error === 'object' && 'childTraceSpans' in error) {
errorOutput.childTraceSpans = (error as any).childTraceSpans
if (ChildWorkflowError.isChildWorkflowError(error)) {
errorOutput.childTraceSpans = error.childTraceSpans
errorOutput.childWorkflowName = error.childWorkflowName
}

this.state.setBlockOutput(node.id, errorOutput, duration)

if (blockLog) {
blockLog.endedAt = new Date().toISOString()
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
blockLog.input = input
blockLog.output = this.filterOutputForLog(block, errorOutput)
}

logger.error(
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
{
Expand Down
6 changes: 3 additions & 3 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import type {
PausePoint,
ResumeStatus,
} from '@/executor/types'
import { normalizeError } from '@/executor/utils/errors'
import { attachExecutionResult, normalizeError } from '@/executor/utils/errors'

const logger = createLogger('ExecutionEngine')

Expand Down Expand Up @@ -170,8 +170,8 @@ export class ExecutionEngine {
metadata: this.context.metadata,
}

if (error && typeof error === 'object') {
;(error as any).executionResult = executionResult
if (error instanceof Error) {
attachExecutionResult(error, executionResult)
}
throw error
}
Expand Down
44 changes: 23 additions & 21 deletions apps/sim/executor/handlers/workflow/workflow-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import type { TraceSpan } from '@/lib/logs/types'
import type { BlockOutput } from '@/blocks/types'
import { Executor } from '@/executor'
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type {
BlockHandler,
ExecutionContext,
ExecutionResult,
StreamingExecution,
} from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
import { parseJSON } from '@/executor/utils/json'
import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup'
Expand Down Expand Up @@ -137,39 +139,39 @@ export class WorkflowBlockHandler implements BlockHandler {
)

return mappedResult
} catch (error: any) {
} catch (error: unknown) {
logger.error(`Error executing child workflow ${workflowId}:`, error)

const { workflows } = useWorkflowRegistry.getState()
const workflowMetadata = workflows[workflowId]
const childWorkflowName = workflowMetadata?.name || workflowId

const originalError = error.message || 'Unknown error'
const wrappedError = new Error(
`Error in child workflow "${childWorkflowName}": ${originalError}`
)
const originalError = error instanceof Error ? error.message : 'Unknown error'
let childTraceSpans: WorkflowTraceSpan[] = []
let executionResult: ExecutionResult | undefined

if (error.executionResult?.logs) {
const executionResult = error.executionResult as ExecutionResult
if (hasExecutionResult(error) && error.executionResult.logs) {
executionResult = error.executionResult

logger.info(`Extracting child trace spans from error.executionResult`, {
hasLogs: (executionResult.logs?.length ?? 0) > 0,
logCount: executionResult.logs?.length ?? 0,
})

const childTraceSpans = this.captureChildWorkflowLogs(
executionResult,
childWorkflowName,
ctx
)
childTraceSpans = this.captureChildWorkflowLogs(executionResult, childWorkflowName, ctx)

logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
;(wrappedError as any).childTraceSpans = childTraceSpans
} else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) {
;(wrappedError as any).childTraceSpans = error.childTraceSpans
} else if (ChildWorkflowError.isChildWorkflowError(error)) {
childTraceSpans = error.childTraceSpans
}

throw wrappedError
throw new ChildWorkflowError({
message: `Error in child workflow "${childWorkflowName}": ${originalError}`,
childWorkflowName,
childTraceSpans,
executionResult,
cause: error instanceof Error ? error : undefined,
})
}
}

Expand Down Expand Up @@ -441,11 +443,11 @@ export class WorkflowBlockHandler implements BlockHandler {

if (!success) {
logger.warn(`Child workflow ${childWorkflowName} failed`)
const error = new Error(
`Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`
)
;(error as any).childTraceSpans = childTraceSpans || []
throw error
throw new ChildWorkflowError({
message: `Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`,
childWorkflowName,
childTraceSpans: childTraceSpans || [],
})
}

return {
Expand Down
35 changes: 34 additions & 1 deletion apps/sim/executor/utils/errors.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,39 @@
import type { ExecutionContext } from '@/executor/types'
import type { ExecutionContext, ExecutionResult } from '@/executor/types'
import type { SerializedBlock } from '@/serializer/types'

/**
* Interface for errors that carry an ExecutionResult.
* Used when workflow execution fails and we want to preserve partial results.
*/
export interface ErrorWithExecutionResult extends Error {
executionResult: ExecutionResult
}

/**
* Type guard to check if an error carries an ExecutionResult.
* Validates that executionResult has required fields (success, output).
*/
export function hasExecutionResult(error: unknown): error is ErrorWithExecutionResult {
if (
!(error instanceof Error) ||
!('executionResult' in error) ||
error.executionResult == null ||
typeof error.executionResult !== 'object'
) {
return false
}

const result = error.executionResult as Record<string, unknown>
return typeof result.success === 'boolean' && result.output != null
}

/**
* Attaches an ExecutionResult to an error for propagation to parent workflows.
*/
export function attachExecutionResult(error: Error, executionResult: ExecutionResult): void {
Object.assign(error, { executionResult })
}

export interface BlockExecutionErrorDetails {
block: SerializedBlock
error: Error | string
Expand Down
9 changes: 7 additions & 2 deletions apps/sim/hooks/use-execution-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,13 @@ export function useExecutionStream() {
})

if (!response.ok) {
const error = await response.json()
throw new Error(error.error || 'Failed to start execution')
const errorResponse = await response.json()
const error = new Error(errorResponse.error || 'Failed to start execution')
// Attach the execution result from server response for error handling
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
throw error
}

if (!response.body) {
Expand Down
Loading