diff --git a/CHANGELOG.md b/CHANGELOG.md index 63412d2..96b5f25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,10 @@ The minor version will be incremented upon a breaking change and the patch versi - richat-v10.0.0 +### Fixes + +- richat: fix unknown unsubscribe id handler in pubsub ([#209](https://github.com/lamports-dev/richat/pull/209)) + ### Breaking - richat: remove entries from subscribe_accounts subscription ([#210](https://github.com/lamports-dev/richat/pull/210)) diff --git a/richat/src/pubsub/server.rs b/richat/src/pubsub/server.rs index eb9404f..52279d0 100644 --- a/richat/src/pubsub/server.rs +++ b/richat/src/pubsub/server.rs @@ -24,7 +24,9 @@ use { rt::tokio::{TokioExecutor, TokioIo}, server::conn::auto::Builder as ServerBuilder, }, - jsonrpsee_types::{Extensions, ResponsePayload, TwoPointZero}, + jsonrpsee_types::{ + ErrorCode, ErrorObject, ErrorObjectOwned, Extensions, ResponsePayload, TwoPointZero, + }, richat_shared::jsonrpc::helpers::get_x_subscription_id, solana_nohash_hasher::IntMap, solana_rpc_client_api::response::RpcVersionInfo, @@ -304,16 +306,16 @@ impl PubSubServer { let _ = tx.send(()); }, Ok(WriteRequest::Message { message, tx }) => { - let result = match message.config { + let result: Result = match message.config { SubscribeConfig::GetVersion => { let version = solana_version::Version::default(); - serde_json::to_value(&RpcVersionInfo { + Ok(serde_json::to_value(&RpcVersionInfo { solana_core: version.to_string(), feature_set: Some(version.feature_set), }) - .expect("json serialization never fail") + .expect("json serialization never fail")) }, - SubscribeConfig::GetVersionRichat => VERSION.create_grpc_version_info().value(), + SubscribeConfig::GetVersionRichat => Ok(VERSION.create_grpc_version_info().value()), SubscribeConfig::Unsubscribe { id } => { let (tx, rx) = oneshot::channel(); if clients_tx.send(ClientRequest::Unsubscribe { @@ -333,8 +335,14 @@ impl PubSubServer { ) .decrement(1); } + Ok(true.into()) + } else { + Err(ErrorObject::owned::<()>( + ErrorCode::InvalidParams.code(), + "Invalid subscription id.", + None, + )) } - removed.into() }, config => { let (tx, rx) = oneshot::channel(); @@ -353,13 +361,17 @@ impl PubSubServer { "method" => method.as_str() ) .increment(1); - id.into() + Ok(id.into()) }, }; + let payload = match result { + Ok(value) => ResponsePayload::success(value), + Err(error) => ResponsePayload::error(error), + }; let vec = serde_json::to_vec(&jsonrpsee_types::Response { jsonrpc: Some(TwoPointZero), - payload: ResponsePayload::success(result), + payload, id: message.id, extensions: Extensions::default(), // doesn't matter, as it is not used in serialize }).expect("json serialization never fail"); @@ -464,3 +476,41 @@ fn create_frame_close(frame: Frame) -> Result { Ok(Frame::close_raw(frame.payload.to_owned().into())) } + +#[cfg(test)] +mod tests { + use { + super::*, + jsonrpsee_types::Id, + serde_json::{Value, json}, + }; + + // Agave returns this exact error for unsubscribe with an unknown id. + #[test] + fn unsubscribe_invalid_id_wire_format_matches_agave() { + let error = ErrorObject::owned::<()>( + ErrorCode::InvalidParams.code(), + "Invalid subscription id.", + None, + ); + let vec = serde_json::to_vec(&jsonrpsee_types::Response { + jsonrpc: Some(TwoPointZero), + payload: ResponsePayload::::error(error), + id: Id::Number(42), + extensions: Extensions::default(), + }) + .unwrap(); + let parsed: Value = serde_json::from_slice(&vec).unwrap(); + assert_eq!( + parsed, + json!({ + "jsonrpc": "2.0", + "error": { + "code": -32602, + "message": "Invalid subscription id.", + }, + "id": 42, + }) + ); + } +}