diff --git a/src/client.rs b/src/client.rs index ad5ab65..4fa7555 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,72 +1,72 @@ //! ## WebSocket[](#websocket "Permalink to this headline") -//! -//! The WebSocket protocol brings bi-directional (soft) real-time and wire traffic efficient connections to the browser. +//! +//! The WebSocket protocol brings bi-directional (soft) real-time and wire traffic efficient connections to the browser. //! Today (2018) WebSocket is universally supported in browsers, network equipment, servers and client languages. -//! -//! Despite having opened completely new possibilities on the Web, WebSocket defines an API for -//! application developers at the *message* level, and *point-to-point*, requiring users who want to use +//! +//! Despite having opened completely new possibilities on the Web, WebSocket defines an API for +//! application developers at the *message* level, and *point-to-point*, requiring users who want to use //! WebSocket connections in their applications to define their own semantics on top of it. -//! -//! The Web Application Messaging Protocol (WAMP) aims to provide application developers with the right level of semantics, -//! with what they need to handle messaging and communication between components in distributed applications at +//! +//! The Web Application Messaging Protocol (WAMP) aims to provide application developers with the right level of semantics, +//! with what they need to handle messaging and communication between components in distributed applications at //! a convenient and abstracted way. -//! -//! WAMP was initially defined as a WebSocket sub-protocol, which provided **Publish & Subscribe (PubSub)** functionality -//! as well as **routed Remote Procedure Calls (rRPC)** for procedures implemented in a WAMP router. -//! Feedback from implementers and users of this was included in a second version of the protocol which this document defines. +//! +//! WAMP was initially defined as a WebSocket sub-protocol, which provided **Publish & Subscribe (PubSub)** functionality +//! as well as **routed Remote Procedure Calls (rRPC)** for procedures implemented in a WAMP router. +//! Feedback from implementers and users of this was included in a second version of the protocol which this document defines. //! Among the changes was that WAMP can now run over any transport which is message-oriented, ordered, reliable, and bi-directional. -//! +//! //! > If you want to read more about WebSocket, we recommend two blog posts of the creators of WAMP;) -//! > +//! > //! > - [WebSocket - Why, what, and - can I use it?][1] //! > - [Dissecting WebSocket’s Overhead][2] //! //! ## WAMP[](#wamp "Permalink to this headline") -//! -//! WAMP is a routed protocol, with all components connecting to a WAMP Router, where the WAMP Router performs message +//! +//! WAMP is a routed protocol, with all components connecting to a WAMP Router, where the WAMP Router performs message //! routing between the components, and provides two messaging patterns in one Web native protocol: -//! +//! //! - **Publish & Subscribe (PubSub)** and //! - routed **Remote Procedure Calls (rRPC)** //! -//! Publish & Subscribe (PubSub) is an established messaging pattern where a component, the Subscriber, informs the router -//! that it wants to receive information on a topic (i.e., it subscribes to a topic). Another component, a Publisher, +//! Publish & Subscribe (PubSub) is an established messaging pattern where a component, the Subscriber, informs the router +//! that it wants to receive information on a topic (i.e., it subscribes to a topic). Another component, a Publisher, //! can then publish to this topic, and the router distributes events to all Subscribers. -//! -//! Routed Remote Procedure Calls (rRPCs) rely on the same sort of decoupling that is used by the Publish & Subscribe pattern. -//! A component, the Callee, announces to the router that it provides a certain procedure, identified by a procedure name. -//! Other components, Callers, can then call the procedure, with the router invoking the procedure on the Callee, -//! receiving the procedure’s result, and then forwarding this result back to the Caller. Routed RPCs differ from +//! +//! Routed Remote Procedure Calls (rRPCs) rely on the same sort of decoupling that is used by the Publish & Subscribe pattern. +//! A component, the Callee, announces to the router that it provides a certain procedure, identified by a procedure name. +//! Other components, Callers, can then call the procedure, with the router invoking the procedure on the Callee, +//! receiving the procedure’s result, and then forwarding this result back to the Caller. Routed RPCs differ from //! traditional client-server RPCs in that the router serves as an intermediary between the Caller and the Callee. -//! +//! //! **Advantages of decoupling and routed RPCs** -//! -//! The decoupling in routed RPCs arises from the fact that the Caller is no longer required to have knowledge of the Callee; -//! it merely needs to know the identifier of the procedure it wants to call. There no longer is a need for a direct network connection +//! +//! The decoupling in routed RPCs arises from the fact that the Caller is no longer required to have knowledge of the Callee; +//! it merely needs to know the identifier of the procedure it wants to call. There no longer is a need for a direct network connection //! or path between the caller and the callee, since all messages are routed at the WAMP level. -//! +//! //! This approach enables a whole range of possibilities: -//! -//! - calling into procedures in components which are not reachable from outside at the network level (e.g. on a NATted connection), +//! +//! - calling into procedures in components which are not reachable from outside at the network level (e.g. on a NATted connection), //! but which can establish an outgoing network connection to the WAMP router. //! -//! - This decoupling of transport and application layer traffic allows a “reversal of command” where a +//! - This decoupling of transport and application layer traffic allows a “reversal of command” where a //! cloud-based system can securely control remote devices -//! - It also allows to treat frontend and backend components (microservices) the same, and it even allows +//! - It also allows to treat frontend and backend components (microservices) the same, and it even allows //! to develop backend code in the browser ([Free Your Code - Backends in the Browser][3]). -//! - Since no ports on edge devices need to be opened for WAMP to work (in both directions), +//! - Since no ports on edge devices need to be opened for WAMP to work (in both directions), //! the remote attack surface of these (potentially many) devices is completely closed ([Security in the IoT][4]). //! -//! - Finally, since the Caller is not aware where, or even who is processing the call (and it should not care!), -//! it is easily possible to make application components highly-available (using hot standby components) +//! - Finally, since the Caller is not aware where, or even who is processing the call (and it should not care!), +//! it is easily possible to make application components highly-available (using hot standby components) //! or scale-out application components ([Scaling microservices with Crossbar.io][5]). //! //! **Summary** -//! -//! Combining the Publish & Subscribe and routed Remote Procedure Calls in one Web native, real-time transport protocol (WebSocket) -//! allows WAMP to be used for the entire messaging requirements of component and microservice based applications, reducing technology +//! +//! Combining the Publish & Subscribe and routed Remote Procedure Calls in one Web native, real-time transport protocol (WebSocket) +//! allows WAMP to be used for the entire messaging requirements of component and microservice based applications, reducing technology //! stack complexity and overhead, providing a capable and secure fundament for applications to rely on. -//! +//! //! [1]: https://crossbario.com/blog/Websocket-Why-What-Can-I-Use-It/ //! [2]: https://crossbario.com/blog/Dissecting-Websocket-Overhead/ //! [3]: https://crossbario.com/blog/Free-Your-Code-Backends-in-the-Browser/ @@ -90,17 +90,17 @@ use std::{ use futures::{channel::oneshot, Future}; use intmap::IntMap; use log::{debug, error, info, trace, warn}; -use rmp_serde::{Deserializer as RMPDeserializer, Serializer}; -use serde::{Deserialize, Serialize}; -use url::Url; use parity_ws::{ connect, util::Token, CloseCode, Error as WSError, ErrorKind as WSErrorKind, Handler, Handshake, Message as WSMessage, Request, Result as WSResult, Sender, }; +use rmp_serde::{Deserializer as RMPDeserializer, Serializer}; +use serde::{Deserialize, Serialize}; +use url::Url; use crate::{ messages::{ - CallOptions, ClientRoles, Dict, ErrorDetails, ErrorType, HelloDetails, InvocationDetails, + ClientRoles, Dict, ErrorDetails, ErrorType, HelloDetails, InvocationDetails, List, MatchingPolicy, Message, PublishOptions, Reason, RegisterOptions, ResultDetails, SubscribeOptions, WelcomeDetails, YieldOptions, URI, }, @@ -203,6 +203,7 @@ trait MessageSender { impl MessageSender for ConnectionInfo { fn send_message(&self, message: Message) -> WampResult<()> { debug!("Sending message {:?} via {}", message, self.protocol); + let send_result = if self.protocol == WAMP_JSON { // Send the json message self.sender @@ -979,7 +980,7 @@ impl Client { }) } - /// Unregister procedure + /// Unregister procedure pub fn unregister( &mut self, registration: Registration, @@ -1036,6 +1037,18 @@ impl Client { procedure: URI, args: Option, kwargs: Option, + ) -> Pin>>> { + let empty_options = HashMap::new(); + self.call_with_options(procedure, args, kwargs, empty_options) + } + + /// Call the procedure with custom options + pub fn call_with_options( + &mut self, + procedure: URI, + args: Option, + kwargs: Option, + options: Dict, ) -> Pin>>> { info!("Calling {:?} with {:?} | {:?}", procedure, args, kwargs); @@ -1047,14 +1060,8 @@ impl Client { info.call_requests.insert(request_id, complete); - info.send_message(Message::Call( - request_id, - CallOptions::new(), - procedure, - args, - kwargs, - )) - .unwrap(); + info.send_message(Message::Call(request_id, options, procedure, args, kwargs)) + .unwrap(); Box::pin(async { receiver.await.unwrap_or(Err(CallError { @@ -1100,7 +1107,7 @@ impl Client { }) } - /// Disconnect from router gracefully + /// Disconnect from router gracefully pub fn shutdown(&mut self) -> Pin>>> { let mut info = self.connection_info.lock().unwrap(); diff --git a/src/messages/mod.rs b/src/messages/mod.rs index 9244cb1..5a6001b 100644 --- a/src/messages/mod.rs +++ b/src/messages/mod.rs @@ -33,7 +33,7 @@ pub enum Message { Registered(ID, ID), Unregister(ID, ID), Unregistered(ID), - Call(ID, CallOptions, URI, Option, Option), + Call(ID, Dict, URI, Option, Option), Invocation(ID, ID, InvocationDetails, Option, Option), Yield(ID, YieldOptions, Option, Option), Result(ID, ResultDetails, Option, Option), @@ -542,7 +542,7 @@ mod test { use super::{ types::{ - CallOptions, ClientRoles, ErrorDetails, ErrorType, EventDetails, HelloDetails, + ClientRoles, ErrorDetails, ErrorType, EventDetails, HelloDetails, InvocationDetails, PublishOptions, Reason, RegisterOptions, ResultDetails, RouterRoles, SubscribeOptions, Value, WelcomeDetails, YieldOptions, URI, }, @@ -794,7 +794,7 @@ mod test { two_way_test!( Message::Call( 7_814_135, - CallOptions::new(), + HashMap::new(), URI::new("com.myapp.ping"), None, None @@ -805,7 +805,7 @@ mod test { two_way_test!( Message::Call( 764_346, - CallOptions::new(), + HashMap::new(), URI::new("com.myapp.echo"), Some(vec![Value::String("a value".to_string())]), None @@ -820,7 +820,7 @@ mod test { two_way_test!( Message::Call( 764_346, - CallOptions::new(), + HashMap::new(), URI::new("com.myapp.compute"), Some(Vec::new()), Some(kwargs) diff --git a/src/messages/types/options.rs b/src/messages/types/options.rs index 205e52e..ab774bc 100644 --- a/src/messages/types/options.rs +++ b/src/messages/types/options.rs @@ -55,9 +55,6 @@ pub struct RegisterOptions { pub invocation_policy: InvocationPolicy, } -#[derive(PartialEq, Debug, Default, Serialize, Deserialize)] -pub struct CallOptions {} - #[derive(PartialEq, Debug, Default, Serialize, Deserialize)] pub struct YieldOptions {} @@ -147,12 +144,6 @@ impl RegisterOptions { } } -impl CallOptions { - pub fn new() -> CallOptions { - CallOptions {} - } -} - impl YieldOptions { pub fn new() -> YieldOptions { YieldOptions {} diff --git a/src/router/rpc/mod.rs b/src/router/rpc/mod.rs index ef8ed6b..78fa7c6 100644 --- a/src/router/rpc/mod.rs +++ b/src/router/rpc/mod.rs @@ -4,7 +4,7 @@ use log::{debug, info}; use crate::{ messages::{ - CallOptions, ErrorType, InvocationDetails, Message, Reason, RegisterOptions, ResultDetails, + ErrorType, InvocationDetails, Message, Reason, RegisterOptions, ResultDetails, YieldOptions, URI, }, Dict, Error, ErrorKind, List, MatchingPolicy, WampResult, ID, @@ -107,7 +107,7 @@ impl ConnectionHandler { pub fn handle_call( &mut self, request_id: ID, - _options: CallOptions, + _options: Dict, procedure: URI, args: Option, kwargs: Option,