-
Notifications
You must be signed in to change notification settings - Fork 3.7k
feat(metrics): emit workflow execution and per-block metrics to CloudWatch #4931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
64a80b4
912349e
8673e5c
83bfcbd
fa8b4bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ import type { | |
| TraceSpan, | ||
| WorkflowState, | ||
| } from '@/lib/logs/types' | ||
| import { type WorkflowExecutionStatus, workflowMetrics } from '@/lib/monitoring/metrics' | ||
| import type { SerializableExecutionState } from '@/executor/execution/types' | ||
|
|
||
| type TriggerData = Record<string, unknown> & { | ||
|
|
@@ -137,6 +138,10 @@ export class LoggingSession { | |
| private completionAttemptFailed = false | ||
| private pendingProgressWrites = new Set<Promise<void>>() | ||
| private postExecutionPromise: Promise<void> | null = null | ||
| /** Guards against double-counting ExecutionStarted across start/fallback paths */ | ||
| private startMetricEmitted = false | ||
| /** Guards against double-counting ExecutionCompleted across completion paths */ | ||
| private completionMetricEmitted = false | ||
|
|
||
| constructor( | ||
| workflowId: string, | ||
|
|
@@ -218,6 +223,18 @@ export class LoggingSession { | |
| } | ||
| } | ||
|
|
||
| private emitExecutionStartedMetric(): void { | ||
| if (this.startMetricEmitted) return | ||
| this.startMetricEmitted = true | ||
| workflowMetrics.recordExecutionStarted({ trigger: this.triggerType }) | ||
| } | ||
|
|
||
| private emitExecutionCompletedMetric(status: WorkflowExecutionStatus, durationMs?: number): void { | ||
| if (this.completionMetricEmitted) return | ||
| this.completionMetricEmitted = true | ||
| workflowMetrics.recordExecutionCompleted({ trigger: this.triggerType, status, durationMs }) | ||
| } | ||
|
|
||
| private async completeExecutionWithFinalization(params: { | ||
| endedAt: string | ||
| totalDurationMs: number | ||
|
|
@@ -325,6 +342,7 @@ export class LoggingSession { | |
| workflowState: this.workflowState, | ||
| deploymentVersionId, | ||
| }) | ||
| this.emitExecutionStartedMetric() | ||
| } else { | ||
| // Resume: no cost reload needed. Billing reconciles from the usage_log | ||
| // ledger (pre-pause rows already exist) plus the live cost summary. | ||
|
|
@@ -364,6 +382,7 @@ export class LoggingSession { | |
| }) | ||
|
|
||
| this.completed = true | ||
| this.emitExecutionCompletedMetric('success', duration) | ||
|
greptile-apps[bot] marked this conversation as resolved.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Success path emits zero durationLow Severity Successful Reviewed by Cursor Bugbot for commit fa8b4bd. Configure here. |
||
|
|
||
| if (traceSpans && traceSpans.length > 0) { | ||
| try { | ||
|
|
@@ -500,6 +519,7 @@ export class LoggingSession { | |
| }) | ||
|
|
||
| this.completed = true | ||
| this.emitExecutionCompletedMetric('failed', Math.max(1, durationMs)) | ||
|
cursor[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| try { | ||
| const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import( | ||
|
|
@@ -593,6 +613,7 @@ export class LoggingSession { | |
| }) | ||
|
|
||
| this.completed = true | ||
| this.emitExecutionCompletedMetric('cancelled', Math.max(1, durationMs)) | ||
|
|
||
| try { | ||
| const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import( | ||
|
|
@@ -688,6 +709,7 @@ export class LoggingSession { | |
| }) | ||
|
|
||
| this.completed = true | ||
| workflowMetrics.recordExecutionPaused({ trigger: this.triggerType }) | ||
|
|
||
| try { | ||
| const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import( | ||
|
|
@@ -779,6 +801,7 @@ export class LoggingSession { | |
| workflowState: this.workflowState, | ||
| deploymentVersionId, | ||
| }) | ||
| this.emitExecutionStartedMetric() | ||
|
|
||
| if (this.requestId) { | ||
| logger.debug( | ||
|
|
@@ -969,6 +992,7 @@ export class LoggingSession { | |
| this.requestId, | ||
| this.workflowId | ||
| ) | ||
| this.emitExecutionCompletedMetric('failed') | ||
|
cursor[bot] marked this conversation as resolved.
Outdated
cursor[bot] marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| static async markExecutionAsFailed( | ||
|
|
@@ -1056,6 +1080,15 @@ export class LoggingSession { | |
|
|
||
| this.completed = true | ||
|
|
||
| if (params.status === 'pending') { | ||
| workflowMetrics.recordExecutionPaused({ trigger: this.triggerType }) | ||
| } else { | ||
| this.emitExecutionCompletedMetric( | ||
| params.status === 'failed' || params.status === 'cancelled' ? params.status : 'success', | ||
| params.totalDurationMs | ||
| ) | ||
|
TheodoreSpeaks marked this conversation as resolved.
TheodoreSpeaks marked this conversation as resolved.
|
||
| } | ||
|
TheodoreSpeaks marked this conversation as resolved.
|
||
|
|
||
| logger.info( | ||
| `[${this.requestId || 'unknown'}] Cost-only fallback succeeded for execution ${this.executionId}` | ||
| ) | ||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.