From 1117ab8f97a891371514134aab5ab387a18d3a83 Mon Sep 17 00:00:00 2001
From: yx9o The method iterates through all queues exactly once, but starts from a rotating index
+ * derived from {@code startIndex} (round-robin) to avoid always scanning from position 0 . For each queue, it computes a penalty via {@link #evaluatePenalty} using
+ * the provided {@code penalizers}. The queue with the smallest penalty is selected. Short-circuit rule: if any queue has a {@code penalty<= 0}, it is returned immediately,
+ * since no better result than 0 is expected. The input {@code queuesWithPriority} is a list of queue groups ordered by priority.
+ * For each priority group, this method delegates to {@link #selectLeastPenalty} to pick the best queue
+ * within that group and obtain its penalty. Short-circuit rule: if any priority group yields a queue whose {@code penalty <= 0},
+ * that result is returned immediately. Otherwise, it returns the queue with the smallest positive penalty among all groups.
+ * If multiple groups produce the same minimum penalty, the first encountered one wins.
+ * The priority value follows the convention that smaller numeric values indicate higher priority.
+ * For example, priority 0 is higher than priority 1.
+ *
+ * Smaller values indicate higher priority. For example:
+ * {
+
+ /**
+ * Returns the penalty value for the given MessageQueue; lower is better.
+ */
+ int penaltyOf(Q messageQueue);
+
+ /**
+ * Aggregates penalties from multiple penalizers for the same MessageQueue (by summing them up).
+ */
+ static
int evaluatePenalty(Q messageQueue, List
p : penalizers) {
+ sum += p.penaltyOf(messageQueue);
+ }
+ return sum;
+ }
+
+ /**
+ * Selects the queue with the lowest evaluated penalty from the given queue list.
+ *
+ *
queue type
+ * @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queues} is null/empty
+ */
+ static
Pair
selectLeastPenalty(List
queues,
+ List
queue type
+ * @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queuesWithPriority} is null/empty
+ */
+ static
Pair
selectLeastPenaltyWithPriority(List
> queuesWithPriority,
+ List
queues : queuesWithPriority) {
+ Pair
queueAndPenalty = selectLeastPenalty(queues, penalizers, startIndex);
+ int penalty = queueAndPenalty.getRight();
+ if (queueAndPenalty.getRight() <= 0) {
+ return queueAndPenalty;
+ }
+ if (penalty < bestPenalty) {
+ bestPenalty = penalty;
+ bestQueue = queueAndPenalty.getLeft();
+ }
+ }
+ return Pair.of(bestQueue, bestPenalty);
+ }
+}
\ No newline at end of file
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java
new file mode 100644
index 00000000000..57b6e65fe5c
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+/**
+ * A functional interface for providing priority values for message queues.
+ * This interface allows custom priority determination logic to be applied to message queues,
+ * enabling queue selection and routing based on priority levels.
+ *
the type of message queue, must extend {@link MessageQueue}
+ */
+@FunctionalInterface
+public interface MessageQueuePriorityProvider
{
+
+ /**
+ * Determines the priority value of the given message queue.
+ *
+ *
+ *
+ * This static utility method takes a list of message queues and a priority provider, + * then organizes the queues into groups based on their priority values. + * The returned list is ordered from highest priority to lowest priority. + *
+ * + * @paramthe type of message queue, must extend {@link MessageQueue} + * @param queues the list of message queues to group by priority, can be null or empty + * @param provider the priority provider to determine the priority of each queue + * @return a list of lists, where each inner list contains queues of the same priority level, + * ordered from highest priority (smallest value) to lowest priority (largest value). + * Returns an empty list if the input queues are null or empty. + */ + staticList> buildPriorityGroups(List
queues, MessageQueuePriorityProviderprovider) { + if (queues == null || queues.isEmpty()) { + return Collections.emptyList(); + } + + Map> buckets = new TreeMap<>(); + for (Q q : queues) { + int p = provider.priorityOf(q); + buckets.computeIfAbsent(p, k -> new ArrayList<>()).add(q); + } + return new ArrayList<>(buckets.values()); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java index f25fb907ef2..0b028fa461a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.proxy.service.route; import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; import com.google.common.math.IntMath; import java.util.ArrayList; import java.util.Collections; @@ -30,13 +29,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; -import org.apache.rocketmq.client.latency.MQFaultStrategy; +import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.route.QueueData; +import static org.apache.rocketmq.proxy.service.route.MessageQueuePenalizer.selectLeastPenaltyWithPriority; +import static org.apache.rocketmq.proxy.service.route.MessageQueuePriorityProvider.buildPriorityGroups; + public class MessageQueueSelector { private static final int BROKER_ACTING_QUEUE_ID = -1; @@ -47,9 +49,18 @@ public class MessageQueueSelector { private final Map brokerNameQueueMap = new ConcurrentHashMap<>(); private final AtomicInteger queueIndex; private final AtomicInteger brokerIndex; - private MQFaultStrategy mqFaultStrategy; + private final List > penalizers = new ArrayList<>(); + + // ordered by priority asc (smaller => higher priority) + private final List > queuesWithPriority; + private final List
> brokerActingQueuesWithPriority; + + public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read) { + this(topicRouteWrapper, read, null); + } - public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy mqFaultStrategy, boolean read) { + public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read, + MessageQueuePriorityProvider
priorityProvider) { if (read) { this.queues.addAll(buildRead(topicRouteWrapper)); } else { @@ -59,7 +70,12 @@ public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy Random random = new Random(); this.queueIndex = new AtomicInteger(random.nextInt()); this.brokerIndex = new AtomicInteger(random.nextInt()); - this.mqFaultStrategy = mqFaultStrategy; + + if (priorityProvider == null) { + priorityProvider = new DefaultMessageQueuePriorityProvider(); + } + this.queuesWithPriority = buildPriorityGroups(queues, priorityProvider); + this.brokerActingQueuesWithPriority = buildPriorityGroups(brokerActingQueues, priorityProvider); } private static List buildRead(TopicRouteWrapper topicRoute) { @@ -138,7 +154,7 @@ private static List buildWrite(TopicRouteWrapper topicR private void buildBrokerActingQueues(String topic, List normalQueues) { for (AddressableMessageQueue mq : normalQueues) { AddressableMessageQueue brokerActingQueue = new AddressableMessageQueue( - new MessageQueue(topic, mq.getMessageQueue().getBrokerName(), BROKER_ACTING_QUEUE_ID), + new MessageQueue(topic, mq.getBrokerName(), BROKER_ACTING_QUEUE_ID), mq.getBrokerAddr()); if (!brokerActingQueues.contains(brokerActingQueue)) { @@ -160,38 +176,15 @@ public AddressableMessageQueue selectOne(boolean onlyBroker) { } public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) { - if (mqFaultStrategy != null && mqFaultStrategy.isSendLatencyFaultEnable()) { - List messageQueueList = null; - MessageQueue messageQueue = null; + if (CollectionUtils.isNotEmpty(penalizers)) { + Pair queueAndPenalty; if (onlyBroker) { - messageQueueList = transferAddressableQueues(brokerActingQueues); + queueAndPenalty = selectLeastPenaltyWithPriority(brokerActingQueuesWithPriority, penalizers, brokerIndex); } else { - messageQueueList = transferAddressableQueues(queues); + queueAndPenalty = selectLeastPenaltyWithPriority(queuesWithPriority, penalizers, queueIndex); } - AddressableMessageQueue addressableMessageQueue = null; - - // use both available filter. - messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, - mqFaultStrategy.getAvailableFilter(), mqFaultStrategy.getReachableFilter()); - addressableMessageQueue = transferQueue2Addressable(messageQueue); - if (addressableMessageQueue != null) { - return addressableMessageQueue; - } - - // use available filter. - messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, - mqFaultStrategy.getAvailableFilter()); - addressableMessageQueue = transferQueue2Addressable(messageQueue); - if (addressableMessageQueue != null) { - return addressableMessageQueue; - } - - // no available filter, then use reachable filter. - messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, - mqFaultStrategy.getReachableFilter()); - addressableMessageQueue = transferQueue2Addressable(messageQueue); - if (addressableMessageQueue != null) { - return addressableMessageQueue; + if (queueAndPenalty != null && queueAndPenalty.getLeft() != null) { + return queueAndPenalty.getLeft(); } } @@ -199,46 +192,6 @@ public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) { return selectOne(onlyBroker); } - private MessageQueue selectOneMessageQueue(List messageQueueList, AtomicInteger sendQueue, TopicPublishInfo.QueueFilter...filter) { - if (messageQueueList == null || messageQueueList.isEmpty()) { - return null; - } - if (filter != null && filter.length != 0) { - for (int i = 0; i < messageQueueList.size(); i++) { - int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); - MessageQueue mq = messageQueueList.get(index); - boolean filterResult = true; - for (TopicPublishInfo.QueueFilter f: filter) { - Preconditions.checkNotNull(f); - filterResult &= f.filter(mq); - } - if (filterResult) { - return mq; - } - } - } - return null; - } - - public List transferAddressableQueues(List addressableMessageQueueList) { - if (addressableMessageQueueList == null) { - return null; - } - - return addressableMessageQueueList.stream() - .map(AddressableMessageQueue::getMessageQueue) - .collect(Collectors.toList()); - } - - private AddressableMessageQueue transferQueue2Addressable(MessageQueue messageQueue) { - for (AddressableMessageQueue amq: queues) { - if (amq.getMessageQueue().equals(messageQueue)) { - return amq; - } - } - return null; - } - public AddressableMessageQueue selectNextOne(AddressableMessageQueue last) { boolean onlyBroker = last.getQueueId() < 0; AddressableMessageQueue newOne = last; @@ -275,12 +228,10 @@ public List getBrokerActingQueues() { return brokerActingQueues; } - public MQFaultStrategy getMQFaultStrategy() { - return mqFaultStrategy; - } - - public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { - this.mqFaultStrategy = mqFaultStrategy; + public void addPenalizer(MessageQueuePenalizer penalizer) { + if (penalizer != null) { + this.penalizers.add(penalizer); + } } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java index 898e529f8cb..a0d768d6dae 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java @@ -17,7 +17,8 @@ package org.apache.rocketmq.proxy.service.route; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.client.latency.MQFaultStrategy; +import java.util.List; +import org.apache.commons.collections.CollectionUtils; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; public class MessageQueueView { @@ -27,11 +28,24 @@ public class MessageQueueView { private final MessageQueueSelector writeSelector; private final TopicRouteWrapper topicRouteWrapper; - public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) { + + public MessageQueueView(String topic, TopicRouteData topicRouteData, List > penalizer) { + this(topic, topicRouteData, penalizer, null); + } + + public MessageQueueView(String topic, TopicRouteData topicRouteData, List > penalizer, + MessageQueuePriorityProvider priorityProvider) { this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic); - this.readSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, true); - this.writeSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, false); + this.readSelector = new MessageQueueSelector(topicRouteWrapper, true, priorityProvider); + this.writeSelector = new MessageQueueSelector(topicRouteWrapper, false, priorityProvider); + + if (CollectionUtils.isNotEmpty(penalizer)) { + for (MessageQueuePenalizer p : penalizer) { + this.readSelector.addPenalizer(p); + this.writeSelector.addPenalizer(p); + } + } } public TopicRouteData getTopicRouteData() { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index bcdf8140bc5..dae30057461 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -19,11 +19,11 @@ import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; - +import com.google.common.annotations.VisibleForTesting; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.ClientConfig; @@ -32,12 +32,10 @@ import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.client.latency.Resolver; import org.apache.rocketmq.client.latency.ServiceDetector; -import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; -import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.Address; @@ -53,19 +51,15 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - private final MQClientAPIFactory mqClientAPIFactory; - private MQFaultStrategy mqFaultStrategy; - + private final MQFaultStrategy mqFaultStrategy; protected final LoadingCache topicCache; - protected final ScheduledExecutorService scheduledExecutorService; protected final ThreadPoolExecutor cacheRefreshExecutor; + protected final List > penalizers = new ArrayList<>(); + protected MessageQueuePriorityProvider priorityProvider = new DefaultMessageQueuePriorityProvider(); public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { ProxyConfig config = ConfigurationManager.getProxyConfig(); - this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor( - new ThreadFactoryImpl("TopicRouteService_") - ); this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( config.getTopicRouteServiceThreadPoolNums(), config.getTopicRouteServiceThreadPoolNums(), @@ -74,7 +68,6 @@ public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { "TopicRouteCacheRefresh", config.getTopicRouteServiceThreadPoolQueueCapacity() ); - this.mqClientAPIFactory = mqClientAPIFactory; this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()) .expireAfterAccess(config.getTopicRouteServiceCacheExpiredSeconds(), TimeUnit.SECONDS) @@ -134,6 +127,8 @@ public String resolve(String name) { } } }, serviceDetector); + + this.penalizers.addAll(buildPenalizerByMQFaultStrategy(mqFaultStrategy)); this.init(); } @@ -146,22 +141,7 @@ private Optional pickTopic() { } protected void init() { - this.appendShutdown(this.scheduledExecutorService::shutdown); - this.appendStartAndShutdown(this.mqClientAPIFactory); - } - - @Override - public void shutdown() throws Exception { - if (this.mqFaultStrategy.isStartDetectorEnable()) { - mqFaultStrategy.shutdown(); - } - } - - @Override - public void start() throws Exception { - if (this.mqFaultStrategy.isStartDetectorEnable()) { - this.mqFaultStrategy.startDetector(); - } + this.appendStartAndShutdown(this.mqFaultStrategy); } public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig) { @@ -220,10 +200,36 @@ protected static boolean isTopicRouteValid(TopicRouteData routeData) { protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) { if (isTopicRouteValid(topicRouteData)) { - MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, TopicRouteService.this.getMqFaultStrategy()); + MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, this.penalizers, this.priorityProvider); log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); return tmp; } return MessageQueueView.WRAPPED_EMPTY_QUEUE; } + + public void setPriorityProvider(MessageQueuePriorityProvider priorityProvider) { + this.priorityProvider = priorityProvider; + } + + public void addPenalizer(MessageQueuePenalizer penalizer) { + this.penalizers.add(penalizer); + } + + @VisibleForTesting + public static List > buildPenalizerByMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { + List > penalizers = new ArrayList<>(); + penalizers.add(messageQueue -> { + if (!mqFaultStrategy.isSendLatencyFaultEnable() || mqFaultStrategy.getAvailableFilter().filter(messageQueue)) { + return 0; + } + return 10; + }); + penalizers.add(messageQueue -> { + if (!mqFaultStrategy.isSendLatencyFaultEnable() || mqFaultStrategy.getReachableFilter().filter(messageQueue)) { + return 0; + } + return 100; + }); + return penalizers; + } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java index a64867ddfe1..870aa0424fd 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java @@ -59,6 +59,7 @@ import org.junit.Before; import org.junit.Test; +import static org.apache.rocketmq.proxy.service.route.TopicRouteService.buildPenalizerByMQFaultStrategy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThrows; @@ -379,7 +380,7 @@ public void testSendNormalMessageQueueSelector() { MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class); when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false); - MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy()); + MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null); AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); AddressableMessageQueue secondSelect = selector.select(ProxyContext.create(), messageQueueView); @@ -415,10 +416,7 @@ public void testSendNormalMessageQueueSelectorPipeLine() throws Exception { mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, true); mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, false); - TopicRouteService topicRouteService = mock(TopicRouteService.class); - when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); - MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy()); - + MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, buildPenalizerByMQFaultStrategy(mqFaultStrategy)); AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); assertEquals(firstSelect.getBrokerName(), BROKER_NAME2); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java new file mode 100644 index 00000000000..f31d973cce5 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.route; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class MessageQueuePenalizerTest { + + /** + * Test evaluatePenalty with null messageQueue should throw NullPointerException + */ + @Test(expected = NullPointerException.class) + public void testEvaluatePenalty_NullMessageQueue() { + List > penalizers = new ArrayList<>(); + penalizers.add(mq -> 10); + MessageQueuePenalizer.evaluatePenalty(null, penalizers); + } + + /** + * Test evaluatePenalty with null penalizers should return 0 + */ + @Test + public void testEvaluatePenalty_NullPenalizers() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, null); + assertEquals(0, penalty); + } + + /** + * Test evaluatePenalty with empty penalizers should return 0 + */ + @Test + public void testEvaluatePenalty_EmptyPenalizers() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, Collections.emptyList()); + assertEquals(0, penalty); + } + + /** + * Test evaluatePenalty aggregates penalties from multiple penalizers by summing them up + */ + @Test + public void testEvaluatePenalty_MultiplePenalizers() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + List > penalizers = Arrays.asList( + q -> 10, + q -> 20, + q -> 5 + ); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, penalizers); + assertEquals(35, penalty); + } + + /** + * Test evaluatePenalty with negative penalties (sum should still work) + */ + @Test + public void testEvaluatePenalty_NegativePenalties() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + List > penalizers = Arrays.asList( + q -> -5, + q -> 10, + q -> -3 + ); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, penalizers); + assertEquals(2, penalty); + } + + /** + * Test selectLeastPenalty with null queues should return null + */ + @Test + public void testSelectLeastPenalty_NullQueues() { + List > penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(null, penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenalty with empty queues should return null + */ + @Test + public void testSelectLeastPenalty_EmptyQueues() { + List > penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty( + Collections.emptyList(), penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenalty selects the queue with the lowest penalty + */ + @Test + public void testSelectLeastPenalty_LowestPenalty() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // Penalizer that assigns different penalties based on queue id + List > penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? 10 : 30) + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenalty short-circuits when penalty <= 0 + */ + @Test + public void testSelectLeastPenalty_ShortCircuitZeroPenalty() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // mq1 has penalty 0, should short-circuit + List > penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? 0 : 30) + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(0, result.getRight().intValue()); + } + + /** + * Test selectLeastPenalty short-circuits when penalty is negative + */ + @Test + public void testSelectLeastPenalty_ShortCircuitNegativePenalty() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // mq1 has penalty -5, should short-circuit + List > penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? -5 : 30) + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(-5, result.getRight().intValue()); + } + + /** + * Test selectLeastPenalty with round-robin behavior (rotating start index) + * Verifies that startIndex affects the iteration order + */ + @Test + public void testSelectLeastPenalty_RoundRobinStartIndex() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // All queues have penalty 0, so whichever is encountered first will be returned + List > penalizers = Collections.singletonList(mq -> 0); + + // Starting from index 0 + AtomicInteger startIndex1 = new AtomicInteger(0); + Pair result1 = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex1); + assertNotNull(result1); + assertEquals(mq0, result1.getLeft()); + + // Starting from index 1 + AtomicInteger startIndex2 = new AtomicInteger(1); + Pair result2 = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex2); + assertNotNull(result2); + assertEquals(mq1, result2.getLeft()); + + // Starting from index 2 + AtomicInteger startIndex3 = new AtomicInteger(2); + Pair result3 = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex3); + assertNotNull(result3); + assertEquals(mq2, result3.getLeft()); + } + + /** + * Test selectLeastPenalty increments startIndex for each iteration + */ + @Test + public void testSelectLeastPenalty_IncrementStartIndex() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + List > penalizers = Collections.singletonList(mq -> 10); + + AtomicInteger startIndex = new AtomicInteger(0); + MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + // After iterating through 3 queues, startIndex should be incremented 3 times + assertEquals(3, startIndex.get()); + } + + /** + * Test selectLeastPenalty handles startIndex wrapping with Math.floorMod + */ + @Test + public void testSelectLeastPenalty_StartIndexWrapping() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + List > penalizers = Collections.singletonList(mq -> 0); + + // Start with large index to test wrapping + AtomicInteger startIndex = new AtomicInteger(100); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + // 100 % 3 = 1, so should start from mq1 + assertEquals(mq1, result.getLeft()); + } + + /** + * Test selectLeastPenaltyWithPriority with null queuesWithPriority should return null + */ + @Test + public void testSelectLeastPenaltyWithPriority_NullQueues() { + List > penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + null, penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenaltyWithPriority with empty queuesWithPriority should return null + */ + @Test + public void testSelectLeastPenaltyWithPriority_EmptyQueues() { + List > penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + Collections.emptyList(), penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenaltyWithPriority with single priority group delegates to selectLeastPenalty + */ + @Test + public void testSelectLeastPenaltyWithPriority_SinglePriorityGroup() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + List queues = Arrays.asList(mq0, mq1); + + List > penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 20 : 10 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + Collections.singletonList(queues), penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority selects queue with lowest penalty across multiple priority groups + */ + @Test + public void testSelectLeastPenaltyWithPriority_MultiplePriorityGroups() { + // Priority group 1 (higher priority) + MessageQueue mq0 = new MessageQueue("topic", "broker-high", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker-high", 1); + List highPriorityQueues = Arrays.asList(mq0, mq1); + + // Priority group 2 (lower priority) + MessageQueue mq2 = new MessageQueue("topic", "broker-low", 0); + MessageQueue mq3 = new MessageQueue("topic", "broker-low", 1); + List lowPriorityQueues = Arrays.asList(mq2, mq3); + + List > queuesWithPriority = Arrays.asList(highPriorityQueues, lowPriorityQueues); + + // Assign penalties: high-priority queues have higher penalties, low-priority have lower + List
> penalizers = Collections.singletonList( + mq -> mq.getBrokerName().equals("broker-high") ? 50 : 10 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + // Should select from low-priority group because it has lower penalty + assertTrue(result.getLeft().getBrokerName().equals("broker-low")); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority short-circuits when a priority group yields penalty <= 0 + */ + @Test + public void testSelectLeastPenaltyWithPriority_ShortCircuitZeroPenalty() { + // Priority group 1 + MessageQueue mq0 = new MessageQueue("topic", "broker-high", 0); + List highPriorityQueues = Collections.singletonList(mq0); + + // Priority group 2 + MessageQueue mq1 = new MessageQueue("topic", "broker-low", 0); + List lowPriorityQueues = Collections.singletonList(mq1); + + List > queuesWithPriority = Arrays.asList(highPriorityQueues, lowPriorityQueues); + + // First group has penalty 0, should short-circuit + List
> penalizers = Collections.singletonList( + mq -> mq.getBrokerName().equals("broker-high") ? 0 : 100 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq0, result.getLeft()); + assertEquals(0, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority when first group encounters zero penalty during iteration + */ + @Test + public void testSelectLeastPenaltyWithPriority_FirstGroupHasZeroPenalty() { + // Priority group 1 + MessageQueue mq0 = new MessageQueue("topic", "broker1", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker1", 1); + List group1 = Arrays.asList(mq0, mq1); + + // Priority group 2 + MessageQueue mq2 = new MessageQueue("topic", "broker2", 0); + List group2 = Collections.singletonList(mq2); + + List > queuesWithPriority = Arrays.asList(group1, group2); + + // mq1 in first group has penalty 0 + List
> penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 1 && mq.getBrokerName().equals("broker1") ? 0 : 50 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(0, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority returns first encountered minimum when multiple groups have same minimum penalty + */ + @Test + public void testSelectLeastPenaltyWithPriority_SameMinimumPenalty() { + // Priority group 1 + MessageQueue mq0 = new MessageQueue("topic", "broker1", 0); + List group1 = Collections.singletonList(mq0); + + // Priority group 2 + MessageQueue mq1 = new MessageQueue("topic", "broker2", 0); + List group2 = Collections.singletonList(mq1); + + // Priority group 3 + MessageQueue mq2 = new MessageQueue("topic", "broker3", 0); + List group3 = Collections.singletonList(mq2); + + List > queuesWithPriority = Arrays.asList(group1, group2, group3); + + // All have same penalty + List
> penalizers = Collections.singletonList(mq -> 10); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + // Should return first encountered (from group1) + assertEquals(mq0, result.getLeft()); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority with complex scenario: + * Multiple priority groups with varying penalties + */ + @Test + public void testSelectLeastPenaltyWithPriority_ComplexScenario() { + // Priority group 1: penalties 100, 90 + MessageQueue mq0 = new MessageQueue("topic", "broker1", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker1", 1); + List group1 = Arrays.asList(mq0, mq1); + + // Priority group 2: penalties 50, 30 + MessageQueue mq2 = new MessageQueue("topic", "broker2", 0); + MessageQueue mq3 = new MessageQueue("topic", "broker2", 1); + List group2 = Arrays.asList(mq2, mq3); + + // Priority group 3: penalties 80, 20 + MessageQueue mq4 = new MessageQueue("topic", "broker3", 0); + MessageQueue mq5 = new MessageQueue("topic", "broker3", 1); + List group3 = Arrays.asList(mq4, mq5); + + List > queuesWithPriority = Arrays.asList(group1, group2, group3); + + List
> penalizers = Collections.singletonList(mq -> { + if (mq.getBrokerName().equals("broker1")) { + return mq.getQueueId() == 0 ? 100 : 90; + } else if (mq.getBrokerName().equals("broker2")) { + return mq.getQueueId() == 0 ? 50 : 30; + } else { + return mq.getQueueId() == 0 ? 80 : 20; + } + }); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + // Should select mq5 from group3 with penalty 20 (the global minimum) + assertEquals(mq5, result.getLeft()); + assertEquals(20, result.getRight().intValue()); + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java new file mode 100644 index 00000000000..22f2a68e8b0 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.route; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class MessageQueuePriorityProviderTest { + + @Test + public void testPriorityOfWithLambda() { + // Test functional interface implementation using lambda + MessageQueuePriorityProvider provider = mq -> mq.getQueueId(); + + MessageQueue queue1 = new MessageQueue("topic", "broker", 0); + MessageQueue queue2 = new MessageQueue("topic", "broker", 5); + MessageQueue queue3 = new MessageQueue("topic", "broker", 10); + + assertEquals(0, provider.priorityOf(queue1)); + assertEquals(5, provider.priorityOf(queue2)); + assertEquals(10, provider.priorityOf(queue3)); + } + + @Test + public void testPriorityOfWithConstantValue() { + // Test with constant priority + MessageQueuePriorityProvider constantProvider = mq -> 1; + + MessageQueue queue1 = new MessageQueue("topic1", "broker1", 0); + MessageQueue queue2 = new MessageQueue("topic2", "broker2", 5); + + assertEquals(1, constantProvider.priorityOf(queue1)); + assertEquals(1, constantProvider.priorityOf(queue2)); + } + + @Test + public void testPriorityOfBasedOnBrokerName() { + // Test priority based on broker name hash + MessageQueuePriorityProvider brokerProvider = + mq -> mq.getBrokerName().hashCode() % 10; + + MessageQueue queue1 = new MessageQueue("topic", "broker-a", 0); + MessageQueue queue2 = new MessageQueue("topic", "broker-b", 0); + + int priority1 = brokerProvider.priorityOf(queue1); + int priority2 = brokerProvider.priorityOf(queue2); + + // Priorities should be deterministic for the same broker + assertEquals(priority1, brokerProvider.priorityOf(queue1)); + assertEquals(priority2, brokerProvider.priorityOf(queue2)); + } + + @Test + public void testBuildPriorityGroupsWithNullList() { + MessageQueuePriorityProvider provider = mq -> 0; + List > result = MessageQueuePriorityProvider.buildPriorityGroups(null, provider); + + assertNotNull(result); + assertTrue(result.isEmpty()); + } + + @Test + public void testBuildPriorityGroupsWithEmptyList() { + MessageQueuePriorityProvider
provider = mq -> 0; + List > result = MessageQueuePriorityProvider.buildPriorityGroups( + Collections.emptyList(), provider); + + assertNotNull(result); + assertTrue(result.isEmpty()); + } + + @Test + public void testBuildPriorityGroupsWithSinglePriority() { + MessageQueuePriorityProvider
provider = mq -> 0; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker1", 0), + new MessageQueue("topic", "broker1", 1), + new MessageQueue("topic", "broker1", 2) + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(1, result.size()); + assertEquals(3, result.get(0).size()); + } + + @Test + public void testBuildPriorityGroupsWithMultiplePriorities() { + // Priority based on queue ID: 0->high, 1->medium, 2->low + MessageQueuePriorityProvider
provider = mq -> { + if (mq.getQueueId() < 2) return 0; // High priority + if (mq.getQueueId() < 4) return 1; // Medium priority + return 2; // Low priority + }; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker", 0), // priority 0 + new MessageQueue("topic", "broker", 1), // priority 0 + new MessageQueue("topic", "broker", 2), // priority 1 + new MessageQueue("topic", "broker", 3), // priority 1 + new MessageQueue("topic", "broker", 4), // priority 2 + new MessageQueue("topic", "broker", 5) // priority 2 + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // First group (highest priority 0) + assertEquals(2, result.get(0).size()); + assertEquals(0, result.get(0).get(0).getQueueId()); + assertEquals(1, result.get(0).get(1).getQueueId()); + + // Second group (medium priority 1) + assertEquals(2, result.get(1).size()); + assertEquals(2, result.get(1).get(0).getQueueId()); + assertEquals(3, result.get(1).get(1).getQueueId()); + + // Third group (low priority 2) + assertEquals(2, result.get(2).size()); + assertEquals(4, result.get(2).get(0).getQueueId()); + assertEquals(5, result.get(2).get(1).getQueueId()); + } + + @Test + public void testBuildPriorityGroupsOrderedByPriority() { + // Test that groups are ordered from high to low priority (ascending numeric value) + MessageQueuePriorityProvider
provider = mq -> mq.getQueueId(); + + List queues = Arrays.asList( + new MessageQueue("topic", "broker", 5), + new MessageQueue("topic", "broker", 0), + new MessageQueue("topic", "broker", 3), + new MessageQueue("topic", "broker", 1) + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(4, result.size()); + + // Verify order: 0, 1, 3, 5 (ascending) + assertEquals(0, result.get(0).get(0).getQueueId()); + assertEquals(1, result.get(1).get(0).getQueueId()); + assertEquals(3, result.get(2).get(0).getQueueId()); + assertEquals(5, result.get(3).get(0).getQueueId()); + } + + @Test + public void testBuildPriorityGroupsWithNegativePriorities() { + // Test with negative priority values + MessageQueuePriorityProvider
provider = mq -> mq.getQueueId() - 5; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker", 0), // priority -5 + new MessageQueue("topic", "broker", 5), // priority 0 + new MessageQueue("topic", "broker", 10) // priority 5 + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // Verify order: -5, 0, 5 (ascending) + assertEquals(0, result.get(0).get(0).getQueueId()); + assertEquals(5, result.get(1).get(0).getQueueId()); + assertEquals(10, result.get(2).get(0).getQueueId()); + } + + @Test + public void testBuildPriorityGroupsWithMixedBrokers() { + // Priority based on broker name + MessageQueuePriorityProvider
provider = mq -> { + if (mq.getBrokerName().equals("broker-high")) return 0; + if (mq.getBrokerName().equals("broker-medium")) return 1; + return 2; + }; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker-high", 0), + new MessageQueue("topic", "broker-low", 0), + new MessageQueue("topic", "broker-medium", 0), + new MessageQueue("topic", "broker-high", 1), + new MessageQueue("topic", "broker-medium", 1) + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // High priority group + assertEquals(2, result.get(0).size()); + assertEquals("broker-high", result.get(0).get(0).getBrokerName()); + assertEquals("broker-high", result.get(0).get(1).getBrokerName()); + + // Medium priority group + assertEquals(2, result.get(1).size()); + assertEquals("broker-medium", result.get(1).get(0).getBrokerName()); + + // Low priority group + assertEquals(1, result.get(2).size()); + assertEquals("broker-low", result.get(2).get(0).getBrokerName()); + } + + @Test + public void testBuildPriorityGroupsPreservesQueueOrder() { + // Test that queues with same priority maintain their relative order + MessageQueuePriorityProvider
provider = mq -> 0; + + List queues = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + queues.add(new MessageQueue("topic", "broker", i)); + } + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(1, result.size()); + assertEquals(10, result.get(0).size()); + + // Verify order is maintained + for (int i = 0; i < 10; i++) { + assertEquals(i, result.get(0).get(i).getQueueId()); + } + } + + @Test + public void testBuildPriorityGroupsWithCustomMessageQueue() { + // Test with extended MessageQueue type + class CustomMessageQueue extends MessageQueue { + private int customPriority; + + public CustomMessageQueue(String topic, String brokerName, int queueId, int customPriority) { + super(topic, brokerName, queueId); + this.customPriority = customPriority; + } + + public int getCustomPriority() { + return customPriority; + } + } + + MessageQueuePriorityProvider
provider = + CustomMessageQueue::getCustomPriority; + + List queues = Arrays.asList( + new CustomMessageQueue("topic", "broker", 0, 2), + new CustomMessageQueue("topic", "broker", 1, 0), + new CustomMessageQueue("topic", "broker", 2, 1) + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // Verify order by custom priority: 0, 1, 2 + assertEquals(0, result.get(0).get(0).getCustomPriority()); + assertEquals(1, result.get(1).get(0).getCustomPriority()); + assertEquals(2, result.get(2).get(0).getCustomPriority()); + } + + @Test + public void testBuildPriorityGroupsWithLargeNumberOfQueues() { + // Test with large number of queues + MessageQueuePriorityProvider
provider = mq -> mq.getQueueId() % 5; + + List queues = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + queues.add(new MessageQueue("topic", "broker", i)); + } + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(5, result.size()); // 5 different priorities (0-4) + + // Each group should have 20 queues (100 / 5) + for (List
group : result) { + assertEquals(20, group.size()); + } + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java index d150f87c409..e44ed28f4a6 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java @@ -30,12 +30,12 @@ public class MessageQueueSelectorTest extends BaseServiceTest { public void testReadMessageQueue() { queueData.setPerm(PermName.PERM_READ); queueData.setReadQueueNums(0); - MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true); + MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true); assertTrue(messageQueueSelector.getQueues().isEmpty()); queueData.setPerm(PermName.PERM_READ); queueData.setReadQueueNums(3); - messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true); + messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true); assertEquals(3, messageQueueSelector.getQueues().size()); assertEquals(1, messageQueueSelector.getBrokerActingQueues().size()); for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) { @@ -58,12 +58,12 @@ public void testReadMessageQueue() { public void testWriteMessageQueue() { queueData.setPerm(PermName.PERM_WRITE); queueData.setReadQueueNums(0); - MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false); + MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false); assertTrue(messageQueueSelector.getQueues().isEmpty()); queueData.setPerm(PermName.PERM_WRITE); queueData.setWriteQueueNums(3); - messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false); + messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false); assertEquals(3, messageQueueSelector.getQueues().size()); assertEquals(1, messageQueueSelector.getBrokerActingQueues().size()); for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) { From 4c665802a2529d295411e3f521c82e64126308f8 Mon Sep 17 00:00:00 2001 From: majialong Date: Thu, 8 Jan 2026 14:19:37 +0800 Subject: [PATCH 07/56] [ISSUE #9994] Improve switchTimerEngine command with OptionGroup validation (#9995) --- .../broker/SwitchTimerEngineSubCommand.java | 17 +- .../SwitchTimerEngineSubCommandTest.java | 256 ++++++++++++++++++ 2 files changed, 265 insertions(+), 8 deletions(-) create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/broker/SwitchTimerEngineSubCommandTest.java diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SwitchTimerEngineSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SwitchTimerEngineSubCommand.java index fbddca1b967..a3d053934c0 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SwitchTimerEngineSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SwitchTimerEngineSubCommand.java @@ -18,11 +18,11 @@ import java.util.Set; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.SubCommand; @@ -44,13 +44,15 @@ public String commandDesc() { @Override public Options buildCommandlineOptions(Options options) { + OptionGroup optionGroup = new OptionGroup(); Option opt = new Option("b", "brokerAddr", true, "update which broker"); - opt.setRequired(false); - options.addOption(opt); + optionGroup.addOption(opt); opt = new Option("c", "clusterName", true, "update which cluster"); - opt.setRequired(false); - options.addOption(opt); + optionGroup.addOption(opt); + + optionGroup.setRequired(true); + options.addOptionGroup(optionGroup); opt = new Option("e", "engineType", true, "R/F, R for rocksdb timeline engine, F for file time wheel engine"); opt.setRequired(true); @@ -69,11 +71,12 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t System.out.print("switchTimerEngine engineType must be R or F\n"); return; } + String engineName = MessageConst.TIMER_ENGINE_ROCKSDB_TIMELINE.equals(engineType) ? ROCKSDB_TIMELINE : FILE_TIME_WHEEL; if (commandLine.hasOption('b')) { String brokerAddr = commandLine.getOptionValue('b').trim(); defaultMQAdminExt.start(); defaultMQAdminExt.switchTimerEngine(brokerAddr, engineType); - System.out.printf("switchTimerEngine to %s success, %s\n", engineType, brokerAddr); + System.out.printf("switchTimerEngine to %s success, %s\n", engineName, brokerAddr); return; } else if (commandLine.hasOption('c')) { String clusterName = commandLine.getOptionValue('c').trim(); @@ -82,7 +85,6 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t for (String brokerAddr : masterSet) { try { defaultMQAdminExt.switchTimerEngine(brokerAddr, engineType); - String engineName = MessageConst.TIMER_ENGINE_ROCKSDB_TIMELINE.equals(engineType) ? ROCKSDB_TIMELINE : FILE_TIME_WHEEL; System.out.printf("switchTimerEngine to %s success, %s\n", engineName, brokerAddr); } catch (Exception e) { e.printStackTrace(); @@ -90,7 +92,6 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t } return; } - ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SwitchTimerEngineSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SwitchTimerEngineSubCommandTest.java new file mode 100644 index 00000000000..2b3244eb76f --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SwitchTimerEngineSubCommandTest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.apache.rocketmq.tools.command.server.ServerResponseMocker; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SwitchTimerEngineSubCommandTest { + + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final PrintStream originalOut = System.out; + + @Before + public void setUp() throws Exception { + outContent.reset(); + System.setOut(new PrintStream(outContent)); + } + + @After + public void tearDown() throws Exception { + System.setOut(originalOut); + outContent.reset(); + } + + @Test + public void testCommandName() { + SwitchTimerEngineSubCommand cmd = new SwitchTimerEngineSubCommand(); + Assert.assertEquals("switchTimerEngine", cmd.commandName()); + } + + @Test + public void testCommandDesc() { + SwitchTimerEngineSubCommand cmd = new SwitchTimerEngineSubCommand(); + Assert.assertEquals("switch the engine of timer message in broker", cmd.commandDesc()); + } + + @Test + public void testBuildCommandlineOptions() { + SwitchTimerEngineSubCommand cmd = new SwitchTimerEngineSubCommand(); + Options options = cmd.buildCommandlineOptions(new Options()); + Assert.assertNotNull(options); + Assert.assertTrue(options.hasOption("b")); + Assert.assertTrue(options.hasOption("c")); + Assert.assertTrue(options.hasOption("e")); + } + + @Test + public void testExecuteWithInvalidEngineType() throws SubCommandException { + SwitchTimerEngineSubCommand cmd = new SwitchTimerEngineSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] { + "-b", "127.0.0.1:10911", + "-e", "X" + }; + final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, + cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.execute(commandLine, options, null); + String output = outContent.toString(); + Assert.assertTrue(output.contains("switchTimerEngine engineType must be R or F")); + } + + @Test + public void testExecuteWithEmptyEngineType() throws SubCommandException { + SwitchTimerEngineSubCommand cmd = new SwitchTimerEngineSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] { + "-b", "127.0.0.1:10911", + "-e", "" + }; + final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, + cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.execute(commandLine, options, null); + String output = outContent.toString(); + Assert.assertTrue(output.contains("switchTimerEngine engineType must be R or F")); + } + + @Test + public void testOptionGroupWithBrokerAddrOnly() throws ParseException { + // Test that -b option alone is valid + SwitchTimerEngineSubCommand cmd = new SwitchTimerEngineSubCommand(); + Options options = cmd.buildCommandlineOptions(new Options()); + String[] subargs = new String[] { + "-b", "127.0.0.1:10911", + "-e", MessageConst.TIMER_ENGINE_ROCKSDB_TIMELINE + }; + DefaultParser parser = new DefaultParser(); + CommandLine commandLine = parser.parse(options, subargs); + Assert.assertTrue(commandLine.hasOption('b')); + Assert.assertFalse(commandLine.hasOption('c')); + Assert.assertEquals("127.0.0.1:10911", commandLine.getOptionValue('b')); + } + + @Test + public void testOptionGroupWithClusterNameOnly() throws ParseException { + // Test that -c option alone is valid + SwitchTimerEngineSubCommand cmd = new SwitchTimerEngineSubCommand(); + Options options = cmd.buildCommandlineOptions(new Options()); + String[] subargs = new String[] { + "-c", "default-cluster", + "-e", MessageConst.TIMER_ENGINE_ROCKSDB_TIMELINE + }; + DefaultParser parser = new DefaultParser(); + CommandLine commandLine = parser.parse(options, subargs); + Assert.assertFalse(commandLine.hasOption('b')); + Assert.assertTrue(commandLine.hasOption('c')); + Assert.assertEquals("default-cluster", commandLine.getOptionValue('c')); + } + + @Test + public void testOptionGroupWithNeitherOption() { + // Test that providing neither -b nor -c should fail (required) + SwitchTimerEngineSubCommand cmd = new SwitchTimerEngineSubCommand(); + Options options = cmd.buildCommandlineOptions(new Options()); + String[] subargs = new String[] { + "-e", MessageConst.TIMER_ENGINE_ROCKSDB_TIMELINE + }; + DefaultParser parser = new DefaultParser(); + try { + parser.parse(options, subargs); + Assert.fail("Should throw ParseException when neither -b nor -c is provided"); + } catch (ParseException e) { + String message = e.getMessage(); + Assert.assertNotNull(message); + Assert.assertEquals("Missing required option: [-b update which broker, -c update which cluster]", message); + } + } + + @Test + public void testExecuteWithBrokerAddr() throws SubCommandException { + ServerResponseMocker brokerMocker = null; + try { + // Start broker mock server (return SUCCESS for switchTimerEngine) + brokerMocker = ServerResponseMocker.startServer(null); + + SwitchTimerEngineSubCommand cmd = new SwitchTimerEngineSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] { + "-b", "127.0.0.1:" + brokerMocker.listenPort(), + "-e", MessageConst.TIMER_ENGINE_ROCKSDB_TIMELINE + }; + final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, + cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.execute(commandLine, options, null); + + String output = outContent.toString(); + // Verify the success message with engine name and broker address + Assert.assertTrue(output.contains("switchTimerEngine to ROCKSDB_TIMELINE success, " + "127.0.0.1:" + brokerMocker.listenPort())); + } finally { + if (brokerMocker != null) { + brokerMocker.shutdown(); + } + } + } + + @Test + public void testExecuteWithClusterName() throws SubCommandException { + ServerResponseMocker brokerMocker = null; + ServerResponseMocker nameServerMocker = null; + String originalNamesrvAddr = null; + String mockNamesrvAddr = null; + try { + // Start broker mock server (return SUCCESS for switchTimerEngine) + brokerMocker = ServerResponseMocker.startServer(null); + + // Start name server mock server (return ClusterInfo for examineBrokerClusterInfo) + nameServerMocker = startNameServer(brokerMocker.listenPort()); + mockNamesrvAddr = "127.0.0.1:" + nameServerMocker.listenPort(); + + originalNamesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY); + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, mockNamesrvAddr); + + SwitchTimerEngineSubCommand cmd = new SwitchTimerEngineSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] { + "-c", "mockCluster", + "-e", MessageConst.TIMER_ENGINE_FILE_TIME_WHEEL + }; + final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, + cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.execute(commandLine, options, null); + + String output = outContent.toString(); + // Verify the success message with engine name and broker address + Assert.assertTrue(output.contains("switchTimerEngine to FILE_TIME_WHEEL success, " + "127.0.0.1:" + brokerMocker.listenPort())); + } finally { + // Restore original system property + if (originalNamesrvAddr != null) { + System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, originalNamesrvAddr); + } else { + System.clearProperty(MixAll.NAMESRV_ADDR_PROPERTY); + } + if (brokerMocker != null) { + brokerMocker.shutdown(); + } + if (nameServerMocker != null) { + nameServerMocker.shutdown(); + } + } + } + + private ServerResponseMocker startNameServer(int brokerPort) { + ClusterInfo clusterInfo = new ClusterInfo(); + + HashMap brokerAddressTable = new HashMap<>(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("mockBrokerName"); + HashMap brokerAddress = new HashMap<>(); + brokerAddress.put(MixAll.MASTER_ID, "127.0.0.1:" + brokerPort); + brokerData.setBrokerAddrs(brokerAddress); + brokerData.setCluster("mockCluster"); + brokerAddressTable.put("mockBrokerName", brokerData); + clusterInfo.setBrokerAddrTable(brokerAddressTable); + + HashMap > clusterAddressTable = new HashMap<>(); + Set brokerNames = new HashSet<>(); + brokerNames.add("mockBrokerName"); + clusterAddressTable.put("mockCluster", brokerNames); + clusterInfo.setClusterAddrTable(clusterAddressTable); + + // start name server + return ServerResponseMocker.startServer(clusterInfo.encode()); + } +} + From ae6981453a97662bc52dde4f19d6cfa7485bf6d7 Mon Sep 17 00:00:00 2001 From: yx9o Date: Thu, 8 Jan 2026 15:16:25 +0800 Subject: [PATCH 08/56] [ISSUE #9923] Transactional messages should not send custom delayed messages (#9924) * [ISSUE #9923] Transactional messages should not send custom delayed messages * Transactional messages do not support delayed delivery --- .../impl/producer/DefaultMQProducerImpl.java | 17 +++++++++++------ .../TransactionMQProducerWithTraceTest.java | 12 ++++++++++++ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index d0bd0649814..894888f5889 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1434,11 +1434,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, throw new MQClientException("tranExecutor is null", null); } - // ignore DelayTimeLevel parameter - if (msg.getDelayTimeLevel() != 0) { - MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); - } - + ensureNotDelayedForTransactional(msg); Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; @@ -1495,7 +1491,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, try { this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { - log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); + log.warn("local transaction execute {}, but end broker transaction failed", localTransactionState, e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); @@ -1508,6 +1504,15 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, return transactionSendResult; } + private void ensureNotDelayedForTransactional(final Message msg) throws MQClientException { + if (msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null + || msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) != null + || msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null + || msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null) { + throw new MQClientException("Transactional messages do not support delayed delivery", null); + } + } + /** * DEFAULT SYNC ------------------------------------------------------- */ diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java index 9f6036153bc..0e550555283 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java @@ -168,6 +168,18 @@ public Object answer(InvocationOnMock mock) throws Throwable { assertThat(ctx.getMessage().getTopic()).isEqualTo(topic); } + @Test(expected = MQClientException.class) + public void testSendMessageInTransaction_NoListener_ThrowsException() throws MQClientException { + producer.setTransactionListener(null); + producer.sendMessageInTransaction(message, null); + } + + @Test(expected = MQClientException.class) + public void testSendMessageInTransaction_DelayMsg_ThrowsException() throws MQClientException { + message.setDelayTimeLevel(3); + producer.sendMessageInTransaction(message, null); + } + @After public void terminate() { producer.shutdown(); From 9fecafe086f614b3559ae77e6a0257506a9d974e Mon Sep 17 00:00:00 2001 From: Humkum <1109939087@qq.com> Date: Mon, 12 Jan 2026 11:26:58 +0800 Subject: [PATCH 09/56] [ISSUE #9953] Fix: there's no need to decompress message body in server side (#9954) --- .../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4361431bec0..ea262bb25d7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -2853,7 +2853,7 @@ private RemotingCommand resumeCheckHalfMessage(ChannelHandlerContext ctx, MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId()); selectMappedBufferResult = this.brokerController.getMessageStore() .selectOneMessageByOffset(messageId.getOffset()); - MessageExt msg = MessageDecoder.decode(selectMappedBufferResult.getByteBuffer()); + MessageExt msg = MessageDecoder.decode(selectMappedBufferResult.getByteBuffer(), true, false); msg.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(0)); PutMessageResult putMessageResult = this.brokerController.getMessageStore() .putMessage(toMessageExtBrokerInner(msg)); From 4eead1364d074f40cbedacdc8051f5b4bce4072e Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Tue, 13 Jan 2026 09:56:22 +0800 Subject: [PATCH 10/56] [ISSUE #9912] Reduce excessive requests for consumer offset timestamps in tiered storage (#9991) * [ISSUE #9912] Reduce excessive requests for consumer offset timestamps in tiered storage Signed-off-by: terrance.lzm * [ISSUE #9912] Reduce excessive requests for consumer offset timestamps in tiered storage --------- Signed-off-by: terrance.lzm --- .../tieredstore/TieredMessageStore.java | 8 +--- .../core/MessageStoreFetcherImpl.java | 45 ++++++++++++++----- .../tieredstore/TieredMessageStoreTest.java | 4 +- 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index b30f868d194..38946fd1611 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -362,8 +362,7 @@ public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueu } @Override - public CompletableFuture getMessageStoreTimeStampAsync(String topic, int queueId, - long consumeQueueOffset) { + public CompletableFuture getMessageStoreTimeStampAsync(String topic, int queueId, long consumeQueueOffset) { if (fetchFromCurrentStore(topic, queueId, consumeQueueOffset)) { Stopwatch stopwatch = Stopwatch.createStarted(); return fetcher.getMessageStoreTimeStampAsync(topic, queueId, consumeQueueOffset) @@ -374,11 +373,6 @@ public CompletableFuture getMessageStoreTimeStampAsync(String topic, int q .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic) .build(); TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); - if (time == -1) { - log.debug("GetEarliestMessageTimeAsync failed, try to get message time from next store, topic: {}, queue: {}, queue offset: {}", - topic, queueId, consumeQueueOffset); - return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset); - } return time; }); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java index f0e8b3ab503..f669f8940af 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.BoundaryType; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; @@ -52,6 +53,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher { private static final Logger log = LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME); protected static final String CACHE_KEY_FORMAT = "%s@%d@%d"; + protected static final String FETCHER_GROUP_NAME = MixAll.CID_RMQ_SYS_PREFIX + "FETCHER_TIMESTAMP"; private final String brokerName; private final MetadataStore metadataStore; @@ -389,18 +391,37 @@ public CompletableFuture getMessageStoreTimeStampAsync(String topic, int q return CompletableFuture.completedFuture(-1L); } - return flatFile.getConsumeQueueAsync(queueOffset) - .thenComposeAsync(cqItem -> { - long commitLogOffset = MessageFormatUtil.getCommitLogOffsetFromItem(cqItem); - int size = MessageFormatUtil.getSizeFromItem(cqItem); - return flatFile.getCommitLogAsync(commitLogOffset, size); - }, messageStore.getStoreExecutor().bufferFetchExecutor) - .thenApply(MessageFormatUtil::getStoreTimeStamp) - .exceptionally(e -> { - log.error("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " + - "get or decode message failed, topic={}, queue={}, offset={}", topic, queueId, queueOffset, e); - return -1L; - }); + // The Metrics thread frequently retrieves the storage timestamp of the latest message; + // as an alternative, return the queue's saved timestamp here. + if (queueOffset + 1L == flatFile.getConsumeQueueCommitOffset()) { + long timestamp = flatFile.getMaxStoreTimestamp(); + return CompletableFuture.completedFuture(timestamp == Long.MAX_VALUE ? -1L : timestamp); + } + + CompletableFuture future = new CompletableFuture<>(); + try { + this.getMessageAsync(FETCHER_GROUP_NAME, topic, queueId, queueOffset, 1, null) + .whenComplete((result, e) -> { + if (e != null) { + log.error("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " + + "Get or decode message failed, topic={}, queue={}, offset={}", topic, queueId, queueOffset, e); + future.completeExceptionally(e); + return; + } + if (result != null && result.getMessageBufferList() != null + && !result.getMessageBufferList().isEmpty()) { + long timestamp = MessageFormatUtil.getStoreTimeStamp(result.getMessageBufferList().get(0)); + log.info("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " + + "topic={}, queue={}, offset={}, timestamp={}", topic, queueId, queueOffset, timestamp); + future.complete(timestamp); + } else { + future.complete(-1L); + } + }); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return future; } @Override diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java index 1a0240681c0..f88779f09b2 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java @@ -268,8 +268,10 @@ public void testGetMessageStoreTimeStampAsync() { configuration.update(properties); Assert.assertEquals(1, (long) currentStore.getMessageStoreTimeStampAsync(mq.getTopic(), mq.getQueueId(), 0).join()); + // If data cannot be fetched from tiered storage, + // there is no need to fallback to local storage. Mockito.when(fetcher.getMessageStoreTimeStampAsync(anyString(), anyInt(), anyLong())).thenReturn(CompletableFuture.completedFuture(-1L)); - Assert.assertEquals(3, (long) currentStore.getMessageStoreTimeStampAsync(mq.getTopic(), mq.getQueueId(), 0).join()); + Assert.assertEquals(-1L, (long) currentStore.getMessageStoreTimeStampAsync(mq.getTopic(), mq.getQueueId(), 0).join()); } @Test From b6cc188bfa5cd35435a34f13f8b3760684c1a8cc Mon Sep 17 00:00:00 2001 From: ymwneu Date: Wed, 14 Jan 2026 10:06:22 +0800 Subject: [PATCH 11/56] [ISSUE #9992] Fix remoting server netty server codec thread reuse problem (#9993) Co-authored-by: maowei.ymw --- .../remoting/netty/NettyRemotingServer.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index d56d6faa336..be02d0f9a97 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -272,9 +272,9 @@ public void run(Timeout timeout) { */ protected ChannelPipeline configChannel(SocketChannel ch) { return ch.pipeline() - .addLast(nettyServerConfig.isServerNettyWorkerGroupEnable() ? defaultEventExecutorGroup : null, + .addLast(getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, new HandshakeHandler()) - .addLast(nettyServerConfig.isServerNettyWorkerGroupEnable() ? defaultEventExecutorGroup : null, + .addLast(getDefaultEventExecutorGroup(), encoder, new NettyDecoder(), distributionHandler, @@ -430,7 +430,7 @@ private void printRemotingCodeDistribution() { } public DefaultEventExecutorGroup getDefaultEventExecutorGroup() { - return defaultEventExecutorGroup; + return nettyServerConfig.isServerNettyWorkerGroupEnable() ? defaultEventExecutorGroup : null; } public NettyEncoder getEncoder() { @@ -462,11 +462,11 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List