Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ cli = ["arrow-array/chrono-tz", "arrow-cast/prettyprint", "tonic/tls-webpki-root
[dev-dependencies]
arrow-cast = { workspace = true, features = ["prettyprint"] }
assert_cmd = "2.0.8"
criterion = { workspace = true, default-features = false, features = ["async_tokio"] }
http = "1.1.0"
http-body = "1.0.0"
hyper-util = "0.1"
Expand Down Expand Up @@ -105,3 +106,18 @@ required-features = ["flight-sql", "tls-ring"]
name = "flight_sql_client_cli"
path = "tests/flight_sql_client_cli.rs"
required-features = ["cli", "flight-sql", "tls-ring"]

[[bench]]
name = "flight_encode"
path = "benches/flight_encode.rs"
harness = false

[[bench]]
name = "flight_decode"
path = "benches/flight_decode.rs"
harness = false

[[bench]]
name = "roundtrip"
path = "benches/roundtrip.rs"
harness = false
153 changes: 153 additions & 0 deletions arrow-flight/benches/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/ Licensed to the Apache Software Foundation (ASF) under one
Comment thread
Rich-T-kid marked this conversation as resolved.
Outdated
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::{Arc, RwLock};

use arrow_array::{
Array, ArrayRef, DictionaryArray, Int32Array, Int64Array, ListArray, RecordBatch, StringArray,
types::Int32Type,
};
use arrow_buffer::OffsetBuffer;
use arrow_flight::{
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult, Ticket,
flight_service_server::{FlightService, FlightServiceServer},
};
use arrow_schema::{DataType, Field, Schema};
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt, stream::BoxStream};
use hyper_util::rt::TokioIo;
use tonic::{
Request, Response, Status, Streaming,
transport::{Channel, Endpoint, Server},
};

pub type Builder = fn(usize) -> ArrayRef;

pub const TYPES: &[(&str, Builder)] = &[
("fixed", fixed),
("nested", nested),
("variable", variable),
("dict", dict),
];

fn fixed(n: usize) -> ArrayRef {
Arc::new(Int64Array::from_iter_values(0..n as i64))
}

fn variable(n: usize) -> ArrayRef {
Arc::new(StringArray::from_iter_values(
(0..n).map(|i| format!("variable_string_{i}{}", "_".repeat(i % 16))),
))
}

fn nested(n: usize) -> ArrayRef {
let values = Int32Array::from_iter_values(0..(n * 4) as i32);
let offsets = OffsetBuffer::<i32>::from_lengths(std::iter::repeat_n(4usize, n));
let field = Arc::new(Field::new_list_field(DataType::Int32, false));
Arc::new(ListArray::new(field, offsets, Arc::new(values), None))
}

fn dict(n: usize) -> ArrayRef {
let keys = Int32Array::from_iter_values((0..n).map(|i| (i % 32) as i32));
let values = StringArray::from_iter_values((0..32).map(|i| format!("dictionary_value_{i:03}")));
Arc::new(DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap())
}

pub fn build_batch(name: &str, rows: usize, cols: usize, build: Builder) -> RecordBatch {
let arrays: Vec<ArrayRef> = (0..cols).map(|_| build(rows)).collect();
let fields: Vec<Field> = arrays
.iter()
.enumerate()
.map(|(i, a)| Field::new(format!("column_{i}_{name}"), a.data_type().clone(), false))
.collect();
RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).unwrap()
}

#[derive(Clone, Default)]
pub struct BenchServer {
frames: Arc<RwLock<Vec<FlightData>>>,
}

impl BenchServer {
#[allow(dead_code)]
pub fn set_frames(&self, frames: Vec<FlightData>) {
*self.frames.write().unwrap() = frames;
}
}

fn unimpl<T>() -> Result<T, Status> {
Err(Status::unimplemented(""))
}

#[rustfmt::skip]
#[tonic::async_trait]
impl FlightService for BenchServer {
type HandshakeStream = BoxStream<'static, Result<HandshakeResponse, Status>>;
type ListFlightsStream = BoxStream<'static, Result<FlightInfo, Status>>;
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
type DoPutStream = BoxStream<'static, Result<PutResult, Status>>;
type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status>>;
type ListActionsStream = BoxStream<'static, Result<ActionType, Status>>;
type DoExchangeStream = BoxStream<'static, Result<FlightData, Status>>;

async fn do_get(&self, _: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
let frames = self.frames.read().unwrap().clone();
Ok(Response::new(futures::stream::iter(frames.into_iter().map(Ok)).boxed()))
}

async fn do_put(&self, req: Request<Streaming<FlightData>>) -> Result<Response<Self::DoPutStream>, Status> {
let _: Vec<FlightData> = req.into_inner().try_collect().await?;
let ack = PutResult { app_metadata: Bytes::new() };
Ok(Response::new(futures::stream::iter([Ok(ack)]).boxed()))
}

async fn do_exchange(&self, req: Request<Streaming<FlightData>>) -> Result<Response<Self::DoExchangeStream>, Status> {
Ok(Response::new(req.into_inner().boxed()))
}

async fn handshake(&self, _: Request<Streaming<HandshakeRequest>>) -> Result<Response<Self::HandshakeStream>, Status> { unimpl() }
async fn list_flights(&self, _: Request<Criteria>) -> Result<Response<Self::ListFlightsStream>, Status> { unimpl() }
async fn get_flight_info(&self, _: Request<FlightDescriptor>) -> Result<Response<FlightInfo>, Status> { unimpl() }
async fn poll_flight_info(&self, _: Request<FlightDescriptor>) -> Result<Response<PollInfo>, Status> { unimpl() }
async fn get_schema(&self, _: Request<FlightDescriptor>) -> Result<Response<SchemaResult>, Status> { unimpl() }
async fn do_action(&self, _: Request<Action>) -> Result<Response<Self::DoActionStream>, Status> { unimpl() }
async fn list_actions(&self, _: Request<Empty>) -> Result<Response<Self::ListActionsStream>, Status> { unimpl() }
}
pub async fn start_server() -> (Channel, BenchServer) {
const DUMMY_URL: &str = "http://localhost:50051";

let bench_server = BenchServer::default();

let (client, server) = tokio::io::duplex(1024 * 1024);

let mut client = Some(client);
let channel = Endpoint::try_from(DUMMY_URL)
.expect("Invalid dummy URL for building an endpoint. This should never happen")
.connect_with_connector_lazy(tower::service_fn(move |_| {
let client = client
.take()
.expect("Client taken twice. This should never happen");
async move { Ok::<_, std::io::Error>(TokioIo::new(client)) }
}));
tokio::spawn(
Server::builder()
.add_service(FlightServiceServer::new(bench_server.clone()))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))),
);
(channel, bench_server)
}
Comment thread
Rich-T-kid marked this conversation as resolved.
Outdated
Empty file.
63 changes: 63 additions & 0 deletions arrow-flight/benches/flight_encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::RecordBatch;
use arrow_flight::{FlightClient, encode::FlightDataEncoderBuilder};
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use futures::TryStreamExt;
use tonic::transport::Channel;

mod common;
use common::{TYPES, build_batch, start_server};

const ROWS: [usize; 2] = [8 * 1024, 64 * 1024];
const COLS: [usize; 2] = [1, 8];

async fn send(channel: Channel, batch: RecordBatch) {
let mut client = FlightClient::new(channel);
let frames = FlightDataEncoderBuilder::new().build(futures::stream::iter([Ok(batch)]));
let _: Vec<_> = client
.do_put(frames)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be benchmarking do_put not really flight_encode 🤔

@Rich-T-kid Rich-T-kid Jun 2, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I wanted to do an end-to-end benchmark, but I think it makes more sense to have smaller, more focused benchmarks. I updated the benchmarks to include one round-trip benchmark and another that benchmarks FlightDataEncoder::encode_batch() directly. Other benchmarks can be added later organically.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image 6-2-26 at 3 57 PM majority of the time is being spent int arrow-ipc as expected, this is what #10044 aims to address. its not directly wired up to to arrow-flight yet, but the results seems to be good so far.

.await
.unwrap()
.try_collect()
.await
.unwrap();
}

fn bench(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let (channel, _) = rt.block_on(start_server());
let mut g = c.benchmark_group("flight_encode");

for &(name, build) in TYPES {
for &rows in &ROWS {
for &cols in &COLS {
let batch = build_batch(name, rows, cols, build);
let id = BenchmarkId::new(name, format!("{rows}x{cols}"));
g.throughput(Throughput::Bytes(batch.get_array_memory_size() as u64));
g.bench_with_input(id, &batch, |b, batch| {
b.to_async(&rt)
.iter(|| send(channel.clone(), batch.clone()));
});
}
}
}
}

criterion_group!(benches, bench);
criterion_main!(benches);
62 changes: 62 additions & 0 deletions arrow-flight/benches/roundtrip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::RecordBatch;
use arrow_flight::{FlightClient, encode::FlightDataEncoderBuilder};
use criterion::{BatchSize, BenchmarkId, Criterion, criterion_group, criterion_main};
use futures::TryStreamExt;
use tonic::transport::Channel;

mod common;
use common::{TYPES, build_batch, start_server};

const ROWS: [usize; 2] = [8 * 1024, 64 * 1024];
const COLS: [usize; 2] = [8, 16];

async fn roundtrip(channel: Channel, batch: RecordBatch) {
let mut client = FlightClient::new(channel);
let frames = FlightDataEncoderBuilder::new().build(futures::stream::iter([Ok(batch)]));
let resp = client.do_exchange(frames).await.unwrap();
let _: Vec<RecordBatch> = resp.try_collect().await.unwrap();
}

fn bench(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let (channel, _) = rt.block_on(start_server());
let mut g = c.benchmark_group("flight_roundtrip");

for &(name, build) in TYPES {
for &rows in &ROWS {
for &cols in &COLS {
//let batch = build_batch(name, rows, cols, build);
let id = BenchmarkId::new(name, format!("{rows}x{cols}"));
//g.throughput(Throughput::Bytes(batch.get_array_memory_size() as u64));
g.bench_function(id, |b| {
let batch = build_batch(name, rows, cols, build);
b.to_async(&rt).iter_batched(
|| (channel.clone(), batch.clone()),
|(ch, b)| roundtrip(ch, b),
BatchSize::SmallInput,
);
});
}
}
}
}

criterion_group!(benches, bench);
criterion_main!(benches);
Loading