From ed3a804a4cf994bc8ac02c49f8cc851abdf14aff Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 27 Apr 2026 15:34:40 -0500 Subject: [PATCH] feat: add TowerHttpConnector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adapts any `tower::Service, Response = http::Response>` into an `HttpConnector`, unlocking the entire `tower-http` / `tower-otel-*` ecosystem against `object_store` — `TraceLayer`, `TimeoutLayer`, `RetryLayer`, `PropagateHeaderLayer`, OpenTelemetry tower layers, etc. Body types match natively, so no wire-format conversion is required. Gated behind a new opt-in `tower` feature (off by default). The feature does **not** imply `cloud` because the whole point of the adapter is transport-agnostic — users can plug in `hyper::Client`, `ureq`, an in-process mock via `tower::service_fn`, or a wasm-friendly client without pulling reqwest in at all. Error mapping starts simple: `poll_ready` failures → `Connect`, `call` failures → `Request`. Can iterate later to mirror `HttpError::reqwest`'s richer classification if needed. `HttpService::call` takes `&self` whereas `tower::Service::call` takes `&mut self`, so each request clones the inner service. This is the standard tower idiom for shared services. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.toml | 2 + src/client/http/mod.rs | 5 + src/client/http/tower.rs | 202 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 209 insertions(+) create mode 100644 src/client/http/tower.rs diff --git a/Cargo.toml b/Cargo.toml index b92e4d7e..b0a45122 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ rustls-pki-types = { version = "1.9", default-features = false, features = ["std serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true } serde_urlencoded = { version = "0.7", optional = true } +tower = { version = "0.5", default-features = false, features = ["util"], optional = true } # Optional tokio feature tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"], optional = true } @@ -90,6 +91,7 @@ http = ["cloud"] tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"] integration = ["rand", "tokio"] tokio = ["dep:tokio", "dep:tracing"] +tower = ["dep:tower"] [dev-dependencies] # In alphabetical order futures-executor = "0.3" diff --git a/src/client/http/mod.rs b/src/client/http/mod.rs index 86e1e11d..101810e1 100644 --- a/src/client/http/mod.rs +++ b/src/client/http/mod.rs @@ -25,3 +25,8 @@ pub use connection::*; mod spawn; pub use spawn::*; + +#[cfg(feature = "tower")] +mod tower; +#[cfg(feature = "tower")] +pub use tower::TowerHttpConnector; diff --git a/src/client/http/tower.rs b/src/client/http/tower.rs new file mode 100644 index 00000000..7d37c01f --- /dev/null +++ b/src/client/http/tower.rs @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Adapt a [`tower::Service`] into an [`HttpConnector`]. +//! +//! This lets users compose anything from `tower-http` (`TraceLayer`, +//! `TimeoutLayer`, `RetryLayer`, …), `tower-otel-*`, or any other +//! `tower::Service>` against `object_store`. +//! +//! ```ignore +//! use object_store::client::TowerHttpConnector; +//! use object_store::ClientOptions; +//! use tower::ServiceBuilder; +//! use tower_http::trace::TraceLayer; +//! +//! let make = move |_opts: &ClientOptions| { +//! // build a `tower::Service, +//! // Response = http::Response>` +//! // — typically by wrapping a `reqwest::Client` or `hyper::Client`. +//! let inner = my_reqwest_tower_adapter()?; +//! Ok(ServiceBuilder::new() +//! .layer(TraceLayer::new_for_http()) +//! .service(inner)) +//! }; +//! HttpBuilder::new() +//! .with_http_connector(TowerHttpConnector(make)) +//! .build(); +//! ``` + +use crate::ClientOptions; +use crate::client::{ + HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpRequest, HttpResponse, HttpService, +}; +use async_trait::async_trait; +use std::error::Error as StdError; +use std::sync::Mutex; +use tower::{Service, ServiceExt}; + +/// Adapt a `tower::Service`-builder closure into an [`HttpConnector`]. +/// +/// `connect` is invoked once per [`HttpClient::new`] with the resolved +/// [`ClientOptions`] and must return a service that takes +/// `http::Request` and yields +/// `http::Response` — the body types `object_store` +/// already uses internally, so no conversion is required. +pub struct TowerHttpConnector(pub F); + +impl std::fmt::Debug for TowerHttpConnector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TowerHttpConnector").finish_non_exhaustive() + } +} + +impl HttpConnector for TowerHttpConnector +where + F: Fn(&ClientOptions) -> crate::Result + Send + Sync + 'static, + S: Service + Clone + Send + Sync + 'static, + S::Future: Send + 'static, + S::Error: Into> + Send + Sync + 'static, +{ + fn connect(&self, options: &ClientOptions) -> crate::Result { + let svc = (self.0)(options)?; + Ok(HttpClient::new(TowerHttpService::new(svc))) + } +} + +/// `HttpService` that drives an inner `tower::Service` per request. +/// +/// `HttpService::call` takes `&self`, but `tower::Service::call` takes +/// `&mut self` and `poll_ready`/`call` are stateful, so each request +/// clones the inner service before driving it. This is the standard +/// tower idiom for shared services. +struct TowerHttpService { + /// `Mutex` only to satisfy `HttpService`'s `Send + Sync`; the + /// critical section is one `clone()`. + svc: Mutex, +} + +impl std::fmt::Debug for TowerHttpService { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TowerHttpService").finish_non_exhaustive() + } +} + +impl TowerHttpService { + fn new(svc: S) -> Self { + Self { + svc: Mutex::new(svc), + } + } + + fn clone_inner(&self) -> S { + self.svc.lock().unwrap().clone() + } +} + +#[async_trait] +impl HttpService for TowerHttpService +where + S: Service + Clone + Send + Sync + 'static, + S::Future: Send + 'static, + S::Error: Into> + Send + Sync + 'static, +{ + async fn call(&self, req: HttpRequest) -> Result { + let mut svc = self.clone_inner(); + ServiceExt::ready(&mut svc) + .await + .map_err(|e| HttpError::new(HttpErrorKind::Connect, BoxStdError(e.into())))?; + Service::call(&mut svc, req) + .await + .map_err(|e| HttpError::new(HttpErrorKind::Request, BoxStdError(e.into()))) + } +} + +/// Newtype that adapts `Box` (which itself +/// implements `StdError`) to satisfy the `E: Error + Send + Sync + 'static` +/// bound on [`HttpError::new`]. +#[derive(Debug)] +struct BoxStdError(Box); + +impl std::fmt::Display for BoxStdError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +impl StdError for BoxStdError { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + self.0.source() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::{HttpRequestBody, HttpResponseBody}; + use bytes::Bytes; + use http::StatusCode; + use http_body_util::BodyExt; + use std::convert::Infallible; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + + fn ok_response(body: &'static str) -> HttpResponse { + let body = HttpResponseBody::new( + http_body_util::Full::new(Bytes::from_static(body.as_bytes())) + .map_err(|never| match never {}), + ); + let mut resp = HttpResponse::new(body); + *resp.status_mut() = StatusCode::OK; + resp + } + + #[tokio::test] + async fn happy_path_routes_request_through_tower_service() { + let calls = Arc::new(AtomicUsize::new(0)); + let calls_for_svc = Arc::clone(&calls); + let svc = tower::service_fn(move |_req: HttpRequest| { + let calls_for_svc = Arc::clone(&calls_for_svc); + async move { + calls_for_svc.fetch_add(1, Ordering::SeqCst); + Ok::<_, Infallible>(ok_response("hi")) + } + }); + + let svc = TowerHttpService::new(svc); + let req = HttpRequest::new(HttpRequestBody::empty()); + let resp = svc.call(req).await.unwrap(); + + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_body().bytes().await.unwrap(); + assert_eq!(body.as_ref(), b"hi"); + assert_eq!(calls.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn call_error_maps_to_request_kind() { + #[derive(Debug, thiserror::Error)] + #[error("boom")] + struct Boom; + + let svc = tower::service_fn(|_req: HttpRequest| async { Err::(Boom) }); + let svc = TowerHttpService::new(svc); + let req = HttpRequest::new(HttpRequestBody::empty()); + let err = svc.call(req).await.unwrap_err(); + assert_eq!(err.kind(), HttpErrorKind::Request); + } +}