Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/rw_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,46 @@ impl<T: ?Sized> RwLock<T> {
})
}

/// Attempts to acquire this `RwLock` with shared read access without blocking.
///
/// Returns `Some(guard)` if the lock was acquired, or `None` if it is already
/// held by a writer.
#[inline]
pub fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
Comment thread
fsdvh marked this conversation as resolved.
#[cfg(feature = "parking_lot")]
Comment thread
fsdvh marked this conversation as resolved.
{
self.0.try_read().map(RwLockReadGuard)
}
#[cfg(not(feature = "parking_lot"))]
{
match self.0.try_read() {
Ok(guard) => Some(RwLockReadGuard(guard)),
Err(std::sync::TryLockError::WouldBlock) => None,
Err(std::sync::TryLockError::Poisoned(err)) => panic!("{}", err),
}
Comment on lines +108 to +124
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The non-blocking lock APIs are documented/implemented here as panicking on poison, but when the crate is built with the shuttle feature it uses the RwLock wrapper in src/shim.rs, whose try_read/try_write currently convert all errors (including poisoning) into None. That means Cache::try_* can silently treat a poisoned lock as mere contention under shuttle. Align the shuttle implementation with these semantics (panic on poison, None only for WouldBlock) so behavior is consistent across feature sets.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arthurprs I think this is a valid suggestion, but it's outside the scope of this PR, wdyt?

}
Comment thread
fsdvh marked this conversation as resolved.
}

/// Attempts to acquire this `RwLock` with exclusive write access without blocking.
///
/// Returns `Some(guard)` if the lock was acquired, or `None` if it is already
/// held by any readers or a writer.
#[inline]
pub fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
#[cfg(feature = "parking_lot")]
{
self.0.try_write().map(RwLockWriteGuard)
}
#[cfg(not(feature = "parking_lot"))]
{
match self.0.try_write() {
Ok(guard) => Some(RwLockWriteGuard(guard)),
Err(std::sync::TryLockError::WouldBlock) => None,
Err(std::sync::TryLockError::Poisoned(err)) => panic!("{}", err),
}
}
}

/// Locks this `RwLock` with exclusive write access, blocking the current
/// thread until it can be acquired.
///
Expand Down
253 changes: 253 additions & 0 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,29 @@ use crate::shard::EntryOrPlaceholder;
pub use crate::sync_placeholder::{EntryAction, EntryResult, GuardResult, PlaceholderGuard};
use crate::sync_placeholder::{JoinFuture, JoinResult};

/// The result of a non-blocking cache operation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContendedResult<Val> {
/// The operation succeeded. For read operations the inner value holds the lookup result;
/// for write operations it holds the lifecycle request state (or `()` for [`Cache::try_insert`]).
Ok(Val),
Comment thread
fsdvh marked this conversation as resolved.
Outdated
/// The shard lock could not be acquired without blocking. The operation was not performed.
Contended,
}

impl<Val> ContendedResult<Val> {
pub fn ok(self) -> Option<Val> {
match self {
ContendedResult::Ok(val) => Some(val),
ContendedResult::Contended => None,
}
}

pub fn is_contended(&self) -> bool {
matches!(self, ContendedResult::Contended)
}
}

/// A concurrent cache
///
/// The concurrent cache is internally composed of equally sized shards, each of which is independently
Expand Down Expand Up @@ -247,6 +270,23 @@ impl<
.is_some_and(|(shard, hash)| shard.read().contains(hash, key))
}

/// Attempts to check if a key exists in the cache without blocking.
/// Returns [`ContendedResult::Ok(true)`] if present, [`ContendedResult::Ok(false)`] if absent,
/// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking.
pub fn try_contains_key<Q>(&self, key: &Q) -> ContendedResult<bool>
where
Q: Hash + Equivalent<Key> + ?Sized,
Comment thread
fsdvh marked this conversation as resolved.
{
let Some((shard, hash)) = self.shard_for(key) else {
return ContendedResult::Ok(false);
};

shard
.try_read()
.map(|guard| ContendedResult::Ok(guard.contains(hash, key)))
.unwrap_or(ContendedResult::Contended)
}
Comment thread
fsdvh marked this conversation as resolved.

/// Fetches an item from the cache whose key is `key`.
pub fn get<Q>(&self, key: &Q) -> Option<Val>
where
Expand All @@ -256,6 +296,22 @@ impl<
shard.read().get(hash, key).cloned()
}

/// Attempts to fetch an item from the cache whose key is `key`.
/// Returns [`ContendedResult::Ok(Some(val))`] if the key is present, [`ContendedResult::Ok(None)`] if absent,
/// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking.
pub fn try_get<Q>(&self, key: &Q) -> ContendedResult<Option<Val>>
where
Q: Hash + Equivalent<Key> + ?Sized,
{
let Some((shard, hash)) = self.shard_for(key) else {
return ContendedResult::Ok(None);
};
shard
.try_read()
.map(|guard| ContendedResult::Ok(guard.get(hash, key).cloned()))
.unwrap_or(ContendedResult::Contended)
}

/// Peeks an item from the cache whose key is `key`.
/// Contrary to gets, peeks don't alter the key "hotness".
pub fn peek<Q>(&self, key: &Q) -> Option<Val>
Expand All @@ -266,6 +322,23 @@ impl<
shard.read().peek(hash, key).cloned()
}

/// Attempts to peek an item from the cache whose key is `key`.
/// Contrary to gets, peeks don't alter the key "hotness".
/// Returns [`ContendedResult::Ok(Some(val))`] if the key is present, [`ContendedResult::Ok(None)`] if absent,
/// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking.
pub fn try_peek<Q>(&self, key: &Q) -> ContendedResult<Option<Val>>
where
Q: Hash + Equivalent<Key> + ?Sized,
{
let Some((shard, hash)) = self.shard_for(key) else {
return ContendedResult::Ok(None);
};
shard
.try_read()
.map(|guard| ContendedResult::Ok(guard.peek(hash, key).cloned()))
.unwrap_or(ContendedResult::Contended)
}

/// Remove an item from the cache whose key is `key`.
/// Returns the removed entry, if any.
pub fn remove<Q>(&self, key: &Q) -> Option<(Key, Val)>
Expand All @@ -276,6 +349,23 @@ impl<
shard.write().remove(hash, key)
}

/// Attempts to remove an item from the cache whose key is `key`.
/// Returns [`ContendedResult::Ok(Some(entry))`] with the removed entry if present, [`ContendedResult::Ok(None)`] if absent,
/// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking.
pub fn try_remove<Q>(&self, key: &Q) -> ContendedResult<Option<(Key, Val)>>
where
Q: Hash + Equivalent<Key> + ?Sized,
{
let Some((shard, hash)) = self.shard_for(key) else {
return ContendedResult::Ok(None);
};

shard
.try_write()
.map(|mut guard| ContendedResult::Ok(guard.remove(hash, key)))
.unwrap_or(ContendedResult::Contended)
}

/// Remove an item from the cache whose key is `key` if `f(&value)` returns `true` for that entry.
/// Compared to peek and remove, this method guarantees that no new value was inserted in-between.
///
Expand Down Expand Up @@ -337,6 +427,18 @@ impl<
self.lifecycle.end_request(lcs);
}

/// Attempts to insert an item in the cache with key `key`.
/// Returns [`ContendedResult::Ok`] if the item was inserted, or [`ContendedResult::Contended`] if the shard lock was contended.
pub fn try_insert(&self, key: Key, value: Val) -> ContendedResult<()> {
match self.try_insert_with_lifecycle(key, value) {
ContendedResult::Ok(lcs) => {
self.lifecycle.end_request(lcs);
ContendedResult::Ok(())
}
ContendedResult::Contended => ContendedResult::Contended,
}
Comment thread
fsdvh marked this conversation as resolved.
Outdated
}

/// Inserts an item in the cache with key `key`.
pub fn insert_with_lifecycle(&self, key: Key, value: Val) -> L::RequestState {
let mut lcs = self.lifecycle.begin_request();
Expand All @@ -349,6 +451,28 @@ impl<
lcs
}

/// Attempts to insert an item in the cache with key `key`.
/// Returns [`ContendedResult::Ok`] with the lifecycle request state if the item was inserted,
/// or [`ContendedResult::Contended`] if the shard lock was contended.
pub fn try_insert_with_lifecycle(
&self,
key: Key,
value: Val,
) -> ContendedResult<L::RequestState> {
let (shard, hash) = self.shard_for(&key).unwrap();

shard
.try_write()
.map(|mut guard| {
let mut lcs = self.lifecycle.begin_request();
let result = guard.insert(&mut lcs, hash, key, value, InsertStrategy::Insert);
// result cannot err with the Insert strategy
debug_assert!(result.is_ok());
ContendedResult::Ok(lcs)
})
.unwrap_or(ContendedResult::Contended)
}

/// Clear all items from the cache
pub fn clear(&self) {
for s in self.shards.iter() {
Expand Down Expand Up @@ -1499,4 +1623,133 @@ mod tests {
}
}
}

// --- Non-blocking method tests ---

#[test]
fn test_contended_result_helpers() {
let ok: ContendedResult<i32> = ContendedResult::Ok(42);
Comment thread
fsdvh marked this conversation as resolved.
Outdated
assert!(!ok.is_contended());
assert_eq!(ok.ok(), Some(42));

let contended: ContendedResult<i32> = ContendedResult::Contended;
assert!(contended.is_contended());
assert_eq!(contended.ok(), None);
}

#[test]
fn test_try_contains_key() {
let cache = Cache::new(100);
cache.insert(1, 10);

assert_eq!(cache.try_contains_key(&1), ContendedResult::Ok(true));
assert_eq!(cache.try_contains_key(&2), ContendedResult::Ok(false));
}

#[test]
fn test_try_contains_key_contended() {
let cache = Cache::new(100);
cache.insert(1, 10);
// Hold write locks on all shards so try_read is blocked.
let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect();
assert_eq!(cache.try_contains_key(&1), ContendedResult::Contended);
}

#[test]
fn test_try_get() {
let cache = Cache::new(100);
cache.insert(1, 10);

assert_eq!(cache.try_get(&1), ContendedResult::Ok(Some(10)));
assert_eq!(cache.try_get(&2), ContendedResult::Ok(None));
}

#[test]
fn test_try_get_contended() {
let cache = Cache::new(100);
cache.insert(1, 10);
let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect();
assert_eq!(cache.try_get(&1), ContendedResult::Contended);
}

#[test]
fn test_try_peek() {
let cache = Cache::new(100);
cache.insert(1, 10);

assert_eq!(cache.try_peek(&1), ContendedResult::Ok(Some(10)));
assert_eq!(cache.try_peek(&2), ContendedResult::Ok(None));
}

#[test]
fn test_try_peek_contended() {
let cache = Cache::new(100);
cache.insert(1, 10);
let _guards: Vec<_> = cache.shards.iter().map(|s| s.write()).collect();
assert_eq!(cache.try_peek(&1), ContendedResult::Contended);
}

#[test]
fn test_try_remove() {
let cache = Cache::new(100);
cache.insert(1, 10);

assert_eq!(cache.try_remove(&1), ContendedResult::Ok(Some((1, 10))));
assert_eq!(cache.try_remove(&1), ContendedResult::Ok(None));
assert_eq!(cache.try_remove(&99), ContendedResult::Ok(None));
}

#[test]
fn test_try_remove_contended() {
let cache = Cache::new(100);
cache.insert(1, 10);
// Hold read locks on all shards so try_write is blocked.
let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect();
assert_eq!(cache.try_remove(&1), ContendedResult::Contended);
drop(guards);
// Item must still be present since the remove did not happen.
assert_eq!(cache.get(&1), Some(10));
}

#[test]
fn test_try_insert() {
let cache = Cache::new(100);

assert_eq!(cache.try_insert(1, 10), ContendedResult::Ok(()));
assert_eq!(cache.get(&1), Some(10));

// Insert same key overwrites the previous value.
assert_eq!(cache.try_insert(1, 20), ContendedResult::Ok(()));
assert_eq!(cache.get(&1), Some(20));
}

#[test]
fn test_try_insert_contended() {
let cache = Cache::new(100);
let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect();
assert_eq!(cache.try_insert(1, 10), ContendedResult::Contended);
drop(guards);
assert_eq!(cache.get(&1), None);
}

#[test]
fn test_try_insert_with_lifecycle() {
let cache = Cache::new(100);

// Successful insert returns the lifecycle request state.
let result = cache.try_insert_with_lifecycle(1, 10);
assert!(!result.is_contended());
let lcs = result.ok().unwrap();
cache.lifecycle.end_request(lcs);
assert_eq!(cache.get(&1), Some(10));

// Contended when a read lock is held.
let guards: Vec<_> = cache.shards.iter().map(|s| s.read()).collect();
assert_eq!(
cache.try_insert_with_lifecycle(2, 20),
ContendedResult::Contended
);
drop(guards);
assert_eq!(cache.get(&2), None);
}
}
Loading