From 6da11142d4f429f37da3f9911680db2d537ce34c Mon Sep 17 00:00:00 2001 From: Jing Yang Date: Thu, 3 Apr 2025 21:21:00 +0800 Subject: [PATCH 1/7] feat: add FDCache trait --- benches/value_log.rs | 25 +++++++++++++++++---- src/config.rs | 12 ++++++---- src/fd_cache.rs | 19 ++++++++++++++++ src/gc/mod.rs | 18 +++++++++------ src/lib.rs | 2 ++ src/value_log.rs | 35 ++++++++++++++++++++--------- tests/accidental_drop_rc.rs | 5 ++++- tests/basic_gc.rs | 5 ++++- tests/basic_kv.rs | 10 +++++++-- tests/common.rs | 12 +++++++++- tests/compression.rs | 5 ++++- tests/gc_space_amp.rs | 5 ++++- tests/recovery.rs | 20 +++++++++++++---- tests/recovery_fail.rs | 10 +++++++-- tests/recovery_mac_ds_store.rs | 10 +++++++-- tests/rollover_index_fail_finish.rs | 5 ++++- tests/space_amp.rs | 10 +++++++-- tests/vlog_clear.rs | 10 +++++++-- tests/vlog_v1_load_fixture.rs | 4 ++-- 19 files changed, 174 insertions(+), 48 deletions(-) create mode 100644 src/fd_cache.rs diff --git a/benches/value_log.rs b/benches/value_log.rs index 9826ade..4c27f0d 100644 --- a/benches/value_log.rs +++ b/benches/value_log.rs @@ -2,11 +2,13 @@ use criterion::{criterion_group, criterion_main, Criterion}; use rand::{Rng, RngCore}; use std::{ collections::BTreeMap, + fs::File, + io::BufReader, sync::{Arc, RwLock}, }; use value_log::{ - BlobCache, Compressor, Config, IndexReader, IndexWriter, UserKey, UserValue, ValueHandle, - ValueLog, ValueLogId, + BlobCache, BlobFileId, Compressor, Config, FDCache, IndexReader, IndexWriter, UserKey, + UserValue, ValueHandle, ValueLog, ValueLogId, }; type MockIndexInner = RwLock>; @@ -89,6 +91,13 @@ impl BlobCache for NoCacher { fn insert(&self, _: ValueLogId, _: &ValueHandle, _: UserValue) {} } +impl FDCache for NoCacher { + fn get(&self, _: ValueLogId, _: BlobFileId) -> Option> { + None + } + fn insert(&self, _: ValueLogId, _: BlobFileId, _: BufReader) {} +} + fn prefetch(c: &mut Criterion) { let mut group = c.benchmark_group("prefetch range"); @@ -101,7 +110,11 @@ fn prefetch(c: &mut Criterion) { let folder = tempfile::tempdir().unwrap(); let vl_path = folder.path(); - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher)).unwrap(); + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + ) + .unwrap(); let mut writer = value_log.get_writer().unwrap(); @@ -184,7 +197,11 @@ fn load_value(c: &mut Criterion) { let folder = tempfile::tempdir().unwrap(); let vl_path = folder.path(); - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher)).unwrap(); + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + ) + .unwrap(); let mut writer = value_log.get_writer().unwrap(); diff --git a/src/config.rs b/src/config.rs index fdc5e9e..d384af0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,25 +2,29 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use crate::{blob_cache::BlobCache, compression::Compressor}; +use crate::{blob_cache::BlobCache, compression::Compressor, fd_cache::BlobFileId, FDCache}; /// Value log configuration -pub struct Config { +pub struct Config { /// Target size of vLog segments pub(crate) segment_size_bytes: u64, /// Blob cache to use pub(crate) blob_cache: BC, + /// File descriptor cache to use + pub(crate) fd_cache: FDC, + /// Compression to use pub(crate) compression: C, } -impl Config { +impl Config { /// Creates a new configuration builder. - pub fn new(blob_cache: BC) -> Self { + pub fn new(blob_cache: BC, fd_cache: FDC) -> Self { Self { blob_cache, + fd_cache, compression: Default::default(), segment_size_bytes: 128 * 1_024 * 1_024, } diff --git a/src/fd_cache.rs b/src/fd_cache.rs new file mode 100644 index 0000000..80857dd --- /dev/null +++ b/src/fd_cache.rs @@ -0,0 +1,19 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use crate::ValueLogId; +use std::{fs::File, io::BufReader}; + +/// The unique identifier for a value log blob file. Another name for SegmentId +pub type BlobFileId = u64; + +/// File descriptor cache, to cache file descriptors after an open() syscall. +/// Reduces the number of open() syscalls needed accessing the same blob file. +pub trait FDCache: Clone { + /// Caches a file descriptor + fn insert(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId, fd: BufReader); + + /// Retrieves a file descriptor from the cache, or `None` if it could not be found + fn get(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId) -> Option>; +} diff --git a/src/gc/mod.rs b/src/gc/mod.rs index 054fff4..57deeb4 100644 --- a/src/gc/mod.rs +++ b/src/gc/mod.rs @@ -4,13 +4,13 @@ pub mod report; -use crate::{id::SegmentId, BlobCache, Compressor, ValueLog}; +use crate::{id::SegmentId, BlobCache, Compressor, FDCache, ValueLog}; /// GC strategy #[allow(clippy::module_name_repetitions)] -pub trait GcStrategy { +pub trait GcStrategy { /// Picks segments based on a predicate. - fn pick(&self, value_log: &ValueLog) -> Vec; + fn pick(&self, value_log: &ValueLog) -> Vec; } /// Picks segments that have a certain percentage of stale blobs @@ -32,8 +32,10 @@ impl StaleThresholdStrategy { } } -impl GcStrategy for StaleThresholdStrategy { - fn pick(&self, value_log: &ValueLog) -> Vec { +impl GcStrategy + for StaleThresholdStrategy +{ + fn pick(&self, value_log: &ValueLog) -> Vec { value_log .manifest .segments @@ -62,9 +64,11 @@ impl SpaceAmpStrategy { } } -impl GcStrategy for SpaceAmpStrategy { +impl GcStrategy + for SpaceAmpStrategy +{ #[allow(clippy::cast_precision_loss, clippy::significant_drop_tightening)] - fn pick(&self, value_log: &ValueLog) -> Vec { + fn pick(&self, value_log: &ValueLog) -> Vec { let space_amp_target = self.0; let current_space_amp = value_log.space_amp(); diff --git a/src/lib.rs b/src/lib.rs index 2365424..b7eb44e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,6 +51,7 @@ #![cfg_attr(not(feature = "bytes"), forbid(unsafe_code))] mod blob_cache; +mod fd_cache; #[doc(hidden)] pub mod coding; @@ -82,6 +83,7 @@ pub use { compression::Compressor, config::Config, error::{Error, Result}, + fd_cache::{BlobFileId, FDCache}, gc::report::GcReport, gc::{GcStrategy, SpaceAmpStrategy, StaleThresholdStrategy}, handle::ValueHandle, diff --git a/src/value_log.rs b/src/value_log.rs index caffbe3..5b7cc5a 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -11,7 +11,7 @@ use crate::{ scanner::{Scanner, SizeMap}, segment::merge::MergeReader, version::Version, - BlobCache, Compressor, Config, GcStrategy, IndexReader, SegmentReader, SegmentWriter, + BlobCache, Compressor, Config, FDCache, GcStrategy, IndexReader, SegmentReader, SegmentWriter, UserValue, ValueHandle, }; use std::{ @@ -43,10 +43,12 @@ fn unlink_blob_files(base_path: &Path, ids: &[SegmentId]) { /// A disk-resident value log #[derive(Clone)] -pub struct ValueLog(Arc>); +pub struct ValueLog( + Arc>, +); -impl std::ops::Deref for ValueLog { - type Target = ValueLogInner; +impl std::ops::Deref for ValueLog { + type Target = ValueLogInner; fn deref(&self) -> &Self::Target { &self.0 @@ -54,7 +56,7 @@ impl std::ops::Deref for ValueLog { } #[allow(clippy::module_name_repetitions)] -pub struct ValueLogInner { +pub struct ValueLogInner { /// Unique value log ID id: u64, @@ -62,11 +64,14 @@ pub struct ValueLogInner { pub path: PathBuf, /// Value log configuration - config: Config, + config: Config, /// In-memory blob cache blob_cache: BC, + /// In-memory FD cache + fd_cache: FDC, + /// Segment manifest #[doc(hidden)] pub manifest: SegmentManifest, @@ -80,7 +85,7 @@ pub struct ValueLogInner { pub rollover_guard: Mutex<()>, } -impl ValueLog { +impl ValueLog { /// Creates or recovers a value log in the given directory. /// /// # Errors @@ -88,7 +93,7 @@ impl ValueLog { /// Will return `Err` if an IO error occurs. pub fn open>( path: P, // TODO: move path into config? - config: Config, + config: Config, ) -> crate::Result { let path = path.into(); @@ -143,7 +148,7 @@ impl ValueLog { /// Creates a new empty value log in a directory. pub(crate) fn create_new>( path: P, - config: Config, + config: Config, ) -> crate::Result { let path = absolute_path(path.into()); log::trace!("Creating value-log at {}", path.display()); @@ -174,6 +179,7 @@ impl ValueLog { } let blob_cache = config.blob_cache.clone(); + let fd_cache = config.fd_cache.clone(); let manifest = SegmentManifest::create_new(&path)?; Ok(Self(Arc::new(ValueLogInner { @@ -181,13 +187,17 @@ impl ValueLog { config, path, blob_cache, + fd_cache, manifest, id_generator: IdGenerator::default(), rollover_guard: Mutex::new(()), }))) } - pub(crate) fn recover>(path: P, config: Config) -> crate::Result { + pub(crate) fn recover>( + path: P, + config: Config, + ) -> crate::Result { let path = path.into(); log::info!("Recovering vLog at {}", path.display()); @@ -204,6 +214,7 @@ impl ValueLog { } let blob_cache = config.blob_cache.clone(); + let fd_cache = config.fd_cache.clone(); let manifest = SegmentManifest::recover(&path)?; let highest_id = manifest @@ -220,6 +231,7 @@ impl ValueLog { config, path, blob_cache, + fd_cache, manifest, id_generator: IdGenerator::new(highest_id + 1), rollover_guard: Mutex::new(()), @@ -270,6 +282,7 @@ impl ValueLog { return Ok(None); }; + // TODO: this is the part with the repeated fopen() to the same file let mut reader = BufReader::new(File::open(&segment.path)?); reader.seek(std::io::SeekFrom::Start(vhandle.offset))?; let mut reader = SegmentReader::with_reader(vhandle.segment_id, reader) @@ -499,7 +512,7 @@ impl ValueLog { /// Will return `Err` if an IO error occurs. pub fn apply_gc_strategy( &self, - strategy: &impl GcStrategy, + strategy: &impl GcStrategy, index_reader: &R, index_writer: W, ) -> crate::Result { diff --git a/tests/accidental_drop_rc.rs b/tests/accidental_drop_rc.rs index 8154ebe..4822f53 100644 --- a/tests/accidental_drop_rc.rs +++ b/tests/accidental_drop_rc.rs @@ -17,7 +17,10 @@ fn accidental_drop_rc() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; for key in ["a", "b"] { let value = &key; diff --git a/tests/basic_gc.rs b/tests/basic_gc.rs index aed495d..42bf976 100644 --- a/tests/basic_gc.rs +++ b/tests/basic_gc.rs @@ -11,7 +11,10 @@ fn basic_gc() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; { let items = ["a", "b", "c", "d", "e"]; diff --git a/tests/basic_kv.rs b/tests/basic_kv.rs index 61071b6..9ddb9e3 100644 --- a/tests/basic_kv.rs +++ b/tests/basic_kv.rs @@ -14,7 +14,10 @@ fn basic_kv() -> value_log::Result<()> { let items = ["a", "b", "c", "d", "e"]; { - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; { let mut index_writer = MockIndexWriter(index.clone()); @@ -37,7 +40,10 @@ fn basic_kv() -> value_log::Result<()> { } { - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; assert_eq!(1, value_log.segment_count()); diff --git a/tests/common.rs b/tests/common.rs index 200081e..73c2112 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -4,10 +4,13 @@ use std::{ collections::BTreeMap, + fs::File, + io::BufReader, sync::{Arc, RwLock}, }; use value_log::{ - BlobCache, Compressor, IndexReader, IndexWriter, UserKey, UserValue, ValueHandle, ValueLogId, + BlobCache, BlobFileId, Compressor, FDCache, IndexReader, IndexWriter, UserKey, UserValue, + ValueHandle, ValueLogId, }; type MockIndexInner = RwLock>; @@ -90,3 +93,10 @@ impl BlobCache for NoCacher { fn insert(&self, _: ValueLogId, _: &ValueHandle, _: UserValue) {} } + +impl FDCache for NoCacher { + fn get(&self, _: ValueLogId, _: BlobFileId) -> Option> { + None + } + fn insert(&self, _: ValueLogId, _: BlobFileId, _: BufReader) {} +} diff --git a/tests/compression.rs b/tests/compression.rs index 274052f..e419617 100644 --- a/tests/compression.rs +++ b/tests/compression.rs @@ -23,7 +23,10 @@ fn compression() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::<_, Lz4Compressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, Lz4Compressor>::new(NoCacher, NoCacher), + )?; let mut index_writer = MockIndexWriter(index.clone()); let mut writer = value_log.get_writer()?; diff --git a/tests/gc_space_amp.rs b/tests/gc_space_amp.rs index 729c726..a38827e 100644 --- a/tests/gc_space_amp.rs +++ b/tests/gc_space_amp.rs @@ -11,7 +11,10 @@ fn gc_space_amp_target_1() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; assert_eq!(0.0, value_log.space_amp()); assert_eq!(0.0, value_log.manifest.stale_ratio()); diff --git a/tests/recovery.rs b/tests/recovery.rs index 6f436a8..5399ff8 100644 --- a/tests/recovery.rs +++ b/tests/recovery.rs @@ -14,7 +14,10 @@ fn basic_recovery() -> value_log::Result<()> { let items = ["a", "b", "c", "d", "e"]; { - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; { let mut index_writer = MockIndexWriter(index.clone()); @@ -51,7 +54,10 @@ fn basic_recovery() -> value_log::Result<()> { } { - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; value_log.scan_for_stats(index.read().unwrap().values().cloned().map(Ok))?; @@ -79,7 +85,10 @@ fn recovery_delete_unfinished() -> value_log::Result<()> { let vl_path = folder.path(); { - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; let mut writer = value_log.get_writer()?; writer.write("a", "a")?; @@ -93,7 +102,10 @@ fn recovery_delete_unfinished() -> value_log::Result<()> { } { - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; assert_eq!(1, value_log.segment_count()); } diff --git a/tests/recovery_fail.rs b/tests/recovery_fail.rs index 6a4e5d6..e0062c7 100644 --- a/tests/recovery_fail.rs +++ b/tests/recovery_fail.rs @@ -14,7 +14,10 @@ fn recovery_fail() -> value_log::Result<()> { let items = ["a", "b", "c", "d", "e"]; { - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; for _ in 0..2 { let mut index_writer = MockIndexWriter(index.clone()); @@ -45,7 +48,10 @@ fn recovery_fail() -> value_log::Result<()> { { matches!( - ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher)), + ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher) + ), Err(value_log::Error::Unrecoverable), ); } diff --git a/tests/recovery_mac_ds_store.rs b/tests/recovery_mac_ds_store.rs index 1f2e220..9f06d3f 100644 --- a/tests/recovery_mac_ds_store.rs +++ b/tests/recovery_mac_ds_store.rs @@ -10,7 +10,10 @@ fn recovery_mac_ds_store() -> value_log::Result<()> { let vl_path = folder.path(); { - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; let mut writer = value_log.get_writer()?; writer.write("a", "a")?; @@ -22,7 +25,10 @@ fn recovery_mac_ds_store() -> value_log::Result<()> { assert!(ds_store.try_exists()?); { - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; assert_eq!(1, value_log.segment_count()); } assert!(ds_store.try_exists()?); diff --git a/tests/rollover_index_fail_finish.rs b/tests/rollover_index_fail_finish.rs index 1f1bcfa..915c70a 100644 --- a/tests/rollover_index_fail_finish.rs +++ b/tests/rollover_index_fail_finish.rs @@ -24,7 +24,10 @@ fn rollover_index_fail_finish() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; let items = ["a", "b", "c", "d", "e"]; diff --git a/tests/space_amp.rs b/tests/space_amp.rs index bc7e602..3c80162 100644 --- a/tests/space_amp.rs +++ b/tests/space_amp.rs @@ -11,7 +11,10 @@ fn worst_case_space_amp() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; assert_eq!(0.0, value_log.space_amp()); assert_eq!(0.0, value_log.manifest.stale_ratio()); @@ -51,7 +54,10 @@ fn no_overlap_space_amp() -> value_log::Result<()> { let index = MockIndex::default(); - let value_log = ValueLog::open(vl_path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, NoCacher), + )?; assert_eq!(0.0, value_log.manifest.stale_ratio()); assert_eq!(0.0, value_log.space_amp()); diff --git a/tests/vlog_clear.rs b/tests/vlog_clear.rs index b353d82..68ed1d6 100644 --- a/tests/vlog_clear.rs +++ b/tests/vlog_clear.rs @@ -27,7 +27,10 @@ fn vlog_clear() -> value_log::Result<()> { let items = ["a", "b", "c", "d", "e"]; { - let value_log = ValueLog::open(vl_path, Config::::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::::new(NoCacher, NoCacher), + )?; for _ in 0..5 { let mut index_writer = MockIndexWriter(index.clone()); @@ -58,7 +61,10 @@ fn vlog_clear() -> value_log::Result<()> { } { - let value_log = ValueLog::open(vl_path, Config::::new(NoCacher))?; + let value_log = ValueLog::open( + vl_path, + Config::::new(NoCacher, NoCacher), + )?; value_log.scan_for_stats(index.read().unwrap().values().cloned().map(Ok))?; diff --git a/tests/vlog_v1_load_fixture.rs b/tests/vlog_v1_load_fixture.rs index 1491c67..2707ebb 100644 --- a/tests/vlog_v1_load_fixture.rs +++ b/tests/vlog_v1_load_fixture.rs @@ -8,7 +8,7 @@ use value_log::{Config, ValueLog}; fn vlog_load_v1() -> value_log::Result<()> { let path = std::path::Path::new("test_fixture/v1_vlog"); - let value_log = ValueLog::open(path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open(path, Config::<_, _, NoCompressor>::new(NoCacher, NoCacher))?; let count = { let mut count = 0; @@ -32,7 +32,7 @@ fn vlog_load_v1() -> value_log::Result<()> { fn vlog_load_v1_corrupt() -> value_log::Result<()> { let path = std::path::Path::new("test_fixture/v1_vlog_corrupt"); - let value_log = ValueLog::open(path, Config::<_, NoCompressor>::new(NoCacher))?; + let value_log = ValueLog::open(path, Config::<_, _, NoCompressor>::new(NoCacher, NoCacher))?; assert_eq!(2, value_log.verify()?); From f009d6978864ae1ecbf2d5142144f421abbb8c93 Mon Sep 17 00:00:00 2001 From: Jing Yang Date: Fri, 4 Apr 2025 22:58:25 +0800 Subject: [PATCH 2/7] feat: cache fd --- benches/value_log.rs | 4 ++-- src/fd_cache.rs | 8 ++++---- src/value_log.rs | 16 ++++++++++++++-- tests/common.rs | 4 ++-- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/benches/value_log.rs b/benches/value_log.rs index 4c27f0d..f777bca 100644 --- a/benches/value_log.rs +++ b/benches/value_log.rs @@ -92,10 +92,10 @@ impl BlobCache for NoCacher { } impl FDCache for NoCacher { - fn get(&self, _: ValueLogId, _: BlobFileId) -> Option> { + fn get(&self, _: ValueLogId, _: BlobFileId) -> Option { None } - fn insert(&self, _: ValueLogId, _: BlobFileId, _: BufReader) {} + fn insert(&self, _: ValueLogId, _: BlobFileId, _: File) {} } fn prefetch(c: &mut Criterion) { diff --git a/src/fd_cache.rs b/src/fd_cache.rs index 80857dd..dfd6aa2 100644 --- a/src/fd_cache.rs +++ b/src/fd_cache.rs @@ -8,12 +8,12 @@ use std::{fs::File, io::BufReader}; /// The unique identifier for a value log blob file. Another name for SegmentId pub type BlobFileId = u64; -/// File descriptor cache, to cache file descriptors after an open() syscall. -/// Reduces the number of open() syscalls needed accessing the same blob file. +/// File descriptor cache, to cache file descriptors after an fopen(). +/// Reduces the number of fopen() needed when accessing the same blob file. pub trait FDCache: Clone { /// Caches a file descriptor - fn insert(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId, fd: BufReader); + fn insert(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId, fd: File); /// Retrieves a file descriptor from the cache, or `None` if it could not be found - fn get(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId) -> Option>; + fn get(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId) -> Option; } diff --git a/src/value_log.rs b/src/value_log.rs index 5b7cc5a..e52b427 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -282,8 +282,17 @@ impl ValueLog { return Ok(None); }; - // TODO: this is the part with the repeated fopen() to the same file - let mut reader = BufReader::new(File::open(&segment.path)?); + let fd = match self.fd_cache.get(self.id, vhandle.segment_id) { + Some(fd) => fd, + None => { + let fd = File::open(&segment.path)?; + let fd_clone = fd.try_clone()?; + self.fd_cache.insert(self.id, vhandle.segment_id, fd_clone); + fd + } + }; + + let mut reader = BufReader::new(fd); reader.seek(std::io::SeekFrom::Start(vhandle.offset))?; let mut reader = SegmentReader::with_reader(vhandle.segment_id, reader) .use_compression(self.config.compression.clone()); @@ -315,6 +324,9 @@ impl ValueLog { self.blob_cache.insert(self.id, &value_handle, val); } + // cache the fd for future use, must ensure to always use SeekFrom::Start + // self.fd_cache.insert(self.id, vhandle.segment_id, reader.inner); + Ok(Some(val)) } diff --git a/tests/common.rs b/tests/common.rs index 73c2112..438118e 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -95,8 +95,8 @@ impl BlobCache for NoCacher { } impl FDCache for NoCacher { - fn get(&self, _: ValueLogId, _: BlobFileId) -> Option> { + fn get(&self, _: ValueLogId, _: BlobFileId) -> Option { None } - fn insert(&self, _: ValueLogId, _: BlobFileId, _: BufReader) {} + fn insert(&self, _: ValueLogId, _: BlobFileId, _: File) {} } From e0032291f3885bb2603d711d4ee44dc74212de06 Mon Sep 17 00:00:00 2001 From: Jing Yang Date: Mon, 7 Apr 2025 22:10:48 +0800 Subject: [PATCH 3/7] fix: cache BufReader instead --- benches/value_log.rs | 4 ++-- src/fd_cache.rs | 4 ++-- src/segment/reader.rs | 4 ++++ src/value_log.rs | 15 +++++---------- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/benches/value_log.rs b/benches/value_log.rs index f777bca..95daa5f 100644 --- a/benches/value_log.rs +++ b/benches/value_log.rs @@ -92,10 +92,10 @@ impl BlobCache for NoCacher { } impl FDCache for NoCacher { - fn get(&self, _: ValueLogId, _: BlobFileId) -> Option { + fn get(&self, _: ValueLogId, _: BlobFileId) -> Option> { None } - fn insert(&self, _: ValueLogId, _: BlobFileId, _: File) {} + fn insert(&self, _: ValueLogId, _: BlobFileId, _: BufReader) {} } fn prefetch(c: &mut Criterion) { diff --git a/src/fd_cache.rs b/src/fd_cache.rs index dfd6aa2..64b1f70 100644 --- a/src/fd_cache.rs +++ b/src/fd_cache.rs @@ -12,8 +12,8 @@ pub type BlobFileId = u64; /// Reduces the number of fopen() needed when accessing the same blob file. pub trait FDCache: Clone { /// Caches a file descriptor - fn insert(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId, fd: File); + fn insert(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId, fd: BufReader); /// Retrieves a file descriptor from the cache, or `None` if it could not be found - fn get(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId) -> Option; + fn get(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId) -> Option>; } diff --git a/src/segment/reader.rs b/src/segment/reader.rs index 1c11063..5244cf1 100644 --- a/src/segment/reader.rs +++ b/src/segment/reader.rs @@ -59,6 +59,10 @@ impl Reader { self.compression = Some(compressor); self } + + pub(crate) fn into_inner(self) -> BufReader { + self.inner + } } impl Iterator for Reader { diff --git a/src/value_log.rs b/src/value_log.rs index e52b427..b3e3aa9 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -282,17 +282,11 @@ impl ValueLog { return Ok(None); }; - let fd = match self.fd_cache.get(self.id, vhandle.segment_id) { + let mut reader = match self.fd_cache.get(self.id, vhandle.segment_id) { Some(fd) => fd, - None => { - let fd = File::open(&segment.path)?; - let fd_clone = fd.try_clone()?; - self.fd_cache.insert(self.id, vhandle.segment_id, fd_clone); - fd - } + None => BufReader::new(File::open(&segment.path)?), }; - let mut reader = BufReader::new(fd); reader.seek(std::io::SeekFrom::Start(vhandle.offset))?; let mut reader = SegmentReader::with_reader(vhandle.segment_id, reader) .use_compression(self.config.compression.clone()); @@ -324,8 +318,9 @@ impl ValueLog { self.blob_cache.insert(self.id, &value_handle, val); } - // cache the fd for future use, must ensure to always use SeekFrom::Start - // self.fd_cache.insert(self.id, vhandle.segment_id, reader.inner); + // cache the BufReader for future use, must ensure to always use SeekFrom::Start when using it from cache + self.fd_cache + .insert(self.id, vhandle.segment_id, reader.into_inner()); Ok(Some(val)) } From 29f00a2b9f853b0f6d2ca0d1b7006adb8e34a8cc Mon Sep 17 00:00:00 2001 From: Jing Yang Date: Mon, 7 Apr 2025 22:20:51 +0800 Subject: [PATCH 4/7] fix: fix NoCacher to use BufReader --- tests/common.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/common.rs b/tests/common.rs index 438118e..f758069 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -95,8 +95,8 @@ impl BlobCache for NoCacher { } impl FDCache for NoCacher { - fn get(&self, _: ValueLogId, _: BlobFileId) -> Option { + fn get(&self, _: ValueLogId, _: BlobFileId) -> Option> { None } - fn insert(&self, _: ValueLogId, _: BlobFileId, _: File) {} + fn insert(&self, _: ValueLogId, _: BlobFileId, _: BufReader) {} } From a917d1219d337e0cfb60a34b46cc5aaa4a6cdb39 Mon Sep 17 00:00:00 2001 From: Jing Yang Date: Sat, 12 Apr 2025 10:28:16 +0800 Subject: [PATCH 5/7] feat: add test --- tests/basic_kv.rs | 43 ++++++++++++++++++++++++++++++++++++++++++- tests/common.rs | 37 +++++++++++++++++++++++++++++++++++-- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/tests/basic_kv.rs b/tests/basic_kv.rs index 9ddb9e3..fed48d4 100644 --- a/tests/basic_kv.rs +++ b/tests/basic_kv.rs @@ -1,6 +1,6 @@ mod common; -use common::{MockIndex, MockIndexWriter, NoCacher, NoCompressor}; +use common::{InMemCacher, MockIndex, MockIndexWriter, NoCacher, NoCompressor}; use test_log::test; use value_log::{Config, IndexWriter, KeyRange, Slice, ValueLog}; @@ -72,3 +72,44 @@ fn basic_kv() -> value_log::Result<()> { Ok(()) } + +#[test] +fn get_with_cached_fd() -> value_log::Result<()> { + let folder = tempfile::tempdir()?; + let vl_path = folder.path(); + let items = ["a", "b", "c", "d", "e"]; + let index = MockIndex::default(); + + let fd_cache = InMemCacher::default(); + let value_log = ValueLog::open( + vl_path, + Config::<_, _, NoCompressor>::new(NoCacher, fd_cache.clone()), + )?; + + let mut index_writer = MockIndexWriter(index.clone()); + let mut writer = value_log.get_writer()?; + + for key in items { + let value = key.repeat(10_000); + let value = value.as_bytes(); + + let key = key.as_bytes(); + + let vhandle = writer.get_next_value_handle(); + index_writer.insert_indirect(key, vhandle, value.len() as u32)?; + + writer.write(key, value)?; + } + value_log.register_writer(writer)?; + + for (key, (vhandle, _)) in index.read().unwrap().iter() { + let item = value_log.get(vhandle)?.unwrap(); + assert_eq!(item, key.repeat(10_000)); + } + + let index_len = index.read().unwrap().len() as u32; + // The first item will cache the fd, subsequent accesses will all be cache hits + assert_eq!(fd_cache.get_hit_count(), index_len - 1); + + Ok(()) +} diff --git a/tests/common.rs b/tests/common.rs index f758069..2da1fe4 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -3,10 +3,11 @@ // (found in the LICENSE-* files in the repository) use std::{ - collections::BTreeMap, + cell::RefCell, + collections::{BTreeMap, HashMap}, fs::File, io::BufReader, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, }; use value_log::{ BlobCache, BlobFileId, Compressor, FDCache, IndexReader, IndexWriter, UserKey, UserValue, @@ -100,3 +101,35 @@ impl FDCache for NoCacher { } fn insert(&self, _: ValueLogId, _: BlobFileId, _: BufReader) {} } + +#[derive(Clone, Default)] +pub struct InMemCacher { + fd_cache: Arc>>, + fd_hit_count: Arc>, +} +impl InMemCacher { + pub(crate) fn get_hit_count(&self) -> u32 { + *self.fd_hit_count.borrow() + } +} + +impl FDCache for InMemCacher { + fn get(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId) -> Option> { + let lock = self.fd_cache.lock().unwrap(); + let fd = match lock.get(&(vlog_id, blob_file_id)) { + Some(fd) => fd, + None => return None, + }; + + let fd_clone = fd.try_clone().unwrap(); + *self.fd_hit_count.borrow_mut() += 1; + Some(BufReader::new(fd_clone)) + } + + fn insert(&self, vlog_id: ValueLogId, blob_file_id: BlobFileId, fd: BufReader) { + self.fd_cache + .lock() + .unwrap() + .insert((vlog_id, blob_file_id), fd.into_inner()); + } +} From 71d5db84bc52c3f144f8e61e266a939124426a47 Mon Sep 17 00:00:00 2001 From: Jing Yang Date: Sun, 13 Apr 2025 16:46:09 +0800 Subject: [PATCH 6/7] feat: add miss count --- tests/basic_kv.rs | 2 ++ tests/common.rs | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/basic_kv.rs b/tests/basic_kv.rs index fed48d4..4890d65 100644 --- a/tests/basic_kv.rs +++ b/tests/basic_kv.rs @@ -100,6 +100,7 @@ fn get_with_cached_fd() -> value_log::Result<()> { writer.write(key, value)?; } + value_log.register_writer(writer)?; for (key, (vhandle, _)) in index.read().unwrap().iter() { @@ -109,6 +110,7 @@ fn get_with_cached_fd() -> value_log::Result<()> { let index_len = index.read().unwrap().len() as u32; // The first item will cache the fd, subsequent accesses will all be cache hits + assert_eq!(fd_cache.get_miss_count(), 1); assert_eq!(fd_cache.get_hit_count(), index_len - 1); Ok(()) diff --git a/tests/common.rs b/tests/common.rs index 2da1fe4..6dd585f 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -7,6 +7,7 @@ use std::{ collections::{BTreeMap, HashMap}, fs::File, io::BufReader, + ops::Add, sync::{Arc, Mutex, RwLock}, }; use value_log::{ @@ -106,11 +107,16 @@ impl FDCache for NoCacher { pub struct InMemCacher { fd_cache: Arc>>, fd_hit_count: Arc>, + fd_miss_count: Arc>, } impl InMemCacher { pub(crate) fn get_hit_count(&self) -> u32 { *self.fd_hit_count.borrow() } + + pub(crate) fn get_miss_count(&self) -> u32 { + *self.fd_miss_count.borrow() + } } impl FDCache for InMemCacher { @@ -118,7 +124,10 @@ impl FDCache for InMemCacher { let lock = self.fd_cache.lock().unwrap(); let fd = match lock.get(&(vlog_id, blob_file_id)) { Some(fd) => fd, - None => return None, + None => { + *self.fd_miss_count.borrow_mut() += 1; + return None; + } }; let fd_clone = fd.try_clone().unwrap(); From a77faf49284f87ba1f02df4f4b38eb7fc5c442e8 Mon Sep 17 00:00:00 2001 From: Jing Yang Date: Sun, 13 Apr 2025 16:52:40 +0800 Subject: [PATCH 7/7] chore: update comment --- tests/basic_kv.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/basic_kv.rs b/tests/basic_kv.rs index 4890d65..01be4c9 100644 --- a/tests/basic_kv.rs +++ b/tests/basic_kv.rs @@ -109,7 +109,7 @@ fn get_with_cached_fd() -> value_log::Result<()> { } let index_len = index.read().unwrap().len() as u32; - // The first item will cache the fd, subsequent accesses will all be cache hits + // The first item will be a miss and cache the fd, subsequent accesses will all be cache hits assert_eq!(fd_cache.get_miss_count(), 1); assert_eq!(fd_cache.get_hit_count(), index_len - 1);