Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11768.enhance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a multi-proxy regression test that reproduces concurrent TUS chunk-upload corruption on shared NFS and locks in the chunk-store fix.
1 change: 1 addition & 0 deletions changes/11769.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support the TUS Checksum extension on storage proxy uploads: clients may now send `Upload-Checksum: sha256 <base64>` and the server rejects mismatched chunks with HTTP 460.
1 change: 1 addition & 0 deletions changes/11770.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `GET /upload/status` endpoint and `X-Backend-Ai-*` progress response headers so clients can recover after a storage proxy crash by resending only the byte ranges still missing.
151 changes: 142 additions & 9 deletions src/ai/backend/storage/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from __future__ import annotations

import asyncio
import base64
import binascii
import logging
import os
import urllib.parse
Expand Down Expand Up @@ -47,7 +49,9 @@
from ai.backend.storage import __version__
from ai.backend.storage.dto.context import StorageRootCtx
from ai.backend.storage.errors import (
ChunkChecksumMismatchError,
InvalidAPIParameters,
InvalidUploadChecksumHeaderError,
TusSessionNotFoundError,
UploadChunkExceedsTotalSizeError,
UploadOffsetMismatchError,
Expand All @@ -56,7 +60,7 @@
ZipArchiveStreamReader,
)
from ai.backend.storage.services.upload.tus_session import TusUploadSession
from ai.backend.storage.services.upload.types import TusUploadSessionArgs
from ai.backend.storage.services.upload.types import TusSessionState, TusUploadSessionArgs
from ai.backend.storage.types import SENTINEL, TusChunkUploadStreamReader
from ai.backend.storage.utils import (
CheckParamSource,
Expand Down Expand Up @@ -400,6 +404,10 @@ class Params(TypedDict):
f"Upload-Offset {client_offset} is out of range [0, {total_size}]"
)

expected_checksum_hex = _parse_upload_checksum_header(
request.headers.get("Upload-Checksum")
)

async with ctx.get_volume(token_data["volume"]) as volume:
session_dir = (
volume.mangle_vfpath(token_data["vfid"]) / ".upload" / token_data["session"]
Expand All @@ -426,6 +434,10 @@ class Params(TypedDict):
f"Chunk at offset {client_offset} with length {written.length} "
f"exceeds declared size {total_size}"
)
if expected_checksum_hex is not None and expected_checksum_hex != written.sha256:
raise ChunkChecksumMismatchError(
f"Chunk at offset {client_offset} failed SHA-256 verification"
)
commit_result = await session.commit_chunk(
offset=client_offset,
chunk_path=written.path,
Expand All @@ -450,9 +462,81 @@ class Params(TypedDict):
upload_offset=state.committed_offset,
upload_length=total_size,
)
headers.update(_progress_response_headers(state))
return web.Response(status=HTTPStatus.NO_CONTENT, headers=headers)


async def tus_upload_status(request: web.Request) -> web.Response:
"""
GET /upload/status β€” JSON snapshot of an in-flight upload session.

Used by clients to recover after a Storage Proxy crash or network error:
they discover which byte ranges are still missing and resend only those.
"""
ctx: RootContext = request.app["ctx"]
secret = ctx.local_config.storage_proxy.secret

class Params(TypedDict):
token: UploadTokenData
dst_dir: str

async with cast(
AbstractAsyncContextManager[Params],
check_params(
request,
t.Dict(
{
t.Key("token"): tx.JsonWebToken(
secret=secret,
inner_iv=upload_token_data_iv,
),
t.Key("dst_dir", default=None): t.Null | t.String,
},
),
read_from=CheckParamSource.QUERY,
),
) as params:
token_data = params["token"]
total_size = int(token_data["size"])
async with ctx.get_volume(token_data["volume"]) as volume:
session_dir = (
volume.mangle_vfpath(token_data["vfid"]) / ".upload" / token_data["session"]
)
session = TusUploadSession(
TusUploadSessionArgs(
session_dir=session_dir,
session_id=token_data["session"],
total_size=total_size,
valkey_client=ctx.valkey_tus_client,
lock_factory=ctx.tus_lock_factory,
)
)
if not await session.exists():
raise TusSessionNotFoundError
state = await session.read_state()
body = {
"session": token_data["session"],
"total_size": total_size,
"committed_offset": state.committed_offset,
"chunks_received": [rec.offset for rec in state.committed_chunks],
"missing_ranges": [
{"offset": offset, "length": length}
for offset, length in state.missing_ranges()
],
"progress_percent": state.progress_percent(),
"status": state.status,
}
log.trace(
"TUS upload status: session={}, progress={}/{} ({:.1f}%), status={}",
token_data["session"],
state.committed_offset,
total_size,
state.progress_percent(),
state.status,
)
return web.json_response(body, status=HTTPStatus.OK)


# TUS-related request/response headers that the storage-proxy accepts and emits,
# advertised via Access-Control-{Allow,Expose}-Headers so browser clients can use
# them across origins.
Expand All @@ -468,16 +552,25 @@ class Params(TypedDict):
# - Upload-Offset: byte position from which the next PATCH must continue;
# required on PATCH requests and present on HEAD/PATCH responses. The core
# resumability primitive of TUS.
# - Upload-Checksum: `<algorithm> <base64>` payload integrity check for PATCH
# bodies (TUS Checksum extension). Only `sha256` is accepted; the server
# verifies the PATCH body's digest against the supplied value and returns
# 460 on mismatch.
# - Content-Type: PATCH bodies must be `application/offset+octet-stream` per
# the TUS spec.
_TUS_HEADER_LIST = "Tus-Resumable, Upload-Length, Upload-Metadata, Upload-Offset, Content-Type"
_TUS_HEADER_LIST = (
"Tus-Resumable, Upload-Length, Upload-Metadata, Upload-Offset, Upload-Checksum, Content-Type"
)
_PROGRESS_HEADER_LIST = (
"X-Backend-Ai-Chunks-Received, X-Backend-Ai-Progress-Percent, X-Backend-Ai-Total-Expected"
)


def _prepare_tus_session_headers(*, upload_offset: int, upload_length: int) -> dict[str, str]:
return {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": _TUS_HEADER_LIST,
"Access-Control-Expose-Headers": _TUS_HEADER_LIST,
"Access-Control-Expose-Headers": f"{_TUS_HEADER_LIST}, {_PROGRESS_HEADER_LIST}",
"Access-Control-Allow-Methods": "*",
"Cache-Control": "no-store",
"Tus-Resumable": "1.0.0",
Expand All @@ -486,22 +579,59 @@ def _prepare_tus_session_headers(*, upload_offset: int, upload_length: int) -> d
}


def _parse_upload_checksum_header(raw: str | None) -> str | None:
"""
Parse a TUS Checksum extension header value into a hex SHA-256 digest.

Returns ``None`` if no header was sent. Raises
``InvalidUploadChecksumHeaderError`` for malformed values or unsupported
algorithms (only ``sha256`` is accepted).
"""
if raw is None:
return None
parts = raw.strip().split(None, 1)
if len(parts) != 2:
raise InvalidUploadChecksumHeaderError(
f"Upload-Checksum must be '<algorithm> <base64>', got {raw!r}"
)
algorithm, encoded = parts
if algorithm.lower() != "sha256":
raise InvalidUploadChecksumHeaderError(
f"Unsupported checksum algorithm: {algorithm}. Only 'sha256' is accepted."
)
try:
digest = base64.b64decode(encoded, validate=True)
except (binascii.Error, ValueError) as e:
raise InvalidUploadChecksumHeaderError(
f"Invalid base64 in Upload-Checksum: {encoded!r}"
) from e
if len(digest) != 32:
raise InvalidUploadChecksumHeaderError(f"sha256 digest must be 32 bytes, got {len(digest)}")
return digest.hex()


def _progress_response_headers(state: TusSessionState) -> dict[str, str]:
return {
"X-Backend-Ai-Chunks-Received": str(len(state.committed_chunks)),
"X-Backend-Ai-Progress-Percent": str(state.progress_percent()),
"X-Backend-Ai-Total-Expected": str(state.total_size),
}


async def tus_options(request: web.Request) -> web.Response:
"""
Let clients discover the supported features of our tus.io server-side implementation.
"""
ctx: RootContext = request.app["ctx"]
headers = {}
headers["Access-Control-Allow-Origin"] = "*"
headers["Access-Control-Allow-Headers"] = (
"Tus-Resumable, Upload-Length, Upload-Metadata, Upload-Offset, Content-Type"
)
headers["Access-Control-Expose-Headers"] = (
"Tus-Resumable, Upload-Length, Upload-Metadata, Upload-Offset, Content-Type"
)
headers["Access-Control-Allow-Headers"] = _TUS_HEADER_LIST
headers["Access-Control-Expose-Headers"] = _TUS_HEADER_LIST
headers["Access-Control-Allow-Methods"] = "*"
headers["Tus-Resumable"] = "1.0.0"
headers["Tus-Version"] = "1.0.0"
headers["Tus-Extension"] = "checksum"
headers["Tus-Checksum-Algorithm"] = "sha256"
headers["Tus-Max-Size"] = str(
int(BinarySize.from_str(ctx.local_config.storage_proxy.max_upload_size)),
)
Expand Down Expand Up @@ -584,4 +714,7 @@ async def init_client_app(ctx: RootContext) -> web.Application:
r.add_route("HEAD", tus_check_session)
r.add_route("PATCH", tus_upload_part)

status_resource = app.router.add_resource("/upload/status")
status_resource.add_route("GET", tus_upload_status)

return app
6 changes: 5 additions & 1 deletion src/ai/backend/storage/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@
QuotaTreeNotFoundError,
)
from .upload import (
ChunkChecksumMismatchError,
ChunkConflictError,
InvalidUploadChecksumHeaderError,
TusSessionNotFoundError,
UploadChunkExceedsTotalSizeError,
UploadSessionCorruptedError,
Expand Down Expand Up @@ -97,10 +99,12 @@
"ServiceNotInitializedError",
"UploadOffsetMismatchError",
# upload
"ChunkChecksumMismatchError",
"ChunkConflictError",
"InvalidUploadChecksumHeaderError",
"TusSessionNotFoundError",
"UploadChunkExceedsTotalSizeError",
"UploadSessionCorruptedError",
"TusSessionNotFoundError",
# vfolder
"VFolderNotFoundError",
"InvalidSubpathError",
Expand Down
43 changes: 43 additions & 0 deletions src/ai/backend/storage/errors/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,46 @@ def error_code(self) -> ErrorCode:
operation=ErrorOperation.READ,
error_detail=ErrorDetail.INTERNAL_ERROR,
)


class _ChecksumMismatch(web.HTTPClientError):
"""
TUS Checksum extension defines HTTP 460 for checksum mismatches.
aiohttp does not ship a built-in class for this code, so we declare one.
"""

status_code = 460


class ChunkChecksumMismatchError(BackendAIError, _ChecksumMismatch):
"""
Raised when ``Upload-Checksum`` header does not match the SHA-256 digest
of the received chunk body (HTTP 460 per TUS Checksum extension).
"""

error_type = "https://api.backend.ai/probs/storage/chunk-checksum-mismatch"
error_title = "Upload Chunk Checksum Mismatch"

def error_code(self) -> ErrorCode:
return ErrorCode(
domain=ErrorDomain.STORAGE_PROXY,
operation=ErrorOperation.UPDATE,
error_detail=ErrorDetail.MISMATCH,
)


class InvalidUploadChecksumHeaderError(BackendAIError, web.HTTPBadRequest):
"""
Raised when ``Upload-Checksum`` header is malformed or specifies an
unsupported algorithm (only ``sha256`` is accepted).
"""

error_type = "https://api.backend.ai/probs/storage/invalid-upload-checksum"
error_title = "Invalid Upload-Checksum header"

def error_code(self) -> ErrorCode:
return ErrorCode(
domain=ErrorDomain.STORAGE_PROXY,
operation=ErrorOperation.REQUEST,
error_detail=ErrorDetail.INVALID_PARAMETERS,
)
41 changes: 41 additions & 0 deletions src/ai/backend/storage/services/upload/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,47 @@ def find_at_offset(self, offset: int) -> ChunkMetadata | None:
return None
return None

def missing_ranges(self) -> list[tuple[int, int]]:
"""
Return the list of ``(offset, length)`` byte ranges in
``[0, total_size)`` that are NOT yet covered by any received chunk.

Overlapping or out-of-bounds received chunks are tolerated: ranges
are coalesced and clipped against the declared total size.
"""
if self.total_size <= 0:
return []
covered: list[tuple[int, int]] = []
for rec in self.committed_chunks:
start = max(0, rec.offset)
end = min(self.total_size, rec.end)
if start < end:
covered.append((start, end))
if not covered:
return [(0, self.total_size)]
covered.sort()
merged: list[tuple[int, int]] = [covered[0]]
for start, end in covered[1:]:
last_start, last_end = merged[-1]
if start <= last_end:
merged[-1] = (last_start, max(last_end, end))
else:
merged.append((start, end))
gaps: list[tuple[int, int]] = []
cursor = 0
for start, end in merged:
if cursor < start:
gaps.append((cursor, start - cursor))
cursor = end
if cursor < self.total_size:
gaps.append((cursor, self.total_size - cursor))
return gaps

def progress_percent(self) -> float:
if self.total_size <= 0:
return 100.0
return round(100.0 * self.committed_offset / self.total_size, 2)

@classmethod
def empty(cls, session_id: TusSessionId, total_size: int) -> Self:
return cls(session_id=session_id, total_size=total_size)
Expand Down
Loading
Loading