feat(spider-tdl): Add wire format to serialize/deserialize task inputs and task outputs.#308
feat(spider-tdl): Add wire format to serialize/deserialize task inputs and task outputs.#308LinZhihao-723 wants to merge 6 commits intoy-scope:mainfrom
Conversation
WalkthroughAdds a new workspace crate Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/spider-tdl/src/error.rs`:
- Around line 3-4: Update the doc comment on TdlError to reference the correct
FFI type: replace the incorrect ExecutionResult::Error mention with the actual
type used in ffi.rs (TaskExecutionResult or its Error variant), e.g.,
"TaskExecutionResult::Error", so the documentation accurately points to the FFI
payload container for TdlError.
In `@components/spider-tdl/src/ffi.rs`:
- Around line 76-79: The as_slice method currently dereferences raw
pointer/length from the FFI-safe CArray but is not marked unsafe; change its
signature to pub unsafe fn as_slice(&self) -> &'borrow_lifetime [ElementType] so
callers must uphold the safety invariants when calling, or alternatively add an
explicit unsafe constructor like pub unsafe fn from_raw_parts(pointer: *const
ElementType, length: usize) -> CArray and keep a safe as_slice that only
operates on validated/constructed instances; update all call sites of
CArray::as_slice, CArray::from_raw_parts (or similar) to reflect the chosen
approach so dereferencing the raw pointer is explicitly unsafe.
- Around line 178-189: into_result currently reclaims a caller-provided raw
pointer/length (via Box::from_raw) without being marked unsafe; change the
signature of TaskExecutionResult::into_result to be unsafe fn into_result(self)
-> Result<Vec<u8>, Vec<u8>> and update all call sites to call it within an
unsafe block, keeping the existing safety doc comment that the caller must
guarantee pointer/length validity and ownership; this makes it clear that using
into_result requires the caller to uphold the invariants around
TaskExecutionResult.pointer and TaskExecutionResult.length before Box::from_raw
is invoked.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: f9a9fac5-b46d-4b73-a178-1c0207dd6149
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.tomlcomponents/spider-tdl/Cargo.tomlcomponents/spider-tdl/src/error.rscomponents/spider-tdl/src/ffi.rscomponents/spider-tdl/src/lib.rscomponents/spider-tdl/src/std.rscomponents/spider-tdl/src/task_context.rscomponents/spider-tdl/src/wire.rs
sitaowang1998
left a comment
There was a problem hiding this comment.
Why are we having both unframe_payloads and StreamDeserializer at the same time, with overlapping functionality?
| } | ||
| let count_bytes: [u8; COUNT_HEADER_LEN] = data[..COUNT_HEADER_LEN] | ||
| .try_into() | ||
| .expect("slice length checked above"); |
There was a problem hiding this comment.
This error message is confusing.
There was a problem hiding this comment.
This is actually not reachable, so the error message is more like an internal comment (still don't want to use unwrap).
Rewrote using first_chunk to avoid this pattern.
| } | ||
| let len_bytes: [u8; FIELD_LEN_PREFIX_LEN] = data[pos..pos + FIELD_LEN_PREFIX_LEN] | ||
| .try_into() | ||
| .expect("slice length checked above"); |
| let len_bytes: [u8; FIELD_LEN_PREFIX_LEN] = self.data | ||
| [self.pos..self.pos + FIELD_LEN_PREFIX_LEN] | ||
| .try_into() | ||
| .expect("slice length checked above"); |
They're not overlapping.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
components/spider-tdl/src/wire.rs (2)
368-397: Consider limiting allocation based on buffer size to prevent DoS.When parsing untrusted wire data,
Vec::with_capacity(count)at line 377 uses the count directly from the wire header. A malicious count (e.g.,u32::MAX) could trigger a massive allocation attempt before validating that the buffer actually contains that many payloads.A defensive approach would be to bound the capacity by the remaining buffer size divided by minimum frame size (4 bytes for length prefix + at least 0 bytes payload).
🛡️ Suggested defensive allocation
fn unframe_payloads(data: &[u8]) -> Result<Vec<Vec<u8>>, WireError> { let count_bytes = data.first_chunk::<COUNT_HEADER_LEN>() .ok_or(WireError::InvalidFormat( "buffer too small for the payload count header", ))?; let count = u32::from_le_bytes(*count_bytes) as usize; let mut pos = COUNT_HEADER_LEN; - let mut payloads = Vec::with_capacity(count); + // Bound capacity by what the remaining buffer could possibly hold to mitigate DoS. + let max_possible = (data.len().saturating_sub(pos)) / FIELD_LEN_PREFIX_LEN; + let mut payloads = Vec::with_capacity(count.min(max_possible)); for _ in 0..count {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/spider-tdl/src/wire.rs` around lines 368 - 397, In unframe_payloads, don't trust the wire-provided count when calling Vec::with_capacity(count); instead compute a safe upper bound (e.g., remaining_bytes = data.len() - COUNT_HEADER_LEN; min_count = remaining_bytes / (FIELD_LEN_PREFIX_LEN /*+ min payload size*/)); then use Vec::with_capacity(min(count as usize, min_count)) or otherwise cap the allocation to avoid huge allocations from a malicious count; reference the function unframe_payloads and the symbols COUNT_HEADER_LEN, FIELD_LEN_PREFIX_LEN, count and data.len() when applying the fix.
600-603: Inconsistent import style:stdvsr#std.The imports mix
std(line 601) andr#std(line 602) for what appears to be the same module. Consider using one form consistently.✨ Suggested fix
use super::*; use crate::{ - std::{int32, int64}, - r#std::{Bytes, List, Map, int8, int16}, + r#std::{int8, int16, int32, int64, Bytes, List, Map}, };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/spider-tdl/src/wire.rs` around lines 600 - 603, The imports are inconsistent: the code imports crate::std::{int32, int64} and crate::r#std::{Bytes, List, Map, int8, int16}; standardize to one form (either use crate::std or crate::r#std) throughout the use statement to avoid confusion and potential compiler issues—update the use line that references crate::std and crate::r#std so both sets of symbols (int32, int64, Bytes, List, Map, int8, int16) are imported from the same module name (e.g., crate::r#std or crate::std) and remove the mixed form.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/spider-tdl/src/wire.rs`:
- Around line 203-212: The doc comment for method append incorrectly states it
forwards WireFrameBuilder::append_payload's return values while the
implementation calls append_serialize; update the documentation to reference the
actual method used (WireFrameBuilder::append_serialize) or change the
implementation to call append_payload to match the docs. Locate the pub fn
append<ValueType: serde::Serialize>(&mut self, value: &ValueType) -> Result<(),
WireError> and either correct the doc line to mention append_serialize or
replace the call to self.builder.append_serialize(value) with
self.builder.append_payload(value) so the doc and code are consistent.
---
Nitpick comments:
In `@components/spider-tdl/src/wire.rs`:
- Around line 368-397: In unframe_payloads, don't trust the wire-provided count
when calling Vec::with_capacity(count); instead compute a safe upper bound
(e.g., remaining_bytes = data.len() - COUNT_HEADER_LEN; min_count =
remaining_bytes / (FIELD_LEN_PREFIX_LEN /*+ min payload size*/)); then use
Vec::with_capacity(min(count as usize, min_count)) or otherwise cap the
allocation to avoid huge allocations from a malicious count; reference the
function unframe_payloads and the symbols COUNT_HEADER_LEN,
FIELD_LEN_PREFIX_LEN, count and data.len() when applying the fix.
- Around line 600-603: The imports are inconsistent: the code imports
crate::std::{int32, int64} and crate::r#std::{Bytes, List, Map, int8, int16};
standardize to one form (either use crate::std or crate::r#std) throughout the
use statement to avoid confusion and potential compiler issues—update the use
line that references crate::std and crate::r#std so both sets of symbols (int32,
int64, Bytes, List, Map, int8, int16) are imported from the same module name
(e.g., crate::r#std or crate::std) and remove the mixed form.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: eb24cae4-bff8-4db1-a3d4-d9f2c5326784
📒 Files selected for processing (1)
components/spider-tdl/src/wire.rs
| /// # Errors | ||
| /// | ||
| /// Returns an error if: | ||
| /// | ||
| /// * Forwards [`WireFrameBuilder::append_payload`]'s return values on failure. | ||
| pub fn append<ValueType: serde::Serialize>( | ||
| &mut self, | ||
| value: &ValueType, | ||
| ) -> Result<(), WireError> { | ||
| self.builder.append_serialize(value) |
There was a problem hiding this comment.
Documentation references incorrect method.
The doc comment states it forwards WireFrameBuilder::append_payload's errors, but the implementation calls append_serialize.
📝 Proposed fix
/// # Errors
///
/// Returns an error if:
///
- /// * Forwards [`WireFrameBuilder::append_payload`]'s return values on failure.
+ /// * Forwards [`WireFrameBuilder::append_serialize`]'s return values on failure.
pub fn append<ValueType: serde::Serialize>(📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// # Errors | |
| /// | |
| /// Returns an error if: | |
| /// | |
| /// * Forwards [`WireFrameBuilder::append_payload`]'s return values on failure. | |
| pub fn append<ValueType: serde::Serialize>( | |
| &mut self, | |
| value: &ValueType, | |
| ) -> Result<(), WireError> { | |
| self.builder.append_serialize(value) | |
| /// # Errors | |
| /// | |
| /// Returns an error if: | |
| /// | |
| /// * Forwards [`WireFrameBuilder::append_serialize`]'s return values on failure. | |
| pub fn append<ValueType: serde::Serialize>( | |
| &mut self, | |
| value: &ValueType, | |
| ) -> Result<(), WireError> { | |
| self.builder.append_serialize(value) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/spider-tdl/src/wire.rs` around lines 203 - 212, The doc comment
for method append incorrectly states it forwards
WireFrameBuilder::append_payload's return values while the implementation calls
append_serialize; update the documentation to reference the actual method used
(WireFrameBuilder::append_serialize) or change the implementation to call
append_payload to match the docs. Locate the pub fn append<ValueType:
serde::Serialize>(&mut self, value: &ValueType) -> Result<(), WireError> and
either correct the doc line to mention append_serialize or replace the call to
self.builder.append_serialize(value) with self.builder.append_payload(value) so
the doc and code are consistent.
Description
spider-tdlcrate foundation: #304.This PR implements the wire format for serializing and deserializing task inputs and outputs, with the design goal of minimizing memory copies required.
For task inputs,
TaskInputsSerializeris designed to serialize task inputs' payloads into the wire format in a streaming flavor. The serialized byte buffer will be transmitted all the way down to the TDL package library without needing to unpack. Inside the TDL package task execution, the byte buffer will be deserialized into a task param struct in place. The memory copies involved:For network transmission and inter-process transmission, they're abstracted by the libraries we use so there might be hidden inefficiency, but this is out of our control. For the buffer serialization and deserialization, this is the best we can do to reduce the number of memory copies needed.
For task outputs,
TaskOutputsSerializeris designed to serialize a tuple of task outputs of a concrete type into the wire format, while each frame is encoded using msgpack (which is the format we store in JCB). The serialized buffer is passed all the way back to the storage layer, and unpacked into raw payloads. The storage layer never interprets these payload bytes. These payloads can be serialized directly usingTaskInputsSerializerfor child tasks. And overall, it has the same efficiency as the task inputs serialization/deserialization, which minimizes the number of memory copies required.Checklist
breaking change.
Validation performed
Summary by CodeRabbit