Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions rocketmq-client/benches/produce_accumulator_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//!
//! Run with: cargo bench --bench produce_accumulator_benchmark

use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::hint::black_box;
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -211,6 +212,66 @@ async fn bench_capacity_reservation(
start.elapsed()
}

#[derive(Clone, Eq, PartialEq)]
struct BenchDeadline {
deadline_tick: usize,
sequence: usize,
}

impl Ord for BenchDeadline {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other
.deadline_tick
.cmp(&self.deadline_tick)
.then_with(|| other.sequence.cmp(&self.sequence))
}
}

impl PartialOrd for BenchDeadline {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

fn bench_guard_full_scan_low_activity(num_keys: usize, ticks: usize) -> usize {
let mut deadlines = vec![ticks + 1; num_keys];
deadlines[0] = ticks / 2;
let mut active = vec![true; num_keys];
let mut processed = 0usize;

for tick in 0..ticks {
for index in 0..num_keys {
if active[index] && deadlines[index] <= tick {
active[index] = false;
processed += 1;
}
}
}

black_box(processed)
}

fn bench_guard_deadline_heap_low_activity(num_keys: usize, ticks: usize) -> usize {
let mut heap = BinaryHeap::new();
for sequence in 0..num_keys {
let deadline_tick = if sequence == 0 { ticks / 2 } else { ticks + 1 };
heap.push(BenchDeadline {
deadline_tick,
sequence,
});
}

let mut processed = 0usize;
for tick in 0..ticks {
while heap.peek().is_some_and(|deadline| deadline.deadline_tick <= tick) {
heap.pop();
processed += 1;
}
}

black_box(processed)
}

/// Benchmark: Concurrent reads and writes with HashMap
async fn bench_hashmap_mutex_mixed_ops(num_keys: usize, read_ops: usize, write_ops: usize) -> Duration {
let map: Arc<Mutex<HashMap<TestKey, Arc<Mutex<TestBatch>>>>> = Arc::new(Mutex::new(HashMap::new()));
Expand Down Expand Up @@ -379,6 +440,22 @@ fn bench_capacity_reservation_control(c: &mut Criterion) {
group.finish();
}

fn bench_guard_deadline_scheduler(c: &mut Criterion) {
let mut group = c.benchmark_group("guard_deadline_scheduler");
let num_keys = 64;
let ticks = 1024;

group.throughput(Throughput::Elements((num_keys * ticks) as u64));
group.bench_function("FullScan_64keys_low_activity", |b| {
b.iter(|| bench_guard_full_scan_low_activity(num_keys, ticks));
});
group.bench_function("DeadlineHeap_64keys_low_activity", |b| {
b.iter(|| bench_guard_deadline_heap_low_activity(num_keys, ticks));
});

group.finish();
}

/// Benchmark Group: Mixed Read/Write Operations
fn bench_mixed_operations(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
Expand Down Expand Up @@ -464,6 +541,7 @@ criterion_group!(
benches,
bench_concurrent_insert,
bench_capacity_reservation_control,
bench_guard_deadline_scheduler,
bench_mixed_operations,
bench_multi_topic_scenario,
bench_high_contention
Expand Down
Loading
Loading