diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index cb50920e126..b6cff813b7f 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -265,7 +265,7 @@ mod tests { #[test] fn ensure_types_unchanged() { const EXPECTED_TYPE_INFO_HASH: &str = - "d43d8ab319fb6d934231dba55950c9825e28c6ecf603e8076a90e0cab3855671"; + "bc1ef49ee0cb886020f6cb0d88f5259df238917ee8d49f8df2404db0da7417fa"; let types = [ meta_type::(), diff --git a/ethexe/common/src/malachite.rs b/ethexe/common/src/malachite.rs index 0e7e50a112f..ea21122522b 100644 --- a/ethexe/common/src/malachite.rs +++ b/ethexe/common/src/malachite.rs @@ -42,7 +42,7 @@ pub enum Transaction { Injected(SignedInjectedTransaction), } -/// Placeholder; shape firms up once executor plumbing lands. +/// Reserved per-MB limits for scheduled task progress. #[derive(Clone, Debug, Default, PartialEq, Eq, Encode, Decode, TypeInfo)] #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] pub struct ProgressTasksLimits {} diff --git a/ethexe/malachite/core/src/streaming.rs b/ethexe/malachite/core/src/streaming.rs index 7f9d3c37778..b310fd492c0 100644 --- a/ethexe/malachite/core/src/streaming.rs +++ b/ethexe/malachite/core/src/streaming.rs @@ -11,7 +11,7 @@ use std::{ cmp::Ordering, - collections::{BTreeMap, BinaryHeap, HashSet}, + collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet}, }; use parity_scale_codec::{Decode, Encode, Error as CodecError, Input, Output}; @@ -26,6 +26,12 @@ use crate::{ types::Address, }; +const MAX_STREAM_MESSAGES: u64 = 16; +const MAX_STREAMS_PER_PEER: usize = 64; +const MAX_STREAMS_TOTAL: usize = 1024; + +type StreamKey = (PeerId, StreamId); + /// Min-heap wrapper that orders `StreamMessage`s by ascending sequence. struct MinSeq(StreamMessage); @@ -83,13 +89,15 @@ struct StreamState { buffer: MinHeap, init_info: Option, seen_sequences: HashSet, - total_messages: usize, - fin_received: bool, + total_messages: Option, } impl StreamState { fn is_done(&self) -> bool { - self.init_info.is_some() && self.fin_received && self.buffer.len() == self.total_messages + self.init_info.is_some() + && self + .total_messages + .is_some_and(|total| self.buffer.len() == total) } fn insert(&mut self, msg: StreamMessage) -> Option { @@ -97,8 +105,7 @@ impl StreamState { self.init_info = msg.content.as_data().and_then(|p| p.as_init()).cloned(); } if msg.is_fin() { - self.fin_received = true; - self.total_messages = msg.sequence as usize + 1; + self.total_messages = Some(msg.sequence as usize + 1); } self.buffer.push(msg); if self.is_done() { @@ -172,13 +179,13 @@ impl ProposalParts { } } -// TODO: #5473 `PartStreamsMap` has no per-peer cap, no total cap, and no -// eviction for streams that never receive a valid `Fin`. Pinned by the -// (ignored) regression test -// `streaming::tests::part_streams_map_grows_unbounded_under_fin_sequence_attack`. #[derive(Default)] pub struct PartStreamsMap { - streams: BTreeMap<(PeerId, StreamId), StreamState>, + streams: BTreeMap, + peer_streams: BTreeMap, + recencies: BTreeMap, + recency_order: BTreeSet<(u64, PeerId, StreamId)>, + next_recency: u64, } impl PartStreamsMap { @@ -186,6 +193,11 @@ impl PartStreamsMap { Self::default() } + #[cfg(test)] + pub fn len(&self) -> usize { + self.streams.len() + } + /// Insert a part. Returns `Some(parts)` once the stream is /// complete (all parts seen + Fin received). Subsequent calls for /// the same `(peer, stream)` after completion return `None` — the @@ -195,20 +207,87 @@ impl PartStreamsMap { peer_id: PeerId, msg: StreamMessage, ) -> Option { - let stream_id = msg.stream_id.clone(); - let state = self - .streams - .entry((peer_id, stream_id.clone())) - .or_default(); - if !state.seen_sequences.insert(msg.sequence) { + let key = (peer_id, msg.stream_id.clone()); + if msg.sequence >= MAX_STREAM_MESSAGES { + self.remove_stream(&key); return None; } - let result = state.insert(msg); - if state.is_done() { - self.streams.remove(&(peer_id, stream_id)); + if !self.streams.contains_key(&key) { + self.make_room_for_new_stream(peer_id); + } + + let result = { + let state = self.streams.entry(key.clone()).or_insert_with(|| { + *self.peer_streams.entry(peer_id).or_default() += 1; + StreamState::default() + }); + if !state.seen_sequences.insert(msg.sequence) { + return None; + } + state.insert(msg) + }; + if result.is_some() { + self.remove_stream(&key); + } else { + self.touch_stream(&key); } result } + + fn make_room_for_new_stream(&mut self, peer_id: PeerId) { + if self + .peer_streams + .get(&peer_id) + .is_some_and(|count| *count >= MAX_STREAMS_PER_PEER) + && let Some(key) = self.oldest_stream_for_peer(peer_id) + { + self.remove_stream(&key); + } + + if self.streams.len() >= MAX_STREAMS_TOTAL + && let Some((_, peer_id, stream_id)) = self.recency_order.iter().next().cloned() + { + self.remove_stream(&(peer_id, stream_id)); + } + } + + fn oldest_stream_for_peer(&self, peer_id: PeerId) -> Option { + self.recency_order + .iter() + .find(|(_, candidate_peer, _)| *candidate_peer == peer_id) + .map(|(_, peer_id, stream_id)| (*peer_id, stream_id.clone())) + } + + fn touch_stream(&mut self, key: &StreamKey) { + let recency = self.next_recency; + self.next_recency = self + .next_recency + .checked_add(1) + .expect("stream recency counter overflowed"); + + if let Some(old_recency) = self.recencies.insert(key.clone(), recency) { + self.recency_order + .remove(&(old_recency, key.0, key.1.clone())); + } + self.recency_order.insert((recency, key.0, key.1.clone())); + } + + fn remove_stream(&mut self, key: &StreamKey) { + if self.streams.remove(key).is_none() { + return; + } + + if let Some(recency) = self.recencies.remove(key) { + self.recency_order.remove(&(recency, key.0, key.1.clone())); + } + + if let Some(count) = self.peer_streams.get_mut(&key.0) { + *count -= 1; + if *count == 0 { + self.peer_streams.remove(&key.0); + } + } + } } #[cfg(test)] @@ -256,6 +335,22 @@ mod tests { StreamMessage::new(stream_id, seq, StreamContent::Fin) } + fn fill_global_cap(map: &mut PartStreamsMap, start_stream: u64) { + let mut stream = start_stream; + for peer_byte in 2..=250 { + for _ in 0..MAX_STREAMS_PER_PEER { + if map.len() == MAX_STREAMS_TOTAL { + return; + } + let p = peer_id(peer_byte); + let s = sid(stream); + assert!(map.insert(p, msg(s, 0, init_part(stream))).is_none()); + stream += 1; + } + } + assert_eq!(map.len(), MAX_STREAMS_TOTAL); + } + #[test] fn complete_in_order_assembles() { let mut map = PartStreamsMap::new(); @@ -273,6 +368,28 @@ mod tests { assert_eq!(done.data_block_bytes(), Some(&b"hello"[..])); } + #[test] + fn completed_stream_releases_slot() { + let mut map = PartStreamsMap::new(); + let p = peer_id(1); + let s = sid(11); + + assert!(map.insert(p, msg(s.clone(), 0, init_part(11))).is_none()); + assert!( + map.insert(p, msg(s.clone(), 1, data_part(b"done"))) + .is_none() + ); + assert!(map.insert(p, fin_msg(s.clone(), 2)).is_some()); + + assert!( + !map.streams.contains_key(&(p, s)), + "completed stream must be removed from PartStreamsMap" + ); + assert_eq!(map.streams.len(), 0); + assert!(map.recencies.is_empty()); + assert!(map.recency_order.is_empty()); + } + #[test] fn complete_out_of_order_assembles() { let mut map = PartStreamsMap::new(); @@ -313,43 +430,152 @@ mod tests { assert!(map.insert(p, fin_msg(s2.clone(), 2)).is_none()); } - /// REPRODUCES: a single peer can grow `PartStreamsMap` without - /// bound by either (a) opening fresh `stream_id`s and never sending - /// `Fin`, or (b) sending a `Fin` with a `sequence` far above any - /// part it actually delivers so the `total_messages == buffer.len()` - /// gate is unreachable. #[test] - #[ignore = "tracks issue #5473 in streaming.rs: unbounded PartStreamsMap"] - fn part_streams_map_grows_unbounded_under_fin_sequence_attack() { + fn per_peer_cap_evicts_oldest_and_accepts_new_stream() { let mut map = PartStreamsMap::new(); let p = peer_id(1); - // Attack A: a peer opens many streams and never finalises. - // 100 distinct stream_ids, each with Init + Data but no Fin. - for stream_idx in 0..100u64 { + let first = sid(0xA000_0000); + for stream_idx in 0..MAX_STREAMS_PER_PEER as u64 { let s = sid(0xA000_0000 + stream_idx); assert!(map.insert(p, msg(s.clone(), 0, init_part(1))).is_none()); - assert!(map.insert(p, msg(s, 1, data_part(b"x"))).is_none()); } - // Attack B: cheaper still — one message per stream, Fin with a - // far-future sequence. `total_messages` becomes - // `u64::MAX as usize + 1` (wraps to 0 in release, panics in - // debug), but the `is_done` gate `buffer.len() == total_messages` - // is unreachable for any sane traffic. 100 more streams. - for stream_idx in 0..100u64 { - let s = sid(0xB000_0000 + stream_idx); - assert!(map.insert(p, fin_msg(s, u64::MAX / 2)).is_none()); + assert_eq!(map.streams.len(), MAX_STREAMS_PER_PEER); + assert_eq!(map.peer_streams.get(&p), Some(&MAX_STREAMS_PER_PEER)); + + let newest = sid(0xB000_0000); + assert!( + map.insert(p, msg(newest.clone(), 0, init_part(1))) + .is_none() + ); + + assert_eq!(map.streams.len(), MAX_STREAMS_PER_PEER); + assert!(!map.streams.contains_key(&(p, first))); + assert!(map.streams.contains_key(&(p, newest.clone()))); + + assert!( + map.insert(p, msg(newest.clone(), 1, data_part(b"new"))) + .is_none() + ); + assert!(map.insert(p, fin_msg(newest.clone(), 2)).is_some()); + assert!(!map.streams.contains_key(&(p, newest))); + assert_eq!(map.peer_streams.get(&p), Some(&(MAX_STREAMS_PER_PEER - 1))); + } + + #[test] + fn global_cap_evicts_oldest_and_stays_bounded() { + let mut map = PartStreamsMap::new(); + let first_peer = peer_id(1); + let first_stream = sid(10); + + assert!( + map.insert(first_peer, msg(first_stream.clone(), 0, init_part(10))) + .is_none() + ); + fill_global_cap(&mut map, 1_000); + assert_eq!(map.len(), MAX_STREAMS_TOTAL); + + let new_peer = peer_id(251); + let new_stream = sid(20_000); + assert!( + map.insert(new_peer, msg(new_stream.clone(), 0, init_part(20_000))) + .is_none() + ); + + assert_eq!(map.len(), MAX_STREAMS_TOTAL); + assert!(!map.streams.contains_key(&(first_peer, first_stream))); + assert!(map.streams.contains_key(&(new_peer, new_stream))); + } + + #[test] + fn valid_parts_refresh_existing_stream_recency() { + let mut map = PartStreamsMap::new(); + let p = peer_id(1); + let refreshed = sid(50); + let stale = sid(51); + + assert!( + map.insert(p, msg(refreshed.clone(), 0, init_part(50))) + .is_none() + ); + assert!( + map.insert(p, msg(stale.clone(), 0, init_part(51))) + .is_none() + ); + assert!( + map.insert(p, msg(refreshed.clone(), 1, data_part(b"fresh"))) + .is_none() + ); + + fill_global_cap(&mut map, 2_000); + + let new_peer = peer_id(251); + let new_stream = sid(30_000); + assert!( + map.insert(new_peer, msg(new_stream.clone(), 0, init_part(30_000))) + .is_none() + ); + + assert!(map.streams.contains_key(&(p, refreshed))); + assert!(!map.streams.contains_key(&(p, stale))); + assert!(map.streams.contains_key(&(new_peer, new_stream))); + } + + #[test] + fn malformed_far_future_fin_evicts_only_its_stream() { + let mut map = PartStreamsMap::new(); + let p = peer_id(1); + let s = sid(30); + let other = sid(31); + + assert!(map.insert(p, msg(s.clone(), 0, init_part(30))).is_none()); + assert!( + map.insert(p, msg(other.clone(), 0, init_part(31))) + .is_none() + ); + assert!(map.streams.contains_key(&(p, s.clone()))); + assert!( + map.insert(p, fin_msg(s.clone(), MAX_STREAM_MESSAGES)) + .is_none() + ); + + assert!(!map.streams.contains_key(&(p, s))); + assert!(map.streams.contains_key(&(p, other.clone()))); + assert_eq!(map.peer_streams.get(&p), Some(&1)); + assert!(map.recencies.contains_key(&(p, other))); + } + + #[test] + fn completed_stream_releases_slot_after_cap_eviction() { + let mut map = PartStreamsMap::new(); + let p = peer_id(1); + let first = sid(40); + + assert!( + map.insert(p, msg(first.clone(), 0, init_part(40))) + .is_none() + ); + for stream_idx in 1..MAX_STREAMS_PER_PEER as u64 { + let s = sid(40 + stream_idx); + assert!(map.insert(p, msg(s, 0, init_part(40))).is_none()); } + assert_eq!(map.streams.len(), MAX_STREAMS_PER_PEER); - // Desired behaviour: a single peer cannot hold > a bounded - // number of in-flight stream slots. The exact cap is up to the - // fix, but it must be much smaller than the 200 we just pushed. + let over_cap = sid(10_000); assert!( - map.streams.len() < 200, - "PartStreamsMap grew to {} entries under a single-peer flood — \ - needs per-peer cap + GC for never-finalised / bogus-Fin streams", - map.streams.len(), + map.insert(p, msg(over_cap.clone(), 0, init_part(40))) + .is_none() + ); + assert!(!map.streams.contains_key(&(p, first.clone()))); + assert!(map.streams.contains_key(&(p, over_cap.clone()))); + + assert!( + map.insert(p, msg(over_cap.clone(), 1, data_part(b"ok"))) + .is_none() ); + assert!(map.insert(p, fin_msg(over_cap.clone(), 2)).is_some()); + assert!(!map.streams.contains_key(&(p, over_cap))); + assert_eq!(map.streams.len(), MAX_STREAMS_PER_PEER - 1); } } diff --git a/ethexe/malachite/service/src/externalities.rs b/ethexe/malachite/service/src/externalities.rs index 9a02cd66fb3..dc793b71ace 100644 --- a/ethexe/malachite/service/src/externalities.rs +++ b/ethexe/malachite/service/src/externalities.rs @@ -55,7 +55,7 @@ use ethexe_malachite_core::{Block, Externalities}; use gprimitives::H256; use parity_scale_codec::Encode; use std::{ - collections::VecDeque, + collections::{HashSet, VecDeque}, sync::{Arc, Mutex, RwLock}, }; use tokio::sync::{Notify, mpsc}; @@ -122,11 +122,7 @@ impl Externalities for EthexeExternalities { // Propagate `last_advanced_eb` forward — the latest // `AdvanceTillEthereumBlock` in this MB wins; otherwise we // inherit the parent's value (zero if pre-genesis). - let parent_advanced = if parent.is_zero() { - H256::zero() - } else { - self.db.mb_meta(parent).last_advanced_eb - }; + let parent_advanced = self.parent_last_advanced_eb(parent); let last_advanced = payload .iter() .rev() @@ -139,7 +135,7 @@ impl Externalities for EthexeExternalities { // CAS-store transactions first so the contract — "if // CompactMb exists, transactions are reachable" — holds // unconditionally. - let transactions_hash = self.db.set_transactions(payload.clone()); + let transactions_hash = self.db.set_transactions(payload); self.db.set_mb_compact_block( mb_hash, CompactMb { @@ -227,11 +223,7 @@ impl Externalities for EthexeExternalities { // `parent_hash` is the consensus envelope hash of the parent // (zero for genesis). Use it directly to seed the producer's // `last_advanced_eb` lookup. - let parent_advanced = if parent_mb_hash.is_zero() { - H256::zero() - } else { - self.db.mb_meta(parent_mb_hash).last_advanced_eb - }; + let parent_advanced = self.parent_last_advanced_eb(parent_mb_hash); let (advance, injected) = self.wait_for_proposable_content(parent_advanced).await; @@ -294,7 +286,7 @@ impl Externalities for EthexeExternalities { // MB → the touched-set seed is empty. let mut touched = match advance { Some(advanced_eb) => eb_touched_programs(&self.db, parent_advanced, advanced_eb)?, - None => std::collections::HashSet::new(), + None => HashSet::new(), }; let initial_touched_count = touched.len(); if initial_touched_count > MAX_TOUCHED_PROGRAMS_PER_MB as usize { @@ -384,7 +376,9 @@ impl Externalities for EthexeExternalities { None }; - while let Some(Transaction::Injected(_)) = next { + let mut injected = Vec::new(); + while let Some(Transaction::Injected(signed)) = next { + injected.push(signed); next = iter.next(); } @@ -398,14 +392,17 @@ impl Externalities for EthexeExternalities { // `ProgressTasksLimits` is empty today; when fields are added, // bound them here. - let Some(Transaction::ProcessQueues { limits: pq_limits }) = iter.next() else { + let Some(Transaction::ProcessQueues { + limits: process_queues_limits, + }) = iter.next() + else { warn!("validate: MB shape violation — expected `ProcessQueues` bookend"); return Ok(false); }; - if pq_limits.gas_allowance > crate::MalachiteConfig::DEFAULT_GAS_ALLOWANCE { + if process_queues_limits.gas_allowance > crate::MalachiteConfig::DEFAULT_GAS_ALLOWANCE { warn!( - allowance = pq_limits.gas_allowance, + allowance = process_queues_limits.gas_allowance, cap = crate::MalachiteConfig::DEFAULT_GAS_ALLOWANCE, "validate: ProcessQueues.gas_allowance exceeds protocol cap" ); @@ -417,6 +414,37 @@ impl Externalities for EthexeExternalities { return Ok(false); } + let mut encoded_size = 0usize; + let mut seen_injected_hashes = HashSet::with_capacity(injected.len()); + for signed in &injected { + let tx_size = signed.encoded_size(); + let Some(next_size) = encoded_size.checked_add(tx_size) else { + warn!( + current_size = encoded_size, + tx_size, "validate: injected tx encoded size overflows usize — rejecting MB", + ); + return Ok(false); + }; + if next_size > MAX_INJECTED_TRANSACTIONS_SIZE_PER_MB { + warn!( + encoded_size = next_size, + cap = MAX_INJECTED_TRANSACTIONS_SIZE_PER_MB, + "validate: injected txs exceed per-MB encoded size cap — rejecting MB", + ); + return Ok(false); + } + encoded_size = next_size; + + let tx_hash = signed.data().to_hash(); + if !seen_injected_hashes.insert(tx_hash) { + warn!( + %tx_hash, + "validate: duplicate injected tx within MB — rejecting MB", + ); + return Ok(false); + } + } + // (2) Quarantine + parent-link — single synchronous check. // // Validators never wait for local sync here. The proposer's @@ -435,11 +463,7 @@ impl Externalities for EthexeExternalities { // each early-return below so operators can tune // `post_quarantine_delay` from observability rather than logs. if let Some(advance) = advance { - let parent_advanced = if parent_hash.is_zero() { - H256::zero() - } else { - self.db.mb_meta(parent_hash).last_advanced_eb - }; + let parent_advanced = self.parent_last_advanced_eb(parent_hash); let start_block_hash = self.db.globals().start_block_hash; let Some(chain_head) = *self.chain_head.read().expect("chain_head poisoned") else { @@ -503,10 +527,7 @@ impl Externalities for EthexeExternalities { // No local chain head yet. If the MB carries no injected // txs we can still accept it; otherwise we must abstain // since the checker has no anchor to walk from. - let has_injected = payload - .iter() - .any(|tx| matches!(tx, Transaction::Injected(_))); - if has_injected { + if !injected.is_empty() { warn!("validate: MB carries injected txs but no local chain head — abstaining"); return Ok(false); } @@ -521,10 +542,7 @@ impl Externalities for EthexeExternalities { // Propagating the error upward is the right call: it indicates // local DB corruption, not a peer-side issue. let checker = TxValidityChecker::new_for_mb(self.db.clone(), chain_head, parent_hash)?; - for tx in payload.iter() { - let Transaction::Injected(signed) = tx else { - continue; - }; + for signed in &injected { // `?` inside `check_tx_validity` only fires on local DB // inconsistency (a `latest_states` entry whose `state_hash` // is absent from CAS). Every malicious-tx-data path returns @@ -543,26 +561,17 @@ impl Externalities for EthexeExternalities { } } - // (4) Touched-programs cap (master's #6). Only enforced on - // the validator side — the proposer in `build_block_above` - // already shapes the MB to stay within the cap; this check - // is the participant's guard against a malicious proposer. - // - // Per master: `limit = max(initial_touched.len(), MAX_*)` — - // the proposer can't *avoid* programs already touched by EB - // events, so those set the floor for the cap. We add every - // `Transaction::Injected` destination on top of the EB-touched - // seed and reject if the union exceeds `limit`. + // (4) Touched-programs cap. The EB-touched set is the durable + // floor: the proposer can't avoid programs already touched by + // EB events, and injected destinations must not grow the union + // beyond max(floor, MAX_TOUCHED_PROGRAMS_PER_MB). // - // NOTE: there is no per-MB size cap on the validator side - // (master parity). We rely on the Malachite engine's 1 MiB - // hard cap on the encoded `Block` payload — anything larger - // never reaches `validate_block_above` in the first place. - let parent_advanced = if parent_hash.is_zero() { - H256::zero() - } else { - self.db.mb_meta(parent_hash).last_advanced_eb - }; + // The encoded-size and within-MB duplicate guards above match + // producer-side selection. This participant-side cap keeps a + // malicious proposer from forcing oversized injected batches + // through the executor even when the outer proposal fits + // Malachite's larger block payload limit. + let parent_advanced = self.parent_last_advanced_eb(parent_hash); // `?` here only fires on local DB issues: missing // `mb_program_states` for `latest_computed_mb_hash`, missing // `block_header` on a canonical ancestor of `advance`, or @@ -573,13 +582,11 @@ impl Externalities for EthexeExternalities { // reasoning as the other two `?`s in this function. let mut touched = match advance { Some(advanced_eb) => eb_touched_programs(&self.db, parent_advanced, advanced_eb)?, - None => std::collections::HashSet::new(), + None => HashSet::new(), }; let limit = touched.len().max(MAX_TOUCHED_PROGRAMS_PER_MB as usize); - for tx in payload.iter() { - if let Transaction::Injected(signed) = tx { - touched.insert(signed.data().destination); - } + for signed in &injected { + touched.insert(signed.data().destination); } if touched.len() > limit { warn!( @@ -594,6 +601,14 @@ impl Externalities for EthexeExternalities { } impl EthexeExternalities { + fn parent_last_advanced_eb(&self, parent_hash: H256) -> H256 { + if parent_hash.is_zero() { + H256::zero() + } else { + self.db.mb_meta(parent_hash).last_advanced_eb + } + } + /// True iff `prerequisite.is_zero()` (no prerequisite — genesis /// or pre-advance) or the prerequisite Eth block has been fully /// **prepared** locally. @@ -1559,7 +1574,7 @@ mod tests { let mempool = Arc::new(crate::InjectedTxMempool::new(db.clone())); let _ = mempool.set_chain_head(head); let pk = ethexe_common::PrivateKey::random(); - // Each tx carries the maximum-size payload; the pool is loaded + // Each tx carries a half-max payload; the pool is loaded // with enough of them that two fit but three don't. for (i, dest) in dests.iter().enumerate().take(3) { let tx = ethexe_common::SignedMessage::create( @@ -1604,6 +1619,87 @@ mod tests { ); } + #[tokio::test] + async fn validate_rejects_mb_exceeding_injected_size_cap() { + use ethexe_common::{ + injected::{InjectedTransaction, MAX_INJECTED_TX_PAYLOAD_SIZE}, + mock::{BlockChain, Mock}, + }; + use gprimitives::ActorId; + + let db = Database::memory(); + let chain = BlockChain::mock(2u32).setup(&db); + let head = chain.blocks[2].to_simple(); + let dests: Vec = (0..2u64).map(ActorId::from).collect(); + let parent_mb = setup_mb_with_destinations(&db, chain.mb_hash_at(1), &dests); + + let (ext, _rx) = make_externalities(db); + *ext.chain_head.write().unwrap() = Some(head); + + let pk = ethexe_common::PrivateKey::random(); + let mut transactions = Vec::new(); + for (i, dest) in dests.iter().enumerate() { + let tx = ethexe_common::SignedMessage::create( + pk.clone(), + InjectedTransaction { + destination: *dest, + payload: vec![0u8; MAX_INJECTED_TX_PAYLOAD_SIZE].try_into().unwrap(), + value: 0, + reference_block: chain.blocks[1].hash, + salt: vec![i as u8; 32].try_into().unwrap(), + }, + ) + .unwrap(); + transactions.push(Transaction::Injected(tx)); + } + transactions.push(Transaction::ProgressTasks { + limits: ProgressTasksLimits::default(), + }); + transactions.push(Transaction::ProcessQueues { + limits: ProcessQueuesLimits::default(), + }); + + assert!( + !ext.validate_block_above(parent_mb, Transactions::new(transactions)) + .await + .unwrap(), + "MB whose cumulative injected encoded size exceeds the cap must be rejected" + ); + } + + #[tokio::test] + async fn validate_rejects_within_mb_duplicate_injected_tx() { + use ethexe_common::mock::{BlockChain, Mock}; + use gprimitives::ActorId; + + let db = Database::memory(); + let chain = BlockChain::mock(2u32).setup(&db); + let head = chain.blocks[2].to_simple(); + let dest = ActorId::from([1; 32]); + let parent_mb = setup_mb_with_destinations(&db, chain.mb_hash_at(1), &[dest]); + + let (ext, _rx) = make_externalities(db); + *ext.chain_head.write().unwrap() = Some(head); + + let pk = ethexe_common::PrivateKey::random(); + let tx = signed_injected_tx(&pk, dest, chain.blocks[1].hash, 7); + let payload = Transactions::new(vec![ + Transaction::Injected(tx.clone()), + Transaction::Injected(tx), + Transaction::ProgressTasks { + limits: ProgressTasksLimits::default(), + }, + Transaction::ProcessQueues { + limits: ProcessQueuesLimits::default(), + }, + ]); + + assert!( + !ext.validate_block_above(parent_mb, payload).await.unwrap(), + "MB carrying the same injected tx twice must be rejected" + ); + } + // ------------------------------------------------------------------ // Shape & ordering checks on `validate_block_above`. // @@ -1878,9 +1974,7 @@ mod tests { assert!( !ext.validate_block_above(parent_mb, payload).await.unwrap(), - "MB whose AdvanceTillEthereumBlock regresses parent.last_advanced_eb \ - must be rejected — currently passes because validate_block_above \ - skips the strict-descendant check the producer enforces", + "MB whose AdvanceTillEthereumBlock regresses parent.last_advanced_eb must be rejected", ); }