Skip to content
This repository was archived by the owner on Sep 5, 2025. It is now read-only.
Closed

V2 #37

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ bytes = ["dep:bytes"]
[dependencies]
bytes = { version = "1", optional = true }
byteorder = "1.5.0"
byteview = { version = "0.6.1" }
byteview = { version = "~0.7.0" }
interval-heap = "0.0.5"
log = "0.4.22"
path-absolutize = "3.1.1"
Expand Down
25 changes: 21 additions & 4 deletions benches/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use criterion::{criterion_group, criterion_main, Criterion};
use rand::{Rng, RngCore};
use std::{
collections::BTreeMap,
fs::File,
io::BufReader,
sync::{Arc, RwLock},
};
use value_log::{
BlobCache, Compressor, Config, IndexReader, IndexWriter, UserKey, UserValue, ValueHandle,
ValueLog, ValueLogId,
BlobCache, BlobFileId, Compressor, Config, FDCache, IndexReader, IndexWriter, UserKey,
UserValue, ValueHandle, ValueLog, ValueLogId,
};

type MockIndexInner = RwLock<BTreeMap<UserKey, (ValueHandle, u32)>>;
Expand Down Expand Up @@ -89,6 +91,13 @@ impl BlobCache for NoCacher {
fn insert(&self, _: ValueLogId, _: &ValueHandle, _: UserValue) {}
}

impl FDCache for NoCacher {
fn get(&self, _: ValueLogId, _: BlobFileId) -> Option<BufReader<File>> {
None
}
fn insert(&self, _: ValueLogId, _: BlobFileId, _: BufReader<File>) {}
}

fn prefetch(c: &mut Criterion) {
let mut group = c.benchmark_group("prefetch range");

Expand All @@ -101,7 +110,11 @@ fn prefetch(c: &mut Criterion) {
let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher)).unwrap();
let value_log = ValueLog::open(
vl_path,
Config::<_, _, NoCompressor>::new(NoCacher, NoCacher),
)
.unwrap();

let mut writer = value_log.get_writer().unwrap();

Expand Down Expand Up @@ -184,7 +197,11 @@ fn load_value(c: &mut Criterion) {
let folder = tempfile::tempdir().unwrap();
let vl_path = folder.path();

let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher)).unwrap();
let value_log = ValueLog::open(
vl_path,
Config::<_, _, NoCompressor>::new(NoCacher, NoCacher),
)
.unwrap();

let mut writer = value_log.get_writer().unwrap();

Expand Down
6 changes: 0 additions & 6 deletions renovate.json

This file was deleted.

12 changes: 8 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,29 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{blob_cache::BlobCache, compression::Compressor};
use crate::{blob_cache::BlobCache, compression::Compressor, fd_cache::BlobFileId, FDCache};

/// Value log configuration
pub struct Config<BC: BlobCache, C: Compressor + Clone> {
pub struct Config<BC: BlobCache, FDC: FDCache, C: Compressor + Clone> {
/// Target size of vLog segments
pub(crate) segment_size_bytes: u64,

/// Blob cache to use
pub(crate) blob_cache: BC,

/// File descriptor cache to use
pub(crate) fd_cache: FDC,

/// Compression to use
pub(crate) compression: Option<C>,
}

impl<BC: BlobCache, C: Compressor + Clone + Default> Config<BC, C> {
impl<BC: BlobCache, FDC: FDCache, C: Compressor + Clone + Default> Config<BC, FDC, C> {
/// Creates a new configuration builder.
pub fn new(blob_cache: BC) -> Self {
pub fn new(blob_cache: BC, fd_cache: FDC) -> Self {
Self {
blob_cache,
fd_cache,
compression: None,
segment_size_bytes: 128 * 1_024 * 1_024,
}
Expand Down
19 changes: 19 additions & 0 deletions src/fd_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::ValueLogId;
use std::{fs::File, io::BufReader};

/// The unique identifier for a value log blob file. Another name for SegmentId
pub type BlobFileId = u64;

/// File descriptor cache, to cache file descriptors after an fopen().
/// Reduces the number of fopen() needed when accessing the same blob file.
pub trait FDCache: Clone {
/// Caches a file descriptor
fn insert(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId, fd: BufReader<File>);

/// Retrieves a file descriptor from the cache, or `None` if it could not be found
fn get(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId) -> Option<BufReader<File>>;
}
18 changes: 11 additions & 7 deletions src/gc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

pub mod report;

use crate::{id::SegmentId, BlobCache, Compressor, ValueLog};
use crate::{id::SegmentId, BlobCache, Compressor, FDCache, ValueLog};

/// GC strategy
#[allow(clippy::module_name_repetitions)]
pub trait GcStrategy<BC: BlobCache, C: Compressor + Clone> {
pub trait GcStrategy<BC: BlobCache, FD: FDCache, C: Compressor + Clone> {
/// Picks segments based on a predicate.
fn pick(&self, value_log: &ValueLog<BC, C>) -> Vec<SegmentId>;
fn pick(&self, value_log: &ValueLog<BC, FD, C>) -> Vec<SegmentId>;
}

/// Picks segments that have a certain percentage of stale blobs
Expand All @@ -32,8 +32,10 @@ impl StaleThresholdStrategy {
}
}

impl<BC: BlobCache, C: Compressor + Clone> GcStrategy<BC, C> for StaleThresholdStrategy {
fn pick(&self, value_log: &ValueLog<BC, C>) -> Vec<SegmentId> {
impl<BC: BlobCache, FDC: FDCache, C: Compressor + Clone> GcStrategy<BC, FDC, C>
for StaleThresholdStrategy
{
fn pick(&self, value_log: &ValueLog<BC, FDC, C>) -> Vec<SegmentId> {
value_log
.manifest
.segments
Expand Down Expand Up @@ -62,9 +64,11 @@ impl SpaceAmpStrategy {
}
}

impl<BC: BlobCache, C: Compressor + Clone> GcStrategy<BC, C> for SpaceAmpStrategy {
impl<BC: BlobCache, FDC: FDCache, C: Compressor + Clone> GcStrategy<BC, FDC, C>
for SpaceAmpStrategy
{
#[allow(clippy::cast_precision_loss, clippy::significant_drop_tightening)]
fn pick(&self, value_log: &ValueLog<BC, C>) -> Vec<SegmentId> {
fn pick(&self, value_log: &ValueLog<BC, FDC, C>) -> Vec<SegmentId> {
let space_amp_target = self.0;
let current_space_amp = value_log.space_amp();

Expand Down
17 changes: 4 additions & 13 deletions src/key_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl KeyRange {

/// Returns `true` if the ranges overlap partially or fully.
#[must_use]
pub fn overlaps_with_bounds(&self, bounds: &(Bound<UserKey>, Bound<UserKey>)) -> bool {
pub fn overlaps_with_bounds(&self, bounds: &(Bound<&[u8]>, Bound<&[u8]>)) -> bool {
let (lo, hi) = bounds;
let (my_lo, my_hi) = self.as_tuple();

Expand Down Expand Up @@ -294,30 +294,21 @@ mod tests {
#[test]
fn inclusive() {
let key_range = KeyRange(UserKey::from("key1"), UserKey::from("key5"));
let bounds = (
Included(UserKey::from("key1")),
Included(UserKey::from("key5")),
);
let bounds = (Included(b"key1" as &[u8]), Included(b"key5" as &[u8]));
assert!(key_range.overlaps_with_bounds(&bounds));
}

#[test]
fn exclusive() {
let key_range = KeyRange(UserKey::from("key1"), UserKey::from("key5"));
let bounds = (
Excluded(UserKey::from("key0")),
Excluded(UserKey::from("key6")),
);
let bounds = (Excluded(b"key0" as &[u8]), Excluded(b"key6" as &[u8]));
assert!(key_range.overlaps_with_bounds(&bounds));
}

#[test]
fn no_overlap() {
let key_range = KeyRange(UserKey::from("key1"), UserKey::from("key5"));
let bounds = (
Excluded(UserKey::from("key5")),
Excluded(UserKey::from("key6")),
);
let bounds = (Excluded(b"key5" as &[u8]), Excluded(b"key6" as &[u8]));
assert!(!key_range.overlaps_with_bounds(&bounds));
}

Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#![cfg_attr(not(feature = "bytes"), forbid(unsafe_code))]

mod blob_cache;
mod fd_cache;

#[doc(hidden)]
pub mod coding;
Expand Down Expand Up @@ -82,6 +83,7 @@ pub use {
compression::Compressor,
config::Config,
error::{Error, Result},
fd_cache::{BlobFileId, FDCache},
gc::report::GcReport,
gc::{GcStrategy, SpaceAmpStrategy, StaleThresholdStrategy},
handle::ValueHandle,
Expand Down
4 changes: 4 additions & 0 deletions src/segment/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl<C: Compressor + Clone> Reader<C> {
self.compression = compressor;
self
}

pub(crate) fn into_inner(self) -> BufReader<File> {
self.inner
}
}

impl<C: Compressor + Clone> Iterator for Reader<C> {
Expand Down
20 changes: 17 additions & 3 deletions src/slice/slice_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ impl Slice {
Self(Bytes::from_static(&[]))
}

#[must_use]
#[doc(hidden)]
pub fn with_size(len: usize) -> Self {
let bytes = vec![0; len];
Self(Bytes::from(bytes))
}

#[must_use]
#[doc(hidden)]
pub fn with_size_unzeroed(len: usize) -> Self {
Self(Self::get_unzeroed_builder(len).freeze())
}

fn get_unzeroed_builder(len: usize) -> BytesMut {
// Use `with_capacity` & `set_len`` to avoid zeroing the buffer
let mut builder = BytesMut::with_capacity(len);
Expand Down Expand Up @@ -63,9 +76,10 @@ impl Slice {

#[must_use]
#[doc(hidden)]
pub fn with_size(len: usize) -> Self {
let bytes = vec![0; len];
Self(Bytes::from(bytes))
pub fn get_mut(&mut self) -> Option<impl std::ops::DerefMut<Target = [u8]> + '_> {
todo!();

Option::<&mut [u8]>::None
}

/// Constructs a [`Slice`] from an I/O reader by pulling in `len` bytes.
Expand Down
17 changes: 14 additions & 3 deletions src/slice/slice_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ impl Slice {
Self(ByteView::new(&[]))
}

#[must_use]
#[doc(hidden)]
pub fn with_size(len: usize) -> Self {
Self(ByteView::with_size(len))
}

#[must_use]
#[doc(hidden)]
pub fn with_size_unzeroed(len: usize) -> Self {
Self(ByteView::with_size_unzeroed(len))
}

#[doc(hidden)]
#[must_use]
pub fn slice(&self, range: impl std::ops::RangeBounds<usize>) -> Self {
Expand All @@ -35,11 +47,10 @@ impl Slice {
Self(ByteView::fused(left, right))
}

// TODO: change to unzeroed and provide a _zeroed method instead
#[must_use]
#[doc(hidden)]
pub fn with_size(len: usize) -> Self {
Self(ByteView::with_size(len))
pub fn get_mut(&mut self) -> Option<impl std::ops::DerefMut<Target = [u8]> + '_> {
self.0.get_mut()
}

/// Constructs a [`Slice`] from an I/O reader by pulling in `len` bytes.
Expand Down
Loading
Loading