From a844c26fc064a4433935dbda6f253bbc8d0e777b Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla <105630300+fsdvh@users.noreply.github.com> Date: Mon, 13 Apr 2026 10:39:10 +0200 Subject: [PATCH 1/4] Add non-blocking methods for sync cache (#1) Add non-blocking API for sync cache --- src/rw_lock.rs | 40 ++++++++ src/sync.rs | 253 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 293 insertions(+) diff --git a/src/rw_lock.rs b/src/rw_lock.rs index 278f5b7..cc8e81c 100644 --- a/src/rw_lock.rs +++ b/src/rw_lock.rs @@ -105,6 +105,46 @@ impl RwLock { }) } + /// Attempts to acquire this `RwLock` with shared read access without blocking. + /// + /// Returns `Some(guard)` if the lock was acquired, or `None` if it is already + /// held by a writer. + #[inline] + pub fn try_read(&self) -> Option> { + #[cfg(feature = "parking_lot")] + { + self.0.try_read().map(RwLockReadGuard) + } + #[cfg(not(feature = "parking_lot"))] + { + match self.0.try_read() { + Ok(guard) => Some(RwLockReadGuard(guard)), + Err(std::sync::TryLockError::WouldBlock) => None, + Err(std::sync::TryLockError::Poisoned(err)) => panic!("{}", err), + } + } + } + + /// Attempts to acquire this `RwLock` with exclusive write access without blocking. + /// + /// Returns `Some(guard)` if the lock was acquired, or `None` if it is already + /// held by any readers or a writer. + #[inline] + pub fn try_write(&self) -> Option> { + #[cfg(feature = "parking_lot")] + { + self.0.try_write().map(RwLockWriteGuard) + } + #[cfg(not(feature = "parking_lot"))] + { + match self.0.try_write() { + Ok(guard) => Some(RwLockWriteGuard(guard)), + Err(std::sync::TryLockError::WouldBlock) => None, + Err(std::sync::TryLockError::Poisoned(err)) => panic!("{}", err), + } + } + } + /// Locks this `RwLock` with exclusive write access, blocking the current /// thread until it can be acquired. /// diff --git a/src/sync.rs b/src/sync.rs index df77b40..e9f6f29 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -18,6 +18,29 @@ use crate::shard::EntryOrPlaceholder; pub use crate::sync_placeholder::{EntryAction, EntryResult, GuardResult, PlaceholderGuard}; use crate::sync_placeholder::{JoinFuture, JoinResult}; +/// The result of a non-blocking cache operation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ContendedResult { + /// The operation succeeded. For read operations the inner value holds the lookup result; + /// for write operations it holds the lifecycle request state (or `()` for [`Cache::try_insert`]). + Ok(Val), + /// The shard lock could not be acquired without blocking. The operation was not performed. + Contended, +} + +impl ContendedResult { + pub fn ok(self) -> Option { + match self { + ContendedResult::Ok(val) => Some(val), + ContendedResult::Contended => None, + } + } + + pub fn is_contended(&self) -> bool { + matches!(self, ContendedResult::Contended) + } +} + /// A concurrent cache /// /// The concurrent cache is internally composed of equally sized shards, each of which is independently @@ -247,6 +270,23 @@ impl< .is_some_and(|(shard, hash)| shard.read().contains(hash, key)) } + /// Attempts to check if a key exists in the cache without blocking. + /// Returns [`ContendedResult::Ok(true)`] if present, [`ContendedResult::Ok(false)`] if absent, + /// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking. + pub fn try_contains_key(&self, key: &Q) -> ContendedResult + where + Q: Hash + Equivalent + ?Sized, + { + let Some((shard, hash)) = self.shard_for(key) else { + return ContendedResult::Ok(false); + }; + + shard + .try_read() + .map(|guard| ContendedResult::Ok(guard.contains(hash, key))) + .unwrap_or(ContendedResult::Contended) + } + /// Fetches an item from the cache whose key is `key`. pub fn get(&self, key: &Q) -> Option where @@ -256,6 +296,22 @@ impl< shard.read().get(hash, key).cloned() } + /// Attempts to fetch an item from the cache whose key is `key`. + /// Returns [`ContendedResult::Ok(Some(val))`] if the key is present, [`ContendedResult::Ok(None)`] if absent, + /// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking. + pub fn try_get(&self, key: &Q) -> ContendedResult> + where + Q: Hash + Equivalent + ?Sized, + { + let Some((shard, hash)) = self.shard_for(key) else { + return ContendedResult::Ok(None); + }; + shard + .try_read() + .map(|guard| ContendedResult::Ok(guard.get(hash, key).cloned())) + .unwrap_or(ContendedResult::Contended) + } + /// Peeks an item from the cache whose key is `key`. /// Contrary to gets, peeks don't alter the key "hotness". pub fn peek(&self, key: &Q) -> Option @@ -266,6 +322,23 @@ impl< shard.read().peek(hash, key).cloned() } + /// Attempts to peek an item from the cache whose key is `key`. + /// Contrary to gets, peeks don't alter the key "hotness". + /// Returns [`ContendedResult::Ok(Some(val))`] if the key is present, [`ContendedResult::Ok(None)`] if absent, + /// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking. + pub fn try_peek(&self, key: &Q) -> ContendedResult> + where + Q: Hash + Equivalent + ?Sized, + { + let Some((shard, hash)) = self.shard_for(key) else { + return ContendedResult::Ok(None); + }; + shard + .try_read() + .map(|guard| ContendedResult::Ok(guard.peek(hash, key).cloned())) + .unwrap_or(ContendedResult::Contended) + } + /// Remove an item from the cache whose key is `key`. /// Returns the removed entry, if any. pub fn remove(&self, key: &Q) -> Option<(Key, Val)> @@ -276,6 +349,23 @@ impl< shard.write().remove(hash, key) } + /// Attempts to remove an item from the cache whose key is `key`. + /// Returns [`ContendedResult::Ok(Some(entry))`] with the removed entry if present, [`ContendedResult::Ok(None)`] if absent, + /// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking. + pub fn try_remove(&self, key: &Q) -> ContendedResult> + where + Q: Hash + Equivalent + ?Sized, + { + let Some((shard, hash)) = self.shard_for(key) else { + return ContendedResult::Ok(None); + }; + + shard + .try_write() + .map(|mut guard| ContendedResult::Ok(guard.remove(hash, key))) + .unwrap_or(ContendedResult::Contended) + } + /// Remove an item from the cache whose key is `key` if `f(&value)` returns `true` for that entry. /// Compared to peek and remove, this method guarantees that no new value was inserted in-between. /// @@ -337,6 +427,18 @@ impl< self.lifecycle.end_request(lcs); } + /// Attempts to insert an item in the cache with key `key`. + /// Returns [`ContendedResult::Ok`] if the item was inserted, or [`ContendedResult::Contended`] if the shard lock was contended. + pub fn try_insert(&self, key: Key, value: Val) -> ContendedResult<()> { + match self.try_insert_with_lifecycle(key, value) { + ContendedResult::Ok(lcs) => { + self.lifecycle.end_request(lcs); + ContendedResult::Ok(()) + } + ContendedResult::Contended => ContendedResult::Contended, + } + } + /// Inserts an item in the cache with key `key`. pub fn insert_with_lifecycle(&self, key: Key, value: Val) -> L::RequestState { let mut lcs = self.lifecycle.begin_request(); @@ -349,6 +451,28 @@ impl< lcs } + /// Attempts to insert an item in the cache with key `key`. + /// Returns [`ContendedResult::Ok`] with the lifecycle request state if the item was inserted, + /// or [`ContendedResult::Contended`] if the shard lock was contended. + pub fn try_insert_with_lifecycle( + &self, + key: Key, + value: Val, + ) -> ContendedResult { + let (shard, hash) = self.shard_for(&key).unwrap(); + + shard + .try_write() + .map(|mut guard| { + let mut lcs = self.lifecycle.begin_request(); + let result = guard.insert(&mut lcs, hash, key, value, InsertStrategy::Insert); + // result cannot err with the Insert strategy + debug_assert!(result.is_ok()); + ContendedResult::Ok(lcs) + }) + .unwrap_or(ContendedResult::Contended) + } + /// Clear all items from the cache pub fn clear(&self) { for s in self.shards.iter() { @@ -1499,4 +1623,133 @@ mod tests { } } } + + // --- Non-blocking method tests --- + + #[test] + fn test_contended_result_helpers() { + let ok: ContendedResult = ContendedResult::Ok(42); + assert!(!ok.is_contended()); + assert_eq!(ok.ok(), Some(42)); + + let contended: ContendedResult = ContendedResult::Contended; + assert!(contended.is_contended()); + assert_eq!(contended.ok(), None); + } + + #[test] + fn test_try_contains_key() { + let cache = Cache::new(100); + cache.insert(1, 10); + + assert_eq!(cache.try_contains_key(&1), ContendedResult::Ok(true)); + assert_eq!(cache.try_contains_key(&2), ContendedResult::Ok(false)); + } + + #[test] + fn test_try_contains_key_contended() { + let cache = Cache::new(100); + cache.insert(1, 10); + // Hold write locks on all shards so try_read is blocked. + let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect(); + assert_eq!(cache.try_contains_key(&1), ContendedResult::Contended); + } + + #[test] + fn test_try_get() { + let cache = Cache::new(100); + cache.insert(1, 10); + + assert_eq!(cache.try_get(&1), ContendedResult::Ok(Some(10))); + assert_eq!(cache.try_get(&2), ContendedResult::Ok(None)); + } + + #[test] + fn test_try_get_contended() { + let cache = Cache::new(100); + cache.insert(1, 10); + let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect(); + assert_eq!(cache.try_get(&1), ContendedResult::Contended); + } + + #[test] + fn test_try_peek() { + let cache = Cache::new(100); + cache.insert(1, 10); + + assert_eq!(cache.try_peek(&1), ContendedResult::Ok(Some(10))); + assert_eq!(cache.try_peek(&2), ContendedResult::Ok(None)); + } + + #[test] + fn test_try_peek_contended() { + let cache = Cache::new(100); + cache.insert(1, 10); + let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect(); + assert_eq!(cache.try_peek(&1), ContendedResult::Contended); + } + + #[test] + fn test_try_remove() { + let cache = Cache::new(100); + cache.insert(1, 10); + + assert_eq!(cache.try_remove(&1), ContendedResult::Ok(Some((1, 10)))); + assert_eq!(cache.try_remove(&1), ContendedResult::Ok(None)); + assert_eq!(cache.try_remove(&99), ContendedResult::Ok(None)); + } + + #[test] + fn test_try_remove_contended() { + let cache = Cache::new(100); + cache.insert(1, 10); + // Hold read locks on all shards so try_write is blocked. + let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect(); + assert_eq!(cache.try_remove(&1), ContendedResult::Contended); + drop(guards); + // Item must still be present since the remove did not happen. + assert_eq!(cache.get(&1), Some(10)); + } + + #[test] + fn test_try_insert() { + let cache = Cache::new(100); + + assert_eq!(cache.try_insert(1, 10), ContendedResult::Ok(())); + assert_eq!(cache.get(&1), Some(10)); + + // Insert same key overwrites the previous value. + assert_eq!(cache.try_insert(1, 20), ContendedResult::Ok(())); + assert_eq!(cache.get(&1), Some(20)); + } + + #[test] + fn test_try_insert_contended() { + let cache = Cache::new(100); + let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect(); + assert_eq!(cache.try_insert(1, 10), ContendedResult::Contended); + drop(guards); + assert_eq!(cache.get(&1), None); + } + + #[test] + fn test_try_insert_with_lifecycle() { + let cache = Cache::new(100); + + // Successful insert returns the lifecycle request state. + let result = cache.try_insert_with_lifecycle(1, 10); + assert!(!result.is_contended()); + let lcs = result.ok().unwrap(); + cache.lifecycle.end_request(lcs); + assert_eq!(cache.get(&1), Some(10)); + + // Contended when a read lock is held. + let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect(); + assert_eq!( + cache.try_insert_with_lifecycle(2, 20), + ContendedResult::Contended + ); + drop(guards); + assert_eq!(cache.get(&2), None); + } } From 1b13f573e24793b5563b59ffc4be520679a9f40a Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla <105630300+fsdvh@users.noreply.github.com> Date: Mon, 13 Apr 2026 10:40:23 +0200 Subject: [PATCH 2/4] Expose shard_index method (#2) Expose shard_index method --- src/sync.rs | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index e9f6f29..0057c64 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -231,6 +231,37 @@ impl< self.shards.iter().map(|s| s.read().hits()).sum() } + #[inline] + fn compute_shard_index(&self, hash: u64) -> u64 { + // Give preference to the bits in the middle of the hash. When choosing the + // shard, rotate the hash by usize::BITS / 2 so we avoid the lower bits and + // the highest 7 bits that hashbrown uses internally for probing, improving + // the real entropy available to each hashbrown shard. + hash.rotate_right(usize::BITS / 2) & self.shards_mask + } + + /// Returns the shard index for the given key. + /// + /// The returned index is guaranteed to be in `[0, num_shards())`. + /// + /// # Use cases + /// + /// - **Batching**: group keys by shard index before acquiring shard locks, so + /// each lock is taken only once per batch instead of once per key. + /// + /// # Notes + /// + /// The mapping from key to shard index depends on the [`BuildHasher`] supplied + /// at construction time. If two `Cache` instances are built with different + /// hashers, the same key may map to different shard indices. + /// + /// [`BuildHasher`]: std::hash::BuildHasher + #[inline] + pub fn shard_index + ?Sized>(&self, key: &Q) -> usize { + let hash = self.hash_builder.hash_one(key); + self.compute_shard_index(hash) as usize + } + #[inline] fn shard_for( &self, @@ -243,11 +274,7 @@ impl< Q: Hash + Equivalent + ?Sized, { let hash = self.hash_builder.hash_one(key); - // When choosing the shard, rotate the hash bits usize::BITS / 2 so that we - // give preference to the bits in the middle of the hash. - // Internally hashbrown uses the lower bits for start of probing + the 7 highest, - // so by picking something else we improve the real entropy available to each hashbrown shard. - let shard_idx = (hash.rotate_right(usize::BITS / 2) & self.shards_mask) as usize; + let shard_idx = self.compute_shard_index(hash) as usize; self.shards.get(shard_idx).map(|s| (s, hash)) } From c76a9db8eacbf8c76a6adbbcf512f43a81ef69e4 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla <105630300+fsdvh@users.noreply.github.com> Date: Mon, 27 Apr 2026 18:39:00 +0200 Subject: [PATCH 3/4] Add wait-free methods (#3) * Add non-blocking methods * Do not leak lcs * Dedicated result * More * Simplify * Cleanup + some docs * Docs * Use a feature flag * Posion lock * Remove feature and more tests * Update src/sync.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Simplify signature * Simplify * clippy * More docs * Update README.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update src/sync.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * AI suggestions * Update src/sync.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- README.md | 1 + src/sync.rs | 188 +++++++++++++++++++++++----------------------------- 2 files changed, 84 insertions(+), 105 deletions(-) diff --git a/README.md b/README.md index 8c986e8..046d03c 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ Lightweight and high performance concurrent cache optimized for low cache overhe * Scales well with the number of threads * Atomic operations with `get_or_insert` and `get_value_or_guard` functions * Atomic async operations with `get_or_insert_async` and `get_value_or_guard_async` functions +* Non-blocking `try_get`, `try_insert`, `try_remove`, and related methods that return an error instead of blocking: typically `Err(LockContention)`, or `Err((Key, Val))` for `try_insert`/`try_insert_with_lifecycle` so inputs are preserved * Closure-based `entry` API for atomic inspect-and-act patterns (keep, remove, replace) * Supports item pinning * Iteration and draining diff --git a/src/sync.rs b/src/sync.rs index 0057c64..7f6f5f3 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -18,29 +18,22 @@ use crate::shard::EntryOrPlaceholder; pub use crate::sync_placeholder::{EntryAction, EntryResult, GuardResult, PlaceholderGuard}; use crate::sync_placeholder::{JoinFuture, JoinResult}; -/// The result of a non-blocking cache operation. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ContendedResult { - /// The operation succeeded. For read operations the inner value holds the lookup result; - /// for write operations it holds the lifecycle request state (or `()` for [`Cache::try_insert`]). - Ok(Val), - /// The shard lock could not be acquired without blocking. The operation was not performed. - Contended, -} - -impl ContendedResult { - pub fn ok(self) -> Option { - match self { - ContendedResult::Ok(val) => Some(val), - ContendedResult::Contended => None, - } - } +/// Error returned by non-blocking cache operations that do not consume their +/// inputs when the relevant shard lock could not be acquired immediately. +/// +/// This is used by borrowed-key/read-path operations. Non-blocking operations +/// that consume owned inputs may instead return those inputs on contention. +#[derive(Debug)] +pub struct LockContention; - pub fn is_contended(&self) -> bool { - matches!(self, ContendedResult::Contended) +impl std::fmt::Display for LockContention { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Lock Contention") } } +impl std::error::Error for LockContention {} + /// A concurrent cache /// /// The concurrent cache is internally composed of equally sized shards, each of which is independently @@ -298,20 +291,20 @@ impl< } /// Attempts to check if a key exists in the cache without blocking. - /// Returns [`ContendedResult::Ok(true)`] if present, [`ContendedResult::Ok(false)`] if absent, - /// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking. - pub fn try_contains_key(&self, key: &Q) -> ContendedResult + /// Returns `Ok(true)` if present, `Ok(false)` if absent, + /// or `Err(LockContention)` if the shard lock could not be acquired without blocking. + pub fn try_contains_key(&self, key: &Q) -> Result where Q: Hash + Equivalent + ?Sized, { let Some((shard, hash)) = self.shard_for(key) else { - return ContendedResult::Ok(false); + return Ok(false); }; - shard - .try_read() - .map(|guard| ContendedResult::Ok(guard.contains(hash, key))) - .unwrap_or(ContendedResult::Contended) + match shard.try_read() { + Some(guard) => Ok(guard.contains(hash, key)), + None => Err(LockContention), + } } /// Fetches an item from the cache whose key is `key`. @@ -324,19 +317,20 @@ impl< } /// Attempts to fetch an item from the cache whose key is `key`. - /// Returns [`ContendedResult::Ok(Some(val))`] if the key is present, [`ContendedResult::Ok(None)`] if absent, - /// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking. - pub fn try_get(&self, key: &Q) -> ContendedResult> + /// Returns `Ok(Some(val))` if the key is present, `Ok(None)` if absent, + /// or `Err(LockContention)` if the shard lock could not be acquired without blocking. + pub fn try_get(&self, key: &Q) -> Result, LockContention> where Q: Hash + Equivalent + ?Sized, { let Some((shard, hash)) = self.shard_for(key) else { - return ContendedResult::Ok(None); + return Ok(None); }; - shard - .try_read() - .map(|guard| ContendedResult::Ok(guard.get(hash, key).cloned())) - .unwrap_or(ContendedResult::Contended) + + match shard.try_read() { + Some(guard) => Ok(guard.get(hash, key).cloned()), + None => Err(LockContention), + } } /// Peeks an item from the cache whose key is `key`. @@ -351,19 +345,19 @@ impl< /// Attempts to peek an item from the cache whose key is `key`. /// Contrary to gets, peeks don't alter the key "hotness". - /// Returns [`ContendedResult::Ok(Some(val))`] if the key is present, [`ContendedResult::Ok(None)`] if absent, - /// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking. - pub fn try_peek(&self, key: &Q) -> ContendedResult> + /// Returns `Ok(Some(val))` if the key is present, `Ok(None)` if absent, + /// or `Err(LockContention)` if the shard lock could not be acquired without blocking. + pub fn try_peek(&self, key: &Q) -> Result, LockContention> where Q: Hash + Equivalent + ?Sized, { let Some((shard, hash)) = self.shard_for(key) else { - return ContendedResult::Ok(None); + return Ok(None); }; - shard - .try_read() - .map(|guard| ContendedResult::Ok(guard.peek(hash, key).cloned())) - .unwrap_or(ContendedResult::Contended) + match shard.try_read() { + Some(guard) => Ok(guard.peek(hash, key).cloned()), + None => Err(LockContention), + } } /// Remove an item from the cache whose key is `key`. @@ -377,20 +371,20 @@ impl< } /// Attempts to remove an item from the cache whose key is `key`. - /// Returns [`ContendedResult::Ok(Some(entry))`] with the removed entry if present, [`ContendedResult::Ok(None)`] if absent, - /// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking. - pub fn try_remove(&self, key: &Q) -> ContendedResult> + /// Returns `Ok(Some(entry))` with the removed entry if present, `Ok(None)` if absent, + /// or `Err(LockContention)` if the shard lock could not be acquired without blocking. + pub fn try_remove(&self, key: &Q) -> Result, LockContention> where Q: Hash + Equivalent + ?Sized, { let Some((shard, hash)) = self.shard_for(key) else { - return ContendedResult::Ok(None); + return Ok(None); }; - shard - .try_write() - .map(|mut guard| ContendedResult::Ok(guard.remove(hash, key))) - .unwrap_or(ContendedResult::Contended) + match shard.try_write() { + Some(mut guard) => Ok(guard.remove(hash, key)), + None => Err(LockContention), + } } /// Remove an item from the cache whose key is `key` if `f(&value)` returns `true` for that entry. @@ -454,16 +448,13 @@ impl< self.lifecycle.end_request(lcs); } - /// Attempts to insert an item in the cache with key `key`. - /// Returns [`ContendedResult::Ok`] if the item was inserted, or [`ContendedResult::Contended`] if the shard lock was contended. - pub fn try_insert(&self, key: Key, value: Val) -> ContendedResult<()> { - match self.try_insert_with_lifecycle(key, value) { - ContendedResult::Ok(lcs) => { - self.lifecycle.end_request(lcs); - ContendedResult::Ok(()) - } - ContendedResult::Contended => ContendedResult::Contended, - } + /// Attempts to insert an item in the cache with key `key` without blocking. + /// Returns `Ok(())` if the item was inserted, or `Err((key, value))` if the shard lock + /// could not be acquired without blocking. + pub fn try_insert(&self, key: Key, value: Val) -> Result<(), (Key, Val)> { + let lcs = self.try_insert_with_lifecycle(key, value)?; + self.lifecycle.end_request(lcs); + Ok(()) } /// Inserts an item in the cache with key `key`. @@ -478,26 +469,26 @@ impl< lcs } - /// Attempts to insert an item in the cache with key `key`. - /// Returns [`ContendedResult::Ok`] with the lifecycle request state if the item was inserted, - /// or [`ContendedResult::Contended`] if the shard lock was contended. + /// Attempts to insert an item in the cache with key `key` without blocking. + /// Returns `Ok(lcs)` with the lifecycle request state if the item was inserted, + /// or `Err((key, value))` if the shard lock could not be acquired without blocking. pub fn try_insert_with_lifecycle( &self, key: Key, value: Val, - ) -> ContendedResult { + ) -> Result { let (shard, hash) = self.shard_for(&key).unwrap(); - shard - .try_write() - .map(|mut guard| { + match shard.try_write() { + Some(mut shard) => { let mut lcs = self.lifecycle.begin_request(); - let result = guard.insert(&mut lcs, hash, key, value, InsertStrategy::Insert); + let result = shard.insert(&mut lcs, hash, key, value, InsertStrategy::Insert); // result cannot err with the Insert strategy debug_assert!(result.is_ok()); - ContendedResult::Ok(lcs) - }) - .unwrap_or(ContendedResult::Contended) + Ok(lcs) + } + _ => Err((key, value)), + } } /// Clear all items from the cache @@ -1652,25 +1643,13 @@ mod tests { } // --- Non-blocking method tests --- - - #[test] - fn test_contended_result_helpers() { - let ok: ContendedResult = ContendedResult::Ok(42); - assert!(!ok.is_contended()); - assert_eq!(ok.ok(), Some(42)); - - let contended: ContendedResult = ContendedResult::Contended; - assert!(contended.is_contended()); - assert_eq!(contended.ok(), None); - } - #[test] fn test_try_contains_key() { let cache = Cache::new(100); cache.insert(1, 10); - assert_eq!(cache.try_contains_key(&1), ContendedResult::Ok(true)); - assert_eq!(cache.try_contains_key(&2), ContendedResult::Ok(false)); + assert!(cache.try_contains_key(&1).is_ok_and(|v| v)); + assert!(cache.try_contains_key(&2).is_ok_and(|v| !v)); } #[test] @@ -1679,7 +1658,7 @@ mod tests { cache.insert(1, 10); // Hold write locks on all shards so try_read is blocked. let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect(); - assert_eq!(cache.try_contains_key(&1), ContendedResult::Contended); + assert!(cache.try_contains_key(&1).is_err()); } #[test] @@ -1687,8 +1666,8 @@ mod tests { let cache = Cache::new(100); cache.insert(1, 10); - assert_eq!(cache.try_get(&1), ContendedResult::Ok(Some(10))); - assert_eq!(cache.try_get(&2), ContendedResult::Ok(None)); + assert!(cache.try_get(&1).is_ok_and(|v| matches!(v, Some(10)))); + assert!(cache.try_get(&2).is_ok_and(|v| v.is_none())); } #[test] @@ -1696,7 +1675,7 @@ mod tests { let cache = Cache::new(100); cache.insert(1, 10); let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect(); - assert_eq!(cache.try_get(&1), ContendedResult::Contended); + assert!(cache.try_get(&1).is_err()); } #[test] @@ -1704,8 +1683,8 @@ mod tests { let cache = Cache::new(100); cache.insert(1, 10); - assert_eq!(cache.try_peek(&1), ContendedResult::Ok(Some(10))); - assert_eq!(cache.try_peek(&2), ContendedResult::Ok(None)); + assert!(cache.try_peek(&1).is_ok_and(|v| matches!(v, Some(10)))); + assert!(cache.try_peek(&2).is_ok_and(|v| v.is_none())); } #[test] @@ -1713,7 +1692,7 @@ mod tests { let cache = Cache::new(100); cache.insert(1, 10); let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect(); - assert_eq!(cache.try_peek(&1), ContendedResult::Contended); + assert!(cache.try_peek(&1).is_err()); } #[test] @@ -1721,9 +1700,11 @@ mod tests { let cache = Cache::new(100); cache.insert(1, 10); - assert_eq!(cache.try_remove(&1), ContendedResult::Ok(Some((1, 10)))); - assert_eq!(cache.try_remove(&1), ContendedResult::Ok(None)); - assert_eq!(cache.try_remove(&99), ContendedResult::Ok(None)); + assert!(cache + .try_remove(&1) + .is_ok_and(|v| matches!(v, Some((1, 10))))); + assert!(cache.try_remove(&1).is_ok_and(|v| v.is_none())); + assert!(cache.try_remove(&99).is_ok_and(|v| v.is_none())); } #[test] @@ -1732,7 +1713,7 @@ mod tests { cache.insert(1, 10); // Hold read locks on all shards so try_write is blocked. let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect(); - assert_eq!(cache.try_remove(&1), ContendedResult::Contended); + assert!(cache.try_remove(&1).is_err()); drop(guards); // Item must still be present since the remove did not happen. assert_eq!(cache.get(&1), Some(10)); @@ -1742,11 +1723,11 @@ mod tests { fn test_try_insert() { let cache = Cache::new(100); - assert_eq!(cache.try_insert(1, 10), ContendedResult::Ok(())); + assert_eq!(cache.try_insert(1, 10), Ok(())); assert_eq!(cache.get(&1), Some(10)); // Insert same key overwrites the previous value. - assert_eq!(cache.try_insert(1, 20), ContendedResult::Ok(())); + assert_eq!(cache.try_insert(1, 20), Ok(())); assert_eq!(cache.get(&1), Some(20)); } @@ -1754,7 +1735,7 @@ mod tests { fn test_try_insert_contended() { let cache = Cache::new(100); let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect(); - assert_eq!(cache.try_insert(1, 10), ContendedResult::Contended); + assert_eq!(cache.try_insert(1, 10), Err((1, 10))); drop(guards); assert_eq!(cache.get(&1), None); } @@ -1765,17 +1746,14 @@ mod tests { // Successful insert returns the lifecycle request state. let result = cache.try_insert_with_lifecycle(1, 10); - assert!(!result.is_contended()); + assert!(result.is_ok()); let lcs = result.ok().unwrap(); cache.lifecycle.end_request(lcs); assert_eq!(cache.get(&1), Some(10)); // Contended when a read lock is held. let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect(); - assert_eq!( - cache.try_insert_with_lifecycle(2, 20), - ContendedResult::Contended - ); + assert_eq!(cache.try_insert_with_lifecycle(2, 20), Err((2, 20))); drop(guards); assert_eq!(cache.get(&2), None); } From 9a15d9492ce355cd8f1fc0d643e7fdb4df0e4dc4 Mon Sep 17 00:00:00 2001 From: Faiaz Sanaulla Date: Wed, 29 Apr 2026 10:15:35 +0200 Subject: [PATCH 4/4] Move req init outside of the lock --- src/sync.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index 7f6f5f3..55b1580 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -22,8 +22,9 @@ use crate::sync_placeholder::{JoinFuture, JoinResult}; /// inputs when the relevant shard lock could not be acquired immediately. /// /// This is used by borrowed-key/read-path operations. Non-blocking operations -/// that consume owned inputs may instead return those inputs on contention. -#[derive(Debug)] +/// that consume owned inputs (e.g. `try_insert`) instead return those inputs +/// on contention so the caller can retry or discard without losing data. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct LockContention; impl std::fmt::Display for LockContention { @@ -450,7 +451,8 @@ impl< /// Attempts to insert an item in the cache with key `key` without blocking. /// Returns `Ok(())` if the item was inserted, or `Err((key, value))` if the shard lock - /// could not be acquired without blocking. + /// could not be acquired without blocking. Lock contention is the only failure + /// mode: the inputs are returned so the caller can retry or discard them. pub fn try_insert(&self, key: Key, value: Val) -> Result<(), (Key, Val)> { let lcs = self.try_insert_with_lifecycle(key, value)?; self.lifecycle.end_request(lcs); @@ -472,16 +474,20 @@ impl< /// Attempts to insert an item in the cache with key `key` without blocking. /// Returns `Ok(lcs)` with the lifecycle request state if the item was inserted, /// or `Err((key, value))` if the shard lock could not be acquired without blocking. + /// Lock contention is the only failure mode: the inputs are returned so the + /// caller can retry or discard them. pub fn try_insert_with_lifecycle( &self, key: Key, value: Val, ) -> Result { + // Tradeoff: begin_request is called before acquiring the shard lock to avoid holding + // the lock during potentially expensive lifecycle initialization. + let mut lcs = self.lifecycle.begin_request(); let (shard, hash) = self.shard_for(&key).unwrap(); match shard.try_write() { Some(mut shard) => { - let mut lcs = self.lifecycle.begin_request(); let result = shard.insert(&mut lcs, hash, key, value, InsertStrategy::Insert); // result cannot err with the Insert strategy debug_assert!(result.is_ok());