Skip to content

Commit 171bb7f

Browse files
committed
Optimize sending of historical events
This batching reduces the duration to stream 100k events 3x. In this function we only care about throughput, not latency.
1 parent ad6f239 commit 171bb7f

1 file changed

Lines changed: 10 additions & 2 deletions

File tree

  • crates/hyperqueue/src/server/client

crates/hyperqueue/src/server/client/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,16 @@ async fn stream_history_events<Tx: Sink<ToClientMessage, Error = HqError> + Unpi
8282
mut history: mpsc::UnboundedReceiver<Event>,
8383
) {
8484
log::debug!("Resending history started");
85-
while let Some(e) = history.recv().await {
86-
if tx.send(ToClientMessage::Event(e)).await.is_err() {
85+
86+
let mut events = Vec::with_capacity(1024);
87+
let capacity = events.capacity();
88+
while history.recv_many(&mut events, capacity).await != 0 {
89+
let events = std::mem::replace(&mut events, Vec::with_capacity(capacity));
90+
if tx
91+
.send_all(&mut futures::stream::iter(events).map(|e| Ok(ToClientMessage::Event(e))))
92+
.await
93+
.is_err()
94+
{
8795
return;
8896
}
8997
}

0 commit comments

Comments
 (0)