-
Notifications
You must be signed in to change notification settings - Fork 54
feat: seekable reader #530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |||||
|
|
||||||
| use std::{ | ||||||
| collections::HashMap, | ||||||
| io::{ErrorKind, Read}, | ||||||
| io::{ErrorKind, Read, Seek, SeekFrom}, | ||||||
| str::FromStr, | ||||||
| }; | ||||||
|
|
||||||
|
|
@@ -35,10 +35,57 @@ use crate::{ | |||||
| util, | ||||||
| }; | ||||||
|
|
||||||
| /// Byte offset and record count of a single Avro data block. | ||||||
| /// | ||||||
| /// Captured automatically as blocks are read during forward iteration. | ||||||
| /// Use with [`super::Reader::seek_to_block`] to jump back to a previously-read block. | ||||||
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||||||
| pub struct BlockPosition { | ||||||
| /// Byte offset in the stream where this block starts (before the object-count varint). | ||||||
| pub offset: u64, | ||||||
| /// Total number of records in this block. | ||||||
| pub message_count: usize, | ||||||
| } | ||||||
|
|
||||||
| /// Wraps an inner reader and tracks the current byte position. | ||||||
| /// | ||||||
| /// Avoids requiring `Seek` just to know how many bytes have been consumed. | ||||||
| /// When the inner reader also implements `Seek`, seeking updates the tracked position. | ||||||
| #[derive(Debug, Clone)] | ||||||
| struct PositionTracker<R> { | ||||||
| inner: R, | ||||||
| pos: u64, | ||||||
| } | ||||||
|
|
||||||
| impl<R> PositionTracker<R> { | ||||||
| fn new(inner: R) -> Self { | ||||||
| Self { inner, pos: 0 } | ||||||
| } | ||||||
|
|
||||||
| fn position(&self) -> u64 { | ||||||
| self.pos | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl<R: Read> Read for PositionTracker<R> { | ||||||
| fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { | ||||||
| let n = self.inner.read(buf)?; | ||||||
| self.pos += n as u64; | ||||||
| Ok(n) | ||||||
| } | ||||||
|
splix marked this conversation as resolved.
|
||||||
| } | ||||||
|
|
||||||
| impl<R: Seek> Seek for PositionTracker<R> { | ||||||
| fn seek(&mut self, from: SeekFrom) -> std::io::Result<u64> { | ||||||
| self.pos = self.inner.seek(from)?; | ||||||
| Ok(self.pos) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// Internal Block reader. | ||||||
| #[derive(Debug, Clone)] | ||||||
| pub(super) struct Block<'r, R> { | ||||||
| reader: R, | ||||||
| reader: PositionTracker<R>, | ||||||
| /// Internal buffering to reduce allocation. | ||||||
| buf: Vec<u8>, | ||||||
| buf_idx: usize, | ||||||
|
|
@@ -51,6 +98,10 @@ pub(super) struct Block<'r, R> { | |||||
| pub(super) user_metadata: HashMap<String, Vec<u8>>, | ||||||
| names_refs: Names, | ||||||
| human_readable: bool, | ||||||
| /// Byte offset where data blocks begin (right after header and sync marker). | ||||||
| pub(super) data_start: u64, | ||||||
| /// Position and record count of the currently loaded block. | ||||||
| pub(super) current_block_info: Option<BlockPosition>, | ||||||
| } | ||||||
|
|
||||||
| impl<'r, R: Read> Block<'r, R> { | ||||||
|
|
@@ -60,7 +111,7 @@ impl<'r, R: Read> Block<'r, R> { | |||||
| human_readable: bool, | ||||||
| ) -> AvroResult<Block<'r, R>> { | ||||||
| let mut block = Block { | ||||||
| reader, | ||||||
| reader: PositionTracker::new(reader), | ||||||
| codec: Codec::Null, | ||||||
| writer_schema: Schema::Null, | ||||||
| schemata, | ||||||
|
|
@@ -71,9 +122,12 @@ impl<'r, R: Read> Block<'r, R> { | |||||
| user_metadata: Default::default(), | ||||||
| names_refs: Default::default(), | ||||||
| human_readable, | ||||||
| data_start: 0, | ||||||
| current_block_info: None, | ||||||
| }; | ||||||
|
|
||||||
| block.read_header()?; | ||||||
| block.data_start = block.reader.position(); | ||||||
| Ok(block) | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -142,6 +196,7 @@ impl<'r, R: Read> Block<'r, R> { | |||||
| /// the block. The objects are stored in an internal buffer to the `Reader`. | ||||||
| fn read_block_next(&mut self) -> AvroResult<()> { | ||||||
| assert!(self.is_empty(), "Expected self to be empty!"); | ||||||
| let block_start = self.reader.position(); | ||||||
| match util::read_long(&mut self.reader).map_err(Error::into_details) { | ||||||
| Ok(block_len) => { | ||||||
| self.message_count = block_len as usize; | ||||||
|
|
@@ -156,6 +211,11 @@ impl<'r, R: Read> Block<'r, R> { | |||||
| return Err(Details::GetBlockMarker.into()); | ||||||
| } | ||||||
|
|
||||||
| self.current_block_info = Some(BlockPosition { | ||||||
| offset: block_start, | ||||||
| message_count: block_len as usize, | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually wrote this part specifically this way to ensure it would not lost, if a refactoring or other change are applied. It cannot rely on the meaning of the self.message_count and its current value if those two will be separated into different places of code
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can move
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not arguing, any of this would work and I can change, that's no problem. But I'm curios to understand why you think this would be better? My idea is just using the source of truth variable, so there is no way it gets a wrong number. Using the self.message_count works with the current code, but it doesn't give any guarantee if it changes. So I'm wondering why the second approach is better? |
||||||
| }); | ||||||
|
|
||||||
| // NOTE (JAB): This doesn't fit this Reader pattern very well. | ||||||
| // `self.buf` is a growable buffer that is reused as the reader is iterated. | ||||||
| // For non `Codec::Null` variants, `decompress` will allocate a new `Vec` | ||||||
|
|
@@ -295,6 +355,36 @@ impl<'r, R: Read> Block<'r, R> { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl<R: Read + Seek> Block<'_, R> { | ||||||
| /// Seek the underlying stream to `offset` and read the block there. | ||||||
| /// Validates the sync marker to confirm it's a real block boundary. | ||||||
| /// Returns an error if no valid block can be read at the offset | ||||||
| /// (e.g., the offset is at or past EOF). | ||||||
| pub(super) fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> { | ||||||
| self.reader | ||||||
| .seek(SeekFrom::Start(offset)) | ||||||
| .map_err(Details::SeekToBlock)?; | ||||||
|
splix marked this conversation as resolved.
Outdated
|
||||||
|
|
||||||
| self.buf.clear(); | ||||||
| self.buf_idx = 0; | ||||||
| self.message_count = 0; | ||||||
| self.current_block_info = None; | ||||||
|
|
||||||
| // read_block_next treats UnexpectedEof as a clean end-of-stream | ||||||
| // (returns Ok with message_count=0). That's correct for forward | ||||||
| // iteration but wrong here — the caller asked for a specific block. | ||||||
| self.read_block_next()?; | ||||||
| if self.is_empty() { | ||||||
|
splix marked this conversation as resolved.
Outdated
|
||||||
| return Err(Details::SeekToBlock(std::io::Error::new( | ||||||
| std::io::ErrorKind::UnexpectedEof, | ||||||
| format!("no block at offset {offset}"), | ||||||
| )) | ||||||
| .into()); | ||||||
| } | ||||||
| Ok(()) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> { | ||||||
| let result = metadata | ||||||
| .get("avro.codec") | ||||||
|
|
||||||
Uh oh!
There was an error while loading. Please reload this page.