Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions benchmarks/test_array_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,24 @@ class ArrayModel(BaseModel):
data: Array[(None,), Float64]


ENCODINGS = ["json", "base64", "binref"]
ENCODINGS = ["json", "base64", "binref", "base64+lz4", "binref+lz4"]

# Maps short encoding name to the format string used by output_to_bytes
_ENCODING_TO_FORMAT: dict[str, supported_format_type] = {
"json": "json",
"base64": "json+base64",
"binref": "json+binref",
"base64+lz4": "json+base64",
"binref+lz4": "json+binref",
}

# Maps short encoding name to extra kwargs passed to output_to_bytes
_ENCODING_TO_KWARGS: dict[str, dict] = {
"json": {},
"base64": {},
"binref": {},
"base64+lz4": {"base64_compression": "lz4"},
"binref+lz4": {"binref_compression": "lz4"},
}


Expand Down Expand Up @@ -78,44 +89,50 @@ def test_encoding(benchmark, encoding_and_size):
encoding, size = encoding_and_size
model = ArrayModel(data=create_test_array(size))
fmt = _ENCODING_TO_FORMAT[encoding]
extra_kwargs = _ENCODING_TO_KWARGS[encoding]
uses_binref = "binref" in encoding

with tempfile.TemporaryDirectory() as tmpdir:
if encoding == "binref":
if uses_binref:

def setup():
_clear_dir(tmpdir)

benchmark.pedantic(
output_to_bytes,
args=(model, fmt),
kwargs={"base_dir": tmpdir},
kwargs={"base_dir": tmpdir, **extra_kwargs},
setup=setup,
rounds=_binref_rounds(size),
)
else:
benchmark(output_to_bytes, model, fmt)
benchmark(output_to_bytes, model, fmt, **extra_kwargs)


def test_decoding(benchmark, encoding_and_size):
encoding, size = encoding_and_size
model = ArrayModel(data=create_test_array(size))
fmt = _ENCODING_TO_FORMAT[encoding]
extra_kwargs = _ENCODING_TO_KWARGS[encoding]
uses_binref = "binref" in encoding

with tempfile.TemporaryDirectory() as tmpdir:
ctx: dict[str, str] = {}
if encoding == "binref":
if uses_binref:
ctx["base_dir"] = tmpdir

encoded = output_to_bytes(model, fmt, base_dir=tmpdir)
encoded = output_to_bytes(model, fmt, base_dir=tmpdir, **extra_kwargs)

if encoding == "binref":
if uses_binref:
# binref filenames are random UUIDs, so we must re-encode in setup
# and pass the fresh payload to the decode call via a mutable wrapper.
payload = [encoded]

def setup():
_clear_dir(tmpdir)
payload[0] = output_to_bytes(model, fmt, base_dir=tmpdir)
payload[0] = output_to_bytes(
model, fmt, base_dir=tmpdir, **extra_kwargs
)

def decode():
ArrayModel.model_validate_json(payload[0], context=ctx)
Expand All @@ -129,17 +146,19 @@ def test_roundtrip(benchmark, encoding_and_size):
encoding, size = encoding_and_size
model = ArrayModel(data=create_test_array(size))
fmt = _ENCODING_TO_FORMAT[encoding]
extra_kwargs = _ENCODING_TO_KWARGS[encoding]
uses_binref = "binref" in encoding

with tempfile.TemporaryDirectory() as tmpdir:
ctx: dict[str, str] = {}
if encoding == "binref":
if uses_binref:
ctx["base_dir"] = tmpdir

def roundtrip():
enc = output_to_bytes(model, fmt, base_dir=tmpdir)
enc = output_to_bytes(model, fmt, base_dir=tmpdir, **extra_kwargs)
ArrayModel.model_validate_json(enc, context=ctx)

if encoding == "binref":
if uses_binref:

def setup():
_clear_dir(tmpdir)
Expand Down
10 changes: 10 additions & 0 deletions docs/content/using-tesseracts/array-encodings.md
Comment thread
angela-ko marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,13 @@ $ curl \
The `.bin` file references are relative to the `--output-path`.
:::
::::

### binref + lz4 compression

Set `TESSERACT_BINREF_COMPRESSION=lz4` to compress arrays in `.bin` files. Each array is compressed individually, preserving offset-based random access. The compressed size is embedded directly in the buffer path (`<file>:<offset>:<compressed_size>`).
Comment on lines +144 to +146

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now also applies to base64, correct?


```bash
$ TESSERACT_BINREF_COMPRESSION=lz4 tesseract run vectoradd apply -f "json+binref" -o /tmp/output @examples/vectoradd/example_inputs.json
$ cat /tmp/output/results.json
{"result":{"object_type":"array","shape":[3],"dtype":"float64","data":{"buffer":"....bin:0:35","encoding":"binref","compression":"lz4"}}}
```
7,116 changes: 3,595 additions & 3,521 deletions production.uv.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ runtime = [
"numpy<=2.5.0,>=1.26",
"debugpy<=1.8.21,>=1.8.14",
"mlflow-skinny<=3.14.0,>=3.7.0",
"lz4<=4.4.5,>=4.0.0",
]
# END RUNTIME DEPENDENCIES

Expand Down
91 changes: 78 additions & 13 deletions tesseract_core/runtime/array_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,28 @@ class ArrayDict(TypedDict):
MAX_BINREF_BUFFER_SIZE = 100 * 1024 * 1024 # 100 MB


def _lz4_frame():
import lz4.frame

return lz4.frame
Comment on lines +67 to +70

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can live in global scope since the dep is now mandatory



def _compress(data: bytes, compression: str | None) -> bytes:
if compression is None:
return data
if compression == "lz4":
return _lz4_frame().compress(data)
raise ValueError(f"Unknown compression: {compression}")


def _decompress(data: bytes, compression: str | None) -> bytes:
if compression is None:
return data
if compression == "lz4":
return _lz4_frame().decompress(data)
raise ValueError(f"Unknown compression: {compression}")


# Base classes for the different array encodings
# The actual models are created dynamically based on the expected shape and dtype by get_array_model

Expand All @@ -79,14 +101,22 @@ class Base64ArrayData(BaseModel):
),
]
encoding: Literal["base64"]
compression: Literal["lz4"] | None = None
model_config = ConfigDict(extra="forbid")


class BinrefArrayData(BaseModel):
"""Data structure that dumps array data to binary file."""
"""Data structure that dumps array data to binary file.

The buffer field format is ``<path>[:<offset>[:<compressed_size>]]``.
When compression is set, the buffer must include ``:<compressed_size>``
so readers know how many compressed bytes to read.
"""

buffer: StrictStr = Field(pattern=r"^.+?(\:\d+)?$")
buffer: StrictStr = Field(pattern=r"^.+?(\:\d+(\:\d+)?)?$")
encoding: Literal["binref"]
compression: Literal["lz4"] | None = None

model_config = ConfigDict(extra="forbid")


Expand Down Expand Up @@ -223,6 +253,7 @@ def _dump_binref_arraydict(
subdir: Path | str | None,
current_binref_uuid: str,
max_file_size: int = MAX_BINREF_BUFFER_SIZE,
compression: str | None = None,
) -> tuple[ArrayDict, str]:
"""Dump array to json+binref encoded array dict."""
target_name = f"{current_binref_uuid}.bin"
Expand All @@ -241,28 +272,41 @@ def _dump_binref_arraydict(
target_name = join_paths(subdir, target_name)
target_path = join_paths(base_dir, target_name)

write_to_path(_fast_tobytes(arr), target_path, append=True)
blob = _compress(_fast_tobytes(arr), compression)
write_to_path(blob, target_path, append=True)
offset = current_size

if compression is not None:
data = {
"buffer": f"{target_name}:{offset}:{len(blob)}",
"encoding": "binref",
"compression": compression,
}
else:
data = {"buffer": f"{target_name}:{offset}", "encoding": "binref"}
arraydict = {
"object_type": "array",
"shape": list(arr.shape),
"dtype": arr.dtype.name,
"data": {"buffer": f"{target_name}:{offset}", "encoding": "binref"},
"data": data,
}
return arraydict, current_binref_uuid


def _dump_base64_arraydict(arr: ArrayLike) -> ArrayDict:
def _dump_base64_arraydict(arr: ArrayLike, compression: str | None = None) -> ArrayDict:
"""Dump array to json+base64 encoded array dict (plain dict, no Pydantic models)."""
blob = _compress(_fast_tobytes(arr), compression)
data: dict[str, Any] = {
"buffer": pybase64.b64encode_as_string(blob),
"encoding": "base64",
}
if compression is not None:
data["compression"] = compression
return {
"object_type": "array",
"shape": list(arr.shape),
"dtype": arr.dtype.name,
"data": {
"buffer": pybase64.b64encode_as_string(_fast_tobytes(arr)),
"encoding": "base64",
},
"data": data,
}


Expand All @@ -279,22 +323,27 @@ def _dump_json_arraydict(arr: ArrayLike) -> ArrayDict:
def _load_base64_arraydict(val: ArrayDict) -> np.ndarray:
"""Load array from json+base64 encoded array dict."""
buffer = pybase64.b64decode(val["data"]["buffer"], validate=True)
buffer = _decompress(buffer, val["data"].get("compression"))
return np.frombuffer(buffer, dtype=val["dtype"]).reshape(val["shape"])


def _load_binref_arraydict(val: ArrayDict, base_dir: str | Path | None) -> np.ndarray:
"""Load array from json+binref encoded array dict."""
path_match = re.match(r"^(?P<path>.+?)(\:(?P<offset>\d+))?$", val["data"]["buffer"])
path_match = re.match(
r"^(?P<path>.+?)(\:(?P<offset>\d+)(\:(?P<compressed_size>\d+))?)?$",
val["data"]["buffer"],
)
if not path_match:
raise ValueError(
f"Invalid binref path format: {val['data']['buffer']}. "
"Expected format is '<path>[:<offset>]'."
"Expected format is '<path>[:<offset>[:<compressed_size>]]'."
)
bufferpath = path_match.group("path")
if path_match.group("offset") is None:
offset = 0
else:
offset = int(path_match.group("offset"))
compressed_size_str = path_match.group("compressed_size")

uses_relative_path = not is_absolute_path(bufferpath) and not is_url(bufferpath)
if uses_relative_path and base_dir is None:
Expand All @@ -308,10 +357,23 @@ def _load_binref_arraydict(val: ArrayDict, base_dir: str | Path | None) -> np.nd
size = 1 if len(shape) == 0 else np.prod(shape)
num_bytes = int(size * dtype.itemsize)

compression = val["data"].get("compression")

if base_dir is not None:
bufferpath = join_paths(base_dir, bufferpath)

buffer = read_from_path(bufferpath, offset=offset, length=num_bytes)
if compression is None:
buffer = read_from_path(bufferpath, offset=offset, length=num_bytes)
else:
if compressed_size_str is None:
raise ValueError(
"compressed_size is required in buffer spec when compression is set "
"(expected format: '<path>:<offset>:<compressed_size>')"
)
buffer = _decompress(
read_from_path(bufferpath, offset=offset, length=int(compressed_size_str)),
compression,
)
return np.frombuffer(buffer, dtype=dtype).reshape(shape)


Expand Down Expand Up @@ -520,7 +582,9 @@ def encode_array(

array_encoding = context.get("array_encoding", "json")
if array_encoding == "base64":
return _dump_base64_arraydict(arr)
return _dump_base64_arraydict(
arr, compression=context.get("base64_compression")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we use a single use_compression variable instead of format-specific ones.

)
elif array_encoding == "binref":
base_dir = context.get("base_dir", get_config().output_path)
subdir = context.get("binref_dir", None)
Expand All @@ -530,6 +594,7 @@ def encode_array(
subdir=subdir,
current_binref_uuid=context.get("__binref_uuid", str(uuid4())),
max_file_size=context.get("max_file_size", MAX_BINREF_BUFFER_SIZE),
compression=context.get("binref_compression"),
)
context["__binref_uuid"] = new_binref_uuid
return data
Expand Down
25 changes: 24 additions & 1 deletion tesseract_core/runtime/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
from enum import Enum
from pathlib import Path
from textwrap import dedent
from types import UnionType
from typing import (
Annotated,
Any,
Literal,
Union,
get_args,
get_origin,
)
Expand Down Expand Up @@ -148,6 +150,22 @@ def main_callback(**kwargs: Any) -> None:
# Too late to configure here, as the API path is needed to load the Tesseract API
continue

# TODO: The Union unwrap + Literal-to-enum conversion below only exists
# because our minimal supported Typer (typer>=0.16 in pyproject.toml)
# can't handle `Literal` types and raises "Type not yet supported".
# Once the minimal Typer is bumped to a version with native `Literal`
# support, this whole branch can be removed.
#
# Unwrap Optional[...] (i.e. `X | None`) to inspect the inner type;
# since all options default to None, optionality is already handled
# and we only need the concrete type for Typer.
if get_origin(field_type) in (Union, UnionType):
non_none_args = [
arg for arg in get_args(field_type) if arg is not type(None)
]
if len(non_none_args) == 1:
field_type = non_none_args[0]

if get_origin(field_type) is Literal:
field_type = make_choice_enum(f"{field_name}Choices", get_args(field_type))

Expand Down Expand Up @@ -436,7 +454,12 @@ def _callback_wrapper(**kwargs: Any):
# so they go through stdio redirection to the log file
profiler.print_stats()

result = output_to_bytes(result, output_format, output_path)
result = output_to_bytes(
result,
output_format,
output_path,
binref_compression=config.binref_compression,
)

# write raw bytes to out_stream.buffer to support binary data (which may e.g. be piped)
if not output_file:
Expand Down
3 changes: 2 additions & 1 deletion tesseract_core/runtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import ast
import os
from pathlib import Path
from typing import Annotated, Any
from typing import Annotated, Any, Literal

from pydantic import (
BaseModel,
Expand Down Expand Up @@ -40,6 +40,7 @@ class RuntimeConfig(BaseModel):
output_path: str = "."
output_format: supported_format_type = "json"
output_file: str = ""
binref_compression: Literal["lz4"] | None = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only binref?

mlflow_tracking_uri: str = ""
mlflow_run_extra_args: Annotated[dict[str, Any], BeforeValidator(_eval_str)] = (
Field(default_factory=dict)
Expand Down
Loading
Loading