Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/kimi_cli/ui/shell/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from kimi_cli.ui.shell.echo import render_user_echo_text
from kimi_cli.ui.shell.mcp_status import render_mcp_prompt
from kimi_cli.ui.shell.migration_nudge import print_migration_goodbye
from kimi_cli.ui.shell.placeholders import ImagePathResolutionError
from kimi_cli.ui.shell.prompt import (
BgTaskCounts,
CustomPromptSession,
Expand Down Expand Up @@ -363,6 +364,9 @@ async def _route_prompt_events(
resume_prompt.clear()
await idle_events.put(_PromptEvent(kind="cwd_lost"))
return
except ImagePathResolutionError as exc:
console.print(f"[yellow]{exc}[/yellow]")
continue
except Exception:
logger.exception("Prompt router crashed")
resume_prompt.clear()
Expand Down
204 changes: 199 additions & 5 deletions src/kimi_cli/ui/shell/placeholders.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from io import BytesIO
from pathlib import Path
from typing import Literal, Protocol
from urllib.parse import unquote, urlparse

from PIL import Image

Expand All @@ -33,6 +34,22 @@

_TEXT_PASTE_CHAR_THRESHOLD = get_env_int("KIMI_CLI_PASTE_CHAR_THRESHOLD", 1000)
_TEXT_PASTE_LINE_THRESHOLD = get_env_int("KIMI_CLI_PASTE_LINE_THRESHOLD", 15)
_INLINE_IMAGE_MAX_BYTES = get_env_int(
"KIMI_CLI_INLINE_IMAGE_MAX_BYTES",
20 * 1024 * 1024,
)

_IMAGE_PATH_BODY = (
r"(?:file://)?"
r"(?:(?:~(?=[/\\])|\.{1,2}(?=[/\\])|[/\\]|[A-Za-z]:[/\\])|(?:[^\s\"'<>()[\]{}!]+[/\\]))"
r"(?:\\.|[^\"'\r\n<>])+?"
r"\.(?:png|jpe?g|webp|gif|heic|heif|bmp|svg)"
)
_IMAGE_PATH_RE = re.compile(
rf"\"(?P<double>{_IMAGE_PATH_BODY})\"|'(?P<single>{_IMAGE_PATH_BODY})'|(?P<bare>{_IMAGE_PATH_BODY})",
re.IGNORECASE,
)
_WINDOWS_DRIVE_RE = re.compile(r"^[a-zA-Z]:[/\\]")


def sanitize_surrogates(text: str) -> str:
Expand Down Expand Up @@ -88,6 +105,111 @@ def _build_image_part(image_bytes: bytes, mime_type: str) -> ImageURLPart:
)


class ImagePathResolutionError(Exception):
"""Raised when an explicit local image path cannot be attached."""


@dataclass(frozen=True, slots=True)
class ImagePathCandidate:
start: int
end: int
raw: str
path_text: str


def _find_image_path_candidates(text: str) -> list[ImagePathCandidate]:
candidates: list[ImagePathCandidate] = []
for match in _IMAGE_PATH_RE.finditer(text):
path_text = match.group("double") or match.group("single") or match.group("bare")
candidates.append(
ImagePathCandidate(
start=match.start(),
end=match.end(),
raw=match.group(0),
path_text=path_text,
)
)
return candidates


def _expand_image_path(path_text: str) -> Path:
raw = path_text.replace("\\ ", " ")
if raw.lower().startswith("file://"):
parsed = urlparse(raw)
raw = unquote(parsed.path)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve UNC host when expanding file:// image paths

In _expand_image_path, file:// URIs are rebuilt from parsed.path only, so the authority component is dropped. On Windows, an input like file://server/share/screenshot.png should resolve to a UNC path (\\server\share\screenshot.png), but this code turns it into /share/screenshot.png (then usually into a cwd-relative path), which causes false "no longer accessible" errors and prevents attaching valid network-share images.

Useful? React with 👍 / 👎.

if _WINDOWS_DRIVE_RE.match(raw.lstrip("/")):
raw = raw.lstrip("/")
path = Path(raw).expanduser()
if not path.is_absolute():
path = Path.cwd() / path
return path


def _is_explicit_missing_path(path_text: str) -> bool:
raw = path_text.replace("\\ ", " ")
return (
raw.lower().startswith("file://")
or raw.startswith(("~", "/", "\\", "./", ".\\", "../", "..\\"))
or _WINDOWS_DRIVE_RE.match(raw) is not None
)


def _sniff_image_mime(image_bytes: bytes, path: Path) -> str | None:
head = image_bytes[:512]
if head.startswith(b"\x89PNG\r\n\x1a\n"):
return "image/png"
if head.startswith(b"\xff\xd8\xff"):
return "image/jpeg"
if head.startswith((b"GIF87a", b"GIF89a")):
return "image/gif"
if head.startswith(b"BM"):
return "image/bmp"
if head.startswith(b"RIFF") and head[8:12] == b"WEBP":
return "image/webp"
if b"ftyp" in head[:32] and path.suffix.lower() in {".heic", ".heif"}:
return mimetypes.guess_type(path.name)[0] or f"image/{path.suffix[1:].lower()}"

stripped = head.lstrip().lower()
if path.suffix.lower() == ".svg" and (
stripped.startswith(b"<svg") or stripped.startswith(b"<?xml") or b"<svg" in stripped[:256]
):
return "image/svg+xml"
return None


def _read_image_parts(path: Path) -> list[ContentPart]:
try:
size = path.stat().st_size
except OSError as exc:
raise ImagePathResolutionError(
f"Image at {path} was no longer accessible; "
"save it to a persistent location and try again."
) from exc

if size > _INLINE_IMAGE_MAX_BYTES:
raise ImagePathResolutionError(
f"Image at {path} is too large to attach inline "
f"({size} bytes; limit {_INLINE_IMAGE_MAX_BYTES})."
)

try:
image_bytes = path.read_bytes()
except OSError as exc:
raise ImagePathResolutionError(
f"Image at {path} could not be read; save it to a persistent location and try again."
) from exc

mime_type = _sniff_image_mime(image_bytes, path)
if mime_type is None:
raise ImagePathResolutionError(f"Image at {path} is not a supported image file.")

return wrap_media_part(
_build_image_part(image_bytes, mime_type),
tag="image",
attrs={"path": str(path)},
)


type CachedAttachmentKind = Literal["image"]


Expand Down Expand Up @@ -433,8 +555,14 @@ class ResolvedPromptCommand:


class PromptPlaceholderManager:
def __init__(self, attachment_cache: AttachmentCache | None = None) -> None:
def __init__(
self,
attachment_cache: AttachmentCache | None = None,
*,
model_capabilities: set[str] | None = None,
) -> None:
self._attachment_cache = attachment_cache or AttachmentCache()
self._model_capabilities = model_capabilities
self._text_handler = PastedTextPlaceholderHandler()
self._image_handler = ImagePlaceholderHandler(self._attachment_cache)
self._handlers: tuple[PlaceholderHandler, ...] = (
Expand All @@ -446,26 +574,42 @@ def __init__(self, attachment_cache: AttachmentCache | None = None) -> None:
def attachment_cache(self) -> AttachmentCache:
return self._attachment_cache

def update_model_capabilities(self, model_capabilities: set[str]) -> None:
self._model_capabilities = model_capabilities

def maybe_placeholderize_pasted_text(self, text: str) -> str:
return self._text_handler.maybe_placeholderize(text)

def create_image_placeholder(self, image: Image.Image) -> str | None:
return self._image_handler.create_placeholder(image)

def resolve_command(self, command: str) -> ResolvedPromptCommand:
def resolve_command(
self, command: str, *, attach_literal_images: bool = True
) -> ResolvedPromptCommand:
content: list[ContentPart] = []
resolved_chunks: list[str] = []
cursor = 0
attached_image_paths: set[Path] = set()

while match := self._find_next_match(command, cursor):
if match.start > cursor:
literal = command[cursor : match.start]
content.append(TextPart(text=literal))
self._append_literal_content(
literal,
content,
attached_image_paths,
attach_images=attach_literal_images,
)
resolved_chunks.append(literal)

resolved_content = match.handler.resolve_content(match)
if resolved_content is None:
content.append(TextPart(text=match.raw))
self._append_literal_content(
match.raw,
content,
attached_image_paths,
attach_images=attach_literal_images,
)
resolved_chunks.append(match.raw)
else:
content.extend(resolved_content)
Expand All @@ -476,7 +620,12 @@ def resolve_command(self, command: str) -> ResolvedPromptCommand:

if cursor < len(command):
literal = command[cursor:]
content.append(TextPart(text=literal))
self._append_literal_content(
literal,
content,
attached_image_paths,
attach_images=attach_literal_images,
)
resolved_chunks.append(literal)

return ResolvedPromptCommand(
Expand Down Expand Up @@ -529,3 +678,48 @@ def _rewrite_command(
parts.append(command[cursor:])

return "".join(parts)

def _supports_image_input(self) -> bool:
return self._model_capabilities is None or "image_in" in self._model_capabilities

def _append_literal_content(
self,
literal: str,
content: list[ContentPart],
attached_image_paths: set[Path],
*,
attach_images: bool,
) -> None:
if not literal:
return
if not attach_images or not self._supports_image_input():
content.append(TextPart(text=literal))
return

cursor = 0
for candidate in _find_image_path_candidates(literal):
if candidate.start > cursor:
content.append(TextPart(text=literal[cursor : candidate.start]))

path = _expand_image_path(candidate.path_text)
try:
resolved_path = path.resolve(strict=True)
except OSError as exc:
if _is_explicit_missing_path(candidate.path_text):
raise ImagePathResolutionError(
f"Image at {path} was no longer accessible; "
"save it to a persistent location and try again."
) from exc
content.append(TextPart(text=candidate.raw))
cursor = candidate.end
continue

if resolved_path not in attached_image_paths:
content.extend(_read_image_parts(resolved_path))
attached_image_paths.add(resolved_path)
else:
content.append(TextPart(text=candidate.raw))
cursor = candidate.end
Comment thread
he-yufeng marked this conversation as resolved.

if cursor < len(literal):
content.append(TextPart(text=literal[cursor:]))
19 changes: 15 additions & 4 deletions src/kimi_cli/ui/shell/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,9 @@ def __init__(
self._last_history_content: str | None = None
self._mode: PromptMode = PromptMode.AGENT
self._thinking = thinking
self._placeholder_manager = PromptPlaceholderManager()
self._placeholder_manager = PromptPlaceholderManager(
model_capabilities=set(self._model_capabilities)
)
# Keep the old attribute for test compatibility and for any external imports.
self._attachment_cache = self._placeholder_manager.attachment_cache
self._last_tip_rotate_time: float = time.monotonic()
Expand Down Expand Up @@ -1891,7 +1893,10 @@ def _get_placeholder_manager(self) -> PromptPlaceholderManager:
manager = getattr(self, "_placeholder_manager", None)
if manager is None:
attachment_cache = getattr(self, "_attachment_cache", None)
manager = PromptPlaceholderManager(attachment_cache=attachment_cache)
manager = PromptPlaceholderManager(
attachment_cache=attachment_cache,
model_capabilities=set(self._model_capabilities),
)
self._placeholder_manager = manager
self._attachment_cache = manager.attachment_cache
return manager
Expand Down Expand Up @@ -2059,13 +2064,19 @@ async def _prompt_once(self, *, append_history: bool | None) -> UserInput:
self._last_submission_was_running = was_running
if append_history is None:
append_history = not was_running
user_input = self._build_user_input(command)
if append_history:
self._append_history_entry(command)
self._tip_rotation_index += 1
return self._build_user_input(command)
return user_input

def _build_user_input(self, command: str) -> UserInput:
resolved = self._get_placeholder_manager().resolve_command(command)
manager = self._get_placeholder_manager()
manager.update_model_capabilities(set(self._model_capabilities))
resolved = manager.resolve_command(
command,
attach_literal_images=self._mode == PromptMode.AGENT,
)

return UserInput(
mode=self._mode,
Expand Down
Loading