Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/sim/app/api/v1/admin/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ export interface AdminDeployResult {
isDeployed: boolean
version: number
deployedAt: string
warnings?: string[]
}

export interface AdminUndeployResult {
Expand Down
108 changes: 100 additions & 8 deletions apps/sim/app/api/v1/admin/workflows/[id]/deploy/route.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import { db, workflow } from '@sim/db'
import { db, workflow, workflowDeploymentVersion } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { and, eq } from 'drizzle-orm'
import { generateRequestId } from '@/lib/core/utils/request'
import { cleanupWebhooksForWorkflow } from '@/lib/webhooks/deploy'
import { removeMcpToolsForWorkflow, syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import {
cleanupWebhooksForWorkflow,
restorePreviousVersionWebhooks,
saveTriggerWebhooksForDeploy,
} from '@/lib/webhooks/deploy'
import {
deployWorkflow,
loadWorkflowFromNormalizedTables,
undeployWorkflow,
} from '@/lib/workflows/persistence/utils'
import { createSchedulesForDeploy, validateWorkflowSchedules } from '@/lib/workflows/schedules'
import {
cleanupDeploymentVersion,
createSchedulesForDeploy,
validateWorkflowSchedules,
} from '@/lib/workflows/schedules'
import { withAdminAuthParams } from '@/app/api/v1/admin/middleware'
import {
badRequestResponse,
Expand All @@ -28,10 +37,11 @@ interface RouteParams {

export const POST = withAdminAuthParams<RouteParams>(async (request, context) => {
const { id: workflowId } = await context.params
const requestId = generateRequestId()

try {
const [workflowRecord] = await db
.select({ id: workflow.id, name: workflow.name })
.select()
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
Expand All @@ -50,6 +60,18 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
return badRequestResponse(`Invalid schedule configuration: ${scheduleValidation.error}`)
}

const [currentActiveVersion] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const previousVersionId = currentActiveVersion?.id

const deployResult = await deployWorkflow({
workflowId,
deployedBy: ADMIN_ACTOR_ID,
Expand All @@ -65,22 +87,91 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
return internalErrorResponse('Failed to resolve deployment version')
}

const workflowData = workflowRecord as Record<string, unknown>

const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId,
workflow: workflowData,
userId: workflowRecord.userId,
blocks: normalizedData.blocks,
requestId,
deploymentVersionId: deployResult.deploymentVersionId,
previousVersionId,
})

if (!triggerSaveResult.success) {
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: deployResult.deploymentVersionId,
})
await undeployWorkflow({ workflowId })
return internalErrorResponse(
triggerSaveResult.error?.message || 'Failed to sync trigger configuration'
)
}

const scheduleResult = await createSchedulesForDeploy(
workflowId,
normalizedData.blocks,
db,
deployResult.deploymentVersionId
)
if (!scheduleResult.success) {
logger.warn(`Schedule creation failed for workflow ${workflowId}: ${scheduleResult.error}`)
logger.error(
`[${requestId}] Admin API: Schedule creation failed for workflow ${workflowId}: ${scheduleResult.error}`
)
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: deployResult.deploymentVersionId,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData,
userId: workflowRecord.userId,
previousVersionId,
requestId,
})
}
await undeployWorkflow({ workflowId })
return internalErrorResponse(scheduleResult.error || 'Failed to create schedule')
}

logger.info(`Admin API: Deployed workflow ${workflowId} as v${deployResult.version}`)
if (previousVersionId && previousVersionId !== deployResult.deploymentVersionId) {
try {
logger.info(`[${requestId}] Admin API: Cleaning up previous version ${previousVersionId}`)
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
} catch (cleanupError) {
logger.error(
`[${requestId}] Admin API: Failed to clean up previous version ${previousVersionId}`,
cleanupError
)
}
}

logger.info(
`[${requestId}] Admin API: Deployed workflow ${workflowId} as v${deployResult.version}`
)

// Sync MCP tools with the latest parameter schema
await syncMcpToolsForWorkflow({ workflowId, requestId, context: 'deploy' })

const response: AdminDeployResult = {
isDeployed: true,
version: deployResult.version!,
deployedAt: deployResult.deployedAt!.toISOString(),
warnings: triggerSaveResult.warnings,
}

return singleResponse(response)
Expand All @@ -105,7 +196,6 @@ export const DELETE = withAdminAuthParams<RouteParams>(async (request, context)
return notFoundResponse('Workflow')
}

// Clean up external webhook subscriptions before undeploying
await cleanupWebhooksForWorkflow(
workflowId,
workflowRecord as Record<string, unknown>,
Expand All @@ -117,6 +207,8 @@ export const DELETE = withAdminAuthParams<RouteParams>(async (request, context)
return internalErrorResponse(result.error || 'Failed to undeploy workflow')
}

await removeMcpToolsForWorkflow(workflowId, requestId)

logger.info(`Admin API: Undeployed workflow ${workflowId}`)

const response: AdminUndeployResult = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import { db, workflow } from '@sim/db'
import { db, workflow, workflowDeploymentVersion } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { and, eq } from 'drizzle-orm'
import { generateRequestId } from '@/lib/core/utils/request'
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
import {
cleanupDeploymentVersion,
createSchedulesForDeploy,
validateWorkflowSchedules,
} from '@/lib/workflows/schedules'
import { withAdminAuthParams } from '@/app/api/v1/admin/middleware'
import {
badRequestResponse,
internalErrorResponse,
notFoundResponse,
singleResponse,
} from '@/app/api/v1/admin/responses'
import type { BlockState } from '@/stores/workflows/workflow/types'

const logger = createLogger('AdminWorkflowActivateVersionAPI')

Expand All @@ -18,11 +27,12 @@ interface RouteParams {
}

export const POST = withAdminAuthParams<RouteParams>(async (request, context) => {
const requestId = generateRequestId()
const { id: workflowId, versionId } = await context.params

try {
const [workflowRecord] = await db
.select({ id: workflow.id })
.select()
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
Expand All @@ -36,23 +46,161 @@ export const POST = withAdminAuthParams<RouteParams>(async (request, context) =>
return badRequestResponse('Invalid version number')
}

const [versionRow] = await db
.select({
id: workflowDeploymentVersion.id,
state: workflowDeploymentVersion.state,
})
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.version, versionNum)
)
)
.limit(1)

if (!versionRow?.state) {
return notFoundResponse('Deployment version')
}

const [currentActiveVersion] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)

const previousVersionId = currentActiveVersion?.id

const deployedState = versionRow.state as { blocks?: Record<string, BlockState> }
const blocks = deployedState.blocks
if (!blocks || typeof blocks !== 'object') {
return internalErrorResponse('Invalid deployed state structure')
}

const workflowData = workflowRecord as Record<string, unknown>

const scheduleValidation = validateWorkflowSchedules(blocks)
if (!scheduleValidation.isValid) {
return badRequestResponse(`Invalid schedule configuration: ${scheduleValidation.error}`)
}

const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId,
workflow: workflowData,
userId: workflowRecord.userId,
blocks,
requestId,
deploymentVersionId: versionRow.id,
previousVersionId,
forceRecreateSubscriptions: true,
})

if (!triggerSaveResult.success) {
logger.error(
`[${requestId}] Admin API: Failed to sync triggers for workflow ${workflowId}`,
triggerSaveResult.error
)
return internalErrorResponse(
triggerSaveResult.error?.message || 'Failed to sync trigger configuration'
)
}

const scheduleResult = await createSchedulesForDeploy(workflowId, blocks, db, versionRow.id)

if (!scheduleResult.success) {
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData,
userId: workflowRecord.userId,
previousVersionId,
requestId,
})
}
return internalErrorResponse(scheduleResult.error || 'Failed to sync schedules')
}

const result = await activateWorkflowVersion({ workflowId, version: versionNum })
if (!result.success) {
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData,
userId: workflowRecord.userId,
previousVersionId,
requestId,
})
}
if (result.error === 'Deployment version not found') {
return notFoundResponse('Deployment version')
}
return internalErrorResponse(result.error || 'Failed to activate version')
}

logger.info(`Admin API: Activated version ${versionNum} for workflow ${workflowId}`)
if (previousVersionId && previousVersionId !== versionRow.id) {
try {
logger.info(
`[${requestId}] Admin API: Cleaning up previous version ${previousVersionId} webhooks/schedules`
)
await cleanupDeploymentVersion({
workflowId,
workflow: workflowData,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
logger.info(`[${requestId}] Admin API: Previous version cleanup completed`)
} catch (cleanupError) {
logger.error(
`[${requestId}] Admin API: Failed to clean up previous version ${previousVersionId}`,
cleanupError
)
}
}

await syncMcpToolsForWorkflow({
workflowId,
requestId,
state: versionRow.state,
context: 'activate',
})

logger.info(
`[${requestId}] Admin API: Activated version ${versionNum} for workflow ${workflowId}`
)

return singleResponse({
success: true,
version: versionNum,
deployedAt: result.deployedAt!.toISOString(),
warnings: triggerSaveResult.warnings,
})
} catch (error) {
logger.error(`Admin API: Failed to activate version for workflow ${workflowId}`, { error })
logger.error(
`[${requestId}] Admin API: Failed to activate version for workflow ${workflowId}`,
{
error,
}
)
return internalErrorResponse('Failed to activate deployment version')
}
})
Loading