Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 56 additions & 16 deletions src/delimited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,28 @@ impl LineDelimiter {

let is_escape = &mut self.is_escape;
let is_quote = &mut self.is_quote;
let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
if *is_escape {
*is_escape = false;
None
} else if *v == ESCAPE {
*is_escape = true;
None
} else if *v == QUOTE {
*is_quote = !*is_quote;
None
} else if *is_quote {
None
} else {
(*v == NEWLINE).then_some(idx + 1)
}
});
let mut record_ends = val
.iter()
.enumerate()
.filter_map(|(idx, v)| {
if *is_escape {
*is_escape = false;
None
} else if *v == ESCAPE {
*is_escape = true;
None
} else if *v == QUOTE {
*is_quote = !*is_quote;
None
} else if *is_quote {
None
} else {
(*v == NEWLINE).then_some(idx + 1)
}
})
// Materialize the split points, records_ends is a double ended iterator, so when calling next_back() the quoting/escaping logic would run in reverse, corrupting the internal state.
.collect::<Vec<_>>()
.into_iter();
Comment thread
bboissin marked this conversation as resolved.

let start_offset = match self.remainder.is_empty() {
true => 0,
Expand Down Expand Up @@ -270,4 +276,38 @@ mod tests {
]
)
}

#[tokio::test]
async fn test_delimiter_quotes_stream() {
let input = vec!["x,y,z\n,\"new\nline\",\"with ", "space\""];
let input_stream =
futures_util::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s))));
let stream = newline_delimited_stream(input_stream);

let results: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(
results,
vec![
Bytes::from("x,y,z\n"),
Bytes::from(",\"new\nline\",\"with space\"")
]
)
}

#[tokio::test]
async fn test_delimiter_escape_stream() {
let input = vec!["hello\n\n\"\\ttabulated\"", "world"];
let input_stream =
futures_util::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s))));
let stream = newline_delimited_stream(input_stream);

let results: Vec<_> = stream.try_collect().await.unwrap();
assert_eq!(
results,
vec![
Bytes::from("hello\n\n"),
Bytes::from("\"\\ttabulated\"world")
]
)
}
}