Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions pkg/fleet/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func testInstallerConfigWithDDA() map[string]installerConfig {
}
}

func testCompletedAgentStatus() *v2alpha1.DaemonSetStatus {
return &v2alpha1.DaemonSetStatus{Desired: 3, UpToDate: 3, Ready: 3}
}

func testStartRequest() remoteAPIRequest {
return remoteAPIRequest{
ID: "exp-abc",
Expand Down Expand Up @@ -1048,6 +1052,26 @@ func TestSetTaskState_PreservesOtherPackages(t *testing.T) {
assert.Equal(t, pbgo.TaskState_DONE, rc.state[1].Task.State)
}

func TestSetTaskState_PreservesAllFields(t *testing.T) {
d, rc := testDaemonWithRC([]*pbgo.PackageState{
{
Package: "datadog-operator",
StableVersion: "1.0.0",
ExperimentVersion: "2.0.0",
StableConfigVersion: "cfg-stable",
ExperimentConfigVersion: "cfg-exp",
},
})
d.setTaskState("datadog-operator", "task-1", pbgo.TaskState_DONE, nil)

require.Len(t, rc.state, 1)
pkg := rc.state[0]
assert.Equal(t, "1.0.0", pkg.StableVersion)
assert.Equal(t, "2.0.0", pkg.ExperimentVersion)
assert.Equal(t, "cfg-stable", pkg.StableConfigVersion)
assert.Equal(t, "cfg-exp", pkg.ExperimentConfigVersion)
}

func TestSetTaskState_NilClient(t *testing.T) {
d := &Daemon{}
// Must not panic when rcClient is nil.
Expand Down Expand Up @@ -1276,6 +1300,7 @@ func TestRunPendingOperationWorker_StartTerminalPhaseSetsError(t *testing.T) {

func TestRunPendingOperationWorker_CompletesStatusUpdateForNonDefaultPackage(t *testing.T) {
dda := testDDAObject(v2alpha1.ExperimentPhaseRunning)
dda.Status.Agent = testCompletedAgentStatus()
dda.Annotations[v2alpha1.AnnotationPendingTaskID] = "task-1"
dda.Annotations[v2alpha1.AnnotationPendingAction] = string(pendingIntentStart)
dda.Annotations[v2alpha1.AnnotationPendingExperimentID] = testExperimentID
Expand Down Expand Up @@ -1445,6 +1470,7 @@ func TestStartDatadogAgentExperiment_OverwritesStalePendingResultVersion(t *test

func TestRunPendingOperationWorker_RecoversPendingOperationFromAnnotations(t *testing.T) {
dda := testDDAObject(v2alpha1.ExperimentPhaseRunning)
dda.Status.Agent = testCompletedAgentStatus()
dda.Annotations[v2alpha1.AnnotationPendingTaskID] = "task-1"
dda.Annotations[v2alpha1.AnnotationPendingAction] = string(pendingIntentStart)
dda.Annotations[v2alpha1.AnnotationPendingExperimentID] = testExperimentID
Expand Down Expand Up @@ -1503,3 +1529,122 @@ func TestRunPendingOperationWorker_RecoversPromoteResultVersionFromAnnotations(t
require.NoError(t, c.Get(context.Background(), testDDANSN, got))
assert.Empty(t, got.Annotations[v2alpha1.AnnotationPendingResultVersion])
}

// --- isDaemonSetRolloutComplete tests ---

func TestIsDaemonSetRolloutComplete(t *testing.T) {
tests := []struct {
name string
agent *v2alpha1.DaemonSetStatus
want bool
}{
{
name: "nil agent — rollout not yet reported",
agent: nil,
want: false,
},
{
name: "zero desired — trivially complete (all nodes tainted/cordoned)",
agent: &v2alpha1.DaemonSetStatus{Desired: 0},
want: true,
},
{
name: "rolling out — some pods not yet updated",
agent: &v2alpha1.DaemonSetStatus{Desired: 3, UpToDate: 1, Ready: 1},
want: false,
},
{
name: "updated but none ready",
agent: &v2alpha1.DaemonSetStatus{Desired: 3, UpToDate: 3, Ready: 0},
want: false,
},
{
name: "updated but only partially ready",
agent: &v2alpha1.DaemonSetStatus{Desired: 3, UpToDate: 3, Ready: 2},
want: false,
},
{
name: "all updated and all ready",
agent: &v2alpha1.DaemonSetStatus{Desired: 3, UpToDate: 3, Ready: 3},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, isDaemonSetRolloutComplete(tt.agent))
})
}
}

// --- Worker start gating tests ---

func TestRunPendingOperationWorker_StartWaitsForRollout_NilAgent(t *testing.T) {
dda := testDDAObject(v2alpha1.ExperimentPhaseRunning)
// status.Agent deliberately left nil — rollout not yet reflected in status
dda.Annotations[v2alpha1.AnnotationPendingTaskID] = "task-1"
dda.Annotations[v2alpha1.AnnotationPendingAction] = string(pendingIntentStart)
dda.Annotations[v2alpha1.AnnotationPendingExperimentID] = testExperimentID
dda.Annotations[v2alpha1.AnnotationPendingPackage] = "datadog-operator"

d, _ := testDaemon(dda, testInstallerConfigWithDDA())
d.rcClient = &mockRCClient{state: []*pbgo.PackageState{
{Package: "datadog-operator", StableConfigVersion: "stable-1"},
}}

tracker := newOperationTracker(d)
tracker.onStatusUpdate(context.Background(), newDDAStatusSnapshot(dda))

rc := d.rcClient.(*mockRCClient)
require.NotNil(t, rc.state[0].Task)
assert.Equal(t, pbgo.TaskState_RUNNING, rc.state[0].Task.State)
assert.Nil(t, rc.state[0].Task.Error)
}

func TestRunPendingOperationWorker_StartWaitsForRollout_PartialUpdate(t *testing.T) {
dda := testDDAObject(v2alpha1.ExperimentPhaseRunning)
dda.Status.Agent = &v2alpha1.DaemonSetStatus{Desired: 3, UpToDate: 1, Ready: 1}
dda.Annotations[v2alpha1.AnnotationPendingTaskID] = "task-1"
dda.Annotations[v2alpha1.AnnotationPendingAction] = string(pendingIntentStart)
dda.Annotations[v2alpha1.AnnotationPendingExperimentID] = testExperimentID
dda.Annotations[v2alpha1.AnnotationPendingPackage] = "datadog-operator"

d, _ := testDaemon(dda, testInstallerConfigWithDDA())
d.rcClient = &mockRCClient{state: []*pbgo.PackageState{
{Package: "datadog-operator", StableConfigVersion: "stable-1"},
}}

tracker := newOperationTracker(d)
tracker.onStatusUpdate(context.Background(), newDDAStatusSnapshot(dda))

rc := d.rcClient.(*mockRCClient)
require.NotNil(t, rc.state[0].Task)
assert.Equal(t, pbgo.TaskState_RUNNING, rc.state[0].Task.State)
assert.Nil(t, rc.state[0].Task.Error)
}

func TestRunPendingOperationWorker_StartDoneAfterRolloutComplete(t *testing.T) {
dda := testDDAObject(v2alpha1.ExperimentPhaseRunning)
dda.Status.Agent = testCompletedAgentStatus()
dda.Annotations[v2alpha1.AnnotationPendingTaskID] = "task-1"
dda.Annotations[v2alpha1.AnnotationPendingAction] = string(pendingIntentStart)
dda.Annotations[v2alpha1.AnnotationPendingExperimentID] = testExperimentID
dda.Annotations[v2alpha1.AnnotationPendingPackage] = "datadog-operator"

d, c := testDaemon(dda, testInstallerConfigWithDDA())
d.rcClient = &mockRCClient{state: []*pbgo.PackageState{
{Package: "datadog-operator", StableConfigVersion: "stable-1"},
}}

tracker := newOperationTracker(d)
tracker.onStatusUpdate(context.Background(), newDDAStatusSnapshot(dda))

rc := d.rcClient.(*mockRCClient)
require.NotNil(t, rc.state[0].Task)
assert.Equal(t, pbgo.TaskState_DONE, rc.state[0].Task.State)
assert.Nil(t, rc.state[0].Task.Error)
assert.Equal(t, testExperimentID, rc.state[0].ExperimentConfigVersion)

got := &v2alpha1.DatadogAgent{}
require.NoError(t, c.Get(context.Background(), testDDANSN, got))
assert.Empty(t, got.Annotations[v2alpha1.AnnotationPendingTaskID])
}
20 changes: 19 additions & 1 deletion pkg/fleet/daemon_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ddaStatusSnapshot struct {
nsn types.NamespacedName
annotations map[string]string
experiment *v2alpha1.ExperimentStatus
agent *v2alpha1.DaemonSetStatus
}

type pendingOperation struct {
Expand Down Expand Up @@ -145,6 +146,9 @@ func newDDAStatusSnapshot(dda *v2alpha1.DatadogAgent) ddaStatusSnapshot {
if dda.Status.Experiment != nil {
snapshot.experiment = dda.Status.Experiment.DeepCopy()
}
if dda.Status.Agent != nil {
snapshot.agent = dda.Status.Agent.DeepCopy()
}
return snapshot
}

Expand All @@ -160,7 +164,7 @@ func evaluatePendingTask(snapshot ddaStatusSnapshot, task pendingOperation) (boo
switch task.intent {
case pendingIntentStart:
if phase == v2alpha1.ExperimentPhaseRunning {
return true, nil
return isDaemonSetRolloutComplete(snapshot.agent), nil

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Verify the rollout status matches the new spec

For a start task on an already-healthy DatadogAgent, this can still mark the task DONE before the experiment DaemonSet has rolled out. The Fleet start patch writes the new spec and signal together, then processStartSignal sets status.experiment.phase=running; in the same reconcile, addDDAIStatusToDDAStatus copies the current DDAI status, which is still the pre-experiment DaemonSet status until the DDAI controller updates and rolls the DaemonSet. If that old status was healthy (UpToDate == Desired && Ready == Desired), this line returns true, clears the pending annotations, and updates RC as if the rollout completed. The gate needs to ensure the status corresponds to the experiment spec/hash/generation, not just that the last reported counts were healthy.

Useful? React with 👍 / 👎.

}
case pendingIntentStop:
return isTerminalPhase(phase), nil
Expand All @@ -177,6 +181,18 @@ func evaluatePendingTask(snapshot ddaStatusSnapshot, task pendingOperation) (boo
return false, nil
}

func isDaemonSetRolloutComplete(agent *v2alpha1.DaemonSetStatus) bool {
if agent == nil {
return false
}
// Desired == 0 is trivially complete: all nodes are tainted/cordoned so
// there is nothing to roll out. This is safe because experiments update an
// existing DaemonSet rather than create a new one, so Desired never
// transiently passes through 0 mid-rollout — it stays at its current node
// count throughout the update.
return agent.UpToDate == agent.Desired && agent.Ready == agent.Desired
}

// finishPendingOperation writes the final RC state for a task.
//
// RC is updated before DDA annotations are cleared. If the daemon crashes in
Expand All @@ -200,8 +216,10 @@ func (d *Daemon) finishPendingOperation(ctx context.Context, task pendingOperati
}
}
if resultErr != nil {
ctrl.Log.Info("Task finished with error", "taskID", task.taskID, "package", task.packageName, "intent", task.intent, "error", resultErr)
d.setTaskState(task.packageName, task.taskID, pbgo.TaskState_ERROR, resultErr)
} else {
ctrl.Log.Info("Task finished successfully", "taskID", task.taskID, "package", task.packageName, "intent", task.intent)
d.setTaskState(task.packageName, task.taskID, pbgo.TaskState_DONE, nil)
}
d.taskMu.Unlock()
Expand Down
Loading