Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class TransactionsMsgHandler implements TronMsgHandler {

private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();

private volatile boolean isClosed = false;
private int threadNum = Args.getInstance().getValidateSignThreadNum();
private final String trxEsName = "trx-msg-handler";
private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor(
Expand All @@ -58,8 +60,14 @@ public void init() {
}

public void close() {
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
isClosed = true;
// Stop the scheduler first so no new tasks are drained from smartContractQueue.
ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName);
// Then shutdown the worker pool to finish already-submitted tasks.
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
// Discard any remaining items and release references.
smartContractQueue.clear();
queue.clear();
}

public boolean isBusy() {
Expand All @@ -68,6 +76,10 @@ public boolean isBusy() {

@Override
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
if (isClosed) {
logger.warn("TransactionsMsgHandler is closed, drop message");
return;
}
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
check(peer, transactionsMessage);
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
Expand All @@ -78,6 +90,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
int trxHandlePoolQueueSize = 0;
int dropSmartContractCount = 0;
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
if (isClosed) {
logger.warn("TransactionsMsgHandler is closed during processing, stop submit");
break;
}
int type = trx.getRawData().getContract(0).getType().getNumber();
if (type == ContractType.TriggerSmartContract_VALUE
|| type == ContractType.CreateSmartContract_VALUE) {
Expand All @@ -87,8 +103,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
dropSmartContractCount++;
}
} else {
ExecutorServiceManager.submit(
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
try {
ExecutorServiceManager.submit(
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
} catch (RejectedExecutionException e) {
logger.warn("Submit task to {} failed", trxEsName);
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -20,7 +21,10 @@
import org.tron.common.TestConstants;
import org.tron.common.runtime.TvmTestUtils;
import org.tron.common.utils.ByteArray;
import org.tron.core.ChainBaseManager;
import org.tron.core.config.args.Args;
import org.tron.core.exception.P2pException;
import org.tron.core.exception.P2pException.TypeEnum;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.message.adv.TransactionMessage;
import org.tron.core.net.message.adv.TransactionsMessage;
Expand Down Expand Up @@ -132,6 +136,78 @@ public void testProcessMessage() {
}
}

@Test
public void testProcessMessageAfterClose() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
handler.init();
handler.close();

PeerConnection peer = Mockito.mock(PeerConnection.class);
TransactionsMessage msg = Mockito.mock(TransactionsMessage.class);

handler.processMessage(peer, msg);

Mockito.verify(msg, Mockito.never()).getTransactions();
Mockito.verifyNoInteractions(peer);
}

@Test
public void testHandleTransaction() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();
try {
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
AdvService advService = Mockito.mock(AdvService.class);
ChainBaseManager chainBaseManager = Mockito.mock(ChainBaseManager.class);

Field f1 = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate");
f1.setAccessible(true);
f1.set(handler, tronNetDelegate);
Field f2 = TransactionsMsgHandler.class.getDeclaredField("advService");
f2.setAccessible(true);
f2.set(handler, advService);
Field f3 = TransactionsMsgHandler.class.getDeclaredField("chainBaseManager");
f3.setAccessible(true);
f3.set(handler, chainBaseManager);

PeerConnection peer = Mockito.mock(PeerConnection.class);

BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
.setAmount(10)
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
.build();
long now = System.currentTimeMillis();
Protocol.Transaction trx = Protocol.Transaction.newBuilder().setRawData(
Protocol.Transaction.raw.newBuilder()
.setTimestamp(now)
.setExpiration(now + 60_000)
.setRefBlockNum(1)
.addContract(Protocol.Transaction.Contract.newBuilder()
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
.setParameter(Any.pack(tc)).build()).build())
.build();
TransactionMessage trxMsg = new TransactionMessage(trx);

Method handleTx = TransactionsMsgHandler.class.getDeclaredMethod(
"handleTransaction", PeerConnection.class, TransactionMessage.class);
handleTx.setAccessible(true);

// happy path → push and broadcast
Mockito.when(chainBaseManager.getNextBlockSlotTime()).thenReturn(now);
handleTx.invoke(handler, peer, trxMsg);
Mockito.verify(advService).broadcast(trxMsg);

// P2pException BAD_TRX → disconnect
Mockito.doThrow(new P2pException(TypeEnum.BAD_TRX, "bad"))
.when(tronNetDelegate).pushTransaction(Mockito.any());
handleTx.invoke(handler, peer, trxMsg);
Mockito.verify(peer).setBadPeer(true);
Mockito.verify(peer).disconnect(Protocol.ReasonCode.BAD_TX);
} finally {
handler.close();
}
}

class TrxEvent {

@Getter
Expand Down
Loading