diff --git a/tesseract_core/runtime/array_encoding.py b/tesseract_core/runtime/array_encoding.py index e266b303..76cc55a8 100644 --- a/tesseract_core/runtime/array_encoding.py +++ b/tesseract_core/runtime/array_encoding.py @@ -98,6 +98,25 @@ class JsonArrayData(BaseModel): model_config = ConfigDict(extra="forbid") +class CudaIpcArrayData(BaseModel): + """Data structure for CUDA IPC shared GPU memory handles. + + The handle is the base64-encoded 64-byte cudaIpcMemHandle_t. + The device is the CUDA device ordinal the memory lives on. + storage_offset is the byte offset within the cudaMalloc allocation. + storage_size is the total size in bytes of the cudaMalloc allocation. + """ + + handle: StrictStr = Field( + description="Base64-encoded cudaIpcMemHandle_t (64 bytes)" + ) + device: int = Field(description="CUDA device ordinal") + storage_offset: int = Field(default=0, description="Byte offset within allocation") + storage_size: int = Field(description="Total allocation size in bytes") + encoding: Literal["cuda_ipc"] + model_config = ConfigDict(extra="forbid") + + class EncodedArrayModel(BaseModel): """Base class for general encoded arrays. @@ -107,7 +126,7 @@ class EncodedArrayModel(BaseModel): object_type: Literal["array"] shape: tuple[PositiveInt, ...] dtype: AllowedDtypes - data: BinrefArrayData | Base64ArrayData | JsonArrayData + data: BinrefArrayData | Base64ArrayData | JsonArrayData | CudaIpcArrayData model_config = ConfigDict(extra="forbid") @@ -315,6 +334,197 @@ def _load_binref_arraydict(val: ArrayDict, base_dir: str | Path | None) -> np.nd return np.frombuffer(buffer, dtype=dtype).reshape(shape) +def _has_cuda_array_interface(obj: Any) -> bool: + """Check if an object exposes the __cuda_array_interface__ protocol. + + This protocol is supported by PyTorch, CuPy, JAX, Numba, and any + CUDA-aware Python library. It indicates the object holds data in + GPU device memory. + """ + return hasattr(obj, "__cuda_array_interface__") + + +def _get_cuda_array_info(arr: Any) -> tuple[int, int, tuple[int, ...], str]: + """Extract (device_ptr, nbytes, shape, numpy_dtype_str) from a CUDA array. + + Works with any object that implements __cuda_array_interface__ (v2+): + PyTorch tensors, CuPy arrays, JAX DeviceArrays, Numba device arrays, etc. + """ + iface = arr.__cuda_array_interface__ + data_ptr = iface["data"][0] + shape = tuple(iface["shape"]) + typestr = iface["typestr"] # e.g. " bytes: + """Call cudaIpcGetMemHandle for a device pointer. Returns 64 raw bytes.""" + import ctypes + + cudart = _get_cudart() + handle = (ctypes.c_byte * _CUDA_IPC_HANDLE_SIZE)() + ret = cudart.cudaIpcGetMemHandle(ctypes.byref(handle), ctypes.c_void_p(device_ptr)) + if ret != 0: + raise RuntimeError(f"cudaIpcGetMemHandle failed with error code {ret}") + return bytes(handle) + + +def _cuda_ipc_open_mem_handle(handle_bytes: bytes, device: int) -> int: + """Call cudaIpcOpenMemHandle. Returns a device pointer (int).""" + import ctypes + + cudart = _get_cudart() + + # Set the target device + ret = cudart.cudaSetDevice(device) + if ret != 0: + raise RuntimeError(f"cudaSetDevice({device}) failed with error code {ret}") + + handle = (ctypes.c_byte * _CUDA_IPC_HANDLE_SIZE)(*handle_bytes) + dev_ptr = ctypes.c_void_p() + # cudaIpcMemLazyEnablePeerAccess = 0x01 + ret = cudart.cudaIpcOpenMemHandle( + ctypes.byref(dev_ptr), handle, ctypes.c_uint(0x01) + ) + if ret != 0: + raise RuntimeError(f"cudaIpcOpenMemHandle failed with error code {ret}") + if dev_ptr.value is None: + raise RuntimeError("cudaIpcOpenMemHandle returned a null pointer") + return dev_ptr.value + + +def _cuda_ipc_close_mem_handle(device_ptr: int) -> None: + """Call cudaIpcCloseMemHandle to release an IPC-opened device pointer.""" + import ctypes + + cudart = _get_cudart() + ret = cudart.cudaIpcCloseMemHandle(ctypes.c_void_p(device_ptr)) + if ret != 0: + raise RuntimeError(f"cudaIpcCloseMemHandle failed with error code {ret}") + + +# --------------------------------------------------------------------------- +# CUDA IPC array encode / decode +# --------------------------------------------------------------------------- + + +def _dump_cuda_ipc_arraydict(arr: Any) -> ArrayDict: + """Dump a CUDA array to a JSON dict with a CUDA IPC handle. + + Works with any object that implements __cuda_array_interface__: + PyTorch tensors, CuPy arrays, JAX DeviceArrays, etc. + + The IPC handle allows another process on the same host (with --ipc=host) + to access the GPU memory directly without any CPU round-trip. + """ + if not _has_cuda_array_interface(arr): + raise ValueError( + "cuda_ipc encoding requires a CUDA array " + f"(object with __cuda_array_interface__), got {type(arr).__name__}" + ) + + data_ptr, nbytes, shape, dtype_name = _get_cuda_array_info(arr) + + # Get the IPC handle for this device pointer + handle_bytes = _cuda_ipc_get_mem_handle(data_ptr) + + # Determine device ordinal + device = 0 + # CuPy arrays expose .device.id, torch tensors expose .device.index + if hasattr(arr, "device"): + dev = arr.device + if hasattr(dev, "id"): + device = dev.id # CuPy + elif hasattr(dev, "index") and dev.index is not None: + device = dev.index # PyTorch + + return { + "object_type": "array", + "shape": list(shape), + "dtype": dtype_name, + "data": { + "handle": pybase64.b64encode_as_string(handle_bytes), + "device": device, + "storage_offset": 0, + "storage_size": nbytes, + "encoding": "cuda_ipc", + }, + } + + +def _load_cuda_ipc_arraydict(val: ArrayDict) -> Any: + """Load a CUDA array from a JSON dict with a CUDA IPC handle. + + The calling process must share the IPC namespace with the producer + (e.g. both run with --ipc=host on Docker) and see the same GPU. + + Returns a CuPy ndarray wrapping the IPC device pointer (zero-copy). + CuPy is the only hard requirement for decoding; downstream code can + convert to PyTorch/JAX via DLPack or __cuda_array_interface__. + """ + try: + import cupy + except ImportError: + raise RuntimeError( + "cuda_ipc decoding requires CuPy to be installed " + "(pip install cupy-cuda12x or similar)." + ) from None + + data = val["data"] + handle_bytes = pybase64.b64decode(data["handle"], validate=True) + device = data["device"] + storage_size = data["storage_size"] + + dtype = np.dtype(val["dtype"]) + shape = tuple(val["shape"]) + + dev_ptr = _cuda_ipc_open_mem_handle(handle_bytes, device) + + mem = cupy.cuda.UnownedMemory(dev_ptr, storage_size, owner=None) + memptr = cupy.cuda.MemoryPointer(mem, 0) + return cupy.ndarray(shape, dtype=dtype, memptr=memptr) + + def _coerce_shape_dtype( arr: ArrayLike, expected_shape: ShapeType, @@ -468,6 +678,10 @@ def decode_array( base_dir = join_paths(base_dir, subdir) data = _load_binref_arraydict(val.model_dump(), base_dir) + elif val.data.encoding == "cuda_ipc": + # Returns a torch.Tensor on GPU — skip numpy coercion + return _load_cuda_ipc_arraydict(val.model_dump()) + # keep checking for "raw" for backwards compat elif val.data.encoding in {"json", "raw"}: data = np.asarray(val.data.buffer).reshape(val.shape) @@ -509,16 +723,26 @@ def encode_array( """ from tesseract_core.runtime.config import get_config + context = info.context if info.context else {} + array_encoding = context.get("array_encoding", "json") + + # For cuda_ipc, skip numpy conversion — array stays on GPU + if array_encoding == "cuda_ipc": + if not info.mode_is_json(): + return arr + if not _has_cuda_array_interface(arr): + raise ValueError( + "cuda_ipc encoding requires a CUDA array " + f"(object with __cuda_array_interface__), got {type(arr).__name__}" + ) + return _dump_cuda_ipc_arraydict(arr) + # Convert to a NumPy array if necessary arr = python_to_array(arr, info, expected_shape, expected_dtype) - context = info.context if info.context else {} - # Python mode -> just return the array without encoding if not info.mode_is_json(): return arr - - array_encoding = context.get("array_encoding", "json") if array_encoding == "base64": return _dump_base64_arraydict(arr) elif array_encoding == "binref": diff --git a/tesseract_core/runtime/file_interactions.py b/tesseract_core/runtime/file_interactions.py index 1a0eac76..dda45a7d 100644 --- a/tesseract_core/runtime/file_interactions.py +++ b/tesseract_core/runtime/file_interactions.py @@ -11,7 +11,7 @@ PathLike = str | Path -supported_format_type = Literal["json", "json+base64", "json+binref"] +supported_format_type = Literal["json", "json+base64", "json+binref", "json+cuda_ipc"] SUPPORTED_FORMATS = get_args(supported_format_type) @@ -36,6 +36,8 @@ def output_to_bytes( "base_dir": base_dir, "binref_dir": binref_dir, } + elif format == "json+cuda_ipc": + context = {"array_encoding": "cuda_ipc"} else: raise ValueError( f"Unsupported format {format} (must be one of {SUPPORTED_FORMATS})" diff --git a/tesseract_core/sdk/engine.py b/tesseract_core/sdk/engine.py index aaf66955..3d651738 100644 --- a/tesseract_core/sdk/engine.py +++ b/tesseract_core/sdk/engine.py @@ -645,7 +645,8 @@ def serve( memory: str | None = None, input_path: str | Path | None = None, output_path: str | Path | None = None, - output_format: Literal["json", "json+base64", "json+binref"] | None = None, + output_format: Literal["json", "json+base64", "json+binref", "json+cuda_ipc"] + | None = None, docker_args: list[str] | None = None, runtime_config: dict[str, Any] | None = None, skip_health_check: bool = False, @@ -779,6 +780,15 @@ def serve( raise ValueError("Network must be specified if network_alias is provided") extra_args.extend(["--network-alias", network_alias]) + # CUDA IPC requires shared IPC namespace between host and container + if output_format == "json+cuda_ipc": + extra_args.extend(["--ipc=host"]) + if not gpus: + logger.warning( + "json+cuda_ipc output format requires GPU access. " + "Consider passing gpus=['all'] or specific GPU IDs." + ) + if docker_args: extra_args.extend(docker_args) @@ -958,7 +968,8 @@ def run_tesseract( memory: str | None = None, input_path: str | Path | None = None, output_path: str | Path | None = None, - output_format: Literal["json", "json+base64", "json+binref"] | None = None, + output_format: Literal["json", "json+base64", "json+binref", "json+cuda_ipc"] + | None = None, output_file: str | None = None, docker_args: list[str] | None = None, stream_logs: bool | Callable[[str], None] = False, diff --git a/tesseract_core/sdk/tesseract.py b/tesseract_core/sdk/tesseract.py index 5685a1f5..ef33c091 100644 --- a/tesseract_core/sdk/tesseract.py +++ b/tesseract_core/sdk/tesseract.py @@ -124,7 +124,9 @@ def from_image( memory: str | None = None, input_path: str | Path | None = None, output_path: str | Path | None = None, - output_format: Literal["json", "json+base64", "json+binref"] = "json+base64", + output_format: Literal[ + "json", "json+base64", "json+binref", "json+cuda_ipc" + ] = "json+base64", docker_args: list[str] | None = None, runtime_config: dict[str, Any] | None = None, stream_logs: BoolOrCallable = False, @@ -228,7 +230,9 @@ def from_tesseract_api( tesseract_api: str | Path | ModuleType, input_path: Path | None = None, output_path: Path | None = None, - output_format: Literal["json", "json+base64", "json+binref"] = "json+base64", + output_format: Literal[ + "json", "json+base64", "json+binref", "json+cuda_ipc" + ] = "json+base64", runtime_config: dict[str, Any] | None = None, stream_logs: BoolOrCallable = False, ) -> Tesseract: @@ -348,9 +352,11 @@ def serve(self) -> None: host_ip = self._spawn_config["host_ip"] self._lastlog = None output_path = self._spawn_config.get("output_path") + output_format = self._spawn_config.get("output_format", "json+base64") self._client = HTTPClient( f"http://{host_ip}:{container.host_port}", output_path=Path(output_path) if output_path else None, + output_format=output_format, timeout=self._timeout, ) @@ -663,6 +669,13 @@ def _encode_array(arr: Any, b64: bool = True) -> dict: } +def _encode_array_cuda_ipc(arr: Any) -> dict: + """Encode a CUDA tensor via IPC handle for cross-process GPU sharing.""" + from tesseract_core.runtime.array_encoding import _dump_cuda_ipc_arraydict + + return _dump_cuda_ipc_arraydict(arr) + + def _decode_array( encoded_arr: dict, output_path: str | Path | None = None ) -> np.ndarray: @@ -714,6 +727,10 @@ def _decode_array( data = f.read(num_bytes) arr = np.frombuffer(data, dtype=dtype) + elif encoding == "cuda_ipc": + from tesseract_core.runtime.array_encoding import _load_cuda_ipc_arraydict + + return _load_cuda_ipc_arraydict(encoded_arr) else: raise ValueError(f"Unexpected array encoding {encoding}. Cannot decode.") @@ -728,10 +745,12 @@ def __init__( self, url: str, output_path: str | Path | None = None, + output_format: str = "json+base64", timeout: float | tuple[float, float] | None = None, ) -> None: self._url = self._sanitize_url(url) self._output_path = output_path + self._output_format = output_format self._timeout = timeout self._session = requests.Session() self._session.headers["Content-Type"] = "application/json" @@ -763,9 +782,26 @@ def _request( url = f"{self.url}/{endpoint.lstrip('/')}" if payload: - encoded_payload = _tree_map( - _encode_array, payload, is_leaf=lambda x: hasattr(x, "__array__") - ) + if self._output_format == "json+cuda_ipc": + # For CUDA IPC: encode GPU arrays via IPC handles, + # fall back to base64 for CPU arrays + def _encode_leaf(x: Any) -> dict: + if hasattr(x, "__cuda_array_interface__"): + return _encode_array_cuda_ipc(x) + return _encode_array(x) + + encoded_payload = _tree_map( + _encode_leaf, + payload, + is_leaf=lambda x: ( + hasattr(x, "__array__") + or hasattr(x, "__cuda_array_interface__") + ), + ) + else: + encoded_payload = _tree_map( + _encode_array, payload, is_leaf=lambda x: hasattr(x, "__array__") + ) else: encoded_payload = None diff --git a/tests/test_cuda_ipc.py b/tests/test_cuda_ipc.py new file mode 100644 index 00000000..8694231f --- /dev/null +++ b/tests/test_cuda_ipc.py @@ -0,0 +1,425 @@ +#!/usr/bin/env python3 +# Copyright 2025 Pasteur Labs. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""End-to-end tests for CUDA IPC array encoding. + +Run on a GPU machine with: + python tests/test_cuda_ipc.py + +Requires: cupy (for encode/decode) and optionally torch (for interop tests). +The CUDA IPC implementation is framework-agnostic — it works with any +object that implements __cuda_array_interface__ (CuPy, PyTorch, JAX, Numba). +""" + +import multiprocessing +import sys +import tempfile +from pathlib import Path + +import numpy as np +import pytest + +try: + import cupy + + _CUDA_AVAILABLE = cupy.cuda.runtime.getDeviceCount() > 0 +except (ImportError, Exception): + _CUDA_AVAILABLE = False + +try: + import torch as _torch + + _TORCH_AVAILABLE = _torch.cuda.is_available() +except ImportError: + _TORCH_AVAILABLE = False + +requires_cuda = pytest.mark.skipif( + not _CUDA_AVAILABLE, reason="CUDA + CuPy not available" +) +requires_torch_cuda = pytest.mark.skipif( + not _TORCH_AVAILABLE, reason="PyTorch CUDA not available" +) + + +def check_cuda_available(): + """For standalone script mode.""" + if not _CUDA_AVAILABLE: + print("SKIP: CUDA + CuPy not available") + sys.exit(0) + print( + f"CUDA available: device 0 = {cupy.cuda.runtime.getDeviceProperties(0)['name'].decode()}" + ) + + +# ── Test 1: Low-level encode/decode round-trip (CuPy) ─────────────────── + + +@requires_cuda +def test_roundtrip_same_process(): + """Encode a CuPy CUDA array, decode it, verify data matches.""" + from tesseract_core.runtime.array_encoding import ( + _dump_cuda_ipc_arraydict, + _load_cuda_ipc_arraydict, + ) + + print("\n=== Test 1: Same-process encode/decode round-trip ===") + + original = cupy.random.randn(64, 128, dtype=cupy.float32) + encoded = _dump_cuda_ipc_arraydict(original) + + # Verify the encoded dict structure + assert encoded["object_type"] == "array" + assert encoded["shape"] == [64, 128] + assert encoded["dtype"] == "float32" + assert encoded["data"]["encoding"] == "cuda_ipc" + assert "handle" in encoded["data"] + assert "device" in encoded["data"] + assert "storage_size" in encoded["data"] + print( + f" Encoded: shape={encoded['shape']}, dtype={encoded['dtype']}, " + f"device={encoded['data']['device']}, " + f"handle_len={len(encoded['data']['handle'])}" + ) + + # Decode — returns a CuPy array + decoded = _load_cuda_ipc_arraydict(encoded) + assert isinstance(decoded, cupy.ndarray) + assert decoded.shape == (64, 128) + assert decoded.dtype == cupy.float32 + + # Verify data matches + cupy.testing.assert_array_equal(original, decoded) + print(" PASSED: Data matches after same-process round-trip") + + +# ── Test 2: Cross-process IPC ─────────────────────────────────────────── + + +def _producer(queue, shape, dtype_name): + """Producer: create a CuPy array and send its IPC-encoded dict.""" + import cupy + + from tesseract_core.runtime.array_encoding import _dump_cuda_ipc_arraydict + + dtype = np.dtype(dtype_name) + arr = cupy.arange(int(np.prod(shape)), dtype=dtype).reshape(shape) + encoded = _dump_cuda_ipc_arraydict(arr) + + # Send encoded dict (small JSON-safe dict) over the queue + queue.put(encoded) + # Also send the expected values for verification + queue.put(cupy.asnumpy(arr).tolist()) + + # Wait for consumer to signal it's done reading + queue.get() # blocks until consumer is done + + +def _consumer(queue): + """Consumer: receive the IPC-encoded dict and reconstruct the array.""" + import cupy + + from tesseract_core.runtime.array_encoding import _load_cuda_ipc_arraydict + + encoded = queue.get() + expected_values = queue.get() + + decoded = _load_cuda_ipc_arraydict(encoded) + assert isinstance(decoded, cupy.ndarray) + + actual_values = cupy.asnumpy(decoded).tolist() + assert actual_values == expected_values, ( + f"Data mismatch!\n Expected: {expected_values[:5]}...\n Got: {actual_values[:5]}..." + ) + + # Signal producer we're done + queue.put("done") + return True + + +@requires_cuda +def test_cross_process_ipc(): + """Test CUDA IPC handle transfer between two processes.""" + print("\n=== Test 2: Cross-process CUDA IPC ===") + + ctx = multiprocessing.get_context("spawn") + queue = ctx.Queue() + + shape = (4, 8) + + producer = ctx.Process(target=_producer, args=(queue, shape, "float32")) + consumer = ctx.Process(target=_consumer, args=(queue,)) + + producer.start() + consumer.start() + + consumer.join(timeout=30) + producer.join(timeout=30) + + if consumer.exitcode != 0: + print(" FAILED: Consumer process exited with error") + sys.exit(1) + if producer.exitcode != 0: + print(" FAILED: Producer process exited with error") + sys.exit(1) + + print(" PASSED: Cross-process CUDA IPC round-trip successful") + + +# ── Test 3: SDK client-side encode/decode ─────────────────────────────── + + +@requires_cuda +def test_sdk_encode_decode(): + """Test the SDK-level _encode_array_cuda_ipc / _decode_array functions.""" + from tesseract_core.sdk.tesseract import _decode_array, _encode_array_cuda_ipc + + print("\n=== Test 3: SDK client-side encode/decode ===") + + original = cupy.random.randn(32, 64).astype(cupy.float64) + encoded = _encode_array_cuda_ipc(original) + + assert encoded["data"]["encoding"] == "cuda_ipc" + print(f" Encoded via SDK: shape={encoded['shape']}, dtype={encoded['dtype']}") + + decoded = _decode_array(encoded) + assert isinstance(decoded, cupy.ndarray) + assert decoded.shape == (32, 64) + assert decoded.dtype == cupy.float64 + cupy.testing.assert_array_equal(original, decoded) + print(" PASSED: SDK encode/decode round-trip matches") + + +# ── Test 4: Multiple dtypes ───────────────────────────────────────────── + + +@requires_cuda +def test_multiple_dtypes(): + """Test CUDA IPC with various dtypes.""" + from tesseract_core.runtime.array_encoding import ( + _dump_cuda_ipc_arraydict, + _load_cuda_ipc_arraydict, + ) + + print("\n=== Test 4: Multiple dtype support ===") + + test_cases = [ + ("float16", (10, 20)), + ("float32", (5, 5, 5)), + ("float64", (100,)), + ("int32", (8, 8)), + ("int64", (3, 3, 3)), + ("int8", (256,)), + ("uint8", (16, 16)), + ("bool", (4, 4)), + ] + + for dtype_name, shape in test_cases: + dtype = np.dtype(dtype_name) + if dtype_name == "bool": + original = cupy.random.randint(0, 2, shape).astype(dtype) + elif dtype.kind == "i" or dtype.kind == "u": + original = cupy.random.randint(0, 100, shape).astype(dtype) + else: + original = cupy.random.randn(*shape).astype(dtype) + + encoded = _dump_cuda_ipc_arraydict(original) + decoded = _load_cuda_ipc_arraydict(encoded) + + cupy.testing.assert_array_equal(original, decoded) + print(f" {dtype_name:15s} shape={shape!s:15s} OK") + + print(" PASSED: All dtypes round-trip correctly") + + +# ── Test 5: Large tensor (benchmark) ──────────────────────────────────── + + +@requires_cuda +def test_large_tensor_benchmark(): + """Benchmark CUDA IPC vs base64 for a large array.""" + import time + + from tesseract_core.runtime.array_encoding import ( + _dump_cuda_ipc_arraydict, + _load_cuda_ipc_arraydict, + ) + from tesseract_core.sdk.tesseract import _decode_array, _encode_array + + print("\n=== Test 5: Large tensor benchmark ===") + + # 256 MB array + size = 64 * 1024 * 1024 # 64M float32 = 256 MB + original_gpu = cupy.random.randn(size, dtype=cupy.float32) + original_cpu = cupy.asnumpy(original_gpu) + + # Benchmark base64 + t0 = time.perf_counter() + encoded_b64 = _encode_array(original_cpu, b64=True) + t_encode_b64 = time.perf_counter() - t0 + + t0 = time.perf_counter() + _decode_array(encoded_b64) + t_decode_b64 = time.perf_counter() - t0 + + # Benchmark CUDA IPC + t0 = time.perf_counter() + encoded_ipc = _dump_cuda_ipc_arraydict(original_gpu) + t_encode_ipc = time.perf_counter() - t0 + + t0 = time.perf_counter() + decoded_ipc = _load_cuda_ipc_arraydict(encoded_ipc) + t_decode_ipc = time.perf_counter() - t0 + + mb = size * 4 / (1024 * 1024) + print(f" Tensor size: {mb:.0f} MB") + print( + f" base64 encode: {t_encode_b64 * 1000:8.2f} ms " + f"decode: {t_decode_b64 * 1000:8.2f} ms " + f"total: {(t_encode_b64 + t_decode_b64) * 1000:8.2f} ms" + ) + print( + f" cuda_ipc encode: {t_encode_ipc * 1000:8.2f} ms " + f"decode: {t_decode_ipc * 1000:8.2f} ms " + f"total: {(t_encode_ipc + t_decode_ipc) * 1000:8.2f} ms" + ) + speedup = (t_encode_b64 + t_decode_b64) / max(t_encode_ipc + t_decode_ipc, 1e-9) + print(f" Speedup: {speedup:.1f}x") + + # Verify correctness + decoded_ipc_cpu = cupy.asnumpy(decoded_ipc) + np.testing.assert_array_equal(original_cpu, decoded_ipc_cpu) + print(" PASSED: Large tensor data verified correct") + + +# ── Test 6: PyTorch interop ───────────────────────────────────────────── + + +@requires_torch_cuda +def test_torch_encode_cupy_decode(): + """Encode a PyTorch CUDA tensor, decode as CuPy array.""" + import torch + + from tesseract_core.runtime.array_encoding import ( + _dump_cuda_ipc_arraydict, + _load_cuda_ipc_arraydict, + ) + + print("\n=== Test 6: PyTorch encode → CuPy decode ===") + + original = torch.randn(32, 64, device="cuda:0", dtype=torch.float32) + encoded = _dump_cuda_ipc_arraydict(original) + + # Decode returns CuPy array + decoded = _load_cuda_ipc_arraydict(encoded) + assert hasattr(decoded, "__cuda_array_interface__") + + # Compare via numpy + expected = original.cpu().numpy() + actual = ( + cupy.asnumpy(decoded) + if isinstance(decoded, cupy.ndarray) + else np.asarray(decoded) + ) + np.testing.assert_array_equal(expected, actual) + print(" PASSED: PyTorch → CuPy round-trip matches") + + +@requires_torch_cuda +def test_cupy_encode_torch_consume(): + """Encode a CuPy array, consume the decoded result in PyTorch via as_tensor.""" + import torch + + from tesseract_core.runtime.array_encoding import ( + _dump_cuda_ipc_arraydict, + _load_cuda_ipc_arraydict, + ) + + print( + "\n=== Test 7: CuPy encode → PyTorch consume via __cuda_array_interface__ ===" + ) + + original = cupy.random.randn(16, 32).astype(cupy.float32) + encoded = _dump_cuda_ipc_arraydict(original) + + decoded_cupy = _load_cuda_ipc_arraydict(encoded) + + # Convert CuPy → PyTorch via DLPack (zero-copy) + decoded_torch = torch.as_tensor(decoded_cupy, device="cuda:0") + assert decoded_torch.is_cuda + assert decoded_torch.shape == (16, 32) + + expected = cupy.asnumpy(original) + actual = decoded_torch.cpu().numpy() + np.testing.assert_array_equal(expected, actual) + print(" PASSED: CuPy → PyTorch consume via as_tensor works") + + +# ── Test 8: Full Tesseract API with cuda_ipc output format ────────────── + + +@requires_cuda +def test_tesseract_api_cuda_ipc(): + """Test a real Tesseract with json+cuda_ipc output format via LocalClient.""" + print("\n=== Test 8: Tesseract API with json+cuda_ipc format ===") + + # Create a minimal tesseract_api module in a temp directory + api_code = """ +import numpy as np +from pydantic import BaseModel +from tesseract_core.runtime import Array + +class InputSchema(BaseModel): + x: Array[(None,), "float32"] + +class OutputSchema(BaseModel): + y: Array[(None,), "float32"] + +def apply(inputs: InputSchema) -> OutputSchema: + x_np = np.asarray(inputs.x) + y_np = x_np * 2.0 + 1.0 + return {"y": y_np} +""" + + with tempfile.TemporaryDirectory() as tmpdir: + api_path = Path(tmpdir) / "tesseract_api.py" + api_path.write_text(api_code) + + from tesseract_core.sdk.tesseract import Tesseract + + t = Tesseract.from_tesseract_api( + api_path, + output_format="json+cuda_ipc", + ) + + x = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32) + result = t.apply({"x": x}) + + y = np.asarray(result["y"]) + expected = x * 2.0 + 1.0 + np.testing.assert_allclose(y, expected, rtol=1e-6) + print(f" Input: {x}") + print(f" Output: {y}") + print(f" Expected: {expected}") + print(" PASSED: Tesseract API produces correct results") + + +# ── Main ──────────────────────────────────────────────────────────────── + + +if __name__ == "__main__": + check_cuda_available() + + test_roundtrip_same_process() + test_cross_process_ipc() + test_sdk_encode_decode() + test_multiple_dtypes() + test_large_tensor_benchmark() + if _TORCH_AVAILABLE: + test_torch_encode_cupy_decode() + test_cupy_encode_torch_consume() + test_tesseract_api_cuda_ipc() + + print("\n" + "=" * 60) + print("ALL TESTS PASSED") + print("=" * 60)