Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions ethexe/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ use core::num::NonZero;
use ethexe_common::{
CodeAndIdUnchecked, ProgramStates, Schedule,
ecdsa::VerifiedData,
events::{BlockRequestEvent, MirrorRequestEvent, mirror::MessageQueueingRequestedEvent},
events::{
BlockRequestEvent, MirrorRequestEvent,
mirror::{ExecutableBalanceTopUpRequestedEvent, MessageQueueingRequestedEvent},
},
gear::Message,
injected::InjectedTransaction,
};
Expand Down Expand Up @@ -175,6 +178,7 @@ mod thread_pool;

// Default amount of programs in one chunk to be processed in parallel.
pub const DEFAULT_CHUNK_SIZE: NonZero<usize> = NonZero::new(16).unwrap();
const EXECUTION_FOR_REPLY_TOP_UP_VALUE_PER_GAS: u128 = 100;

#[derive(thiserror::Error, Debug)]
pub enum ProcessorError {
Expand Down Expand Up @@ -470,6 +474,7 @@ pub struct ExecutableDataForReply {
pub payload: Vec<u8>,
pub value: u128,
pub gas_allowance: u64,
pub with_top_up: bool,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -497,6 +502,7 @@ impl OverlaidProcessor {
payload,
value,
gas_allowance,
with_top_up,
} = executable;

let known_programs = program_states.keys().copied().collect::<Vec<_>>();
Expand All @@ -518,22 +524,34 @@ impl OverlaidProcessor {

let transitions = InBlockTransitions::new(height, program_states, Schedule::default());

let transitions = self.0.handle_injected_and_events(
transitions,
vec![],
vec![BlockRequestEvent::Mirror {
let mut events = Vec::with_capacity(if with_top_up { 2 } else { 1 });

if with_top_up {
events.push(BlockRequestEvent::Mirror {
actor_id: program_id,
event: MirrorRequestEvent::MessageQueueingRequested(
MessageQueueingRequestedEvent {
id: MessageId::zero(),
source,
payload: payload.clone(),
value,
call_reply: true,
event: MirrorRequestEvent::ExecutableBalanceTopUpRequested(
ExecutableBalanceTopUpRequestedEvent {
value: u128::from(gas_allowance)
.saturating_mul(EXECUTION_FOR_REPLY_TOP_UP_VALUE_PER_GAS),
},
),
}],
)?;
});
}

events.push(BlockRequestEvent::Mirror {
actor_id: program_id,
event: MirrorRequestEvent::MessageQueueingRequested(MessageQueueingRequestedEvent {
id: MessageId::zero(),
source,
payload: payload.clone(),
value,
call_reply: true,
}),
});

let transitions = self
.0
.handle_injected_and_events(transitions, vec![], events)?;

let transitions = OverlaidRunContext::new(
self.0.db.clone(),
Expand Down
123 changes: 121 additions & 2 deletions ethexe/processor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::*;
use anyhow::{Result, anyhow};
use ethexe_common::{
DEFAULT_BLOCK_GAS_LIMIT, OUTGOING_MESSAGES_SOFT_LIMIT, PROGRAM_MODIFICATIONS_SOFT_LIMIT,
PrivateKey, ScheduledTask, SignedMessage,
PrivateKey, ScheduledTask, SignedMessage, StateHashWithQueueSize,
db::*,
events::{
BlockRequestEvent, MirrorRequestEvent, RouterRequestEvent,
Expand All @@ -14,7 +14,10 @@ use ethexe_common::{
},
mock::*,
};
use ethexe_runtime_common::{RUNTIME_ID, WAIT_UP_TO_SAFE_DURATION, state::MessageQueue};
use ethexe_runtime_common::{
RUNTIME_ID, WAIT_UP_TO_SAFE_DURATION,
state::{MessageQueue, Storage},
};
use gear_core::{
ids::prelude::CodeIdExt,
message::{ErrorReplyReason, ReplyCode, SuccessReplyReason},
Expand Down Expand Up @@ -1162,6 +1165,7 @@ async fn overlay_execution() {
payload: demo_async::Command::Common.encode(),
value: 0,
gas_allowance: DEFAULT_BLOCK_GAS_LIMIT,
with_top_up: false,
};
let reply_result = overlaid_processor
.execute_for_reply(executable)
Expand Down Expand Up @@ -1255,6 +1259,7 @@ async fn overlay_execution_returns_messages_sent_to_users() {
payload: b"PING".to_vec(),
value: 0,
gas_allowance: DEFAULT_BLOCK_GAS_LIMIT,
with_top_up: false,
})
.await
.unwrap();
Expand All @@ -1268,6 +1273,120 @@ async fn overlay_execution_returns_messages_sent_to_users() {
assert!(message.reply_details.is_none());
}

#[tokio::test]
async fn overlay_execution_with_top_up_works_for_depleted_programs() {
init_logger();

let (mut processor, chain, [code_id]) =
setup_test_env_and_load_codes([demo_ping::WASM_BINARY]).await;
let block1 = chain.blocks[1].to_simple();
let user_id = ActorId::from(10);
let actor_id = ActorId::from(0x10000);

let mut handler = setup_handler(processor.db.clone(), block1.header.height);
handler
.handle_router_event(RouterRequestEvent::ProgramCreated(ProgramCreatedEvent {
actor_id,
code_id,
}))
.expect("failed to create new program");
handler
.handle_mirror_event(
actor_id,
MirrorRequestEvent::ExecutableBalanceTopUpRequested(
ExecutableBalanceTopUpRequestedEvent {
value: 350_000_000_000,
},
),
)
.expect("failed to top up balance");
handler
.handle_mirror_event(
actor_id,
MirrorRequestEvent::MessageQueueingRequested(MessageQueueingRequestedEvent {
id: MessageId::from(1),
source: user_id,
payload: vec![],
value: 0,
call_reply: false,
}),
)
.expect("failed to queue init");

let transitions = processor
.process_queues(
handler.transitions,
block1.header.height,
block1.header.timestamp,
DEFAULT_BLOCK_GAS_LIMIT,
None,
)
.await
.expect("failed to initialize program");
processor.db.set_program_code_id(actor_id, code_id);
let FinalizedBlockTransitions { states, .. } = transitions.finalize();

let original_state = states
.get(&actor_id)
.copied()
.expect("initialized program state");
let mut depleted_program_state = processor
.db
.program_state(original_state.hash)
.expect("program state in database");
depleted_program_state.executable_balance = 0;
let depleted_hash = processor.db.write_program_state(depleted_program_state);
let mut depleted_states = states.clone();
depleted_states.insert(
actor_id,
StateHashWithQueueSize {
hash: depleted_hash,
canonical_queue_size: original_state.canonical_queue_size,
injected_queue_size: original_state.injected_queue_size,
},
);

let block2 = chain.blocks[2].to_simple();
let executable = ExecutableDataForReply {
height: block2.header.height,
timestamp: block2.header.timestamp,
program_states: depleted_states.clone(),
source: user_id,
program_id: actor_id,
payload: b"PING".to_vec(),
value: 0,
gas_allowance: DEFAULT_BLOCK_GAS_LIMIT,
with_top_up: false,
};
let reply_without_top_up = processor
.clone()
.overlaid()
.execute_for_reply(executable)
.await
.expect("overlay execution without top-up returns an error reply");
assert!(reply_without_top_up.reply.code.is_error());

let reply_with_top_up = processor
.clone()
.overlaid()
.execute_for_reply(ExecutableDataForReply {
height: block2.header.height,
timestamp: block2.header.timestamp,
program_states: depleted_states,
source: user_id,
program_id: actor_id,
payload: b"PING".to_vec(),
value: 0,
gas_allowance: DEFAULT_BLOCK_GAS_LIMIT,
with_top_up: true,
})
.await
.expect("overlay execution with top-up succeeds");

assert_eq!(reply_with_top_up.reply.payload, b"PONG");
assert!(reply_with_top_up.reply.code.is_success());
}

#[tokio::test]
async fn injected_ping_pong() {
init_logger();
Expand Down
3 changes: 3 additions & 0 deletions ethexe/rpc/src/apis/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub trait Program {
program_id: H160,
payload: Bytes,
value: u128,
with_top_up: Option<bool>,
) -> jsonrpsee::core::RpcResult<CalculateReplyForHandleResult>;

#[method(name = "program_ids")]
Expand Down Expand Up @@ -135,6 +136,7 @@ impl ProgramServer for ProgramApi {
program_id: H160,
payload: Bytes,
value: u128,
with_top_up: Option<bool>,
) -> jsonrpsee::core::RpcResult<CalculateReplyForHandleResult> {
let mb_hash = utils::latest_computed_mb(&self.db)?;
let block = utils::block_at_or_latest_synced(&self.db, None)?;
Expand All @@ -151,6 +153,7 @@ impl ProgramServer for ProgramApi {
payload: payload.to_vec(),
value,
gas_allowance: self.gas_allowance,
with_top_up: with_top_up.unwrap_or(false),
};

// TODO (breathx): spawn in a new thread and catch panics. (?) Generally catch runtime panics (?).
Expand Down
13 changes: 12 additions & 1 deletion ethexe/sdk/src/mirror.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,16 @@ impl<'a> Mirror<'a> {
payload: impl AsRef<[u8]>,
value: u128,
) -> Result<CalculateReplyForHandleResult> {
self.calculate_reply_for_handle_at(payload, value, None)
self.calculate_reply_for_handle_at(payload, value, None, false)
.await
}

pub async fn calculate_reply_for_handle_with_top_up(
&self,
payload: impl AsRef<[u8]>,
value: u128,
) -> Result<CalculateReplyForHandleResult> {
self.calculate_reply_for_handle_at(payload, value, None, true)
.await
}

Expand All @@ -117,6 +126,7 @@ impl<'a> Mirror<'a> {
payload: impl AsRef<[u8]>,
value: u128,
at: Option<H256>,
with_top_up: bool,
) -> Result<CalculateReplyForHandleResult> {
let sender_address = self.api.ethereum_client.sender_address();
let source: ActorId = sender_address.into();
Expand All @@ -129,6 +139,7 @@ impl<'a> Mirror<'a> {
destination.to_address_lossy(),
payload.as_ref().to_vec().into(),
value,
Some(with_top_up),
)
.map_err(Into::into)
.await
Expand Down
Loading