diff --git a/src/kimi_cli/ui/shell/__init__.py b/src/kimi_cli/ui/shell/__init__.py index 73d104054..af01fe1ce 100644 --- a/src/kimi_cli/ui/shell/__init__.py +++ b/src/kimi_cli/ui/shell/__init__.py @@ -33,6 +33,7 @@ from kimi_cli.ui.shell.echo import render_user_echo_text from kimi_cli.ui.shell.mcp_status import render_mcp_prompt from kimi_cli.ui.shell.migration_nudge import print_migration_goodbye +from kimi_cli.ui.shell.placeholders import ImagePathResolutionError from kimi_cli.ui.shell.prompt import ( BgTaskCounts, CustomPromptSession, @@ -363,6 +364,9 @@ async def _route_prompt_events( resume_prompt.clear() await idle_events.put(_PromptEvent(kind="cwd_lost")) return + except ImagePathResolutionError as exc: + console.print(f"[yellow]{exc}[/yellow]") + continue except Exception: logger.exception("Prompt router crashed") resume_prompt.clear() diff --git a/src/kimi_cli/ui/shell/placeholders.py b/src/kimi_cli/ui/shell/placeholders.py index f58c39c84..245c6cd2c 100644 --- a/src/kimi_cli/ui/shell/placeholders.py +++ b/src/kimi_cli/ui/shell/placeholders.py @@ -10,6 +10,7 @@ from io import BytesIO from pathlib import Path from typing import Literal, Protocol +from urllib.parse import unquote, urlparse from PIL import Image @@ -33,6 +34,22 @@ _TEXT_PASTE_CHAR_THRESHOLD = get_env_int("KIMI_CLI_PASTE_CHAR_THRESHOLD", 1000) _TEXT_PASTE_LINE_THRESHOLD = get_env_int("KIMI_CLI_PASTE_LINE_THRESHOLD", 15) +_INLINE_IMAGE_MAX_BYTES = get_env_int( + "KIMI_CLI_INLINE_IMAGE_MAX_BYTES", + 20 * 1024 * 1024, +) + +_IMAGE_PATH_BODY = ( + r"(?:file://)?" + r"(?:(?:~(?=[/\\])|\.{1,2}(?=[/\\])|[/\\]|[A-Za-z]:[/\\])|(?:[^\s\"'<>()[\]{}!]+[/\\]))" + r"(?:\\.|[^\"'\r\n<>])+?" + r"\.(?:png|jpe?g|webp|gif|heic|heif|bmp|svg)" +) +_IMAGE_PATH_RE = re.compile( + rf"\"(?P{_IMAGE_PATH_BODY})\"|'(?P{_IMAGE_PATH_BODY})'|(?P{_IMAGE_PATH_BODY})", + re.IGNORECASE, +) +_WINDOWS_DRIVE_RE = re.compile(r"^[a-zA-Z]:[/\\]") def sanitize_surrogates(text: str) -> str: @@ -88,6 +105,111 @@ def _build_image_part(image_bytes: bytes, mime_type: str) -> ImageURLPart: ) +class ImagePathResolutionError(Exception): + """Raised when an explicit local image path cannot be attached.""" + + +@dataclass(frozen=True, slots=True) +class ImagePathCandidate: + start: int + end: int + raw: str + path_text: str + + +def _find_image_path_candidates(text: str) -> list[ImagePathCandidate]: + candidates: list[ImagePathCandidate] = [] + for match in _IMAGE_PATH_RE.finditer(text): + path_text = match.group("double") or match.group("single") or match.group("bare") + candidates.append( + ImagePathCandidate( + start=match.start(), + end=match.end(), + raw=match.group(0), + path_text=path_text, + ) + ) + return candidates + + +def _expand_image_path(path_text: str) -> Path: + raw = path_text.replace("\\ ", " ") + if raw.lower().startswith("file://"): + parsed = urlparse(raw) + raw = unquote(parsed.path) + if _WINDOWS_DRIVE_RE.match(raw.lstrip("/")): + raw = raw.lstrip("/") + path = Path(raw).expanduser() + if not path.is_absolute(): + path = Path.cwd() / path + return path + + +def _is_explicit_missing_path(path_text: str) -> bool: + raw = path_text.replace("\\ ", " ") + return ( + raw.lower().startswith("file://") + or raw.startswith(("~", "/", "\\", "./", ".\\", "../", "..\\")) + or _WINDOWS_DRIVE_RE.match(raw) is not None + ) + + +def _sniff_image_mime(image_bytes: bytes, path: Path) -> str | None: + head = image_bytes[:512] + if head.startswith(b"\x89PNG\r\n\x1a\n"): + return "image/png" + if head.startswith(b"\xff\xd8\xff"): + return "image/jpeg" + if head.startswith((b"GIF87a", b"GIF89a")): + return "image/gif" + if head.startswith(b"BM"): + return "image/bmp" + if head.startswith(b"RIFF") and head[8:12] == b"WEBP": + return "image/webp" + if b"ftyp" in head[:32] and path.suffix.lower() in {".heic", ".heif"}: + return mimetypes.guess_type(path.name)[0] or f"image/{path.suffix[1:].lower()}" + + stripped = head.lstrip().lower() + if path.suffix.lower() == ".svg" and ( + stripped.startswith(b" list[ContentPart]: + try: + size = path.stat().st_size + except OSError as exc: + raise ImagePathResolutionError( + f"Image at {path} was no longer accessible; " + "save it to a persistent location and try again." + ) from exc + + if size > _INLINE_IMAGE_MAX_BYTES: + raise ImagePathResolutionError( + f"Image at {path} is too large to attach inline " + f"({size} bytes; limit {_INLINE_IMAGE_MAX_BYTES})." + ) + + try: + image_bytes = path.read_bytes() + except OSError as exc: + raise ImagePathResolutionError( + f"Image at {path} could not be read; save it to a persistent location and try again." + ) from exc + + mime_type = _sniff_image_mime(image_bytes, path) + if mime_type is None: + raise ImagePathResolutionError(f"Image at {path} is not a supported image file.") + + return wrap_media_part( + _build_image_part(image_bytes, mime_type), + tag="image", + attrs={"path": str(path)}, + ) + + type CachedAttachmentKind = Literal["image"] @@ -433,8 +555,14 @@ class ResolvedPromptCommand: class PromptPlaceholderManager: - def __init__(self, attachment_cache: AttachmentCache | None = None) -> None: + def __init__( + self, + attachment_cache: AttachmentCache | None = None, + *, + model_capabilities: set[str] | None = None, + ) -> None: self._attachment_cache = attachment_cache or AttachmentCache() + self._model_capabilities = model_capabilities self._text_handler = PastedTextPlaceholderHandler() self._image_handler = ImagePlaceholderHandler(self._attachment_cache) self._handlers: tuple[PlaceholderHandler, ...] = ( @@ -446,26 +574,42 @@ def __init__(self, attachment_cache: AttachmentCache | None = None) -> None: def attachment_cache(self) -> AttachmentCache: return self._attachment_cache + def update_model_capabilities(self, model_capabilities: set[str]) -> None: + self._model_capabilities = model_capabilities + def maybe_placeholderize_pasted_text(self, text: str) -> str: return self._text_handler.maybe_placeholderize(text) def create_image_placeholder(self, image: Image.Image) -> str | None: return self._image_handler.create_placeholder(image) - def resolve_command(self, command: str) -> ResolvedPromptCommand: + def resolve_command( + self, command: str, *, attach_literal_images: bool = True + ) -> ResolvedPromptCommand: content: list[ContentPart] = [] resolved_chunks: list[str] = [] cursor = 0 + attached_image_paths: set[Path] = set() while match := self._find_next_match(command, cursor): if match.start > cursor: literal = command[cursor : match.start] - content.append(TextPart(text=literal)) + self._append_literal_content( + literal, + content, + attached_image_paths, + attach_images=attach_literal_images, + ) resolved_chunks.append(literal) resolved_content = match.handler.resolve_content(match) if resolved_content is None: - content.append(TextPart(text=match.raw)) + self._append_literal_content( + match.raw, + content, + attached_image_paths, + attach_images=attach_literal_images, + ) resolved_chunks.append(match.raw) else: content.extend(resolved_content) @@ -476,7 +620,12 @@ def resolve_command(self, command: str) -> ResolvedPromptCommand: if cursor < len(command): literal = command[cursor:] - content.append(TextPart(text=literal)) + self._append_literal_content( + literal, + content, + attached_image_paths, + attach_images=attach_literal_images, + ) resolved_chunks.append(literal) return ResolvedPromptCommand( @@ -529,3 +678,48 @@ def _rewrite_command( parts.append(command[cursor:]) return "".join(parts) + + def _supports_image_input(self) -> bool: + return self._model_capabilities is None or "image_in" in self._model_capabilities + + def _append_literal_content( + self, + literal: str, + content: list[ContentPart], + attached_image_paths: set[Path], + *, + attach_images: bool, + ) -> None: + if not literal: + return + if not attach_images or not self._supports_image_input(): + content.append(TextPart(text=literal)) + return + + cursor = 0 + for candidate in _find_image_path_candidates(literal): + if candidate.start > cursor: + content.append(TextPart(text=literal[cursor : candidate.start])) + + path = _expand_image_path(candidate.path_text) + try: + resolved_path = path.resolve(strict=True) + except OSError as exc: + if _is_explicit_missing_path(candidate.path_text): + raise ImagePathResolutionError( + f"Image at {path} was no longer accessible; " + "save it to a persistent location and try again." + ) from exc + content.append(TextPart(text=candidate.raw)) + cursor = candidate.end + continue + + if resolved_path not in attached_image_paths: + content.extend(_read_image_parts(resolved_path)) + attached_image_paths.add(resolved_path) + else: + content.append(TextPart(text=candidate.raw)) + cursor = candidate.end + + if cursor < len(literal): + content.append(TextPart(text=literal[cursor:])) diff --git a/src/kimi_cli/ui/shell/prompt.py b/src/kimi_cli/ui/shell/prompt.py index b820fe8a0..7599858f3 100644 --- a/src/kimi_cli/ui/shell/prompt.py +++ b/src/kimi_cli/ui/shell/prompt.py @@ -1208,7 +1208,9 @@ def __init__( self._last_history_content: str | None = None self._mode: PromptMode = PromptMode.AGENT self._thinking = thinking - self._placeholder_manager = PromptPlaceholderManager() + self._placeholder_manager = PromptPlaceholderManager( + model_capabilities=set(self._model_capabilities) + ) # Keep the old attribute for test compatibility and for any external imports. self._attachment_cache = self._placeholder_manager.attachment_cache self._last_tip_rotate_time: float = time.monotonic() @@ -1891,7 +1893,10 @@ def _get_placeholder_manager(self) -> PromptPlaceholderManager: manager = getattr(self, "_placeholder_manager", None) if manager is None: attachment_cache = getattr(self, "_attachment_cache", None) - manager = PromptPlaceholderManager(attachment_cache=attachment_cache) + manager = PromptPlaceholderManager( + attachment_cache=attachment_cache, + model_capabilities=set(self._model_capabilities), + ) self._placeholder_manager = manager self._attachment_cache = manager.attachment_cache return manager @@ -2059,13 +2064,19 @@ async def _prompt_once(self, *, append_history: bool | None) -> UserInput: self._last_submission_was_running = was_running if append_history is None: append_history = not was_running + user_input = self._build_user_input(command) if append_history: self._append_history_entry(command) self._tip_rotation_index += 1 - return self._build_user_input(command) + return user_input def _build_user_input(self, command: str) -> UserInput: - resolved = self._get_placeholder_manager().resolve_command(command) + manager = self._get_placeholder_manager() + manager.update_model_capabilities(set(self._model_capabilities)) + resolved = manager.resolve_command( + command, + attach_literal_images=self._mode == PromptMode.AGENT, + ) return UserInput( mode=self._mode, diff --git a/tests/ui_and_conv/test_prompt_placeholders.py b/tests/ui_and_conv/test_prompt_placeholders.py index 3fb6e7a8f..d4c9b1dda 100644 --- a/tests/ui_and_conv/test_prompt_placeholders.py +++ b/tests/ui_and_conv/test_prompt_placeholders.py @@ -1,16 +1,26 @@ from __future__ import annotations +import pytest from PIL import Image from kimi_cli.ui.shell import placeholders from kimi_cli.ui.shell.placeholders import ( AttachmentCache, + ImagePathResolutionError, PromptPlaceholderManager, should_placeholderize_pasted_text, ) from kimi_cli.wire.types import ImageURLPart, TextPart +def _write_png(path) -> None: + Image.new("RGB", (4, 4), color=(10, 20, 30)).save(path, format="PNG") + + +def _image_parts(content): + return [part for part in content if isinstance(part, ImageURLPart)] + + def test_placeholder_manager_serializes_text_tokens_for_history(tmp_path) -> None: manager = PromptPlaceholderManager(attachment_cache=AttachmentCache(root=tmp_path)) text_token = manager.maybe_placeholderize_pasted_text("alpha\nbeta\ngamma") @@ -139,6 +149,92 @@ def test_placeholder_manager_leaves_unknown_image_placeholder_literal() -> None: assert resolved.content == [TextPart(text="[image:missing.png,10x10]")] +def test_placeholder_manager_attaches_absolute_image_path(tmp_path) -> None: + image_path = tmp_path / "Screenshot 2026-05-07 at 5.47.51 PM.png" + _write_png(image_path) + manager = PromptPlaceholderManager(model_capabilities={"image_in"}) + + resolved = manager.resolve_command(f"look {image_path} please") + + assert resolved.resolved_text == f"look {image_path} please" + image_parts = _image_parts(resolved.content) + assert len(image_parts) == 1 + assert image_parts[0].image_url.url.startswith("data:image/png;base64,") + + +def test_placeholder_manager_attaches_parenthesized_image_path(tmp_path) -> None: + image_path = tmp_path / "thumbnail.png" + _write_png(image_path) + manager = PromptPlaceholderManager(model_capabilities={"image_in"}) + + resolved = manager.resolve_command(f"look ({image_path})") + + assert len(_image_parts(resolved.content)) == 1 + assert resolved.content[0] == TextPart(text="look (") + assert resolved.content[-1] == TextPart(text=")") + + +def test_placeholder_manager_attaches_markdown_relative_image_path(tmp_path, monkeypatch) -> None: + image_path = tmp_path / "thumbnail.png" + _write_png(image_path) + monkeypatch.chdir(tmp_path) + manager = PromptPlaceholderManager(model_capabilities={"image_in"}) + + resolved = manager.resolve_command("look ![alt](./thumbnail.png)") + + assert len(_image_parts(resolved.content)) == 1 + assert resolved.content[0] == TextPart(text="look ![alt](") + assert resolved.content[-1] == TextPart(text=")") + + +def test_placeholder_manager_keeps_duplicate_image_path_as_text(tmp_path) -> None: + image_path = tmp_path / "thumbnail.png" + _write_png(image_path) + manager = PromptPlaceholderManager(model_capabilities={"image_in"}) + + resolved = manager.resolve_command(f"compare {image_path} with {image_path}") + + assert len(_image_parts(resolved.content)) == 1 + assert resolved.content[-1] == TextPart(text=str(image_path)) + + +def test_placeholder_manager_attaches_file_url_image_path(tmp_path) -> None: + image_path = tmp_path / "thumbnail.png" + _write_png(image_path) + manager = PromptPlaceholderManager(model_capabilities={"image_in"}) + + resolved = manager.resolve_command(f"inspect {image_path.as_uri()}") + + assert len(_image_parts(resolved.content)) == 1 + + +def test_placeholder_manager_skips_image_paths_without_image_capability(tmp_path) -> None: + image_path = tmp_path / "thumbnail.png" + _write_png(image_path) + command = f"look {image_path}" + manager = PromptPlaceholderManager(model_capabilities=set()) + + resolved = manager.resolve_command(command) + + assert resolved.content == [TextPart(text=command)] + + +def test_placeholder_manager_reports_missing_explicit_image_path(tmp_path) -> None: + image_path = tmp_path / "TemporaryItems" / "NSIRD_screencaptureui_x" / "Screenshot.png" + manager = PromptPlaceholderManager(model_capabilities={"image_in"}) + + with pytest.raises(ImagePathResolutionError, match="no longer accessible"): + manager.resolve_command(f"look {image_path}") + + +def test_placeholder_manager_leaves_simple_missing_image_filename_as_text() -> None: + manager = PromptPlaceholderManager(model_capabilities={"image_in"}) + + resolved = manager.resolve_command("create missing.png") + + assert resolved.content == [TextPart(text="create missing.png")] + + def test_placeholder_manager_sanitizes_surrogates_in_pasted_text() -> None: manager = PromptPlaceholderManager() # Lone surrogate \ud83d (half of an emoji pair) must not survive into the entry. diff --git a/tests/ui_and_conv/test_prompt_tips.py b/tests/ui_and_conv/test_prompt_tips.py index fc322804b..35df0bccd 100644 --- a/tests/ui_and_conv/test_prompt_tips.py +++ b/tests/ui_and_conv/test_prompt_tips.py @@ -3,6 +3,7 @@ import os import time from collections.abc import Callable +from contextlib import nullcontext from types import SimpleNamespace from typing import Any, cast from unittest.mock import MagicMock @@ -11,6 +12,7 @@ from kimi_cli.soul import StatusSnapshot from kimi_cli.ui.shell import prompt as shell_prompt +from kimi_cli.ui.shell.placeholders import ImagePathResolutionError from kimi_cli.ui.shell.prompt import ( _GIT_STATUS_TTL, PROMPT_SYMBOL, @@ -875,6 +877,78 @@ async def prompt_async(self, **kwargs: Any) -> str: assert captured == [None] +@pytest.mark.asyncio +async def test_prompt_once_appends_history_after_command_resolves(monkeypatch) -> None: + prompt_session = object.__new__(CustomPromptSession) + prompt_session._running_prompt_delegate = None + prompt_session._tip_rotation_index = 0 + history: list[str] = [] + monkeypatch.setattr(shell_prompt, "patch_stdout", lambda **_kwargs: nullcontext()) + + class _DummySession: + async def prompt_async(self, **kwargs: Any) -> str: + return "look /tmp/missing.png" + + def build_user_input(command: str) -> UserInput: + assert history == [] + return UserInput( + mode=PromptMode.AGENT, + command=command, + resolved_command=command, + content=[], + ) + + prompt_session._session = cast(Any, _DummySession()) + prompt_session._build_user_input = build_user_input + prompt_session._append_history_entry = history.append # type: ignore[assignment] + + result = await prompt_session._prompt_once(append_history=True) + + assert result.command == "look /tmp/missing.png" + assert history == ["look /tmp/missing.png"] + + +@pytest.mark.asyncio +async def test_prompt_once_skips_history_when_command_resolution_fails(monkeypatch) -> None: + prompt_session = object.__new__(CustomPromptSession) + prompt_session._running_prompt_delegate = None + prompt_session._tip_rotation_index = 0 + history: list[str] = [] + monkeypatch.setattr(shell_prompt, "patch_stdout", lambda **_kwargs: nullcontext()) + + class _DummySession: + async def prompt_async(self, **kwargs: Any) -> str: + return "look /tmp/missing.png" + + def build_user_input(command: str) -> UserInput: + raise ImagePathResolutionError("image disappeared") + + prompt_session._session = cast(Any, _DummySession()) + prompt_session._build_user_input = build_user_input + prompt_session._append_history_entry = history.append # type: ignore[assignment] + + with pytest.raises(ImagePathResolutionError, match="image disappeared"): + await prompt_session._prompt_once(append_history=True) + + assert history == [] + + +def test_build_user_input_keeps_shell_image_paths_literal(tmp_path) -> None: + prompt_session = object.__new__(CustomPromptSession) + prompt_session._mode = PromptMode.SHELL + prompt_session._model_capabilities = {"image_in"} + + command = f"cat {tmp_path / 'missing.png'}" + + user_input = prompt_session._build_user_input(command) + + assert user_input.mode == PromptMode.SHELL + assert user_input.command == command + assert user_input.resolved_command == command + assert len(user_input.content) == 1 + assert user_input.content[0].text == command + + @pytest.mark.asyncio async def test_prompt_next_skips_history_for_running_submission() -> None: prompt_session = object.__new__(CustomPromptSession) diff --git a/tests/ui_and_conv/test_shell_prompt_router.py b/tests/ui_and_conv/test_shell_prompt_router.py index 593e3e452..dad842809 100644 --- a/tests/ui_and_conv/test_shell_prompt_router.py +++ b/tests/ui_and_conv/test_shell_prompt_router.py @@ -9,6 +9,7 @@ import kimi_cli.ui.shell as shell_module from kimi_cli.soul import Soul +from kimi_cli.ui.shell.placeholders import ImagePathResolutionError from kimi_cli.ui.shell.prompt import CwdLostError, PromptMode, UserInput from kimi_cli.wire.types import TextPart @@ -264,3 +265,39 @@ async def test_route_prompt_events_cwd_lost_posts_cwd_lost_event( event = idle_events.get_nowait() assert event.kind == "cwd_lost" assert not resume_prompt.is_set() + + +@pytest.mark.asyncio +async def test_route_prompt_events_image_path_error_reprompts( + _patched_prompt_router, + monkeypatch, +) -> None: + shell = shell_module.Shell(cast(Soul, _make_fake_soul())) + prompt_session = _FakePromptSession( + [ + (False, ImagePathResolutionError("image disappeared")), + (False, _make_user_input("retry")), + ], + ) + idle_events: asyncio.Queue[shell_module._PromptEvent] = asyncio.Queue() + resume_prompt = asyncio.Event() + resume_prompt.set() + printed: list[str] = [] + monkeypatch.setattr( + shell_module.console, "print", lambda msg="", *_, **__: printed.append(str(msg)) + ) + + task = asyncio.create_task( + shell._route_prompt_events(cast(Any, prompt_session), idle_events, resume_prompt) + ) + try: + event = await asyncio.wait_for(idle_events.get(), timeout=2.0) + finally: + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + assert "image disappeared" in printed[0] + assert event.kind == "input" + assert event.user_input is not None + assert event.user_input.command == "retry"