Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/engine/chaos_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,11 @@ func (e *ChaosEngine) runCycle(ctx context.Context, runtime ContainerRuntime) {
e.lastEvent = &event
e.history = append(e.history, event)
if len(e.history) > MaxHistorySize {
e.history = e.history[len(e.history)-MaxHistorySize:]
// Prevent memory leak by creating a new slice instead of just slicing
// which would keep the underlying ever-growing array in memory.
trimmed := make([]utils.EventRecord, MaxHistorySize)
copy(trimmed, e.history[len(e.history)-MaxHistorySize:])
e.history = trimmed
}
e.mu.Unlock()

Expand Down
58 changes: 55 additions & 3 deletions pkg/engine/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
)

Expand All @@ -22,6 +24,14 @@ type DockerClient struct {
allowedTargets map[string]bool
networkManager *NetworkChaosManager
resourceManager *ResourceChaosManager

idCache map[string]cachedEntry
idCacheMu sync.RWMutex
}

type cachedEntry struct {
id string
expiresAt time.Time
}

func tryConnectWithOpts(allowedTargets []string, opt client.Opt) (*DockerClient, error) {
Expand All @@ -47,6 +57,7 @@ func tryConnectWithOpts(allowedTargets []string, opt client.Opt) (*DockerClient,
allowedTargets: allowed,
networkManager: NewNetworkChaosManager(),
resourceManager: NewResourceChaosManager(),
idCache: make(map[string]cachedEntry),
}, nil
}

Expand Down Expand Up @@ -125,25 +136,61 @@ func (d *DockerClient) assertAllowed(name string) error {
}

func (d *DockerClient) getContainerID(ctx context.Context, name string) (string, error) {
d.idCacheMu.RLock()
entry, ok := d.idCache[name]
d.idCacheMu.RUnlock()
if ok && time.Now().Before(entry.expiresAt) {
return entry.id, nil
}

// Try finding by name filter first (O(1) on Docker daemon side vs O(N))
args := filters.NewArgs()
args.Add("name", "^/?"+name+"$")
containers, err := d.cli.ContainerList(ctx, types.ContainerListOptions{
All: true,
All: true,
Filters: args,
})
if err != nil {
return "", err
}

if len(containers) == 0 {
// Fallback to compose service label filter
args = filters.NewArgs()
args.Add("label", "com.docker.compose.service="+name)
containers, err = d.cli.ContainerList(ctx, types.ContainerListOptions{
All: true,
Filters: args,
})
if err != nil {
return "", err
}
}

for _, c := range containers {
for _, n := range c.Names {
if strings.TrimPrefix(n, "/") == name {
d.cacheID(name, c.ID)
return c.ID, nil
}
}
if c.Labels["com.docker.compose.service"] == name {
d.cacheID(name, c.ID)
return c.ID, nil
}
}
return "", fmt.Errorf("container not found: %s", name)
}

func (d *DockerClient) cacheID(name, id string) {
d.idCacheMu.Lock()
defer d.idCacheMu.Unlock()
d.idCache[name] = cachedEntry{
id: id,
expiresAt: time.Now().Add(30 * time.Second),
}
}

func (d *DockerClient) getContainerInfo(ctx context.Context, id string) (*ContainerInfo, error) {
c, err := d.cli.ContainerInspect(ctx, id)
if err != nil {
Expand Down Expand Up @@ -343,7 +390,9 @@ func (d *DockerClient) ExecCommand(ctx context.Context, name string, cmd []strin
}

// Poll until the exec command finishes, respecting context cancellation
for i := 0; i < 50; i++ { // max 5 seconds wait (50 * 100ms)
// using exponential backoff to reduce CPU usage
delay := 50 * time.Millisecond
for i := 0; i < 50; i++ {
select {
case <-ctx.Done():
return -1, fmt.Errorf("exec cancelled in container %s: %w", name, ctx.Err())
Expand All @@ -356,7 +405,10 @@ func (d *DockerClient) ExecCommand(ctx context.Context, name string, cmd []strin
if !inspectResp.Running {
return inspectResp.ExitCode, nil
}
time.Sleep(100 * time.Millisecond)
time.Sleep(delay)
if delay < 500 * time.Millisecond {
delay *= 2
}
}

return -1, fmt.Errorf("timeout waiting for exec command to complete in container %s", name)
Expand Down
45 changes: 31 additions & 14 deletions pkg/engine/kubernetes_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,36 @@ func (k *KubernetesClient) assertAllowed(name string) error {
return nil
}

// findPod finds a pod by label "app=name" or name prefix, respecting context cancellation.
func (k *KubernetesClient) findPod(ctx context.Context, name string) (*corev1.Pod, error) {
// Try label selector first (most reliable for Deployments/StatefulSets)
pods, err := k.clientset.CoreV1().Pods(k.namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s", name),
})
if err == nil && len(pods.Items) > 0 {
// Prefer Running pods
for _, p := range pods.Items {
if p.Status.Phase == corev1.PodRunning && p.DeletionTimestamp == nil {
return &p, nil
// Try exact pod name match first (O(1) API call)
p, err := k.clientset.CoreV1().Pods(k.namespace).Get(ctx, name, metav1.GetOptions{})
if err == nil && p.DeletionTimestamp == nil && p.Status.Phase == corev1.PodRunning {
return p, nil
}

// Try common label selectors (Server-side filtering, much faster than listing all)
selectors := []string{
fmt.Sprintf("app=%s", name),
fmt.Sprintf("app.kubernetes.io/name=%s", name),
}

for _, selector := range selectors {
pods, err := k.clientset.CoreV1().Pods(k.namespace).List(ctx, metav1.ListOptions{
LabelSelector: selector,
})
if err == nil && len(pods.Items) > 0 {
// Prefer Running pods
for _, p := range pods.Items {
if p.Status.Phase == corev1.PodRunning && p.DeletionTimestamp == nil {
return &p, nil
}
}
return &pods.Items[0], nil
}
return &pods.Items[0], nil
}

// Fallback: find by name prefix
pods, err = k.clientset.CoreV1().Pods(k.namespace).List(ctx, metav1.ListOptions{})
// Fallback: find by name prefix (expensive in large namespaces)
pods, err := k.clientset.CoreV1().Pods(k.namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list pods in namespace '%s': %w", k.namespace, err)
}
Expand Down Expand Up @@ -255,7 +267,9 @@ func (k *KubernetesClient) injectEphemeralNetshoot(ctx context.Context, pod *cor
}

// Wait for the ephemeral container to become Running (up to 30 seconds)
// using exponential backoff to reduce API server load
deadline := time.Now().Add(30 * time.Second)
delay := 100 * time.Millisecond
for time.Now().Before(deadline) {
select {
case <-ctx.Done():
Expand All @@ -277,7 +291,10 @@ func (k *KubernetesClient) injectEphemeralNetshoot(ctx context.Context, pod *cor
}
}

time.Sleep(500 * time.Millisecond)
time.Sleep(delay)
if delay < 2 * time.Second {
delay *= 2
}
}

return fmt.Errorf("timeout: chaos-netshoot container in pod '%s' did not become Running within 30s", pod.Name)
Expand Down
Loading