Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 61 additions & 54 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,72 @@
//! ## WebSocket[](#websocket "Permalink to this headline")
//!
//! The WebSocket protocol brings bi-directional (soft) real-time and wire traffic efficient connections to the browser.
//!
//! The WebSocket protocol brings bi-directional (soft) real-time and wire traffic efficient connections to the browser.
//! Today (2018) WebSocket is universally supported in browsers, network equipment, servers and client languages.
//!
//! Despite having opened completely new possibilities on the Web, WebSocket defines an API for
//! application developers at the *message* level, and *point-to-point*, requiring users who want to use
//!
//! Despite having opened completely new possibilities on the Web, WebSocket defines an API for
//! application developers at the *message* level, and *point-to-point*, requiring users who want to use
//! WebSocket connections in their applications to define their own semantics on top of it.
//!
//! The Web Application Messaging Protocol (WAMP) aims to provide application developers with the right level of semantics,
//! with what they need to handle messaging and communication between components in distributed applications at
//!
//! The Web Application Messaging Protocol (WAMP) aims to provide application developers with the right level of semantics,
//! with what they need to handle messaging and communication between components in distributed applications at
//! a convenient and abstracted way.
//!
//! WAMP was initially defined as a WebSocket sub-protocol, which provided **Publish & Subscribe (PubSub)** functionality
//! as well as **routed Remote Procedure Calls (rRPC)** for procedures implemented in a WAMP router.
//! Feedback from implementers and users of this was included in a second version of the protocol which this document defines.
//!
//! WAMP was initially defined as a WebSocket sub-protocol, which provided **Publish & Subscribe (PubSub)** functionality
//! as well as **routed Remote Procedure Calls (rRPC)** for procedures implemented in a WAMP router.
//! Feedback from implementers and users of this was included in a second version of the protocol which this document defines.
//! Among the changes was that WAMP can now run over any transport which is message-oriented, ordered, reliable, and bi-directional.
//!
//!
//! > If you want to read more about WebSocket, we recommend two blog posts of the creators of WAMP;)
//! >
//! >
//! > - [WebSocket - Why, what, and - can I use it?][1]
//! > - [Dissecting WebSocket’s Overhead][2]
//!
//! ## WAMP[](#wamp "Permalink to this headline")
//!
//! WAMP is a routed protocol, with all components connecting to a WAMP Router, where the WAMP Router performs message
//!
//! WAMP is a routed protocol, with all components connecting to a WAMP Router, where the WAMP Router performs message
//! routing between the components, and provides two messaging patterns in one Web native protocol:
//!
//!
//! - **Publish & Subscribe (PubSub)** and
//! - routed **Remote Procedure Calls (rRPC)**
//!
//! Publish & Subscribe (PubSub) is an established messaging pattern where a component, the Subscriber, informs the router
//! that it wants to receive information on a topic (i.e., it subscribes to a topic). Another component, a Publisher,
//! Publish & Subscribe (PubSub) is an established messaging pattern where a component, the Subscriber, informs the router
//! that it wants to receive information on a topic (i.e., it subscribes to a topic). Another component, a Publisher,
//! can then publish to this topic, and the router distributes events to all Subscribers.
//!
//! Routed Remote Procedure Calls (rRPCs) rely on the same sort of decoupling that is used by the Publish & Subscribe pattern.
//! A component, the Callee, announces to the router that it provides a certain procedure, identified by a procedure name.
//! Other components, Callers, can then call the procedure, with the router invoking the procedure on the Callee,
//! receiving the procedure’s result, and then forwarding this result back to the Caller. Routed RPCs differ from
//!
//! Routed Remote Procedure Calls (rRPCs) rely on the same sort of decoupling that is used by the Publish & Subscribe pattern.
//! A component, the Callee, announces to the router that it provides a certain procedure, identified by a procedure name.
//! Other components, Callers, can then call the procedure, with the router invoking the procedure on the Callee,
//! receiving the procedure’s result, and then forwarding this result back to the Caller. Routed RPCs differ from
//! traditional client-server RPCs in that the router serves as an intermediary between the Caller and the Callee.
//!
//!
//! **Advantages of decoupling and routed RPCs**
//!
//! The decoupling in routed RPCs arises from the fact that the Caller is no longer required to have knowledge of the Callee;
//! it merely needs to know the identifier of the procedure it wants to call. There no longer is a need for a direct network connection
//!
//! The decoupling in routed RPCs arises from the fact that the Caller is no longer required to have knowledge of the Callee;
//! it merely needs to know the identifier of the procedure it wants to call. There no longer is a need for a direct network connection
//! or path between the caller and the callee, since all messages are routed at the WAMP level.
//!
//!
//! This approach enables a whole range of possibilities:
//!
//! - calling into procedures in components which are not reachable from outside at the network level (e.g. on a NATted connection),
//!
//! - calling into procedures in components which are not reachable from outside at the network level (e.g. on a NATted connection),
//! but which can establish an outgoing network connection to the WAMP router.
//!
//! - This decoupling of transport and application layer traffic allows a “reversal of command” where a
//! - This decoupling of transport and application layer traffic allows a “reversal of command” where a
//! cloud-based system can securely control remote devices
//! - It also allows to treat frontend and backend components (microservices) the same, and it even allows
//! - It also allows to treat frontend and backend components (microservices) the same, and it even allows
//! to develop backend code in the browser ([Free Your Code - Backends in the Browser][3]).
//! - Since no ports on edge devices need to be opened for WAMP to work (in both directions),
//! - Since no ports on edge devices need to be opened for WAMP to work (in both directions),
//! the remote attack surface of these (potentially many) devices is completely closed ([Security in the IoT][4]).
//!
//! - Finally, since the Caller is not aware where, or even who is processing the call (and it should not care!),
//! it is easily possible to make application components highly-available (using hot standby components)
//! - Finally, since the Caller is not aware where, or even who is processing the call (and it should not care!),
//! it is easily possible to make application components highly-available (using hot standby components)
//! or scale-out application components ([Scaling microservices with Crossbar.io][5]).
//!
//! **Summary**
//!
//! Combining the Publish & Subscribe and routed Remote Procedure Calls in one Web native, real-time transport protocol (WebSocket)
//! allows WAMP to be used for the entire messaging requirements of component and microservice based applications, reducing technology
//!
//! Combining the Publish & Subscribe and routed Remote Procedure Calls in one Web native, real-time transport protocol (WebSocket)
//! allows WAMP to be used for the entire messaging requirements of component and microservice based applications, reducing technology
//! stack complexity and overhead, providing a capable and secure fundament for applications to rely on.
//!
//!
//! [1]: https://crossbario.com/blog/Websocket-Why-What-Can-I-Use-It/
//! [2]: https://crossbario.com/blog/Dissecting-Websocket-Overhead/
//! [3]: https://crossbario.com/blog/Free-Your-Code-Backends-in-the-Browser/
Expand All @@ -90,17 +90,17 @@ use std::{
use futures::{channel::oneshot, Future};
use intmap::IntMap;
use log::{debug, error, info, trace, warn};
use rmp_serde::{Deserializer as RMPDeserializer, Serializer};
use serde::{Deserialize, Serialize};
use url::Url;
use parity_ws::{
connect, util::Token, CloseCode, Error as WSError, ErrorKind as WSErrorKind, Handler,
Handshake, Message as WSMessage, Request, Result as WSResult, Sender,
};
use rmp_serde::{Deserializer as RMPDeserializer, Serializer};
use serde::{Deserialize, Serialize};
use url::Url;

use crate::{
messages::{
CallOptions, ClientRoles, Dict, ErrorDetails, ErrorType, HelloDetails, InvocationDetails,
ClientRoles, Dict, ErrorDetails, ErrorType, HelloDetails, InvocationDetails,
List, MatchingPolicy, Message, PublishOptions, Reason, RegisterOptions, ResultDetails,
SubscribeOptions, WelcomeDetails, YieldOptions, URI,
},
Expand Down Expand Up @@ -203,6 +203,7 @@ trait MessageSender {
impl MessageSender for ConnectionInfo {
fn send_message(&self, message: Message) -> WampResult<()> {
debug!("Sending message {:?} via {}", message, self.protocol);

let send_result = if self.protocol == WAMP_JSON {
// Send the json message
self.sender
Expand Down Expand Up @@ -979,7 +980,7 @@ impl Client {
})
}

/// Unregister procedure
/// Unregister procedure
pub fn unregister(
&mut self,
registration: Registration,
Expand Down Expand Up @@ -1036,6 +1037,18 @@ impl Client {
procedure: URI,
args: Option<List>,
kwargs: Option<Dict>,
) -> Pin<Box<dyn Future<Output = Result<(List, Dict), CallError>>>> {
let empty_options = HashMap::new();
self.call_with_options(procedure, args, kwargs, empty_options)
}

/// Call the procedure with custom options
pub fn call_with_options(
&mut self,
procedure: URI,
args: Option<List>,
kwargs: Option<Dict>,
options: Dict,
) -> Pin<Box<dyn Future<Output = Result<(List, Dict), CallError>>>> {
info!("Calling {:?} with {:?} | {:?}", procedure, args, kwargs);

Expand All @@ -1047,14 +1060,8 @@ impl Client {

info.call_requests.insert(request_id, complete);

info.send_message(Message::Call(
request_id,
CallOptions::new(),
procedure,
args,
kwargs,
))
.unwrap();
info.send_message(Message::Call(request_id, options, procedure, args, kwargs))
.unwrap();

Box::pin(async {
receiver.await.unwrap_or(Err(CallError {
Expand Down Expand Up @@ -1100,7 +1107,7 @@ impl Client {
})
}

/// Disconnect from router gracefully
/// Disconnect from router gracefully
pub fn shutdown(&mut self) -> Pin<Box<dyn Future<Output = Result<(), CallError>>>> {
let mut info = self.connection_info.lock().unwrap();

Expand Down
10 changes: 5 additions & 5 deletions src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub enum Message {
Registered(ID, ID),
Unregister(ID, ID),
Unregistered(ID),
Call(ID, CallOptions, URI, Option<List>, Option<Dict>),
Call(ID, Dict, URI, Option<List>, Option<Dict>),
Invocation(ID, ID, InvocationDetails, Option<List>, Option<Dict>),
Yield(ID, YieldOptions, Option<List>, Option<Dict>),
Result(ID, ResultDetails, Option<List>, Option<Dict>),
Expand Down Expand Up @@ -542,7 +542,7 @@ mod test {

use super::{
types::{
CallOptions, ClientRoles, ErrorDetails, ErrorType, EventDetails, HelloDetails,
ClientRoles, ErrorDetails, ErrorType, EventDetails, HelloDetails,
InvocationDetails, PublishOptions, Reason, RegisterOptions, ResultDetails, RouterRoles,
SubscribeOptions, Value, WelcomeDetails, YieldOptions, URI,
},
Expand Down Expand Up @@ -794,7 +794,7 @@ mod test {
two_way_test!(
Message::Call(
7_814_135,
CallOptions::new(),
HashMap::new(),
URI::new("com.myapp.ping"),
None,
None
Expand All @@ -805,7 +805,7 @@ mod test {
two_way_test!(
Message::Call(
764_346,
CallOptions::new(),
HashMap::new(),
URI::new("com.myapp.echo"),
Some(vec![Value::String("a value".to_string())]),
None
Expand All @@ -820,7 +820,7 @@ mod test {
two_way_test!(
Message::Call(
764_346,
CallOptions::new(),
HashMap::new(),
URI::new("com.myapp.compute"),
Some(Vec::new()),
Some(kwargs)
Expand Down
9 changes: 0 additions & 9 deletions src/messages/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ pub struct RegisterOptions {
pub invocation_policy: InvocationPolicy,
}

#[derive(PartialEq, Debug, Default, Serialize, Deserialize)]
pub struct CallOptions {}

#[derive(PartialEq, Debug, Default, Serialize, Deserialize)]
pub struct YieldOptions {}

Expand Down Expand Up @@ -147,12 +144,6 @@ impl RegisterOptions {
}
}

impl CallOptions {
pub fn new() -> CallOptions {
CallOptions {}
}
}

impl YieldOptions {
pub fn new() -> YieldOptions {
YieldOptions {}
Expand Down
4 changes: 2 additions & 2 deletions src/router/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use log::{debug, info};

use crate::{
messages::{
CallOptions, ErrorType, InvocationDetails, Message, Reason, RegisterOptions, ResultDetails,
ErrorType, InvocationDetails, Message, Reason, RegisterOptions, ResultDetails,
YieldOptions, URI,
},
Dict, Error, ErrorKind, List, MatchingPolicy, WampResult, ID,
Expand Down Expand Up @@ -107,7 +107,7 @@ impl ConnectionHandler {
pub fn handle_call(
&mut self,
request_id: ID,
_options: CallOptions,
_options: Dict,
procedure: URI,
args: Option<List>,
kwargs: Option<Dict>,
Expand Down