Skip to content
1 change: 1 addition & 0 deletions changes/11767.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix multi-proxy storage TUS uploads from corrupting shared NFS-mounted files by storing each PATCH as a metadata-tracked per-offset chunk and assembling atomically on completion.
217 changes: 127 additions & 90 deletions src/ai/backend/storage/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import logging
import os
import urllib.parse
from collections.abc import AsyncGenerator, Iterator, Mapping, MutableMapping
from collections.abc import AsyncGenerator, Iterator, Mapping
from contextlib import AbstractAsyncContextManager
from datetime import UTC, datetime
from http import HTTPStatus
Expand Down Expand Up @@ -38,20 +38,26 @@
ArchiveDownloadQueryParams,
ArchiveDownloadTokenData,
)
from ai.backend.common.files import AsyncFileWriter
from ai.backend.common.json import dump_json_str
from ai.backend.common.metrics.http import build_api_metric_middleware
from ai.backend.common.middlewares.exception import general_exception_middleware
from ai.backend.common.typed_validators import PydanticJWTValidator
from ai.backend.common.types import BinarySize, VFolderID
from ai.backend.common.types import BinarySize, TusSessionId, VFolderID
from ai.backend.logging import BraceStyleAdapter
from ai.backend.storage import __version__
from ai.backend.storage.dto.context import StorageRootCtx
from ai.backend.storage.errors import InvalidAPIParameters, UploadOffsetMismatchError
from ai.backend.storage.errors import (
InvalidAPIParameters,
TusSessionNotFoundError,
UploadChunkExceedsTotalSizeError,
UploadOffsetMismatchError,
)
from ai.backend.storage.services.file_stream.zip import (
ZipArchiveStreamReader,
)
from ai.backend.storage.types import SENTINEL
from ai.backend.storage.services.upload.tus_session import TusUploadSession
from ai.backend.storage.services.upload.types import TusUploadSessionArgs
from ai.backend.storage.types import SENTINEL, TusChunkUploadStreamReader
from ai.backend.storage.utils import (
CheckParamSource,
build_attachment_headers,
Expand All @@ -60,7 +66,6 @@

if TYPE_CHECKING:
from ai.backend.storage.context import RootContext
from ai.backend.storage.volumes.abc import AbstractVolume

log = BraceStyleAdapter(logging.getLogger(__spec__.name))

Expand Down Expand Up @@ -96,7 +101,7 @@ class UploadTokenData(TypedDict):
volume: str
vfid: VFolderID
relpath: str
session: str
session: TusSessionId
size: int


Expand Down Expand Up @@ -329,14 +334,30 @@ class Params(TypedDict):
) as params:
token_data = params["token"]
async with ctx.get_volume(token_data["volume"]) as volume:
headers = await prepare_tus_session_headers(request, token_data, volume)
session_dir = (
volume.mangle_vfpath(token_data["vfid"]) / ".upload" / token_data["session"]
)
session = TusUploadSession(
TusUploadSessionArgs(
session_dir=session_dir,
session_id=token_data["session"],
total_size=int(token_data["size"]),
valkey_client=ctx.valkey_tus_client,
lock_factory=ctx.tus_lock_factory,
)
)
if not await session.exists():
raise TusSessionNotFoundError
state = await session.read_state()
headers = _prepare_tus_session_headers(
upload_offset=state.committed_offset,
upload_length=int(token_data["size"]),
)
return web.Response(headers=headers)


async def tus_upload_part(request: web.Request) -> web.Response:
"""
Perform the chunk upload.
"""
"""Perform a TUS PATCH chunk upload."""
ctx: RootContext = request.app["ctx"]
secret = ctx.local_config.storage_proxy.secret

Expand All @@ -361,61 +382,110 @@ class Params(TypedDict):
),
) as params:
token_data = params["token"]
async with ctx.get_volume(token_data["volume"]) as volume:
headers = await prepare_tus_session_headers(request, token_data, volume)
vfpath = volume.mangle_vfpath(token_data["vfid"])
upload_temp_path: Path = vfpath / ".upload" / token_data["session"]
total_size = int(token_data["size"])

# TUS protocol requires Upload-Offset validation before appending data
upload_offset_header = request.headers.get("Upload-Offset")
if upload_offset_header is None:
raise InvalidAPIParameters(
"Missing required Upload-Offset header for TUS PATCH request"
upload_offset_header = request.headers.get("Upload-Offset")
if upload_offset_header is None:
raise InvalidAPIParameters(
"Missing required Upload-Offset header for TUS PATCH request"
)
try:
client_offset = int(upload_offset_header)
except ValueError as e:
raise InvalidAPIParameters(
f"Invalid Upload-Offset header value: {upload_offset_header}"
) from e
if client_offset < 0 or client_offset > total_size:
raise UploadOffsetMismatchError(
f"Upload-Offset {client_offset} is out of range [0, {total_size}]"
)

async with ctx.get_volume(token_data["volume"]) as volume:
session_dir = (
volume.mangle_vfpath(token_data["vfid"]) / ".upload" / token_data["session"]
)
session = TusUploadSession(
TusUploadSessionArgs(
session_dir=session_dir,
session_id=token_data["session"],
total_size=total_size,
valkey_client=ctx.valkey_tus_client,
lock_factory=ctx.tus_lock_factory,
)
)
if not await session.exists():
raise TusSessionNotFoundError

upload_stream = TusChunkUploadStreamReader(
request.content, request.content_type, DEFAULT_CHUNK_SIZE
)
written = await session.write_temp_chunk(client_offset, upload_stream)
try:
client_offset = int(upload_offset_header)
except ValueError as e:
raise InvalidAPIParameters(
f"Invalid Upload-Offset header value: {upload_offset_header}"
) from e

actual_offset = int(headers["Upload-Offset"])
if client_offset != actual_offset:
raise UploadOffsetMismatchError(
f"Upload offset mismatch: expected {actual_offset}, got {client_offset}"
if client_offset + written.length > total_size:
raise UploadChunkExceedsTotalSizeError(
f"Chunk at offset {client_offset} with length {written.length} "
f"exceeds declared size {total_size}"
)
commit_result = await session.commit_chunk(
offset=client_offset,
chunk_path=written.path,
length=written.length,
sha256=written.sha256,
)

async with AsyncFileWriter(
target_filename=upload_temp_path,
access_mode="ab",
max_chunks=DEFAULT_INFLIGHT_CHUNKS,
) as writer:
while not request.content.at_eof():
chunk = await request.content.read(DEFAULT_CHUNK_SIZE)
await writer.write(chunk)

current_size = Path(upload_temp_path).stat().st_size
if current_size >= int(token_data["size"]):
parent_dir = vfpath
except BaseException:
if written.path.exists():
await asyncio.to_thread(written.path.unlink)
raise

state = commit_result.state
if commit_result.is_final_commit:
parent_dir = volume.mangle_vfpath(token_data["vfid"])
if (dst_dir := params["dst_dir"]) is not None:
parent_dir = vfpath / dst_dir
target_path: Path = parent_dir / token_data["relpath"]
if not target_path.parent.exists():
target_path.parent.mkdir(parents=True, exist_ok=True)
upload_temp_path.rename(target_path)
try:
loop = asyncio.get_running_loop()
await loop.run_in_executor(
None,
lambda: upload_temp_path.parent.rmdir(),
)
except OSError:
pass
headers["Upload-Offset"] = str(current_size)
parent_dir = parent_dir / dst_dir
target_path = parent_dir / token_data["relpath"]
await session.assemble(target_path)
await session.cleanup()

headers = _prepare_tus_session_headers(
upload_offset=state.committed_offset,
upload_length=total_size,
)
return web.Response(status=HTTPStatus.NO_CONTENT, headers=headers)


# TUS-related request/response headers that the storage-proxy accepts and emits,
# advertised via Access-Control-{Allow,Expose}-Headers so browser clients can use
# them across origins.
#
# - Tus-Resumable: TUS protocol version (1.0.0). Required on every TUS request
# and response β€” the spec's version-negotiation handshake.
# - Upload-Length: total declared size of the file in bytes. Set on session
# creation; echoed in HEAD responses.
# - Upload-Metadata: comma-separated key/value pairs (value is base64-encoded
# per the spec) carrying user-provided metadata such as filename or content
# type. Optional; this implementation does not currently consume it but
# surfaces it to clients.
# - Upload-Offset: byte position from which the next PATCH must continue;
# required on PATCH requests and present on HEAD/PATCH responses. The core
# resumability primitive of TUS.
# - Content-Type: PATCH bodies must be `application/offset+octet-stream` per
# the TUS spec.
_TUS_HEADER_LIST = "Tus-Resumable, Upload-Length, Upload-Metadata, Upload-Offset, Content-Type"
Comment thread
jopemachine marked this conversation as resolved.


def _prepare_tus_session_headers(*, upload_offset: int, upload_length: int) -> dict[str, str]:
return {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": _TUS_HEADER_LIST,
"Access-Control-Expose-Headers": _TUS_HEADER_LIST,
"Access-Control-Allow-Methods": "*",
"Cache-Control": "no-store",
"Tus-Resumable": "1.0.0",
"Upload-Offset": str(upload_offset),
"Upload-Length": str(upload_length),
}


async def tus_options(request: web.Request) -> web.Response:
"""
Let clients discover the supported features of our tus.io server-side implementation.
Expand All @@ -439,39 +509,6 @@ async def tus_options(request: web.Request) -> web.Response:
return web.Response(headers=headers)


async def prepare_tus_session_headers(
_request: web.Request,
token_data: Mapping[str, Any],
volume: AbstractVolume,
) -> MutableMapping[str, str]:
vfpath = volume.mangle_vfpath(token_data["vfid"])
upload_temp_path = vfpath / ".upload" / token_data["session"]
if not Path(upload_temp_path).exists():
raise web.HTTPNotFound(
body=dump_json_str(
{
"title": "No such upload session",
"type": "https://api.backend.ai/probs/storage/no-such-upload-session",
},
),
content_type="application/problem+json",
)
headers = {}
headers["Access-Control-Allow-Origin"] = "*"
headers["Access-Control-Allow-Headers"] = (
"Tus-Resumable, Upload-Length, Upload-Metadata, Upload-Offset, Content-Type"
)
headers["Access-Control-Expose-Headers"] = (
"Tus-Resumable, Upload-Length, Upload-Metadata, Upload-Offset, Content-Type"
)
headers["Access-Control-Allow-Methods"] = "*"
headers["Cache-Control"] = "no-store"
headers["Tus-Resumable"] = "1.0.0"
headers["Upload-Offset"] = str(Path(upload_temp_path).stat().st_size)
headers["Upload-Length"] = str(token_data["size"])
return headers


class DownloadHandler:
"""Handler class for download operations following manager's api_handler pattern.

Expand Down
10 changes: 10 additions & 0 deletions src/ai/backend/storage/api/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
ItemResult,
QuotaScopeID,
ResultSet,
TusSessionId,
VolumeMountableNodeType,
)
from ai.backend.logging import BraceStyleAdapter
Expand All @@ -89,6 +90,7 @@
StorageProxyError,
VFolderNotFoundError,
)
from ai.backend.storage.services.upload.types import TusSessionState
from ai.backend.storage.types import QuotaConfig, VFolderID
from ai.backend.storage.utils import check_params, log_manager_api_entry
from ai.backend.storage.watcher import ChownTask, MountTask, UmountTask
Expand Down Expand Up @@ -1189,6 +1191,14 @@ class Params(TypedDict):
ctx: RootContext = request.app["ctx"]
async with ctx.get_volume(params["volume"]) as volume:
session_id = await volume.prepare_upload(params["vfid"])
# Register the session in Valkey so HEAD/PATCH can authoritatively tell
# "session exists" from a Valkey state lookup rather than poking the
# filesystem.
tus_session_id = TusSessionId(session_id)
await ctx.valkey_tus_client.set_session_state(
tus_session_id,
TusSessionState.empty(tus_session_id, int(params["size"])).model_dump_json(),
)
token_data = {
"op": "upload",
"volume": params["volume"],
Expand Down
4 changes: 4 additions & 0 deletions src/ai/backend/storage/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
)
from .upload import (
ChunkConflictError,
TusSessionNotFoundError,
UploadChunkExceedsTotalSizeError,
UploadSessionCorruptedError,
)
from .vfolder import (
Expand Down Expand Up @@ -96,7 +98,9 @@
"UploadOffsetMismatchError",
# upload
"ChunkConflictError",
"UploadChunkExceedsTotalSizeError",
"UploadSessionCorruptedError",
"TusSessionNotFoundError",
# vfolder
"VFolderNotFoundError",
"InvalidSubpathError",
Expand Down
35 changes: 35 additions & 0 deletions src/ai/backend/storage/errors/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,41 @@ def error_code(self) -> ErrorCode:
)


class TusSessionNotFoundError(BackendAIError, web.HTTPNotFound):
"""
Raised when a TUS handler is invoked for a session that is not registered
in Valkey (never created, or its state expired by TTL).
"""

error_type = "https://api.backend.ai/probs/storage/no-such-upload-session"
error_title = "No such upload session"

def error_code(self) -> ErrorCode:
return ErrorCode(
domain=ErrorDomain.STORAGE_PROXY,
operation=ErrorOperation.READ,
error_detail=ErrorDetail.NOT_FOUND,
)


class UploadChunkExceedsTotalSizeError(BackendAIError, web.HTTPConflict):
"""
Raised when a PATCH chunk's offset+length would write past the declared
``Upload-Length`` (409 Conflict). The Upload-Offset header itself is in
range; the chunk's body simply overruns the remaining slot.
"""

error_type = "https://api.backend.ai/probs/storage/upload-chunk-exceeds-total-size"
error_title = "Upload Chunk Exceeds Total Size"

def error_code(self) -> ErrorCode:
return ErrorCode(
domain=ErrorDomain.STORAGE_PROXY,
operation=ErrorOperation.UPDATE,
error_detail=ErrorDetail.CONFLICT,
)


class UploadSessionCorruptedError(BackendAIError, web.HTTPInternalServerError):
"""
Raised when the upload session metadata stored in Valkey cannot be parsed,
Expand Down
Loading
Loading