Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions avro/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@ pub enum Details {
#[error("Failed to read block marker bytes: {0}")]
ReadBlockMarker(#[source] std::io::Error),

#[error("Failed to seek to block: {0}")]
SeekToBlock(#[source] std::io::Error),

#[error("Read into buffer failed: {0}")]
ReadIntoBuf(#[source] std::io::Error),

Expand Down
2 changes: 1 addition & 1 deletion avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub use error::Error;
reason = "Still need to export it until we remove it completely"
)]
pub use reader::{
Reader,
BlockPosition, Reader,
datum::{from_avro_datum, from_avro_datum_reader_schemata, from_avro_datum_schemata},
read_marker,
single_object::{GenericSingleObjectReader, SpecificSingleObjectReader},
Expand Down
170 changes: 163 additions & 7 deletions avro/src/reader/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use log::warn;
use serde::de::DeserializeOwned;
use serde_json::from_slice;
use std::io::IoSliceMut;
use std::{
collections::HashMap,
io::{ErrorKind, Read},
io::{ErrorKind, Read, Seek, SeekFrom},
str::FromStr,
};

use log::warn;
use serde::de::DeserializeOwned;
use serde_json::from_slice;

use crate::{
AvroResult, Codec, Error,
decode::{decode, decode_internal},
Expand All @@ -35,10 +35,118 @@ use crate::{
util,
};

/// Byte offset and record count of a single Avro data block.
///
/// Captured automatically as blocks are read during forward iteration.
/// Use with [`super::Reader::seek_to_block`] to jump back to a previously-read block.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BlockPosition {
// Byte offset in the stream where this block starts (before the object-count varint).
offset: u64,
// Total number of records in this block.
message_count: usize,
}

impl BlockPosition {
/// Byte offset in the stream where this block starts (before the object-count varint).
pub fn offset(&self) -> u64 {
self.offset
}

/// Total number of records in this block.
pub fn message_count(&self) -> usize {
self.message_count
}
}

/// Optional byte-offset tracker over an inner reader.
///
/// `Direct` is a transparent pass-through so the inner type's optimized
/// `read_exact`/`read_to_end`/`read_vectored` are reached unchanged.
///
/// `Tracking` mirrors those same delegations and accumulates bytes consumed,
/// exposing the offset via [`Self::position`]. Tracking is opt-in.
#[derive(Debug, Clone)]
pub(super) enum PositionTracker<R> {
Direct(R),
Tracking { inner: R, pos: u64 },
}

impl<R> PositionTracker<R> {
/// Byte offset consumed so far, or `None` when tracking is disabled.
pub(super) fn position(&self) -> Option<u64> {
match self {
Self::Direct(_) => None,
Self::Tracking { pos, .. } => Some(*pos),
}
}
}

impl<R: Read> Read for PositionTracker<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self {
Self::Direct(r) => r.read(buf),
Self::Tracking { inner, pos } => {
let n = inner.read(buf)?;
*pos += n as u64;
Ok(n)
}
}
}

fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> std::io::Result<usize> {
match self {
Self::Direct(r) => r.read_vectored(bufs),
Self::Tracking { inner, pos } => {
let n = inner.read_vectored(bufs)?;
*pos += n as u64;
Ok(n)
}
}
}

fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
match self {
Self::Direct(r) => r.read_to_end(buf),
Self::Tracking { inner, pos } => {
let n = inner.read_to_end(buf)?;
*pos += n as u64;
Ok(n)
}
}
}

fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
match self {
Self::Direct(r) => r.read_exact(buf),
Self::Tracking { inner, pos } => {
inner.read_exact(buf)?;
// `read_exact` either fills `buf` completely or errors, so on
// success exactly `buf.len()` bytes were consumed.
*pos += buf.len() as u64;
Ok(())
}
}
}
Comment thread
splix marked this conversation as resolved.
}

impl<R: Seek> Seek for PositionTracker<R> {
fn seek(&mut self, from: SeekFrom) -> std::io::Result<u64> {
match self {
Self::Direct(r) => r.seek(from),
Self::Tracking { inner, pos } => {
let new_pos = inner.seek(from)?;
*pos = new_pos;
Ok(new_pos)
}
}
}
}

/// Internal Block reader.
#[derive(Debug, Clone)]
pub(super) struct Block<'r, R> {
reader: R,
reader: PositionTracker<R>,
/// Internal buffering to reduce allocation.
buf: Vec<u8>,
buf_idx: usize,
Expand All @@ -51,14 +159,27 @@ pub(super) struct Block<'r, R> {
pub(super) user_metadata: HashMap<String, Vec<u8>>,
names_refs: Names,
human_readable: bool,
/// Byte offset where data blocks begin (right after header and sync marker).
pub(super) data_start: Option<u64>,
/// Position and record count of the currently loaded block.
pub(super) current_block_info: Option<BlockPosition>,
}

impl<'r, R: Read> Block<'r, R> {
pub(super) fn new(
reader: R,
schemata: Vec<&'r Schema>,
human_readable: bool,
track_positions: bool,
) -> AvroResult<Block<'r, R>> {
let reader = if track_positions {
PositionTracker::Tracking {
inner: reader,
pos: 0,
}
} else {
PositionTracker::Direct(reader)
};
let mut block = Block {
reader,
codec: Codec::Null,
Expand All @@ -71,9 +192,12 @@ impl<'r, R: Read> Block<'r, R> {
user_metadata: Default::default(),
names_refs: Default::default(),
human_readable,
data_start: None,
current_block_info: None,
};

block.read_header()?;
block.data_start = block.reader.position();
Ok(block)
}

Expand Down Expand Up @@ -142,6 +266,7 @@ impl<'r, R: Read> Block<'r, R> {
/// the block. The objects are stored in an internal buffer to the `Reader`.
fn read_block_next(&mut self) -> AvroResult<()> {
assert!(self.is_empty(), "Expected self to be empty!");
let block_start = self.reader.position();
match util::read_long(&mut self.reader).map_err(Error::into_details) {
Ok(block_len) => {
self.message_count = block_len as usize;
Expand All @@ -162,7 +287,18 @@ impl<'r, R: Read> Block<'r, R> {
// and replace `buf` with the new one, instead of reusing the same buffer.
// We can address this by using some "limited read" type to decode directly
// into the buffer. But this is fine, for now.
self.codec.decompress(&mut self.buf)
let next = self.codec.decompress(&mut self.buf);

// Make sure the position points only to a valid block
self.current_block_info = match next {
Ok(_) => block_start.map(|offset| BlockPosition {
offset,
message_count: block_len as usize,
}),
Err(_) => None,
};

next
}
Err(Details::ReadVariableIntegerBytes(io_err)) => {
if let ErrorKind::UnexpectedEof = io_err.kind() {
Expand Down Expand Up @@ -295,6 +431,26 @@ impl<'r, R: Read> Block<'r, R> {
}
}

impl<R: Read + Seek> Block<'_, R> {
/// Seek the underlying stream to `offset` and read the block there.
/// Validates the sync marker to confirm it's a real block boundary.
/// Returns an error if no valid block can be read at the offset
/// (e.g., the offset is at or past EOF).
pub(super) fn seek_to_block(&mut self, offset: u64) -> AvroResult<()> {
self.buf.clear();
self.buf_idx = 0;
self.message_count = 0;
self.current_block_info = None;

self.reader
.seek(SeekFrom::Start(offset))
.map_err(Details::SeekToBlock)?;

self.read_block_next()?;
Ok(())
}
}

fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
let result = metadata
.get("avro.codec")
Expand Down
Loading