Skip to content

Commit 7909919

Browse files
committed
feat(host): add streaming HTTP support with integration test and Gemini proxy component
Signed-off-by: Aditya <aditya.salunkh919@gmail.com>
1 parent db332aa commit 7909919

File tree

21 files changed

+2733
-6
lines changed

21 files changed

+2733
-6
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ tests/output
66
!tests/fixtures/*.wasm
77
!crates/wash-runtime/tests/fixtures/*.wasm
88

9+
!crates/wash-runtime/tests/fixtures/*.wasm
10+
911
# Ignore IDE specific files
1012
.vscode
1113

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ schemars = { version = "0.8", default-features = false }
116116
git2 = { version = "0.19", default-features = false }
117117
hostname = { version = "0.4", default-features = false }
118118
http-body-util = { version = "0.1.3", default-features = false }
119+
hyper-util = { version = "0.1", default-features = false }
119120
names = { version = "0.14", default-features = false }
120121
semver = { version = "1.0.26", default-features = false }
121122
serde = { version = "1.0.219", default-features = false }
@@ -125,6 +126,7 @@ sha2 = { version = "0.10.8", default-features = false }
125126
sysinfo = { version = "0.32", default-features = false, features = ["system"] }
126127
tar = { version = "0.4", default-features = false }
127128
tempfile = { version = "3.15.0", default-features = false }
129+
test-log = { version = "0.2", default-features = false, features = ["trace"] }
128130
tokio = { version = "1.45.1", default-features = false, features = ["full"] }
129131
tokio-util = { version = "0.7", default-features = false }
130132
tokio-rustls = { version = "0.26", default-features = false }

crates/wash-runtime/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,7 @@ pbjson-build = { workspace = true, default-features = true }
8383
tokio = { workspace = true, features = ["full"] }
8484
tracing-subscriber = { workspace = true }
8585
reqwest = { workspace = true }
86+
hyper-util = { workspace = true, features = ["tokio", "client", "client-legacy", "http1"] }
87+
http-body-util = { workspace = true }
88+
test-log = { workspace = true }
8689
gag = "1.0"

crates/wash-runtime/src/host/http.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,6 @@ pub async fn handle_component_request(
619619
Some(scheme) if scheme == &hyper::http::uri::Scheme::HTTP => Scheme::Http,
620620
Some(scheme) if scheme == &hyper::http::uri::Scheme::HTTPS => Scheme::Https,
621621
Some(scheme) => Scheme::Other(scheme.as_str().to_string()),
622-
// Fallback to HTTP if no scheme is present
623622
None => Scheme::Http,
624623
};
625624
let req = store.data_mut().new_incoming_request(scheme, req)?;
@@ -642,13 +641,8 @@ pub async fn handle_component_request(
642641
});
643642

644643
match receiver.await {
645-
// If the client calls `response-outparam::set` then one of these
646-
// methods will be called.
647644
Ok(Ok(resp)) => Ok(resp),
648645
Ok(Err(e)) => Err(e.into()),
649-
650-
// Otherwise the `sender` will get dropped along with the `Store`
651-
// meaning that the oneshot will get disconnected
652646
Err(e) => {
653647
if let Err(task_error) = task.await {
654648
error!(err = ?task_error, "error receiving http response");
491 KB
Binary file not shown.
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
//! Integration test for streaming Gemini proxy component
2+
3+
use anyhow::Result;
4+
use futures::stream;
5+
use http_body_util::combinators::BoxBody;
6+
use http_body_util::{BodyExt, StreamBody};
7+
use hyper::{
8+
StatusCode,
9+
body::{Bytes, Frame},
10+
client::conn::http1,
11+
};
12+
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
13+
14+
mod common;
15+
use common::find_available_port;
16+
17+
use wash_runtime::{
18+
engine::Engine,
19+
host::{
20+
HostApi, HostBuilder,
21+
http::{DevRouter, HttpServer},
22+
},
23+
plugin::wasi_logging::WasiLogging,
24+
types::{Component, LocalResources, Workload, WorkloadStartRequest},
25+
wit::WitInterface,
26+
};
27+
28+
const HTTP_STREAMING_GEMINI_WASM: &[u8] = include_bytes!("./fixtures/http_ai_proxy.wasm");
29+
30+
#[test_log::test(tokio::test)]
31+
async fn wasi_http_gemini_proxy() -> Result<()> {
32+
println!("\n🚀 STREAMING GEMINI PROXY TEST\n");
33+
34+
let engine = Engine::builder().build()?;
35+
let port = find_available_port().await?;
36+
let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
37+
let http_handler = DevRouter::default();
38+
let http_plugin = HttpServer::new(http_handler, addr);
39+
40+
let host = HostBuilder::new()
41+
.with_engine(engine)
42+
.with_http_handler(Arc::new(http_plugin))
43+
.with_plugin(Arc::new(WasiLogging {}))?
44+
.build()?
45+
.start()
46+
.await?;
47+
48+
println!("✓ Host started on {addr}");
49+
50+
let req = WorkloadStartRequest {
51+
workload_id: uuid::Uuid::new_v4().to_string(),
52+
workload: Workload {
53+
namespace: "test".to_string(),
54+
name: "gemini-proxy".to_string(),
55+
annotations: HashMap::new(),
56+
service: None,
57+
components: vec![Component {
58+
bytes: bytes::Bytes::from_static(HTTP_STREAMING_GEMINI_WASM),
59+
local_resources: LocalResources {
60+
memory_limit_mb: 512,
61+
cpu_limit: 2,
62+
config: HashMap::new(),
63+
environment: HashMap::new(),
64+
volume_mounts: vec![],
65+
allowed_hosts: vec!["generativelanguage.googleapis.com".to_string()],
66+
},
67+
pool_size: 1,
68+
max_invocations: 100,
69+
}],
70+
host_interfaces: vec![
71+
WitInterface {
72+
namespace: "wasi".to_string(),
73+
package: "http".to_string(),
74+
interfaces: ["incoming-handler".to_string()].into_iter().collect(),
75+
version: Some(semver::Version::parse("0.2.2").unwrap()),
76+
config: {
77+
let mut config = HashMap::new();
78+
config.insert("host".to_string(), "gemini-proxy".to_string());
79+
config
80+
},
81+
},
82+
WitInterface {
83+
namespace: "wasi".to_string(),
84+
package: "logging".to_string(),
85+
interfaces: ["logging".to_string()].into_iter().collect(),
86+
version: Some(semver::Version::parse("0.1.0-draft").unwrap()),
87+
config: HashMap::new(),
88+
},
89+
],
90+
volumes: vec![],
91+
},
92+
};
93+
94+
host.workload_start(req).await?;
95+
println!("✓ Workload deployed\n");
96+
97+
let prompt = "Explain how AI works";
98+
let body_stream = stream::iter([Ok::<_, hyper::Error>(Frame::data(Bytes::from(prompt)))]);
99+
100+
// Create HTTP client first
101+
let stream = tokio::net::TcpStream::connect(addr).await?;
102+
let io = hyper_util::rt::TokioIo::new(stream);
103+
let (mut sender, conn) = http1::Builder::new()
104+
.preserve_header_case(true)
105+
.title_case_headers(false)
106+
.handshake(io)
107+
.await?;
108+
109+
tokio::spawn(async move {
110+
if let Err(err) = conn.await {
111+
eprintln!("Connection error: {err:?}");
112+
}
113+
});
114+
115+
// Build request with relative URI but explicit Host header
116+
let mut request = hyper::Request::builder()
117+
.method(hyper::Method::POST)
118+
.uri("/gemini-proxy")
119+
.header("content-type", "text/plain")
120+
.body(BoxBody::new(StreamBody::new(body_stream)))?;
121+
122+
request.headers_mut().insert(
123+
hyper::header::HOST,
124+
hyper::header::HeaderValue::from_static("gemini-proxy"),
125+
);
126+
127+
let response = sender.send_request(request).await?;
128+
129+
assert_eq!(StatusCode::OK, response.status());
130+
131+
println!("\nResponse status: {:?}", response.status());
132+
println!("Response headers: {:?}", response.headers());
133+
println!("\n=== Streaming Response ===");
134+
135+
// Track streaming metrics
136+
let (_parts, body) = response.into_parts();
137+
let mut body_stream = body;
138+
let start_time = std::time::Instant::now();
139+
let mut chunk_count = 0;
140+
let mut total_bytes = 0;
141+
let mut response_text = String::new();
142+
143+
while let Some(frame) = body_stream.frame().await {
144+
match frame {
145+
Ok(frame) => {
146+
if let Some(chunk) = frame.data_ref() {
147+
chunk_count += 1;
148+
total_bytes += chunk.len();
149+
150+
let elapsed = start_time.elapsed();
151+
println!(
152+
"[{:.7}s] Chunk #{} received ({} bytes)",
153+
elapsed.as_secs_f64(),
154+
chunk_count,
155+
chunk.len()
156+
);
157+
158+
if let Ok(text) = std::str::from_utf8(chunk) {
159+
response_text.push_str(text);
160+
print!("{}", text);
161+
use std::io::Write;
162+
std::io::stdout().flush().unwrap();
163+
}
164+
}
165+
}
166+
Err(e) => {
167+
eprintln!("\nError reading frame: {e}");
168+
break;
169+
}
170+
}
171+
}
172+
173+
let total_time = start_time.elapsed();
174+
println!(
175+
"\n=== End (Total: {:.7}s, {} chunks, {} bytes) ===\n",
176+
total_time.as_secs_f64(),
177+
chunk_count,
178+
total_bytes
179+
);
180+
181+
// Assertions to verify streaming behavior
182+
assert!(
183+
chunk_count >= 10,
184+
"Expected at least 10 chunks for streaming response, got {}",
185+
chunk_count
186+
);
187+
188+
assert!(
189+
total_bytes > 100,
190+
"Expected substantial response content, got {} bytes",
191+
total_bytes
192+
);
193+
194+
assert!(
195+
!response_text.is_empty(),
196+
"Expected non-empty response text"
197+
);
198+
199+
// Verify that streaming actually took time (not instant)
200+
assert!(
201+
total_time.as_millis() > 100,
202+
"Response came too fast ({:?}ms), may not be streaming",
203+
total_time.as_millis()
204+
);
205+
206+
println!("✓ All streaming assertions passed!");
207+
208+
Ok(())
209+
}

examples/http-ai-proxy/.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Rust build artifacts
2+
target/
3+
4+
# Wash build artifacts
5+
build/
6+

0 commit comments

Comments
 (0)