Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 232 additions & 24 deletions ethexe/malachite/service/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,24 @@
//!
//! - Every tx carries `reference_block: H256`. The tx is valid as
//! long as `ref_block.height + VALIDITY_WINDOW > head.height`.
//! - On insert we drop any tx whose `ref_block` is already outside
//! the validity window relative to the latest observed head, or
//! whose `ref_block` is not yet in the database.
//! - On insert we drop a tx only when its `ref_block` is known to
//! the local DB AND already past the validity window. A
//! not-yet-replicated `ref_block` is tolerated — the producer's
//! EB lags the observer by O(seconds) in normal operation, and
//! `fetch` already filters non-ancestors. To make this work
//! during the cold-start window (before the observer's first
//! `set_chain_head` tick) the head height is seeded from the
//! DB's `latest_synced_eb` so the `is_expired` gate is active
//! from process boot.
//! - On fetch we return only txs whose `ref_block` is a canonical
//! ancestor of the given `head`. Non-ancestors are kept — a
//! reorg can make them eligible again.
//! - On forget (finalized MB) we remove the tx from the pool and
//! remember its hash in a seen-hash table. Subsequent inserts
//! of the same tx are rejected. Seen-hashes age out by the
//! same `VALIDITY_WINDOW` rule as pool entries.
//! of the same tx are rejected. `purge_expired` only evicts a
//! `seen` entry when its `ref_block` is known AND past the
//! validity window — mirroring the insert tolerance so the
//! dedup gate survives until the producer's EB catches up.
//!
//! The pool makes heavy use of `ethexe_db::Database::block_header` to
//! resolve `reference_block` into heights and to walk ancestor links;
Expand Down Expand Up @@ -144,13 +152,36 @@ pub const DEFAULT_POOL_CAPACITY: usize = 10_000;
// `VALIDITY_WINDOW` ages them out. Needs a per-sender quota keyed on the
// recovered ECDSA address.

/// Pool entry — the signed tx plus the head height observed when it
/// was inserted. The head height anchors the grace window applied
/// when the tx's `reference_block` doesn't resolve via the local DB:
/// `purge_expired` keeps such an entry while
/// `head_height - inserted_at_head_height < VALIDITY_WINDOW` and
/// evicts it once that age is crossed.
#[derive(Debug)]
struct PoolEntry {
tx: SignedInjectedTransaction,
inserted_at_head_height: u32,
}

/// Seen entry — committed tx ref_block plus the head height observed
/// when `forget` ran. Mirrors [`PoolEntry`]'s grace-window policy
/// for the dedup table.
#[derive(Debug)]
struct SeenEntry {
ref_block: H256,
seen_at_head_height: u32,
}

/// Pool state behind a single mutex — operations are short, contention low.
#[derive(Debug, Default)]
struct Inner {
pool: HashMap<HashOf<InjectedTransaction>, SignedInjectedTransaction>,
/// Recently committed txs (tx_hash → ref_block) for dedup. Aged out with the validity window.
seen: HashMap<HashOf<InjectedTransaction>, H256>,
pool: HashMap<HashOf<InjectedTransaction>, PoolEntry>,
/// Recently committed txs (tx_hash → [`SeenEntry`]) for dedup. Aged out with the validity window.
seen: HashMap<HashOf<InjectedTransaction>, SeenEntry>,
/// Latest chain head height — drives age-out of pool/seen entries.
/// Seeded from `db.globals().latest_synced_eb` at construction so
/// the `is_expired` gate is active during the cold-start window.
latest_head_height: Option<u32>,
}

Expand All @@ -169,8 +200,18 @@ impl InjectedTxMempool {
}

pub fn with_capacity(db: Database, capacity: usize) -> Self {
// Seed `latest_head_height` from the DB's last-synced EB so the
// `is_expired` gate in `insert` is active during the cold-start
// window — between process boot and the observer's first
// `set_chain_head` tick. Without this, fast-sync nodes accept
// arbitrarily-old txs that the very next chain-head advance
// would purge, misleading RPC clients with a hollow `Accept`.
let initial_head_height = db.globals().latest_synced_eb.header.height;
Self {
inner: Mutex::new(Inner::default()),
inner: Mutex::new(Inner {
latest_head_height: Some(initial_head_height),
..Default::default()
}),
db,
capacity,
new_tx_notify: Arc::new(Notify::new()),
Expand Down Expand Up @@ -229,38 +270,72 @@ impl InjectedTxMempool {

/// Evict pool entries and seen-hashes whose `reference_block` has
/// aged out relative to `head_height`.
///
/// Eviction policy per entry:
/// - `ref_block` known AND past validity window → evict (canonical
/// expiry).
/// - `ref_block` unknown to the local DB AND the entry has lived
/// at least `VALIDITY_WINDOW` blocks since it was inserted
/// (grace window expired) → evict. Bounded back-pressure for
/// txs whose ref_block never lands or is bogus.
/// - Otherwise → keep. Mirrors `insert`'s best-effort tolerance for
/// not-yet-replicated ref_blocks; without this, a lagging
/// observer would silently purge txs the local RPC just
/// `Accept`ed, and would break the `forget`→`seen` dedup gate
/// for committed txs whose ref_block hasn't replicated.
fn purge_expired(inner: &mut Inner, head_height: u32, db: &Database) {
inner.pool.retain(|tx_hash, tx| {
let ref_block = tx.data().reference_block;
inner.pool.retain(|tx_hash, entry| {
let ref_block = entry.tx.data().reference_block;
match db.block_header(ref_block).map(|h| h.height) {
Some(h) if !Self::is_expired(head_height, h) => true,
Some(h) => {
Some(h) if Self::is_expired(head_height, h) => {
trace!(
%tx_hash, %ref_block, ref_height = h, head_height,
"dropping expired tx from pool",
);
false
}
None => {
Some(_) => true,
None if Self::grace_expired(head_height, entry.inserted_at_head_height) => {
trace!(
%tx_hash, %ref_block,
"dropping tx with unknown ref_block from pool",
inserted_at_head_height = entry.inserted_at_head_height,
head_height,
"dropping tx with unresolved ref_block from pool — grace window expired",
);
false
}
None => true,
}
});

inner.seen.retain(|tx_hash, ref_block| {
match db.block_header(*ref_block).map(|h| h.height) {
Some(h) if !Self::is_expired(head_height, h) => true,
_ => {
trace!(%tx_hash, ref_block = %ref_block, "dropping expired seen-hash");
inner.seen.retain(|tx_hash, entry| {
match db.block_header(entry.ref_block).map(|h| h.height) {
Some(h) if Self::is_expired(head_height, h) => {
trace!(%tx_hash, ref_block = %entry.ref_block, "dropping expired seen-hash");
false
}
Some(_) => true,
None if Self::grace_expired(head_height, entry.seen_at_head_height) => {
trace!(
%tx_hash,
ref_block = %entry.ref_block,
seen_at_head_height = entry.seen_at_head_height,
head_height,
"dropping seen-hash with unresolved ref_block — grace window expired",
);
false
}
None => true,
}
});
}

/// Grace-window check for entries whose `reference_block` is not
/// (yet) in the local DB. Mirrors [`Self::is_expired`]'s comparison
/// shape against the entry's insertion-time head height.
fn grace_expired(head_height: u32, inserted_at_head_height: u32) -> bool {
head_height.saturating_sub(inserted_at_head_height) >= VALIDITY_WINDOW as u32
}
}

#[async_trait]
Expand Down Expand Up @@ -345,8 +420,19 @@ impl Mempool for InjectedTxMempool {
return Err(MempoolInsertError::PoolFull);
}

// Stamp the insertion head height so `purge_expired` can apply
// a bounded grace window to entries whose `ref_block` never
// resolves (lagging observer / bogus client input). Cold-start
// gets the DB's `latest_synced_eb` height via `with_capacity`.
let inserted_at_head_height = inner.latest_head_height.unwrap_or(0);
let pool_len_after = inner.pool.len() + 1;
inner.pool.insert(tx_hash, tx);
inner.pool.insert(
tx_hash,
PoolEntry {
tx,
inserted_at_head_height,
},
);
info!(
%tx_hash,
%ref_block,
Expand Down Expand Up @@ -382,8 +468,8 @@ impl Mempool for InjectedTxMempool {
let result: Vec<_> = inner
.pool
.values()
.filter(|tx| ancestors.contains(&tx.data().reference_block))
.cloned()
.filter(|entry| ancestors.contains(&entry.tx.data().reference_block))
.map(|entry| entry.tx.clone())
.collect();
info!(
head_hash = %head.hash,
Expand All @@ -398,10 +484,17 @@ impl Mempool for InjectedTxMempool {

async fn forget(&self, committed: &[SignedInjectedTransaction]) {
let mut inner = self.inner.lock().expect("poisoned mempool");
let seen_at_head_height = inner.latest_head_height.unwrap_or(0);
for tx in committed {
let tx_hash = tx.data().to_hash();
inner.pool.remove(&tx_hash);
inner.seen.insert(tx_hash, tx.data().reference_block);
inner.seen.insert(
tx_hash,
SeenEntry {
ref_block: tx.data().reference_block,
seen_at_head_height,
},
);
}
}

Expand Down Expand Up @@ -668,6 +761,121 @@ mod tests {
);
}

/// Regression for the cold-start expiry bypass: before the fix the
/// `is_expired` gate in `insert` was guarded on
/// `latest_head_height.is_some()`, so any insert that landed in the
/// window between process boot and the observer's first
/// `set_chain_head` tick skipped the expiry check entirely. RPC
/// returned `Accept` for arbitrarily-old txs that the very next
/// chain-head advance silently purged. The fix seeds
/// `latest_head_height` from `db.globals().latest_synced_eb` at
/// construction so the gate is active from boot.
#[test]
fn cold_start_insert_rejects_expired_ref_block_using_latest_synced_eb() {
let db = Database::memory();
// Post-fast-sync state: a long chain in `block_header` and a
// `latest_synced_eb` pointer set by the observer at some prior
// run. The current process has NOT yet ticked `set_chain_head`.
let chain = linear_chain(&db, (VALIDITY_WINDOW as usize) + 5);
let last = chain.last().expect("chain non-empty");
db.globals_mutate(|g| g.latest_synced_eb = *last);

let pool = InjectedTxMempool::with_capacity(db, 4);
let pk = PrivateKey::random();

// tx anchored at block 1 (height 1). Tip is at height
// `VALIDITY_WINDOW + 4`, so `1 + WINDOW <= tip_height` —
// expired by any sane head proxy.
let expired_tx = signed_tx(&pk, ActorId::zero(), chain[1].hash, 0);
assert!(
matches!(
pool.insert(expired_tx),
Err(MempoolInsertError::ExpiredRefBlock),
),
"cold-start insert must apply `is_expired` using \
`latest_synced_eb` as the head proxy when the observer \
has not yet ticked — otherwise public RPC returns Accept \
for txs that the first `set_chain_head` would purge",
);
assert_eq!(pool.len(), 0);
}

/// Regression for the insert→purge race on a lagging observer: the
/// `insert` path tolerates not-yet-replicated `ref_block`s (the
/// producer's EB lags the observer by O(seconds)). Before the
/// fix, the very next `set_chain_head` ran `purge_expired` which
/// evicted the tx on the `_ => false` arm — orphaning the
/// promise the local RPC just `Accept`ed. The fix keeps such
/// entries within a `VALIDITY_WINDOW`-block grace period.
#[test]
fn purge_expired_keeps_tx_with_unresolved_ref_block_within_grace() {
let db = Database::memory();
let chain = linear_chain(&db, 3);
let pool = InjectedTxMempool::with_capacity(db, 8);
let pk = PrivateKey::random();

// Simulate the lag: client posts a tx anchored at a ref_block
// the producer knows but our observer hasn't synced yet.
let unsynced_ref_block = H256::from([0xCA; 32]);
let tx = signed_tx(&pk, ActorId::zero(), unsynced_ref_block, 0);
pool.insert(tx).expect("insert tolerates unknown ref_block");
assert_eq!(pool.len(), 1);

// The next chain-head advance triggers `purge_expired`. The
// ref_block is still unknown — that's the exact race the
// grace window covers.
pool.set_chain_head(chain[1]);
assert_eq!(
pool.len(),
1,
"tx with not-yet-replicated ref_block must survive the \
very next set_chain_head — grace window of VALIDITY_WINDOW \
blocks. The producer's EB normally lands within seconds.",
);
}

/// Regression for the forget→purge dedup bypass: `forget()`
/// stamps every committed tx into `seen`. Before the fix, if the
/// committed tx's `ref_block` hadn't yet replicated to this
/// validator's DB, the next `set_chain_head` evicted the `seen`
/// entry via `_ => false` — letting the same network-committed
/// tx re-enter the local pool. The grace-window fix retains the
/// `seen` entry for `VALIDITY_WINDOW` blocks past `forget` even
/// when the ref_block is unknown.
#[test]
fn forget_then_purge_keeps_seen_for_unresolved_ref_block_within_grace() {
let db = Database::memory();
let chain = linear_chain(&db, 3);
let pool = InjectedTxMempool::with_capacity(db, 8);
let pk = PrivateKey::random();

// Network committed this tx; our observer hasn't synced its
// ref_block yet.
let unsynced_ref_block = H256::from([0xCA; 32]);
let tx = signed_tx(&pk, ActorId::zero(), unsynced_ref_block, 0);

// process_finalized → forget() for a tx we never pooled
// locally. The `seen` table stamps tx_hash → (ref_block, head=0).
futures::executor::block_on(pool.forget(std::slice::from_ref(&tx)));

// Sanity: dedup gate is active immediately after forget.
assert!(matches!(
pool.insert(tx.clone()),
Err(MempoolInsertError::AlreadyCommitted),
));

// Next chain-head advance fires `purge_expired`. With the
// grace-window fix the seen entry survives — dedup gate
// intact.
pool.set_chain_head(chain[1]);
assert!(
matches!(pool.insert(tx), Err(MempoolInsertError::AlreadyCommitted),),
"forgotten tx with not-yet-replicated ref_block must remain \
in `seen` across the next set_chain_head — otherwise a \
re-submitted committed tx slips back into the local pool",
);
}

#[test]
fn forget_moves_committed_to_seen_table() {
let db = Database::memory();
Expand Down
Loading