From b530d3091c89c821d22b8e0e28da6a2334e17b0a Mon Sep 17 00:00:00 2001 From: 0xbigapple Date: Thu, 16 Apr 2026 20:07:34 +0800 Subject: [PATCH] fix(net): fix RejectedExecutionException during shutdown trxHandlePool --- .../TransactionsMsgHandler.java | 27 ++++++- .../TransactionsMsgHandlerTest.java | 76 +++++++++++++++++++ 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java index 0436b48d374..961646de4ee 100644 --- a/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java +++ b/framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java @@ -3,6 +3,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.Getter; @@ -44,6 +45,7 @@ public class TransactionsMsgHandler implements TronMsgHandler { private BlockingQueue queue = new LinkedBlockingQueue(); + private volatile boolean isClosed = false; private int threadNum = Args.getInstance().getValidateSignThreadNum(); private final String trxEsName = "trx-msg-handler"; private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor( @@ -58,8 +60,14 @@ public void init() { } public void close() { - ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); + isClosed = true; + // Stop the scheduler first so no new tasks are drained from smartContractQueue. ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName); + // Then shutdown the worker pool to finish already-submitted tasks. + ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName); + // Discard any remaining items and release references. + smartContractQueue.clear(); + queue.clear(); } public boolean isBusy() { @@ -68,6 +76,10 @@ public boolean isBusy() { @Override public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException { + if (isClosed) { + logger.warn("TransactionsMsgHandler is closed, drop message"); + return; + } TransactionsMessage transactionsMessage = (TransactionsMessage) msg; check(peer, transactionsMessage); for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { @@ -78,6 +90,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep int trxHandlePoolQueueSize = 0; int dropSmartContractCount = 0; for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) { + if (isClosed) { + logger.warn("TransactionsMsgHandler is closed during processing, stop submit"); + break; + } int type = trx.getRawData().getContract(0).getType().getNumber(); if (type == ContractType.TriggerSmartContract_VALUE || type == ContractType.CreateSmartContract_VALUE) { @@ -87,8 +103,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep dropSmartContractCount++; } } else { - ExecutorServiceManager.submit( - trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); + try { + ExecutorServiceManager.submit( + trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx))); + } catch (RejectedExecutionException e) { + logger.warn("Submit task to {} failed", trxEsName); + break; + } } } diff --git a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java index db8aac00c60..3d40127add6 100644 --- a/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java +++ b/framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java @@ -3,6 +3,7 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -20,7 +21,10 @@ import org.tron.common.TestConstants; import org.tron.common.runtime.TvmTestUtils; import org.tron.common.utils.ByteArray; +import org.tron.core.ChainBaseManager; import org.tron.core.config.args.Args; +import org.tron.core.exception.P2pException; +import org.tron.core.exception.P2pException.TypeEnum; import org.tron.core.net.TronNetDelegate; import org.tron.core.net.message.adv.TransactionMessage; import org.tron.core.net.message.adv.TransactionsMessage; @@ -132,6 +136,78 @@ public void testProcessMessage() { } } + @Test + public void testProcessMessageAfterClose() throws Exception { + TransactionsMsgHandler handler = new TransactionsMsgHandler(); + handler.init(); + handler.close(); + + PeerConnection peer = Mockito.mock(PeerConnection.class); + TransactionsMessage msg = Mockito.mock(TransactionsMessage.class); + + handler.processMessage(peer, msg); + + Mockito.verify(msg, Mockito.never()).getTransactions(); + Mockito.verifyNoInteractions(peer); + } + + @Test + public void testHandleTransaction() throws Exception { + TransactionsMsgHandler handler = new TransactionsMsgHandler(); + try { + TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class); + AdvService advService = Mockito.mock(AdvService.class); + ChainBaseManager chainBaseManager = Mockito.mock(ChainBaseManager.class); + + Field f1 = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate"); + f1.setAccessible(true); + f1.set(handler, tronNetDelegate); + Field f2 = TransactionsMsgHandler.class.getDeclaredField("advService"); + f2.setAccessible(true); + f2.set(handler, advService); + Field f3 = TransactionsMsgHandler.class.getDeclaredField("chainBaseManager"); + f3.setAccessible(true); + f3.set(handler, chainBaseManager); + + PeerConnection peer = Mockito.mock(PeerConnection.class); + + BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder() + .setAmount(10) + .setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf"))) + .setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf"))) + .build(); + long now = System.currentTimeMillis(); + Protocol.Transaction trx = Protocol.Transaction.newBuilder().setRawData( + Protocol.Transaction.raw.newBuilder() + .setTimestamp(now) + .setExpiration(now + 60_000) + .setRefBlockNum(1) + .addContract(Protocol.Transaction.Contract.newBuilder() + .setType(Protocol.Transaction.Contract.ContractType.TransferContract) + .setParameter(Any.pack(tc)).build()).build()) + .build(); + TransactionMessage trxMsg = new TransactionMessage(trx); + + Method handleTx = TransactionsMsgHandler.class.getDeclaredMethod( + "handleTransaction", PeerConnection.class, TransactionMessage.class); + handleTx.setAccessible(true); + + // happy path → push and broadcast + Mockito.when(chainBaseManager.getNextBlockSlotTime()).thenReturn(now); + handleTx.invoke(handler, peer, trxMsg); + Mockito.verify(advService).broadcast(trxMsg); + + // P2pException BAD_TRX → disconnect + Mockito.doThrow(new P2pException(TypeEnum.BAD_TRX, "bad")) + .when(tronNetDelegate).pushTransaction(Mockito.any()); + handleTx.invoke(handler, peer, trxMsg); + Mockito.verify(peer).setBadPeer(true); + Mockito.verify(peer).disconnect(Protocol.ReasonCode.BAD_TX); + } finally { + handler.close(); + } + } + class TrxEvent { @Getter