diff --git a/src/delimited.rs b/src/delimited.rs index b9f88420..0ff65fab 100644 --- a/src/delimited.rs +++ b/src/delimited.rs @@ -77,22 +77,28 @@ impl LineDelimiter { let is_escape = &mut self.is_escape; let is_quote = &mut self.is_quote; - let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| { - if *is_escape { - *is_escape = false; - None - } else if *v == ESCAPE { - *is_escape = true; - None - } else if *v == QUOTE { - *is_quote = !*is_quote; - None - } else if *is_quote { - None - } else { - (*v == NEWLINE).then_some(idx + 1) - } - }); + let mut record_ends = val + .iter() + .enumerate() + .filter_map(|(idx, v)| { + if *is_escape { + *is_escape = false; + None + } else if *v == ESCAPE { + *is_escape = true; + None + } else if *v == QUOTE { + *is_quote = !*is_quote; + None + } else if *is_quote { + None + } else { + (*v == NEWLINE).then_some(idx + 1) + } + }) + // Materialize the split points, records_ends is a double ended iterator, so when calling next_back() the quoting/escaping logic would run in reverse, corrupting the internal state. + .collect::>() + .into_iter(); let start_offset = match self.remainder.is_empty() { true => 0, @@ -270,4 +276,38 @@ mod tests { ] ) } + + #[tokio::test] + async fn test_delimiter_quotes_stream() { + let input = vec!["x,y,z\n,\"new\nline\",\"with ", "space\""]; + let input_stream = + futures_util::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s)))); + let stream = newline_delimited_stream(input_stream); + + let results: Vec<_> = stream.try_collect().await.unwrap(); + assert_eq!( + results, + vec![ + Bytes::from("x,y,z\n"), + Bytes::from(",\"new\nline\",\"with space\"") + ] + ) + } + + #[tokio::test] + async fn test_delimiter_escape_stream() { + let input = vec!["hello\n\n\"\\ttabulated\"", "world"]; + let input_stream = + futures_util::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s)))); + let stream = newline_delimited_stream(input_stream); + + let results: Vec<_> = stream.try_collect().await.unwrap(); + assert_eq!( + results, + vec![ + Bytes::from("hello\n\n"), + Bytes::from("\"\\ttabulated\"world") + ] + ) + } }