Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ rust-version = "1.71"

[features]
default = ["ahash", "parking_lot"]
non-blocking = []
sharded-lock = ["dep:crossbeam-utils"]
shuttle = ["dep:shuttle"]
stats = []
Comment thread
fsdvh marked this conversation as resolved.
Expand Down
34 changes: 34 additions & 0 deletions src/rw_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,40 @@ impl<T: ?Sized> RwLock<T> {
})
}

/// Attempts to acquire this `RwLock` with shared read access without blocking.
///
/// Returns `Some(guard)` if the lock was acquired, or `None` if it is already
/// held by a writer.
#[cfg(feature = "non-blocking")]
#[inline]
pub fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
Comment thread
fsdvh marked this conversation as resolved.
#[cfg(feature = "parking_lot")]
Comment thread
fsdvh marked this conversation as resolved.
{
self.0.try_read().map(RwLockReadGuard)
}
#[cfg(not(feature = "parking_lot"))]
{
self.0.try_read().ok().map(RwLockReadGuard)
}
Comment thread
fsdvh marked this conversation as resolved.
}

/// Attempts to acquire this `RwLock` with exclusive write access without blocking.
///
/// Returns `Some(guard)` if the lock was acquired, or `None` if it is already
/// held by any readers or a writer.
#[cfg(feature = "non-blocking")]
#[inline]
pub fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
#[cfg(feature = "parking_lot")]
{
self.0.try_write().map(RwLockWriteGuard)
}
#[cfg(not(feature = "parking_lot"))]
{
self.0.try_write().ok().map(RwLockWriteGuard)
Comment thread
fsdvh marked this conversation as resolved.
Outdated
}
}

/// Locks this `RwLock` with exclusive write access, blocking the current
/// thread until it can be acquired.
///
Expand Down
132 changes: 132 additions & 0 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,31 @@ use crate::shard::EntryOrPlaceholder;
pub use crate::sync_placeholder::{EntryAction, EntryResult, GuardResult, PlaceholderGuard};
use crate::sync_placeholder::{JoinFuture, JoinResult};

#[cfg(feature = "non-blocking")]
/// The result of a non-blocking cache operation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContendedResult<Val> {
/// The operation succeeded. For read operations the inner value holds the lookup result;
/// for write operations it holds the lifecycle request state (or `()` for [`Cache::try_insert`]).
Ok(Val),
Comment thread
fsdvh marked this conversation as resolved.
Outdated
/// The shard lock could not be acquired without blocking. The operation was not performed.
Contended,
}

#[cfg(feature = "non-blocking")]
impl<Val> ContendedResult<Val> {
pub fn ok(self) -> Option<Val> {
match self {
ContendedResult::Ok(val) => Some(val),
ContendedResult::Contended => None,
}
}

pub fn is_contended(&self) -> bool {
matches!(self, ContendedResult::Contended)
}
}

/// A concurrent cache
///
/// The concurrent cache is internally composed of equally sized shards, each of which is independently
Expand Down Expand Up @@ -247,6 +272,24 @@ impl<
.is_some_and(|(shard, hash)| shard.read().contains(hash, key))
}

/// Attempts to check if a key exists in the cache without blocking.
/// Returns [`ContendedResult::Ok(true)`] if present, [`ContendedResult::Ok(false)`] if absent,
/// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking.
#[cfg(feature = "non-blocking")]
pub fn try_contains_key<Q>(&self, key: &Q) -> ContendedResult<bool>
where
Q: Hash + Equivalent<Key> + ?Sized,
Comment thread
fsdvh marked this conversation as resolved.
{
let Some((shard, hash)) = self.shard_for(key) else {
return ContendedResult::Ok(false);
};

shard
.try_read()
.map(|guard| ContendedResult::Ok(guard.contains(hash, key)))
.unwrap_or(ContendedResult::Contended)
}
Comment thread
fsdvh marked this conversation as resolved.

/// Fetches an item from the cache whose key is `key`.
pub fn get<Q>(&self, key: &Q) -> Option<Val>
where
Expand All @@ -256,6 +299,23 @@ impl<
shard.read().get(hash, key).cloned()
}

/// Attempts to fetch an item from the cache whose key is `key`.
/// Returns [`ContendedResult::Ok(Some(val))`] if the key is present, [`ContendedResult::Ok(None)`] if absent,
/// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking.
#[cfg(feature = "non-blocking")]
pub fn try_get<Q>(&self, key: &Q) -> ContendedResult<Option<Val>>
where
Q: Hash + Equivalent<Key> + ?Sized,
{
let Some((shard, hash)) = self.shard_for(key) else {
return ContendedResult::Ok(None);
};
shard
.try_read()
.map(|guard| ContendedResult::Ok(guard.get(hash, key).cloned()))
.unwrap_or(ContendedResult::Contended)
}

/// Peeks an item from the cache whose key is `key`.
/// Contrary to gets, peeks don't alter the key "hotness".
pub fn peek<Q>(&self, key: &Q) -> Option<Val>
Expand All @@ -266,6 +326,24 @@ impl<
shard.read().peek(hash, key).cloned()
}

/// Attempts to peek an item from the cache whose key is `key`.
/// Contrary to gets, peeks don't alter the key "hotness".
/// Returns [`ContendedResult::Ok(Some(val))`] if the key is present, [`ContendedResult::Ok(None)`] if absent,
/// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking.
#[cfg(feature = "non-blocking")]
pub fn try_peek<Q>(&self, key: &Q) -> ContendedResult<Option<Val>>
where
Q: Hash + Equivalent<Key> + ?Sized,
{
let Some((shard, hash)) = self.shard_for(key) else {
return ContendedResult::Ok(None);
};
shard
.try_read()
.map(|guard| ContendedResult::Ok(guard.peek(hash, key).cloned()))
.unwrap_or(ContendedResult::Contended)
}

/// Remove an item from the cache whose key is `key`.
/// Returns the removed entry, if any.
pub fn remove<Q>(&self, key: &Q) -> Option<(Key, Val)>
Expand All @@ -276,6 +354,24 @@ impl<
shard.write().remove(hash, key)
}

/// Attempts to remove an item from the cache whose key is `key`.
/// Returns [`ContendedResult::Ok(Some(entry))`] with the removed entry if present, [`ContendedResult::Ok(None)`] if absent,
/// or [`ContendedResult::Contended`] if the shard lock could not be acquired without blocking.
#[cfg(feature = "non-blocking")]
pub fn try_remove<Q>(&self, key: &Q) -> ContendedResult<Option<(Key, Val)>>
where
Q: Hash + Equivalent<Key> + ?Sized,
{
let Some((shard, hash)) = self.shard_for(key) else {
return ContendedResult::Ok(None);
};

shard
.try_write()
.map(|mut guard| ContendedResult::Ok(guard.remove(hash, key)))
.unwrap_or(ContendedResult::Contended)
}

/// Remove an item from the cache whose key is `key` if `f(&value)` returns `true` for that entry.
/// Compared to peek and remove, this method guarantees that no new value was inserted in-between.
///
Expand Down Expand Up @@ -337,6 +433,19 @@ impl<
self.lifecycle.end_request(lcs);
}

/// Attempts to insert an item in the cache with key `key`.
/// Returns [`ContendedResult::Ok`] if the item was inserted, or [`ContendedResult::Contended`] if the shard lock was contended.
#[cfg(feature = "non-blocking")]
pub fn try_insert(&self, key: Key, value: Val) -> ContendedResult<()> {
match self.try_insert_with_lifecycle(key, value) {
ContendedResult::Ok(lcs) => {
self.lifecycle.end_request(lcs);
ContendedResult::Ok(())
}
ContendedResult::Contended => ContendedResult::Contended,
}
}

/// Inserts an item in the cache with key `key`.
pub fn insert_with_lifecycle(&self, key: Key, value: Val) -> L::RequestState {
let mut lcs = self.lifecycle.begin_request();
Expand All @@ -349,6 +458,29 @@ impl<
lcs
}

/// Attempts to insert an item in the cache with key `key`.
/// Returns [`ContendedResult::Ok`] with the lifecycle request state if the item was inserted,
/// or [`ContendedResult::Contended`] if the shard lock was contended.
#[cfg(feature = "non-blocking")]
pub fn try_insert_with_lifecycle(
&self,
key: Key,
value: Val,
) -> ContendedResult<L::RequestState> {
let (shard, hash) = self.shard_for(&key).unwrap();

shard
.try_write()
.map(|mut guard| {
let mut lcs = self.lifecycle.begin_request();
let result = guard.insert(&mut lcs, hash, key, value, InsertStrategy::Insert);
// result cannot err with the Insert strategy
debug_assert!(result.is_ok());
ContendedResult::Ok(lcs)
})
.unwrap_or(ContendedResult::Contended)
}

/// Clear all items from the cache
pub fn clear(&self) {
for s in self.shards.iter() {
Expand Down