Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,15 @@ func classifyWaitingContainer(waiting *v1.ContainerStateWaiting, c v1.PodConditi
case "ImagePullBackOff":
gracePeriod := config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration
if time.Since(t) >= gracePeriod {
if isRegistryRateLimited(waiting.Message) {
// Registry rate limiting (HTTP 429) is not caused by the user
// and is a transient infrastructure problem. Classify as a
// system-retryable failure so it does not consume the user's
// retry budget.
return pluginsCore.PhaseInfoSystemRetryableFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}
return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
Expand Down Expand Up @@ -1355,6 +1364,13 @@ func GetMessageAfterGracePeriod(message string, gracePeriod time.Duration) strin
return fmt.Sprintf("Grace period [%s] exceeded|%s", gracePeriod, message)
}

// isRegistryRateLimited reports whether an ImagePullBackOff message indicates
// the container runtime hit HTTP 429 Too Many Requests against the image
// registry — a transient infrastructure failure outside the user's control.
func isRegistryRateLimited(message string) bool {
return strings.Contains(message, "429 Too Many Requests")
}

func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error) {
for _, status := range append(
append(status.InitContainerStatuses, status.ContainerStatuses...), status.EphemeralContainerStatuses...) {
Expand Down
23 changes: 23 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2575,6 +2575,29 @@ func TestDemystifyPending(t *testing.T) {
assert.True(t, taskStatus.CleanupOnFailure())
})

t.Run("ImagePullBackOffOutsideGracePeriod_RegistryRateLimited", func(t *testing.T) {
// HTTP 429 from the registry is a transient infrastructure failure,
// not a user error — it should surface as a system retryable failure.
s2 := *s.DeepCopy()
s2.Conditions[0].LastTransitionTime.Time = metav1.Now().Add(-config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration)
s2.ContainerStatuses = []v1.ContainerStatus{
{
Ready: false,
State: v1.ContainerState{
Waiting: &v1.ContainerStateWaiting{
Reason: "ImagePullBackOff",
Message: `Back-off pulling image "registry.example.com/foo:bar": ErrImagePull: failed to pull and unpack image "registry.example.com/foo:bar": failed to copy: httpReadSeeker: failed open: unexpected status code https://registry.example.com/v2/foo/blobs/sha256:abc: 429 Too Many Requests`,
},
},
},
}
taskStatus, err := DemystifyPending(s2, pluginsCore.TaskInfo{})
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase())
assert.Equal(t, core.ExecutionError_SYSTEM, taskStatus.Err().Kind)
assert.True(t, taskStatus.CleanupOnFailure())
})

t.Run("InvalidImageName", func(t *testing.T) {
s.ContainerStatuses = []v1.ContainerStatus{
{
Expand Down
Loading