Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/sim/app/api/cron/cleanup-stale-executions/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { verifyCronAuth } from '@/lib/auth/internal'
import { JOB_RETENTION_HOURS, JOB_STATUS } from '@/lib/core/async-jobs'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { workflowMetrics } from '@/lib/monitoring/metrics'

const logger = createLogger('CleanupStaleExecutions')

Expand All @@ -32,6 +33,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
executionId: workflowExecutionLogs.executionId,
workflowId: workflowExecutionLogs.workflowId,
startedAt: workflowExecutionLogs.startedAt,
trigger: workflowExecutionLogs.trigger,
})
.from(workflowExecutionLogs)
.where(
Expand Down Expand Up @@ -72,6 +74,13 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
staleDurationMinutes,
})

// Crashed workers never reach a LoggingSession completion path, so this
// is the only place these failures can be counted toward the error rate.
workflowMetrics.recordExecutionCompleted({
trigger: execution.trigger,
status: 'failed',
})
Comment thread
TheodoreSpeaks marked this conversation as resolved.

cleaned++
} catch (error) {
logger.error(`Failed to clean up execution ${execution.executionId}:`, {
Expand Down
18 changes: 18 additions & 0 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { redactApiKeys } from '@/lib/core/security/redaction'
import { normalizeStringArray } from '@/lib/core/utils/arrays'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { compactExecutionPayload } from '@/lib/execution/payloads/serializer'
import { workflowMetrics } from '@/lib/monitoring/metrics'
import {
containsUserFileWithMetadata,
hydrateUserFilesWithBase64,
Expand Down Expand Up @@ -239,6 +240,7 @@ export class BlockExecutor {
if (normalizedOutput.childTraceSpans && Array.isArray(normalizedOutput.childTraceSpans)) {
blockLog.childTraceSpans = normalizedOutput.childTraceSpans
}
this.recordBlockMetric(block, true, duration)
}

const { childTraceSpans: _traces, ...outputForState } = normalizedOutput
Expand Down Expand Up @@ -284,6 +286,21 @@ export class BlockExecutor {
}
}

private recordBlockMetric(block: SerializedBlock, success: boolean, durationMs: number): void {
const operation = block.config?.params?.operation
workflowMetrics.recordBlockExecuted({
blockType: block.metadata?.id || 'unknown',
// Operation is user-configured; only emit registry-style identifiers so
// dynamic values can't explode CloudWatch dimension cardinality.
operation:
typeof operation === 'string' && /^[a-zA-Z0-9_-]{1,64}$/.test(operation)
? operation
: undefined,
success,
durationMs,
})
}

private buildNodeMetadata(node: DAGNode): WorkflowNodeMetadata {
const metadata = node?.metadata ?? {}
return {
Expand Down Expand Up @@ -371,6 +388,7 @@ export class BlockExecutor {
if (ChildWorkflowError.isChildWorkflowError(error) && error.childTraceSpans.length > 0) {
blockLog.childTraceSpans = error.childTraceSpans
}
this.recordBlockMetric(block, false, duration)
}

this.execLogger.error(
Expand Down
129 changes: 129 additions & 0 deletions apps/sim/lib/logs/execution/logging-session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,24 @@ const {
completeWorkflowExecutionMock,
startWorkflowExecutionMock,
loadWorkflowStateForExecutionMock,
recordExecutionStartedMock,
recordExecutionCompletedMock,
recordExecutionPausedMock,
} = vi.hoisted(() => ({
completeWorkflowExecutionMock: vi.fn(),
startWorkflowExecutionMock: vi.fn(),
loadWorkflowStateForExecutionMock: vi.fn(),
recordExecutionStartedMock: vi.fn(),
recordExecutionCompletedMock: vi.fn(),
recordExecutionPausedMock: vi.fn(),
}))

vi.mock('@/lib/monitoring/metrics', () => ({
workflowMetrics: {
recordExecutionStarted: recordExecutionStartedMock,
recordExecutionCompleted: recordExecutionCompletedMock,
recordExecutionPaused: recordExecutionPausedMock,
},
}))

vi.mock('@sim/db', () => ({
Expand Down Expand Up @@ -648,3 +662,118 @@ describe('LoggingSession.markExecutionAsFailed workflowId scoping', () => {
expect(combined).toContain('force_failed')
})
})

describe('LoggingSession workflow metrics', () => {
beforeEach(() => {
vi.clearAllMocks()
startWorkflowExecutionMock.mockResolvedValue({})
completeWorkflowExecutionMock.mockResolvedValue({})
loadWorkflowStateForExecutionMock.mockResolvedValue({
blocks: {},
edges: [],
loops: {},
parallels: {},
})
dbMocks.selectLimit.mockResolvedValue([{ status: 'running' }])
dbMocks.execute.mockResolvedValue(undefined)
})

it('emits ExecutionStarted on start and not on resume', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
await session.start({ workspaceId: 'ws-1' })
expect(recordExecutionStartedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionStartedMock).toHaveBeenCalledWith({ trigger: 'api' })

recordExecutionStartedMock.mockClear()
const resumeSession = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
await resumeSession.start({ workspaceId: 'ws-1', skipLogCreation: true })
expect(recordExecutionStartedMock).not.toHaveBeenCalled()
})

it('emits a success completion with trigger and duration', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'webhook', 'req-1')
await session.complete({ totalDurationMs: 500 })

expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
trigger: 'webhook',
status: 'success',
durationMs: 500,
})
})

it('emits a failed completion via completeWithError', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'schedule', 'req-1')
await session.completeWithError({ totalDurationMs: 250, error: { message: 'boom' } })

expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
trigger: 'schedule',
status: 'failed',
durationMs: 250,
})
})

it('emits a cancelled completion via completeWithCancellation', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'manual', 'req-1')
await session.completeWithCancellation({ totalDurationMs: 100 })

expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
trigger: 'manual',
status: 'cancelled',
durationMs: 100,
})
})

it('emits ExecutionPaused (not a completion) on pause, then failed if markAsFailed follows', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
await session.completeWithPause({ totalDurationMs: 100 })

expect(recordExecutionPausedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionPausedMock).toHaveBeenCalledWith({ trigger: 'api' })
expect(recordExecutionCompletedMock).not.toHaveBeenCalled()

await session.markAsFailed('pause persistence failed')
expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
trigger: 'api',
status: 'failed',
durationMs: undefined,
})
})

it('does not double-emit when markAsFailed runs after a completed session', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
await session.complete({ totalDurationMs: 500 })
await session.markAsFailed('timeout')

expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionCompletedMock).toHaveBeenCalledWith({
trigger: 'api',
status: 'success',
durationMs: 500,
})
})

it('emits exactly one completion when the primary write fails and the fallback succeeds', async () => {
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
completeWorkflowExecutionMock
.mockRejectedValueOnce(new Error('finalize failed'))
.mockResolvedValueOnce({})

await session.safeCompleteWithError({ error: { message: 'boom' } })

expect(recordExecutionCompletedMock).toHaveBeenCalledTimes(1)
expect(recordExecutionCompletedMock).toHaveBeenCalledWith(
expect.objectContaining({ trigger: 'api', status: 'failed' })
)
})

it('skips the completion metric when the run was already cancelled elsewhere', async () => {
dbMocks.selectLimit.mockResolvedValue([{ status: 'cancelled' }])
const session = new LoggingSession('wf-1', 'exec-1', 'api', 'req-1')
await session.completeWithError({ error: { message: 'boom' } })

expect(recordExecutionCompletedMock).not.toHaveBeenCalled()
})
})
25 changes: 25 additions & 0 deletions apps/sim/lib/logs/execution/logging-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type {
TraceSpan,
WorkflowState,
} from '@/lib/logs/types'
import { type WorkflowExecutionStatus, workflowMetrics } from '@/lib/monitoring/metrics'
import type { SerializableExecutionState } from '@/executor/execution/types'

type TriggerData = Record<string, unknown> & {
Expand Down Expand Up @@ -137,6 +138,8 @@ export class LoggingSession {
private completionAttemptFailed = false
private pendingProgressWrites = new Set<Promise<void>>()
private postExecutionPromise: Promise<void> | null = null
/** Guards against double-counting ExecutionCompleted across completion paths */
private completionMetricEmitted = false

constructor(
workflowId: string,
Expand Down Expand Up @@ -218,6 +221,12 @@ export class LoggingSession {
}
}

private emitExecutionCompletedMetric(status: WorkflowExecutionStatus, durationMs?: number): void {
if (this.completionMetricEmitted) return
this.completionMetricEmitted = true
workflowMetrics.recordExecutionCompleted({ trigger: this.triggerType, status, durationMs })
}

private async completeExecutionWithFinalization(params: {
endedAt: string
totalDurationMs: number
Expand Down Expand Up @@ -325,6 +334,7 @@ export class LoggingSession {
workflowState: this.workflowState,
deploymentVersionId,
})
workflowMetrics.recordExecutionStarted({ trigger: this.triggerType })
} else {
// Resume: no cost reload needed. Billing reconciles from the usage_log
// ledger (pre-pause rows already exist) plus the live cost summary.
Expand Down Expand Up @@ -364,6 +374,7 @@ export class LoggingSession {
})

this.completed = true
this.emitExecutionCompletedMetric('success', duration)
Comment thread
greptile-apps[bot] marked this conversation as resolved.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Success path emits zero duration

Low Severity

Successful complete coerces missing totalDurationMs to 0 and always passes that number into emitExecutionCompletedMetric, so ExecutionDuration is emitted as 0ms. Error, cancel, and fallback paths omit or clamp duration when it is unknown, so this path can skew duration percentiles.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit fa8b4bd. Configure here.


if (traceSpans && traceSpans.length > 0) {
try {
Expand Down Expand Up @@ -500,6 +511,7 @@ export class LoggingSession {
})

this.completed = true
this.emitExecutionCompletedMetric('failed', Math.max(1, durationMs))
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

try {
const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import(
Expand Down Expand Up @@ -593,6 +605,7 @@ export class LoggingSession {
})

this.completed = true
this.emitExecutionCompletedMetric('cancelled', Math.max(1, durationMs))

try {
const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import(
Expand Down Expand Up @@ -688,6 +701,7 @@ export class LoggingSession {
})

this.completed = true
workflowMetrics.recordExecutionPaused({ trigger: this.triggerType })

try {
const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import(
Expand Down Expand Up @@ -779,6 +793,7 @@ export class LoggingSession {
workflowState: this.workflowState,
deploymentVersionId,
})
workflowMetrics.recordExecutionStarted({ trigger: this.triggerType })
Comment thread
TheodoreSpeaks marked this conversation as resolved.
Outdated

if (this.requestId) {
logger.debug(
Expand Down Expand Up @@ -969,6 +984,7 @@ export class LoggingSession {
this.requestId,
this.workflowId
)
this.emitExecutionCompletedMetric('failed')
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
}

static async markExecutionAsFailed(
Expand Down Expand Up @@ -1056,6 +1072,15 @@ export class LoggingSession {

this.completed = true

if (params.status === 'pending') {
workflowMetrics.recordExecutionPaused({ trigger: this.triggerType })
} else {
this.emitExecutionCompletedMetric(
params.status === 'failed' || params.status === 'cancelled' ? params.status : 'success',
params.totalDurationMs || 0
)
Comment thread
TheodoreSpeaks marked this conversation as resolved.
Comment thread
TheodoreSpeaks marked this conversation as resolved.
}
Comment thread
TheodoreSpeaks marked this conversation as resolved.

logger.info(
`[${this.requestId || 'unknown'}] Cost-only fallback succeeded for execution ${this.executionId}`
)
Expand Down
Loading
Loading