Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions hitbox-backend/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! - [`Backend`] - Low-level dyn-compatible trait for raw byte operations
//! - [`CacheBackend`] - High-level trait with typed operations (automatic via blanket impl)

use std::borrow::Cow;
use std::{future::Future, sync::Arc};

use async_trait::async_trait;
Expand Down Expand Up @@ -266,13 +267,13 @@ pub trait CacheBackend: Backend {
let format = self.value_format();

let decompress_timer = Timer::new();
let decompressed = self.compressor().decompress(&raw_data)?;
let decompressed = self.compressor().decompress(Cow::Borrowed(&raw_data))?;
crate::metrics::record_decompress(
backend_label.as_str(),
decompress_timer.elapsed(),
);

let decompressed_bytes = Bytes::from(decompressed);
let decompressed_bytes = Bytes::from(decompressed.into_owned());

// Deserialize using with_deserializer - context may be upgraded
let deserialize_timer = Timer::new();
Expand Down Expand Up @@ -348,7 +349,9 @@ pub trait CacheBackend: Backend {
crate::metrics::record_serialize(backend_label.as_str(), serialize_timer.elapsed());

let compress_timer = Timer::new();
let compressed_value = self.compressor().compress(&serialized_value)?;
let compressed_value = self
.compressor()
.compress(Cow::Borrowed(&serialized_value))?;
crate::metrics::record_compress(backend_label.as_str(), compress_timer.elapsed());

let compressed_len = compressed_value.len();
Expand All @@ -357,7 +360,11 @@ pub trait CacheBackend: Backend {
let result = self
.write(
key,
CacheValue::new(Bytes::from(compressed_value), value.expire(), value.stale()),
CacheValue::new(
Bytes::from(compressed_value.into_owned()),
value.expire(),
value.stale(),
),
)
.await;
crate::metrics::record_write(backend_label.as_str(), write_timer.elapsed());
Expand Down
35 changes: 21 additions & 14 deletions hitbox-backend/src/composition/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::Arc;
use bytes::Bytes;
use hitbox_core::{BoxContext, CacheValue, Raw, ReadMode};
use smol_str::SmolStr;
use std::borrow::Cow;

use super::context::{CompositionContext, CompositionLayer, upgrade_context};
use super::envelope::CompositionEnvelope;
Expand Down Expand Up @@ -100,11 +101,11 @@ impl CompositionFormat {
// Compress
let compress_timer = Timer::new();
let compressed = compressor
.compress(&serialized)
.compress(Cow::Borrowed(&serialized))
.map_err(|e| FormatError::Serialize(Box::new(e)))?;
crate::metrics::record_compress(label, compress_timer.elapsed());

Ok(Bytes::from(compressed))
Ok(Bytes::from(compressed.into_owned()))
}

/// Serialize data for both layers and return raw compressed bytes without Envelope.
Expand All @@ -129,7 +130,7 @@ impl CompositionFormat {
let l1_compress_timer = Timer::new();
let l1_compressed = self
.l1_compressor
.compress(&l1_serialized)
.compress(Cow::Borrowed(&l1_serialized))
.map_err(|e| FormatError::Serialize(Box::new(e)))?;
crate::metrics::record_compress(&self.l1_label, l1_compress_timer.elapsed());

Expand All @@ -147,11 +148,14 @@ impl CompositionFormat {
let l2_compress_timer = Timer::new();
let l2_compressed = self
.l2_compressor
.compress(&l2_serialized)
.compress(Cow::Borrowed(&l2_serialized))
.map_err(|e| FormatError::Serialize(Box::new(e)))?;
crate::metrics::record_compress(&self.l2_label, l2_compress_timer.elapsed());

Ok((Bytes::from(l1_compressed), Bytes::from(l2_compressed)))
Ok((
Bytes::from(l1_compressed.into_owned()),
Bytes::from(l2_compressed.into_owned()),
))
}

/// Deserialize data from a specific layer.
Expand All @@ -175,7 +179,7 @@ impl CompositionFormat {
// Decompress
let decompress_timer = Timer::new();
let decompressed = compressor
.decompress(data)
.decompress(Cow::Borrowed(data))
.map_err(|e| FormatError::Deserialize(Box::new(e)))?;
crate::metrics::record_decompress(label, decompress_timer.elapsed());

Expand Down Expand Up @@ -214,12 +218,15 @@ impl Format for CompositionFormat {
let compress_timer = Timer::new();
let l1_compressed = self
.l1_compressor
.compress(&l1_serialized)
.compress(Cow::Borrowed(&l1_serialized))
.map_err(|e| FormatError::Serialize(Box::new(e)))?;
crate::metrics::record_compress(&self.l1_label, compress_timer.elapsed());

let composition =
CompositionEnvelope::L1(CacheValue::new(Bytes::from(l1_compressed), None, None));
let composition = CompositionEnvelope::L1(CacheValue::new(
Bytes::from(l1_compressed.into_owned()),
None,
None,
));

return composition
.serialize()
Expand All @@ -235,7 +242,7 @@ impl Format for CompositionFormat {
let l1_compress_timer = Timer::new();
let l1_compressed = self
.l1_compressor
.compress(&l1_serialized)
.compress(Cow::Borrowed(&l1_serialized))
.map_err(|e| FormatError::Serialize(Box::new(e)))?;
crate::metrics::record_compress(&self.l1_label, l1_compress_timer.elapsed());

Expand All @@ -253,14 +260,14 @@ impl Format for CompositionFormat {
let l2_compress_timer = Timer::new();
let l2_compressed = self
.l2_compressor
.compress(&l2_serialized)
.compress(Cow::Borrowed(&l2_serialized))
.map_err(|e| FormatError::Serialize(Box::new(e)))?;
crate::metrics::record_compress(&self.l2_label, l2_compress_timer.elapsed());

// Pack both compressed values into CompositionEnvelope
let composition = CompositionEnvelope::Both {
l1: CacheValue::new(Bytes::from(l1_compressed), None, None),
l2: CacheValue::new(Bytes::from(l2_compressed), None, None),
l1: CacheValue::new(Bytes::from(l1_compressed.into_owned()), None, None),
l2: CacheValue::new(Bytes::from(l2_compressed.into_owned()), None, None),
};

// Serialize the CompositionEnvelope using zero-copy repr(C) format
Expand Down Expand Up @@ -313,7 +320,7 @@ impl Format for CompositionFormat {
// Decompress the data
let decompress_timer = Timer::new();
let decompressed = compressor
.decompress(compressed_data.as_ref())
.decompress(Cow::Borrowed(compressed_data.as_ref()))
.map_err(|e| FormatError::Deserialize(Box::new(e)))?;
crate::metrics::record_decompress(label, decompress_timer.elapsed());

Expand Down
27 changes: 15 additions & 12 deletions hitbox-backend/src/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//! | `GzipCompressor` | Good | Medium | `gzip` |
//! | `ZstdCompressor` | Best | Fast | `zstd` |

use std::borrow::Cow;
use thiserror::Error;

/// Error type for compression operations.
Expand All @@ -31,22 +32,22 @@ pub enum CompressionError {
/// and `Arc<dyn Compressor>`.
pub trait Compressor: Send + Sync + std::fmt::Debug {
/// Compress the input data.
fn compress(&self, data: &[u8]) -> Result<Vec<u8>, CompressionError>;
fn compress<'a>(&self, data: Cow<'a, [u8]>) -> Result<Cow<'a, [u8]>, CompressionError>;

/// Decompress the input data.
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, CompressionError>;
fn decompress<'a>(&self, data: Cow<'a, [u8]>) -> Result<Cow<'a, [u8]>, CompressionError>;

/// Clone this compressor into a box.
fn clone_box(&self) -> Box<dyn Compressor>;
}

// Blanket implementation for Box<dyn Compressor>
impl Compressor for Box<dyn Compressor> {
fn compress(&self, data: &[u8]) -> Result<Vec<u8>, CompressionError> {
fn compress<'a>(&self, data: Cow<'a, [u8]>) -> Result<Cow<'a, [u8]>, CompressionError> {
(**self).compress(data)
}

fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, CompressionError> {
fn decompress<'a>(&self, data: Cow<'a, [u8]>) -> Result<Cow<'a, [u8]>, CompressionError> {
(**self).decompress(data)
}

Expand All @@ -57,11 +58,11 @@ impl Compressor for Box<dyn Compressor> {

// Blanket implementation for Arc<dyn Compressor>
impl Compressor for std::sync::Arc<dyn Compressor> {
fn compress(&self, data: &[u8]) -> Result<Vec<u8>, CompressionError> {
fn compress<'a>(&self, data: Cow<'a, [u8]>) -> Result<Cow<'a, [u8]>, CompressionError> {
(**self).compress(data)
}

fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, CompressionError> {
fn decompress<'a>(&self, data: Cow<'a, [u8]>) -> Result<Cow<'a, [u8]>, CompressionError> {
(**self).decompress(data)
}

Expand All @@ -75,12 +76,12 @@ impl Compressor for std::sync::Arc<dyn Compressor> {
pub struct PassthroughCompressor;

impl Compressor for PassthroughCompressor {
fn compress(&self, data: &[u8]) -> Result<Vec<u8>, CompressionError> {
Ok(data.to_vec())
fn compress<'a>(&self, data: Cow<'a, [u8]>) -> Result<Cow<'a, [u8]>, CompressionError> {
Ok(data)
}

fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, CompressionError> {
Ok(data.to_vec())
fn decompress<'a>(&self, data: Cow<'a, [u8]>) -> Result<Cow<'a, [u8]>, CompressionError> {
Ok(data)
}

fn clone_box(&self) -> Box<dyn Compressor> {
Expand Down Expand Up @@ -208,10 +209,12 @@ mod tests {
let compressor = PassthroughCompressor;
let data = b"Hello, World!";

let compressed = compressor.compress(data).unwrap();
let compressed = compressor.compress(Cow::Borrowed(data)).unwrap();

let data = Cow::from(data);
assert_eq!(compressed, data);

let decompressed = compressor.decompress(&compressed).unwrap();
let decompressed = compressor.decompress(compressed).unwrap();
assert_eq!(decompressed, data);
}

Expand Down
17 changes: 9 additions & 8 deletions hitbox-test/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use hitbox_core::{
EntityPolicyConfig, ResponseCachePolicy,
};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;

/// Test response type for backend testing
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -501,12 +502,12 @@ pub async fn test_url_encoded_key_json_value<B: Backend + CacheBackend>(backend:
// Decompress the data before validating format
let decompressed = backend
.compressor()
.decompress(raw_value.data())
.decompress(Cow::Borrowed(raw_value.data()))
.expect("failed to decompress");

// Verify it's valid JSON
let as_string =
String::from_utf8(decompressed.clone()).expect("Value should be valid UTF-8 JSON");
String::from_utf8(decompressed.into_owned()).expect("Value should be valid UTF-8 JSON");
assert!(
as_string.contains("\"id\"") || as_string.contains("id"),
"Value should contain JSON fields"
Expand Down Expand Up @@ -550,11 +551,11 @@ pub async fn test_url_encoded_key_bincode_value<B: Backend + CacheBackend>(backe
// Decompress the data before validating format
let decompressed = backend
.compressor()
.decompress(raw_value.data())
.decompress(Cow::Borrowed(raw_value.data()))
.expect("failed to decompress");

// Verify it's NOT readable JSON (binary format)
let as_string = String::from_utf8(decompressed.clone());
let as_string = String::from_utf8(decompressed.into_owned());
assert!(
as_string.is_err() || !as_string.unwrap().contains("\"id\""),
"Value should be in Bincode format (binary), not JSON"
Expand Down Expand Up @@ -598,12 +599,12 @@ pub async fn test_bitcode_key_json_value<B: Backend + CacheBackend>(backend: &B)
// Decompress the data before validating format
let decompressed = backend
.compressor()
.decompress(raw_value.data())
.decompress(Cow::Borrowed(raw_value.data()))
.expect("failed to decompress");

// Verify value is JSON
let as_string =
String::from_utf8(decompressed.clone()).expect("Value should be valid UTF-8 JSON");
String::from_utf8(decompressed.into_owned()).expect("Value should be valid UTF-8 JSON");
assert!(
as_string.contains("\"id\"") || as_string.contains("id"),
"Value should be in JSON format"
Expand Down Expand Up @@ -646,11 +647,11 @@ pub async fn test_bitcode_key_bincode_value<B: Backend + CacheBackend>(backend:
// Decompress the data before validating format
let decompressed = backend
.compressor()
.decompress(raw_value.data())
.decompress(Cow::Borrowed(raw_value.data()))
.expect("failed to decompress");

// Verify value is binary Bincode
let as_string = String::from_utf8(decompressed.clone());
let as_string = String::from_utf8(decompressed.into_owned());
assert!(
as_string.is_err() || !as_string.unwrap().contains("\"id\""),
"Value should be in Bincode format (binary), not JSON"
Expand Down
Loading