diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index a0b85c894d5..66b9ff5880a 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -147,7 +147,10 @@ use core::num::NonZero; use ethexe_common::{ CodeAndIdUnchecked, ProgramStates, Schedule, ecdsa::VerifiedData, - events::{BlockRequestEvent, MirrorRequestEvent, mirror::MessageQueueingRequestedEvent}, + events::{ + BlockRequestEvent, MirrorRequestEvent, + mirror::{ExecutableBalanceTopUpRequestedEvent, MessageQueueingRequestedEvent}, + }, gear::Message, injected::InjectedTransaction, }; @@ -175,6 +178,7 @@ mod thread_pool; // Default amount of programs in one chunk to be processed in parallel. pub const DEFAULT_CHUNK_SIZE: NonZero = NonZero::new(16).unwrap(); +const EXECUTION_FOR_REPLY_TOP_UP_VALUE_PER_GAS: u128 = 100; #[derive(thiserror::Error, Debug)] pub enum ProcessorError { @@ -470,6 +474,7 @@ pub struct ExecutableDataForReply { pub payload: Vec, pub value: u128, pub gas_allowance: u64, + pub with_top_up: bool, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -497,6 +502,7 @@ impl OverlaidProcessor { payload, value, gas_allowance, + with_top_up, } = executable; let known_programs = program_states.keys().copied().collect::>(); @@ -518,22 +524,34 @@ impl OverlaidProcessor { let transitions = InBlockTransitions::new(height, program_states, Schedule::default()); - let transitions = self.0.handle_injected_and_events( - transitions, - vec![], - vec![BlockRequestEvent::Mirror { + let mut events = Vec::with_capacity(if with_top_up { 2 } else { 1 }); + + if with_top_up { + events.push(BlockRequestEvent::Mirror { actor_id: program_id, - event: MirrorRequestEvent::MessageQueueingRequested( - MessageQueueingRequestedEvent { - id: MessageId::zero(), - source, - payload: payload.clone(), - value, - call_reply: true, + event: MirrorRequestEvent::ExecutableBalanceTopUpRequested( + ExecutableBalanceTopUpRequestedEvent { + value: u128::from(gas_allowance) + .saturating_mul(EXECUTION_FOR_REPLY_TOP_UP_VALUE_PER_GAS), }, ), - }], - )?; + }); + } + + events.push(BlockRequestEvent::Mirror { + actor_id: program_id, + event: MirrorRequestEvent::MessageQueueingRequested(MessageQueueingRequestedEvent { + id: MessageId::zero(), + source, + payload: payload.clone(), + value, + call_reply: true, + }), + }); + + let transitions = self + .0 + .handle_injected_and_events(transitions, vec![], events)?; let transitions = OverlaidRunContext::new( self.0.db.clone(), diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index 3e4791f2599..38b065f6e6c 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -5,7 +5,7 @@ use crate::*; use anyhow::{Result, anyhow}; use ethexe_common::{ DEFAULT_BLOCK_GAS_LIMIT, OUTGOING_MESSAGES_SOFT_LIMIT, PROGRAM_MODIFICATIONS_SOFT_LIMIT, - PrivateKey, ScheduledTask, SignedMessage, + PrivateKey, ScheduledTask, SignedMessage, StateHashWithQueueSize, db::*, events::{ BlockRequestEvent, MirrorRequestEvent, RouterRequestEvent, @@ -14,7 +14,10 @@ use ethexe_common::{ }, mock::*, }; -use ethexe_runtime_common::{RUNTIME_ID, WAIT_UP_TO_SAFE_DURATION, state::MessageQueue}; +use ethexe_runtime_common::{ + RUNTIME_ID, WAIT_UP_TO_SAFE_DURATION, + state::{MessageQueue, Storage}, +}; use gear_core::{ ids::prelude::CodeIdExt, message::{ErrorReplyReason, ReplyCode, SuccessReplyReason}, @@ -1162,6 +1165,7 @@ async fn overlay_execution() { payload: demo_async::Command::Common.encode(), value: 0, gas_allowance: DEFAULT_BLOCK_GAS_LIMIT, + with_top_up: false, }; let reply_result = overlaid_processor .execute_for_reply(executable) @@ -1255,6 +1259,7 @@ async fn overlay_execution_returns_messages_sent_to_users() { payload: b"PING".to_vec(), value: 0, gas_allowance: DEFAULT_BLOCK_GAS_LIMIT, + with_top_up: false, }) .await .unwrap(); @@ -1268,6 +1273,120 @@ async fn overlay_execution_returns_messages_sent_to_users() { assert!(message.reply_details.is_none()); } +#[tokio::test] +async fn overlay_execution_with_top_up_works_for_depleted_programs() { + init_logger(); + + let (mut processor, chain, [code_id]) = + setup_test_env_and_load_codes([demo_ping::WASM_BINARY]).await; + let block1 = chain.blocks[1].to_simple(); + let user_id = ActorId::from(10); + let actor_id = ActorId::from(0x10000); + + let mut handler = setup_handler(processor.db.clone(), block1.header.height); + handler + .handle_router_event(RouterRequestEvent::ProgramCreated(ProgramCreatedEvent { + actor_id, + code_id, + })) + .expect("failed to create new program"); + handler + .handle_mirror_event( + actor_id, + MirrorRequestEvent::ExecutableBalanceTopUpRequested( + ExecutableBalanceTopUpRequestedEvent { + value: 350_000_000_000, + }, + ), + ) + .expect("failed to top up balance"); + handler + .handle_mirror_event( + actor_id, + MirrorRequestEvent::MessageQueueingRequested(MessageQueueingRequestedEvent { + id: MessageId::from(1), + source: user_id, + payload: vec![], + value: 0, + call_reply: false, + }), + ) + .expect("failed to queue init"); + + let transitions = processor + .process_queues( + handler.transitions, + block1.header.height, + block1.header.timestamp, + DEFAULT_BLOCK_GAS_LIMIT, + None, + ) + .await + .expect("failed to initialize program"); + processor.db.set_program_code_id(actor_id, code_id); + let FinalizedBlockTransitions { states, .. } = transitions.finalize(); + + let original_state = states + .get(&actor_id) + .copied() + .expect("initialized program state"); + let mut depleted_program_state = processor + .db + .program_state(original_state.hash) + .expect("program state in database"); + depleted_program_state.executable_balance = 0; + let depleted_hash = processor.db.write_program_state(depleted_program_state); + let mut depleted_states = states.clone(); + depleted_states.insert( + actor_id, + StateHashWithQueueSize { + hash: depleted_hash, + canonical_queue_size: original_state.canonical_queue_size, + injected_queue_size: original_state.injected_queue_size, + }, + ); + + let block2 = chain.blocks[2].to_simple(); + let executable = ExecutableDataForReply { + height: block2.header.height, + timestamp: block2.header.timestamp, + program_states: depleted_states.clone(), + source: user_id, + program_id: actor_id, + payload: b"PING".to_vec(), + value: 0, + gas_allowance: DEFAULT_BLOCK_GAS_LIMIT, + with_top_up: false, + }; + let reply_without_top_up = processor + .clone() + .overlaid() + .execute_for_reply(executable) + .await + .expect("overlay execution without top-up returns an error reply"); + assert!(reply_without_top_up.reply.code.is_error()); + + let reply_with_top_up = processor + .clone() + .overlaid() + .execute_for_reply(ExecutableDataForReply { + height: block2.header.height, + timestamp: block2.header.timestamp, + program_states: depleted_states, + source: user_id, + program_id: actor_id, + payload: b"PING".to_vec(), + value: 0, + gas_allowance: DEFAULT_BLOCK_GAS_LIMIT, + with_top_up: true, + }) + .await + .expect("overlay execution with top-up succeeds"); + + assert_eq!(reply_with_top_up.reply.payload, b"PONG"); + assert!(reply_with_top_up.reply.code.is_success()); +} + #[tokio::test] async fn injected_ping_pong() { init_logger(); diff --git a/ethexe/rpc/src/apis/program.rs b/ethexe/rpc/src/apis/program.rs index ddf2ce057c3..3431dc3edfb 100644 --- a/ethexe/rpc/src/apis/program.rs +++ b/ethexe/rpc/src/apis/program.rs @@ -58,6 +58,7 @@ pub trait Program { program_id: H160, payload: Bytes, value: u128, + with_top_up: Option, ) -> jsonrpsee::core::RpcResult; #[method(name = "program_ids")] @@ -135,6 +136,7 @@ impl ProgramServer for ProgramApi { program_id: H160, payload: Bytes, value: u128, + with_top_up: Option, ) -> jsonrpsee::core::RpcResult { let mb_hash = utils::latest_computed_mb(&self.db)?; let block = utils::block_at_or_latest_synced(&self.db, None)?; @@ -151,6 +153,7 @@ impl ProgramServer for ProgramApi { payload: payload.to_vec(), value, gas_allowance: self.gas_allowance, + with_top_up: with_top_up.unwrap_or(false), }; // TODO (breathx): spawn in a new thread and catch panics. (?) Generally catch runtime panics (?). diff --git a/ethexe/sdk/src/mirror.rs b/ethexe/sdk/src/mirror.rs index dcafcc56f26..9ad9d9e2d5f 100644 --- a/ethexe/sdk/src/mirror.rs +++ b/ethexe/sdk/src/mirror.rs @@ -108,7 +108,16 @@ impl<'a> Mirror<'a> { payload: impl AsRef<[u8]>, value: u128, ) -> Result { - self.calculate_reply_for_handle_at(payload, value, None) + self.calculate_reply_for_handle_at(payload, value, None, false) + .await + } + + pub async fn calculate_reply_for_handle_with_top_up( + &self, + payload: impl AsRef<[u8]>, + value: u128, + ) -> Result { + self.calculate_reply_for_handle_at(payload, value, None, true) .await } @@ -117,6 +126,7 @@ impl<'a> Mirror<'a> { payload: impl AsRef<[u8]>, value: u128, at: Option, + with_top_up: bool, ) -> Result { let sender_address = self.api.ethereum_client.sender_address(); let source: ActorId = sender_address.into(); @@ -129,6 +139,7 @@ impl<'a> Mirror<'a> { destination.to_address_lossy(), payload.as_ref().to_vec().into(), value, + Some(with_top_up), ) .map_err(Into::into) .await