Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,9 @@ public class CommonParameter {
public long pendingTransactionTimeout;
@Getter
@Setter
public int maxTrxCacheSize;
@Getter
@Setter
public boolean nodeMetricsEnable = false;
@Getter
@Setter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class NodeConfig {
private ChannelConfig channel = new ChannelConfig();
private int maxTransactionPendingSize = 2000;
private long pendingTransactionTimeout = 60000;
private int maxTrxCacheSize = 50_000;
private int agreeNodeCount = 0;
private boolean openHistoryQueryWhenLiteFN = false;
private boolean unsolidifiedBlockCheck = false;
Expand Down Expand Up @@ -472,6 +473,11 @@ private void postProcess() {
if (dynamicConfig.checkInterval <= 0) {
dynamicConfig.checkInterval = 600;
}

// maxTrxCacheSize: minimum 2000
if (maxTrxCacheSize < 2000) {
maxTrxCacheSize = 2000;
}
}

// ===========================================================================
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ node {
receiveTcpMinDataLength = 2048
maxTransactionPendingSize = 2000
pendingTransactionTimeout = 60000
# total cached trx across handler queues + pending + rePush
maxTrxCacheSize = 50000

# Consensus agreement
agreeNodeCount = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ private static void applyNodeConfig(NodeConfig nc) {

PARAMETER.maxTransactionPendingSize = nc.getMaxTransactionPendingSize();
PARAMETER.pendingTransactionTimeout = nc.getPendingTransactionTimeout();
PARAMETER.maxTrxCacheSize = nc.getMaxTrxCacheSize();

PARAMETER.validContractProtoThreadNum = nc.getValidContractProtoThreads();

Expand Down
8 changes: 6 additions & 2 deletions framework/src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -2065,9 +2065,13 @@ public NullifierStore getNullifierStore() {
return chainBaseManager.getNullifierStore();
}

public int getCachedTransactionSize() {
return pushTransactionQueue.size() + getPendingTransactions().size()
+ getRePushTransactions().size();
}

public boolean isTooManyPending() {
return getPendingTransactions().size() + getRePushTransactions().size()
> maxTransactionPendingSize;
return getCachedTransactionSize() > maxTransactionPendingSize;
}

private void preValidateTransactionSign(List<TransactionCapsule> txs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,4 +384,8 @@ public boolean isBlockUnsolidified() {
return headNum - solidNum >= maxUnsolidifiedBlocks;
}

public int getCachedTransactionSize() {
return dbManager.getCachedTransactionSize();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
@Component
public class TransactionsMsgHandler implements TronMsgHandler {

private static int MAX_TRX_SIZE = 50_000;
private static int MAX_SMART_CONTRACT_SUBMIT_SIZE = 100;
@Autowired
private TronNetDelegate tronNetDelegate;
Expand All @@ -40,7 +39,8 @@ public class TransactionsMsgHandler implements TronMsgHandler {
@Autowired
private ChainBaseManager chainBaseManager;

private BlockingQueue<TrxEvent> smartContractQueue = new LinkedBlockingQueue(MAX_TRX_SIZE);
private BlockingQueue<TrxEvent> smartContractQueue = new LinkedBlockingQueue(
Args.getInstance().getMaxTrxCacheSize());

private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();

Expand All @@ -63,7 +63,8 @@ public void close() {
}

public boolean isBusy() {
return queue.size() + smartContractQueue.size() > MAX_TRX_SIZE;
return queue.size() + smartContractQueue.size()
+ tronNetDelegate.getCachedTransactionSize() > Args.getInstance().getMaxTrxCacheSize();
Copy link
Copy Markdown
Collaborator

@317787106 317787106 May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] May be specify what happens when maxrxCacheSize <=0 .

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minimum value has been set to 2000.

}

@Override
Expand Down
4 changes: 4 additions & 0 deletions framework/src/main/resources/config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ node {
fetchBlock.timeout = 200
# syncFetchBatchNum = 2000

# Maximum total number of cached transactions (handler queues + pending + rePush).
# When exceeded, the node stops accepting TRX INV messages from peers.
# maxTrxCacheSize = 50000

# Number of validate sign thread, default availableProcessors
# validateSignThreadNum = 16

Expand Down
53 changes: 53 additions & 0 deletions framework/src/test/java/org/tron/core/db/ManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
import com.google.common.collect.Sets;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -1292,6 +1295,56 @@ public void blockTrigger() {
Assert.assertEquals(TronError.ErrCode.EVENT_SUBSCRIBE_ERROR, thrown.getErrCode());
}

@Test
public void testGetCachedTransactionSize() throws Exception {
BlockingQueue<TransactionCapsule> pushQ = new LinkedBlockingQueue<>();
pushQ.add(new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
Field pushField = Manager.class.getDeclaredField("pushTransactionQueue");
pushField.setAccessible(true);
pushField.set(dbManager, pushQ);

dbManager.getPendingTransactions().clear();
dbManager.getPendingTransactions().add(
new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
dbManager.getPendingTransactions().add(
new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));

dbManager.getRePushTransactions().clear();

// 1 (push) + 2 (pending) + 0 (rePush) = 3
Assert.assertEquals(3, dbManager.getCachedTransactionSize());

// cleanup
pushQ.clear();
dbManager.getPendingTransactions().clear();
}

@Test
public void testIsTooManyPendingIncludesPushQueue() throws Exception {
int threshold = Args.getInstance().getMaxTransactionPendingSize();

BlockingQueue<TransactionCapsule> pushQ = new LinkedBlockingQueue<>();
Field pushField = Manager.class.getDeclaredField("pushTransactionQueue");
pushField.setAccessible(true);
pushField.set(dbManager, pushQ);

dbManager.getPendingTransactions().clear();
dbManager.getRePushTransactions().clear();

for (int i = 0; i < threshold; i++) {
dbManager.getPendingTransactions().add(
new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
}
Assert.assertFalse(dbManager.isTooManyPending());

pushQ.add(new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
Assert.assertTrue(dbManager.isTooManyPending());

// cleanup
dbManager.getPendingTransactions().clear();
pushQ.clear();
}

public void adjustBalance(AccountStore accountStore, byte[] accountAddress, long amount)
throws BalanceInsufficientException {
Commons.adjustBalance(accountStore, accountAddress, amount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ public static void init() {
public void testProcessMessage() {
TransactionsMsgHandler transactionsMsgHandler = new TransactionsMsgHandler();
try {
Assert.assertFalse(transactionsMsgHandler.isBusy());

transactionsMsgHandler.init();

PeerConnection peer = Mockito.mock(PeerConnection.class);
Expand All @@ -54,6 +52,8 @@ public void testProcessMessage() {
field.setAccessible(true);
field.set(transactionsMsgHandler, tronNetDelegate);

Assert.assertFalse(transactionsMsgHandler.isBusy());

BalanceContract.TransferContract transferContract = BalanceContract.TransferContract
.newBuilder()
.setAmount(10)
Expand Down Expand Up @@ -132,6 +132,28 @@ public void testProcessMessage() {
}
}

@Test
public void testIsBusyWithCachedTransactions() throws Exception {
TransactionsMsgHandler handler = new TransactionsMsgHandler();

int threshold = Args.getInstance().getMaxTrxCacheSize();
TronNetDelegate tronNetDelegateMock = Mockito.mock(TronNetDelegate.class);
Field field = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate");
field.setAccessible(true);
field.set(handler, tronNetDelegateMock);

// queue and smartContractQueue are empty, but cached size > threshold
Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(threshold + 1);
Assert.assertTrue(handler.isBusy());

// boundary: cached size == threshold, isBusy() uses strict >, so not busy
Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(threshold);
Assert.assertFalse(handler.isBusy());

Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(0);
Assert.assertFalse(handler.isBusy());
}

class TrxEvent {

@Getter
Expand Down
Loading