Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 229 additions & 5 deletions tesseract_core/runtime/array_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,25 @@ class JsonArrayData(BaseModel):
model_config = ConfigDict(extra="forbid")


class CudaIpcArrayData(BaseModel):
"""Data structure for CUDA IPC shared GPU memory handles.

The handle is the base64-encoded 64-byte cudaIpcMemHandle_t.
The device is the CUDA device ordinal the memory lives on.
storage_offset is the byte offset within the cudaMalloc allocation.
storage_size is the total size in bytes of the cudaMalloc allocation.
"""

handle: StrictStr = Field(
description="Base64-encoded cudaIpcMemHandle_t (64 bytes)"
)
device: int = Field(description="CUDA device ordinal")
storage_offset: int = Field(default=0, description="Byte offset within allocation")
storage_size: int = Field(description="Total allocation size in bytes")
encoding: Literal["cuda_ipc"]
model_config = ConfigDict(extra="forbid")


class EncodedArrayModel(BaseModel):
"""Base class for general encoded arrays.

Expand All @@ -107,7 +126,7 @@ class EncodedArrayModel(BaseModel):
object_type: Literal["array"]
shape: tuple[PositiveInt, ...]
dtype: AllowedDtypes
data: BinrefArrayData | Base64ArrayData | JsonArrayData
data: BinrefArrayData | Base64ArrayData | JsonArrayData | CudaIpcArrayData
model_config = ConfigDict(extra="forbid")


Expand Down Expand Up @@ -315,6 +334,197 @@ def _load_binref_arraydict(val: ArrayDict, base_dir: str | Path | None) -> np.nd
return np.frombuffer(buffer, dtype=dtype).reshape(shape)


def _has_cuda_array_interface(obj: Any) -> bool:
"""Check if an object exposes the __cuda_array_interface__ protocol.

This protocol is supported by PyTorch, CuPy, JAX, Numba, and any
CUDA-aware Python library. It indicates the object holds data in
GPU device memory.
"""
return hasattr(obj, "__cuda_array_interface__")


def _get_cuda_array_info(arr: Any) -> tuple[int, int, tuple[int, ...], str]:
"""Extract (device_ptr, nbytes, shape, numpy_dtype_str) from a CUDA array.

Works with any object that implements __cuda_array_interface__ (v2+):
PyTorch tensors, CuPy arrays, JAX DeviceArrays, Numba device arrays, etc.
"""
iface = arr.__cuda_array_interface__
data_ptr = iface["data"][0]
shape = tuple(iface["shape"])
typestr = iface["typestr"] # e.g. "<f4", "|b1"
dtype = np.dtype(typestr)
nbytes = int(np.prod(shape)) * dtype.itemsize if shape else dtype.itemsize
return data_ptr, nbytes, shape, dtype.name


# ---------------------------------------------------------------------------
# CUDA IPC via ctypes (framework-agnostic, no torch/cupy dependency)
# ---------------------------------------------------------------------------

_CUDART_HANDLE = None
_CUDA_IPC_HANDLE_SIZE = 64 # cudaIpcMemHandle_t is always 64 bytes


def _get_cudart():
"""Lazily load the CUDA runtime shared library."""
global _CUDART_HANDLE
if _CUDART_HANDLE is not None:
return _CUDART_HANDLE

import ctypes
import ctypes.util

# Try common names for the CUDA runtime library
for name in ("cudart", "cudart64_12", "cudart64_11"):
path = ctypes.util.find_library(name)
if path:
_CUDART_HANDLE = ctypes.CDLL(path)
return _CUDART_HANDLE

# Fallback: try loading directly
for path in ("libcudart.so", "libcudart.dylib", "cudart64_12.dll"):
try:
_CUDART_HANDLE = ctypes.CDLL(path)
return _CUDART_HANDLE
except OSError:
continue

raise RuntimeError(
"Could not find CUDA runtime library (libcudart). "
"Make sure CUDA is installed and LD_LIBRARY_PATH is set."
)


def _cuda_ipc_get_mem_handle(device_ptr: int) -> bytes:
"""Call cudaIpcGetMemHandle for a device pointer. Returns 64 raw bytes."""
import ctypes

cudart = _get_cudart()
handle = (ctypes.c_byte * _CUDA_IPC_HANDLE_SIZE)()
ret = cudart.cudaIpcGetMemHandle(ctypes.byref(handle), ctypes.c_void_p(device_ptr))
if ret != 0:
raise RuntimeError(f"cudaIpcGetMemHandle failed with error code {ret}")
return bytes(handle)


def _cuda_ipc_open_mem_handle(handle_bytes: bytes, device: int) -> int:
"""Call cudaIpcOpenMemHandle. Returns a device pointer (int)."""
import ctypes

cudart = _get_cudart()

# Set the target device
ret = cudart.cudaSetDevice(device)
if ret != 0:
raise RuntimeError(f"cudaSetDevice({device}) failed with error code {ret}")

handle = (ctypes.c_byte * _CUDA_IPC_HANDLE_SIZE)(*handle_bytes)
dev_ptr = ctypes.c_void_p()
# cudaIpcMemLazyEnablePeerAccess = 0x01
ret = cudart.cudaIpcOpenMemHandle(
ctypes.byref(dev_ptr), handle, ctypes.c_uint(0x01)
)
if ret != 0:
raise RuntimeError(f"cudaIpcOpenMemHandle failed with error code {ret}")
if dev_ptr.value is None:
raise RuntimeError("cudaIpcOpenMemHandle returned a null pointer")
return dev_ptr.value


def _cuda_ipc_close_mem_handle(device_ptr: int) -> None:
"""Call cudaIpcCloseMemHandle to release an IPC-opened device pointer."""
import ctypes

cudart = _get_cudart()
ret = cudart.cudaIpcCloseMemHandle(ctypes.c_void_p(device_ptr))
if ret != 0:
raise RuntimeError(f"cudaIpcCloseMemHandle failed with error code {ret}")


# ---------------------------------------------------------------------------
# CUDA IPC array encode / decode
# ---------------------------------------------------------------------------


def _dump_cuda_ipc_arraydict(arr: Any) -> ArrayDict:
"""Dump a CUDA array to a JSON dict with a CUDA IPC handle.

Works with any object that implements __cuda_array_interface__:
PyTorch tensors, CuPy arrays, JAX DeviceArrays, etc.

The IPC handle allows another process on the same host (with --ipc=host)
to access the GPU memory directly without any CPU round-trip.
"""
if not _has_cuda_array_interface(arr):
raise ValueError(
"cuda_ipc encoding requires a CUDA array "
f"(object with __cuda_array_interface__), got {type(arr).__name__}"
)

data_ptr, nbytes, shape, dtype_name = _get_cuda_array_info(arr)

# Get the IPC handle for this device pointer
handle_bytes = _cuda_ipc_get_mem_handle(data_ptr)

# Determine device ordinal
device = 0
# CuPy arrays expose .device.id, torch tensors expose .device.index
if hasattr(arr, "device"):
dev = arr.device
if hasattr(dev, "id"):
device = dev.id # CuPy
elif hasattr(dev, "index") and dev.index is not None:
device = dev.index # PyTorch

return {
"object_type": "array",
"shape": list(shape),
"dtype": dtype_name,
"data": {
"handle": pybase64.b64encode_as_string(handle_bytes),
"device": device,
"storage_offset": 0,
"storage_size": nbytes,
"encoding": "cuda_ipc",
},
}


def _load_cuda_ipc_arraydict(val: ArrayDict) -> Any:
"""Load a CUDA array from a JSON dict with a CUDA IPC handle.

The calling process must share the IPC namespace with the producer
(e.g. both run with --ipc=host on Docker) and see the same GPU.

Returns a CuPy ndarray wrapping the IPC device pointer (zero-copy).
CuPy is the only hard requirement for decoding; downstream code can
convert to PyTorch/JAX via DLPack or __cuda_array_interface__.
"""
try:
import cupy
except ImportError:
raise RuntimeError(
"cuda_ipc decoding requires CuPy to be installed "
"(pip install cupy-cuda12x or similar)."
) from None

data = val["data"]
handle_bytes = pybase64.b64decode(data["handle"], validate=True)
device = data["device"]
storage_size = data["storage_size"]

dtype = np.dtype(val["dtype"])
shape = tuple(val["shape"])

dev_ptr = _cuda_ipc_open_mem_handle(handle_bytes, device)

mem = cupy.cuda.UnownedMemory(dev_ptr, storage_size, owner=None)
memptr = cupy.cuda.MemoryPointer(mem, 0)
return cupy.ndarray(shape, dtype=dtype, memptr=memptr)


def _coerce_shape_dtype(
arr: ArrayLike,
expected_shape: ShapeType,
Expand Down Expand Up @@ -468,6 +678,10 @@ def decode_array(
base_dir = join_paths(base_dir, subdir)
data = _load_binref_arraydict(val.model_dump(), base_dir)

elif val.data.encoding == "cuda_ipc":
# Returns a torch.Tensor on GPU — skip numpy coercion
return _load_cuda_ipc_arraydict(val.model_dump())

# keep checking for "raw" for backwards compat
elif val.data.encoding in {"json", "raw"}:
data = np.asarray(val.data.buffer).reshape(val.shape)
Expand Down Expand Up @@ -509,16 +723,26 @@ def encode_array(
"""
from tesseract_core.runtime.config import get_config

context = info.context if info.context else {}
array_encoding = context.get("array_encoding", "json")

# For cuda_ipc, skip numpy conversion — array stays on GPU
if array_encoding == "cuda_ipc":
if not info.mode_is_json():
return arr
if not _has_cuda_array_interface(arr):
raise ValueError(
"cuda_ipc encoding requires a CUDA array "
f"(object with __cuda_array_interface__), got {type(arr).__name__}"
)
return _dump_cuda_ipc_arraydict(arr)

# Convert to a NumPy array if necessary
arr = python_to_array(arr, info, expected_shape, expected_dtype)

context = info.context if info.context else {}

# Python mode -> just return the array without encoding
if not info.mode_is_json():
return arr

array_encoding = context.get("array_encoding", "json")
if array_encoding == "base64":
return _dump_base64_arraydict(arr)
elif array_encoding == "binref":
Expand Down
4 changes: 3 additions & 1 deletion tesseract_core/runtime/file_interactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

PathLike = str | Path

supported_format_type = Literal["json", "json+base64", "json+binref"]
supported_format_type = Literal["json", "json+base64", "json+binref", "json+cuda_ipc"]
SUPPORTED_FORMATS = get_args(supported_format_type)


Expand All @@ -36,6 +36,8 @@ def output_to_bytes(
"base_dir": base_dir,
"binref_dir": binref_dir,
}
elif format == "json+cuda_ipc":
context = {"array_encoding": "cuda_ipc"}
else:
raise ValueError(
f"Unsupported format {format} (must be one of {SUPPORTED_FORMATS})"
Expand Down
15 changes: 13 additions & 2 deletions tesseract_core/sdk/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,8 @@ def serve(
memory: str | None = None,
input_path: str | Path | None = None,
output_path: str | Path | None = None,
output_format: Literal["json", "json+base64", "json+binref"] | None = None,
output_format: Literal["json", "json+base64", "json+binref", "json+cuda_ipc"]
| None = None,
docker_args: list[str] | None = None,
runtime_config: dict[str, Any] | None = None,
skip_health_check: bool = False,
Expand Down Expand Up @@ -779,6 +780,15 @@ def serve(
raise ValueError("Network must be specified if network_alias is provided")
extra_args.extend(["--network-alias", network_alias])

# CUDA IPC requires shared IPC namespace between host and container
if output_format == "json+cuda_ipc":
extra_args.extend(["--ipc=host"])
if not gpus:
logger.warning(
"json+cuda_ipc output format requires GPU access. "
"Consider passing gpus=['all'] or specific GPU IDs."
)

if docker_args:
extra_args.extend(docker_args)

Expand Down Expand Up @@ -958,7 +968,8 @@ def run_tesseract(
memory: str | None = None,
input_path: str | Path | None = None,
output_path: str | Path | None = None,
output_format: Literal["json", "json+base64", "json+binref"] | None = None,
output_format: Literal["json", "json+base64", "json+binref", "json+cuda_ipc"]
| None = None,
output_file: str | None = None,
docker_args: list[str] | None = None,
stream_logs: bool | Callable[[str], None] = False,
Expand Down
Loading
Loading