Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Lightweight and high performance concurrent cache optimized for low cache overhe
* Scales well with the number of threads
* Atomic operations with `get_or_insert` and `get_value_or_guard` functions
* Atomic async operations with `get_or_insert_async` and `get_value_or_guard_async` functions
* Non-blocking `try_get`, `try_insert`, `try_remove`, and related methods that return an error instead of blocking: typically `Err(LockContention)`, or `Err((Key, Val))` for `try_insert`/`try_insert_with_lifecycle` so inputs are preserved
* Closure-based `entry` API for atomic inspect-and-act patterns (keep, remove, replace)
* Supports item pinning
* Iteration and draining
Expand Down
40 changes: 40 additions & 0 deletions src/rw_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,46 @@ impl<T: ?Sized> RwLock<T> {
})
}

/// Attempts to acquire this `RwLock` with shared read access without blocking.
///
/// Returns `Some(guard)` if the lock was acquired, or `None` if it is already
/// held by a writer.
#[inline]
pub fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
#[cfg(feature = "parking_lot")]
{
self.0.try_read().map(RwLockReadGuard)
}
#[cfg(not(feature = "parking_lot"))]
{
match self.0.try_read() {
Ok(guard) => Some(RwLockReadGuard(guard)),
Err(std::sync::TryLockError::WouldBlock) => None,
Err(std::sync::TryLockError::Poisoned(err)) => panic!("{}", err),
}
}
}

/// Attempts to acquire this `RwLock` with exclusive write access without blocking.
///
/// Returns `Some(guard)` if the lock was acquired, or `None` if it is already
/// held by any readers or a writer.
#[inline]
pub fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
#[cfg(feature = "parking_lot")]
{
self.0.try_write().map(RwLockWriteGuard)
}
#[cfg(not(feature = "parking_lot"))]
{
match self.0.try_write() {
Ok(guard) => Some(RwLockWriteGuard(guard)),
Err(std::sync::TryLockError::WouldBlock) => None,
Err(std::sync::TryLockError::Poisoned(err)) => panic!("{}", err),
}
}
}

/// Locks this `RwLock` with exclusive write access, blocking the current
/// thread until it can be acquired.
///
Expand Down
237 changes: 237 additions & 0 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ use crate::shard::EntryOrPlaceholder;
pub use crate::sync_placeholder::{EntryAction, EntryResult, GuardResult, PlaceholderGuard};
use crate::sync_placeholder::{JoinFuture, JoinResult};

/// Error returned by non-blocking cache operations that do not consume their
/// inputs when the relevant shard lock could not be acquired immediately.
///
/// This is used by borrowed-key/read-path operations. Non-blocking operations
/// that consume owned inputs (e.g. `try_insert`) instead return those inputs
/// on contention so the caller can retry or discard without losing data.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct LockContention;

impl std::fmt::Display for LockContention {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Lock Contention")
}
}

impl std::error::Error for LockContention {}

/// A concurrent cache
///
/// The concurrent cache is internally composed of equally sized shards, each of which is independently
Expand Down Expand Up @@ -274,6 +291,23 @@ impl<
.is_some_and(|(shard, hash)| shard.read().contains(hash, key))
}

/// Attempts to check if a key exists in the cache without blocking.
/// Returns `Ok(true)` if present, `Ok(false)` if absent,
/// or `Err(LockContention)` if the shard lock could not be acquired without blocking.
pub fn try_contains_key<Q>(&self, key: &Q) -> Result<bool, LockContention>
where
Q: Hash + Equivalent<Key> + ?Sized,
{
let Some((shard, hash)) = self.shard_for(key) else {
return Ok(false);
};

match shard.try_read() {
Some(guard) => Ok(guard.contains(hash, key)),
None => Err(LockContention),
}
}

/// Fetches an item from the cache whose key is `key`.
pub fn get<Q>(&self, key: &Q) -> Option<Val>
where
Expand All @@ -283,6 +317,23 @@ impl<
shard.read().get(hash, key).cloned()
}

/// Attempts to fetch an item from the cache whose key is `key`.
/// Returns `Ok(Some(val))` if the key is present, `Ok(None)` if absent,
/// or `Err(LockContention)` if the shard lock could not be acquired without blocking.
pub fn try_get<Q>(&self, key: &Q) -> Result<Option<Val>, LockContention>
where
Q: Hash + Equivalent<Key> + ?Sized,
{
let Some((shard, hash)) = self.shard_for(key) else {
return Ok(None);
};

match shard.try_read() {
Some(guard) => Ok(guard.get(hash, key).cloned()),
None => Err(LockContention),
}
}

/// Peeks an item from the cache whose key is `key`.
/// Contrary to gets, peeks don't alter the key "hotness".
pub fn peek<Q>(&self, key: &Q) -> Option<Val>
Expand All @@ -293,6 +344,23 @@ impl<
shard.read().peek(hash, key).cloned()
}

/// Attempts to peek an item from the cache whose key is `key`.
/// Contrary to gets, peeks don't alter the key "hotness".
/// Returns `Ok(Some(val))` if the key is present, `Ok(None)` if absent,
/// or `Err(LockContention)` if the shard lock could not be acquired without blocking.
pub fn try_peek<Q>(&self, key: &Q) -> Result<Option<Val>, LockContention>
where
Q: Hash + Equivalent<Key> + ?Sized,
{
let Some((shard, hash)) = self.shard_for(key) else {
return Ok(None);
};
match shard.try_read() {
Some(guard) => Ok(guard.peek(hash, key).cloned()),
None => Err(LockContention),
}
}

/// Remove an item from the cache whose key is `key`.
/// Returns the removed entry, if any.
pub fn remove<Q>(&self, key: &Q) -> Option<(Key, Val)>
Expand All @@ -303,6 +371,23 @@ impl<
shard.write().remove(hash, key)
}

/// Attempts to remove an item from the cache whose key is `key`.
/// Returns `Ok(Some(entry))` with the removed entry if present, `Ok(None)` if absent,
/// or `Err(LockContention)` if the shard lock could not be acquired without blocking.
pub fn try_remove<Q>(&self, key: &Q) -> Result<Option<(Key, Val)>, LockContention>
where
Q: Hash + Equivalent<Key> + ?Sized,
{
let Some((shard, hash)) = self.shard_for(key) else {
return Ok(None);
};

match shard.try_write() {
Some(mut guard) => Ok(guard.remove(hash, key)),
None => Err(LockContention),
}
}

/// Remove an item from the cache whose key is `key` if `f(&value)` returns `true` for that entry.
/// Compared to peek and remove, this method guarantees that no new value was inserted in-between.
///
Expand Down Expand Up @@ -364,6 +449,16 @@ impl<
self.lifecycle.end_request(lcs);
}

/// Attempts to insert an item in the cache with key `key` without blocking.
/// Returns `Ok(())` if the item was inserted, or `Err((key, value))` if the shard lock
/// could not be acquired without blocking. Lock contention is the only failure
/// mode: the inputs are returned so the caller can retry or discard them.
pub fn try_insert(&self, key: Key, value: Val) -> Result<(), (Key, Val)> {
let lcs = self.try_insert_with_lifecycle(key, value)?;
self.lifecycle.end_request(lcs);
Ok(())
}

/// Inserts an item in the cache with key `key`.
pub fn insert_with_lifecycle(&self, key: Key, value: Val) -> L::RequestState {
let mut lcs = self.lifecycle.begin_request();
Expand All @@ -376,6 +471,32 @@ impl<
lcs
}

/// Attempts to insert an item in the cache with key `key` without blocking.
/// Returns `Ok(lcs)` with the lifecycle request state if the item was inserted,
/// or `Err((key, value))` if the shard lock could not be acquired without blocking.
/// Lock contention is the only failure mode: the inputs are returned so the
/// caller can retry or discard them.
pub fn try_insert_with_lifecycle(
&self,
key: Key,
value: Val,
) -> Result<L::RequestState, (Key, Val)> {
// Tradeoff: begin_request is called before acquiring the shard lock to avoid holding
// the lock during potentially expensive lifecycle initialization.
let mut lcs = self.lifecycle.begin_request();
let (shard, hash) = self.shard_for(&key).unwrap();

match shard.try_write() {
Some(mut shard) => {
let result = shard.insert(&mut lcs, hash, key, value, InsertStrategy::Insert);
// result cannot err with the Insert strategy
debug_assert!(result.is_ok());
Ok(lcs)
}
_ => Err((key, value)),
}
}

/// Clear all items from the cache
pub fn clear(&self) {
for s in self.shards.iter() {
Expand Down Expand Up @@ -1526,4 +1647,120 @@ mod tests {
}
}
}

// --- Non-blocking method tests ---
#[test]
fn test_try_contains_key() {
let cache = Cache::new(100);
cache.insert(1, 10);

assert!(cache.try_contains_key(&1).is_ok_and(|v| v));
assert!(cache.try_contains_key(&2).is_ok_and(|v| !v));
}

#[test]
fn test_try_contains_key_contended() {
let cache = Cache::new(100);
cache.insert(1, 10);
// Hold write locks on all shards so try_read is blocked.
let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect();
assert!(cache.try_contains_key(&1).is_err());
}

#[test]
fn test_try_get() {
let cache = Cache::new(100);
cache.insert(1, 10);

assert!(cache.try_get(&1).is_ok_and(|v| matches!(v, Some(10))));
assert!(cache.try_get(&2).is_ok_and(|v| v.is_none()));
}

#[test]
fn test_try_get_contended() {
let cache = Cache::new(100);
cache.insert(1, 10);
let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect();
assert!(cache.try_get(&1).is_err());
}

#[test]
fn test_try_peek() {
let cache = Cache::new(100);
cache.insert(1, 10);

assert!(cache.try_peek(&1).is_ok_and(|v| matches!(v, Some(10))));
assert!(cache.try_peek(&2).is_ok_and(|v| v.is_none()));
}

#[test]
fn test_try_peek_contended() {
let cache = Cache::new(100);
cache.insert(1, 10);
let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect();
assert!(cache.try_peek(&1).is_err());
}

#[test]
fn test_try_remove() {
let cache = Cache::new(100);
cache.insert(1, 10);

assert!(cache
.try_remove(&1)
.is_ok_and(|v| matches!(v, Some((1, 10)))));
assert!(cache.try_remove(&1).is_ok_and(|v| v.is_none()));
assert!(cache.try_remove(&99).is_ok_and(|v| v.is_none()));
}

#[test]
fn test_try_remove_contended() {
let cache = Cache::new(100);
cache.insert(1, 10);
// Hold read locks on all shards so try_write is blocked.
let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect();
assert!(cache.try_remove(&1).is_err());
drop(guards);
// Item must still be present since the remove did not happen.
assert_eq!(cache.get(&1), Some(10));
}

#[test]
fn test_try_insert() {
let cache = Cache::new(100);

assert_eq!(cache.try_insert(1, 10), Ok(()));
assert_eq!(cache.get(&1), Some(10));

// Insert same key overwrites the previous value.
assert_eq!(cache.try_insert(1, 20), Ok(()));
assert_eq!(cache.get(&1), Some(20));
}

#[test]
fn test_try_insert_contended() {
let cache = Cache::new(100);
let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect();
assert_eq!(cache.try_insert(1, 10), Err((1, 10)));
drop(guards);
assert_eq!(cache.get(&1), None);
}

#[test]
fn test_try_insert_with_lifecycle() {
let cache = Cache::new(100);

// Successful insert returns the lifecycle request state.
let result = cache.try_insert_with_lifecycle(1, 10);
assert!(result.is_ok());
let lcs = result.ok().unwrap();
cache.lifecycle.end_request(lcs);
assert_eq!(cache.get(&1), Some(10));

// Contended when a read lock is held.
let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect();
assert_eq!(cache.try_insert_with_lifecycle(2, 20), Err((2, 20)));
drop(guards);
assert_eq!(cache.get(&2), None);
}
}
Loading