我想要缩放下面的示例程序,该程序创建一个简单的JMS消息并同时将其推送到相同的队列。示例程序目前在单个四核主机上需要大约20秒。任何人都可以推荐一些更改以改进性能? 20秒的测量只是为下面的并行流处理线:为JMS消息创建分配ParallelStream处理
我能想到的test.parallelStream().forEach(e -> sender.sendMessage(e));
一种方法是我的专辑(“测试”)展开了一些主机,然后同时处理集合中的块通过配置每个线程拥有自己的线程池。其中一个缺点是容错性,并且必须放置适当的构造以确保每个线程池不处理相同的消息。
另一种方法是使用更高性能/并行经纪人像卡夫卡
请注意,我采取任何异步线程的办法,我确实需要能够控制多少消息正在从这些线程发送到另一个应用程序,因为它们只同时支持一定数量。任何其他想法?
全部来源:
import java.util.ArrayList;
import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
private static ConnectionFactory factory = null;
private static Connection connection = null;
private static Session session = null;
private static Destination destination = null;
private static MessageProducer producer = null;
public Sender() {}
public void sendMessage(String test) {
try {
TextMessage message = session.createTextMessage();
message.setText(test);
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws JMSException {
factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
((ActiveMQConnectionFactory)factory).setUseAsyncSend(true);
((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("SAMPLEQUEUE");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
List<String> test = new ArrayList<String>();
for (int i = 0; i <= 100000; i++) {
test.add(Integer.toString(i));
}
Sender sender = new Sender();
test.parallelStream().forEach(e -> sender.sendMessage(e));
}
}
您的代码中存在潜在的线程安全问题:MessagProducer :: send()不保证可以安全地从多个线程调用。 – Ralf
现在请忽略它,因为它稍后将被移动到Spring JMS,这只是一个例子。谢谢。 – c12
预期的吞吐量消息是多少?每个消息的大小是多少?你的例子不够用,一个整数作为一个字符串需要大约10个字节,与现实世界的用例相比,它是相当小的。 –