feat(websocket): Add WebSocket transport and WASM support for the data-plane#1337
feat(websocket): Add WebSocket transport and WASM support for the data-plane#1337hackeramitkumar wants to merge 47 commits into
Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
eff05d7 to
3419b5e
Compare
fec5693 to
4be697a
Compare
18fe86d to
6be330a
Compare
73deffa to
200933a
Compare
| }) | ||
| .await; | ||
| } | ||
| Err(err) => { |
Check notice
Code scanning / CodeQL
Unused variable Note
|
|
||
| Ok::<Response<Empty<Bytes>>, Infallible>(response) | ||
| } | ||
| Err(err) => { |
Check notice
Code scanning / CodeQL
Unused variable Note
| } | ||
| }; | ||
|
|
||
| if let Err(err) = processor.process_stream( |
Check notice
Code scanning / CodeQL
Unused variable Note
| warn!(%session_id, "requested to delete unknown session"); | ||
| } | ||
| } | ||
| Some(Ok(m)) => { |
Check notice
Code scanning / CodeQL
Unused variable Note
| Some(Ok(m)) => { | ||
| error!(?m, "received unexpected message"); | ||
| } | ||
| Some(Err(e)) => { |
Check notice
Code scanning / CodeQL
Unused variable Note
d7634d1 to
bae4e58
Compare
bae4e58 to
de0dc5a
Compare
| break; | ||
| } | ||
| } | ||
| Err(err) => { |
Check notice
Code scanning / CodeQL
Unused variable Note
| Some(Ok(gloo_net::websocket::Message::Text(_))) => { | ||
| warn!("ignoring text websocket frame, expected binary protobuf frame"); | ||
| } | ||
| Some(Err(err)) => { |
Check notice
Code scanning / CodeQL
Unused variable Note
| }; | ||
|
|
||
| let payload = msg.encode_to_vec(); | ||
| if let Err(err) = sink |
Check notice
Code scanning / CodeQL
Unused variable Note
0009130 to
c6d2bde
Compare
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com> Made-with: Cursor
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
Signed-off-by: amitami2 <amitami2@cisco.com>
0ea19c2 to
ba70104
Compare
msardara
left a comment
There was a problem hiding this comment.
Great work Amit! Some nits to fix but it looks good overall.
| ) -> WebSocketStreams { | ||
| let (mut read_half, mut write_half) = websocket.split(tokio::io::split); | ||
| read_half.set_auto_close(false); | ||
| read_half.set_auto_pong(false); |
There was a problem hiding this comment.
Any reason not to set this to true? Having it set to true will be beneficial if there are middleboxes suvh as ingresses or forward proxies, as they might interrupt the connection in case of no traffic.
There was a problem hiding this comment.
understood. I will set it to true.
| cancellation_token: CancellationToken, | ||
| ) -> WebSocketStreams { | ||
| let (mut read_half, mut write_half) = websocket.split(tokio::io::split); | ||
| read_half.set_auto_close(false); |
There was a problem hiding this comment.
This should be set to true I think, otherwise we need to handle the websocket close frames
|
|
||
| let read_cancel = cancellation_token.clone(); | ||
| tokio::spawn(async move { | ||
| let mut noop_send = |_frame: Frame<'_>| async move { Result::<(), WebSocketError>::Ok(()) }; |
There was a problem hiding this comment.
This closure should send auto close and auto pong frames. A noop_send will not be able to send the control frames.
You can do something in this line:
let (tx_control, mut rx_control) = mpsc::channel::<Vec<u8>>(4);
// In the read task:
let mut send_pong = |frame: Frame<'_>| {
let tx = tx_control.clone();
let payload = frame.payload.to_vec();
async move {
let _ = tx.send(payload).await;
Ok(())
}
};
reader.read_frame(&mut send_pong)
// In the write task, select on both rx_outbound and rx_control:
tokio::select! {
Some(pong) = rx_control.recv() => {
write_half.write_frame(Frame::pong(pong.into())).await?;
}
Some(msg) = rx_outbound.recv() => {
// existing write logic
}
}There was a problem hiding this comment.
Make sense. I think we can handle both close/pong types of control signals here.
There was a problem hiding this comment.
Was this modified because of some limitation on the wasm side?
There was a problem hiding this comment.
If possible I'd try to integrate the wasm part also in the data plane and reuse SLIM as it is without creating a custom session layer using a websocket as SLIM connection. Do you think it is feasible?
| /// Replace the MLS signature key pair with externally-generated keys. | ||
| /// Used by WASM builds where keys must be generated by the MLS crypto | ||
| /// provider (WebCrypto) rather than the identity provider. | ||
| fn set_signature_keys( | ||
| &mut self, | ||
| _private_key: Vec<u8>, | ||
| _public_key: Vec<u8>, | ||
| ) -> Result<(), AuthError> { | ||
| Err(AuthError::MlsNotSupported) | ||
| } |
| return Ok(()); | ||
| } | ||
|
|
||
| #[cfg(any(feature = "native", feature = "wasm"))] |
| pub mod basic; | ||
| #[cfg(feature = "native")] | ||
| pub mod identity; | ||
| #[cfg(feature = "native")] | ||
| pub mod jwt; | ||
| #[cfg(feature = "native")] | ||
| pub mod oidc; | ||
| #[cfg(not(target_family = "windows"))] | ||
| #[cfg(all(not(target_family = "windows"), feature = "native"))] | ||
| pub mod spire; | ||
| #[cfg(feature = "native")] | ||
| pub mod static_jwt; | ||
|
|
||
| #[cfg(feature = "native")] | ||
| use slim_auth::errors::AuthError as SlimAuthError; |
There was a problem hiding this comment.
cfg_if::cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
pub mod identity;
pub mod jwt;
pub mod oidc;
pub mod static_jwt;
#[cfg(not(target_family = "windows"))]
pub mod spire;
use slim_auth::errors::AuthError as SlimAuthError;
}
}
| use duration_string::DurationString; | ||
| #[cfg(feature = "native")] | ||
| use rustls_pki_types::ServerName; | ||
| #[cfg(feature = "native")] | ||
| use tokio_retry::RetryIf; | ||
|
|
||
| #[cfg(feature = "native")] | ||
| use display_error_chain::ErrorChainExt; | ||
| #[cfg(any(feature = "native", feature = "wasm"))] | ||
| use std::str::FromStr; | ||
| use std::{collections::HashMap, time::Duration}; | ||
| #[cfg(feature = "native")] | ||
| use tower::ServiceExt; | ||
| #[cfg(all(feature = "native", target_family = "unix"))] | ||
| use { | ||
| hyper_util::rt::TokioIo, | ||
| std::{error::Error as StdErrorTrait, path::PathBuf, sync::Arc}, | ||
| tokio::net::UnixStream, | ||
| tower::service_fn, | ||
| }; | ||
|
|
||
| #[cfg(feature = "native")] | ||
| use base64::prelude::*; | ||
| #[cfg(feature = "native")] | ||
| use http::header::{HeaderMap, HeaderName, HeaderValue}; | ||
| #[cfg(feature = "native")] | ||
| use hyper_rustls; | ||
| #[cfg(feature = "native")] | ||
| use hyper_util::client::legacy::connect::HttpConnector; | ||
| #[cfg(feature = "native")] | ||
| use hyper_util::client::legacy::connect::proxy::Tunnel; | ||
| #[cfg(feature = "native")] | ||
| use hyper_util::client::proxy::matcher::Intercept; | ||
| use schemars::JsonSchema; | ||
| use serde::{Deserialize, Serialize}; | ||
| #[cfg(feature = "native")] | ||
| use tonic::codegen::{Body, Bytes, StdError}; | ||
| #[cfg(feature = "native")] | ||
| use tonic::transport::{Channel, Uri}; | ||
| #[cfg(feature = "native")] | ||
| use tracing::warn; | ||
|
|
||
| #[cfg(feature = "native")] | ||
| use slim_auth::metadata::MetadataMap; | ||
| #[cfg(not(feature = "native"))] | ||
| type MetadataMap = HashMap<String, serde_json::Value>; | ||
|
|
||
| #[cfg(feature = "native")] | ||
| use crate::auth::ClientAuthenticator; | ||
| use crate::auth::basic::Config as BasicAuthenticationConfig; | ||
| #[cfg(feature = "native")] | ||
| use crate::auth::jwt::Config as JwtAuthenticationConfig; | ||
| #[cfg(all(feature = "native", not(target_family = "windows")))] | ||
| use crate::auth::spire::SpireConfig as SpireAuthConfig; | ||
| #[cfg(feature = "native")] | ||
| use crate::auth::static_jwt::Config as BearerAuthenticationConfig; | ||
| use crate::backoff::Strategy; | ||
| use crate::backoff::exponential::Config as ExponentialBackoff; | ||
| use crate::backoff::fixedinterval::Config as FixedIntervalBackoff; | ||
| use crate::component::configuration::Configuration; | ||
| use crate::grpc::compression::CompressionType; | ||
| use crate::grpc::errors::ConfigError; | ||
| #[cfg(feature = "native")] | ||
| use crate::grpc::headers_middleware::SetRequestHeaderLayer; | ||
| use crate::grpc::proxy::ProxyConfig; | ||
| use crate::tls::client::TlsClientConfig as TLSSetting; | ||
| #[cfg(feature = "native")] | ||
| use crate::tls::common::RustlsConfigLoader; | ||
| use crate::transport::TransportProtocol; | ||
| #[cfg(any(feature = "native", feature = "wasm"))] | ||
| use crate::websocket::client::WebSocketClientChannel; |
There was a problem hiding this comment.
| use duration_string::DurationString; | |
| #[cfg(feature = "native")] | |
| use rustls_pki_types::ServerName; | |
| #[cfg(feature = "native")] | |
| use tokio_retry::RetryIf; | |
| #[cfg(feature = "native")] | |
| use display_error_chain::ErrorChainExt; | |
| #[cfg(any(feature = "native", feature = "wasm"))] | |
| use std::str::FromStr; | |
| use std::{collections::HashMap, time::Duration}; | |
| #[cfg(feature = "native")] | |
| use tower::ServiceExt; | |
| #[cfg(all(feature = "native", target_family = "unix"))] | |
| use { | |
| hyper_util::rt::TokioIo, | |
| std::{error::Error as StdErrorTrait, path::PathBuf, sync::Arc}, | |
| tokio::net::UnixStream, | |
| tower::service_fn, | |
| }; | |
| #[cfg(feature = "native")] | |
| use base64::prelude::*; | |
| #[cfg(feature = "native")] | |
| use http::header::{HeaderMap, HeaderName, HeaderValue}; | |
| #[cfg(feature = "native")] | |
| use hyper_rustls; | |
| #[cfg(feature = "native")] | |
| use hyper_util::client::legacy::connect::HttpConnector; | |
| #[cfg(feature = "native")] | |
| use hyper_util::client::legacy::connect::proxy::Tunnel; | |
| #[cfg(feature = "native")] | |
| use hyper_util::client::proxy::matcher::Intercept; | |
| use schemars::JsonSchema; | |
| use serde::{Deserialize, Serialize}; | |
| #[cfg(feature = "native")] | |
| use tonic::codegen::{Body, Bytes, StdError}; | |
| #[cfg(feature = "native")] | |
| use tonic::transport::{Channel, Uri}; | |
| #[cfg(feature = "native")] | |
| use tracing::warn; | |
| #[cfg(feature = "native")] | |
| use slim_auth::metadata::MetadataMap; | |
| #[cfg(not(feature = "native"))] | |
| type MetadataMap = HashMap<String, serde_json::Value>; | |
| #[cfg(feature = "native")] | |
| use crate::auth::ClientAuthenticator; | |
| use crate::auth::basic::Config as BasicAuthenticationConfig; | |
| #[cfg(feature = "native")] | |
| use crate::auth::jwt::Config as JwtAuthenticationConfig; | |
| #[cfg(all(feature = "native", not(target_family = "windows")))] | |
| use crate::auth::spire::SpireConfig as SpireAuthConfig; | |
| #[cfg(feature = "native")] | |
| use crate::auth::static_jwt::Config as BearerAuthenticationConfig; | |
| use crate::backoff::Strategy; | |
| use crate::backoff::exponential::Config as ExponentialBackoff; | |
| use crate::backoff::fixedinterval::Config as FixedIntervalBackoff; | |
| use crate::component::configuration::Configuration; | |
| use crate::grpc::compression::CompressionType; | |
| use crate::grpc::errors::ConfigError; | |
| #[cfg(feature = "native")] | |
| use crate::grpc::headers_middleware::SetRequestHeaderLayer; | |
| use crate::grpc::proxy::ProxyConfig; | |
| use crate::tls::client::TlsClientConfig as TLSSetting; | |
| #[cfg(feature = "native")] | |
| use crate::tls::common::RustlsConfigLoader; | |
| use crate::transport::TransportProtocol; | |
| #[cfg(any(feature = "native", feature = "wasm"))] | |
| use crate::websocket::client::WebSocketClientChannel; | |
| use std::{collections::HashMap, str::FromStr, time::Duration}; | |
| use duration_string::DurationString; | |
| use schemars::JsonSchema; | |
| use serde::{Deserialize, Serialize}; | |
| use crate::auth::basic::Config as BasicAuthenticationConfig; | |
| use crate::backoff::Strategy; | |
| use crate::backoff::exponential::Config as ExponentialBackoff; | |
| use crate::backoff::fixedinterval::Config as FixedIntervalBackoff; | |
| use crate::component::configuration::Configuration; | |
| use crate::grpc::compression::CompressionType; | |
| use crate::grpc::errors::ConfigError; | |
| use crate::grpc::proxy::ProxyConfig; | |
| use crate::tls::client::TlsClientConfig as TLSSetting; | |
| use crate::transport::TransportProtocol; | |
| use crate::websocket::client::WebSocketClientChannel; | |
| cfg_if::cfg_if! { | |
| if #[cfg(not(target_arch = "wasm32"))] { | |
| use std::sync::Arc; | |
| use base64::prelude::*; | |
| use display_error_chain::ErrorChainExt; | |
| use http::header::{HeaderMap, HeaderName, HeaderValue}; | |
| use hyper_util::client::legacy::connect::HttpConnector; | |
| use hyper_util::client::legacy::connect::proxy::Tunnel; | |
| use hyper_util::client::proxy::matcher::Intercept; | |
| use rustls_pki_types::ServerName; | |
| use tokio_retry::RetryIf; | |
| use tonic::codegen::{Body, Bytes, StdError}; | |
| use tonic::transport::{Channel, Uri}; | |
| use tower::ServiceExt; | |
| use tracing::warn; | |
| use slim_auth::metadata::MetadataMap; | |
| use crate::auth::ClientAuthenticator; | |
| use crate::auth::jwt::Config as JwtAuthenticationConfig; | |
| use crate::auth::static_jwt::Config as BearerAuthenticationConfig; | |
| use crate::grpc::headers_middleware::SetRequestHeaderLayer; | |
| use crate::tls::common::RustlsConfigLoader; | |
| #[cfg(not(target_family = "windows"))] | |
| use crate::auth::spire::SpireConfig as SpireAuthConfig; | |
| #[cfg(target_family = "unix")] | |
| use { | |
| hyper_util::rt::TokioIo, | |
| std::{error::Error as StdErrorTrait, path::PathBuf}, | |
| tokio::net::UnixStream, | |
| tower::service_fn, | |
| }; | |
| } else { | |
| type MetadataMap = HashMap<String, serde_json::Value>; | |
| } | |
| } |
| /// timeout settings, buffer size settings, and origin settings. | ||
| pub async fn to_channel( | ||
| /// Converts the client configuration to a gRPC-only channel. | ||
| pub async fn to_grpc_channel( |
There was a problem hiding this comment.
I think we should generalize this. Here it implies that who is using the config knows whether the config will generate a grpc or a websocket channel.
We should create an enum wrapping the grpc and the websockert channels, and we should offer a common set of methods to call on the channel, which should be transparent to the caller.
We should so keep a single to_channel, that returns a Channel which is
enum Channel {
GrpcChannel
WebsocketChannel
}
Alternatively we could use a generic function, but that makes life more difficult if we need to store the channel in a container.
There was a problem hiding this comment.
Actually I did in that way only. So outside grpc folder there is one client.rs file in that we have ClientConfig that will be generic for both Websocket/gRPC. And there we have a function to_channel() that returns an enum. So that function will call this to_grpc_channel / to_websocket_channel on the basis of the transport type.
Summary
This PR introduces WebSocket as a new transport protocol for the SLIM data-plane alongside the existing gRPC transport, and adds WebAssembly (WASM) support to enable running SLIM clients in the browser.
What's Changed
WebSocket Transport (Server & Client)
websocketconfig module with server and client implementations (config/src/websocket/)hyper+fastwebsocketswith TLS/WSS supportdatapath/src/websocket/stream.rs)client-config-debug.yaml,server-config-debug.yaml,client-config-wss.yaml,server-config-wss.yamlWASM Support
slim-wasmcrate (core/slim-wasm/) — awasm-bindgenentry point for running SLIM in the browserclient_wasm.rs,common_wasm.rs)wasmandnativefeature flags across all core crates (auth,config,session,datapath,mls,service,tracing,signal, etc.)native.rs(tokio-based) andwasm.rs(console-based)mls/src/crypto.rs) for WASM-compatible crypto backendssession/src/runtime.rs) for native vs WASM task spawningexamples/browser/index.html)Config Refactoring
ClientConfigandServerConfigintoconfig/src/client.rsandconfig/src/server.rs(previously embedded ingrpc/)grpc/schema/toschema/CI / Toolchain
Dockerfile,.cargo/config.toml, GitHub Actions workflows) — LLVM 19 is no longer available onapt.llvm.orgfor Debian bookwormwasmandwebsocketbuildsType of Change
Checklist
Fixes #1312