2016-03-11 27 views
0

我想要缩放下面的示例程序,该程序创建一个简单的JMS消息并同时将其推送到相同的队列。示例程序目前在单个四核主机上需要大约20秒。任何人都可以推荐一些更改以改进性能? 20秒的测量只是为下面的并行流处理线:为JMS消息创建分配ParallelStream处理

我能想到的
test.parallelStream().forEach(e -> sender.sendMessage(e)); 

一种方法是我的专辑(“测试”)展开了一些主机,然后同时处理集合中的块通过配置每个线程拥有自己的线程池。其中一个缺点是容错性,并且必须放置适当的构造以确保每个线程池不处理相同的消息。

另一种方法是使用更高性能/并行经纪人像卡夫卡

请注意,我采取任何异步线程的办法,我确实需要能够控制多少消息正在从这些线程发送到另一个应用程序,因为它们只同时支持一定数量。任何其他想法?

全部来源:

import java.util.ArrayList; 
import java.util.List; 

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 

public class Sender { 
    private static ConnectionFactory factory = null; 
    private static Connection connection = null; 
    private static Session session = null; 
    private static Destination destination = null; 
    private static MessageProducer producer = null; 

    public Sender() {} 

    public void sendMessage(String test) { 
     try { 
      TextMessage message = session.createTextMessage(); 
      message.setText(test); 
      producer.send(message); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 

    public static void main(String[] args) throws JMSException { 
      factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); 
    ((ActiveMQConnectionFactory)factory).setUseAsyncSend(true); 
    ((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true); 
    connection = factory.createConnection(); 
    connection.start(); 
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    destination = session.createQueue("SAMPLEQUEUE"); 
    producer = session.createProducer(destination); 
    producer.setDeliveryMode(DeliveryMode.PERSISTENT); 

     List<String> test = new ArrayList<String>(); 
     for (int i = 0; i <= 100000; i++) { 
      test.add(Integer.toString(i)); 
     } 
     Sender sender = new Sender(); 
     test.parallelStream().forEach(e -> sender.sendMessage(e)); 
    } 
} 
+0

您的代码中存在潜在的线程安全问题:MessagProducer :: send()不保证可以安全地从多个线程调用。 – Ralf

+0

现在请忽略它,因为它稍后将被移动到Spring JMS,这只是一个例子。谢谢。 – c12

+0

预期的吞吐量消息是多少?每个消息的大小是多少?你的例子不够用,一个整数作为一个字符串需要大约10个字节,与现实世界的用例相比,它是相当小的。 –

回答

1

正如指出的MessagProducer不能保证是线程安全的,但也不是会话。

反正就一点:

  1. 我会强烈建议不要使用IO操作的默认流API,因为它使用的所有操作一个线程池,线程池被限制为核心的数量另外你不能指定超时。您应该使用的Executor来代替:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
 

    ExecutorService executor = Executors.newFixedThreadPool(2); 
    for (String msg: msgs) { 
     executor.execute(() -> send(msg)); 
    } 

  • 如果没有保证的消息传递,您可以更改消息传递模式NON_PERSISTENT,这加快了发送,因为有没有消息存储和保证传送的开销。
  • 在现实中,虽然你不需要自己做线程管理作为ActiveMQ的有异步支持,这意味着你的消息可以在单独的线程,只要承认你不必保证交货:http://activemq.apache.org/async-sends.html
  • 如果确实需要为了保证传送,请改用JMS事务内部的PERSISTENT消息传送模式(默认)批量发送消息(一种全有或全无的方式)。这会提高性能,因为您只在提交期间进行实际发送。 https://docs.oracle.com/javaee/6/api/javax/jms/Connection.html#createSession(boolean, int)
  • 启用optimizeAcknowledge在ActiveMQ中加快消息代理本身:http://activemq.apache.org/optimized-acknowledgement.html
  • 最后,您可以设置会话确认模式Session.DUPS_OK_ACKNOWLEDGE,这样的消息在后台懒洋洋地承认,但是这可能产生重复发送邮件,因此在消费者需要使用唯一的编号或类似编号
  • 当然,您不应该将所有这些方法结合在一起使用,只需使用适合其他方法的常识即可。

    +0

    highstakes感谢指针!我将把MessageProducer和Session移植到Spring JMS中,这仅仅是一个例子。我没有看到一个清楚的方法来在基于Executor的线程池中散布一组记录。 – c12

    +0

    @ c12增加了一个例子,虽然你不需要使用这个,只需要依靠ActiveMQ线程管理。或者,你可以使用executors和批量发送例如每个10k消息只在交易 – highstakes

    +0

    我假设ConnectionFactory.setUseAsyncSend(true)和DeliveryMode.PERSISTENT可以结合,正确吗?当我设置使用AsyncSend(true)和DeliveryMode.NON_PERSISTENT与setUseAsyncSend(true)和DeliveryMode.PERSISTENT时,我看到了2倍的改进。对于相同数量的单线程和5秒的100k消息大约是2.5秒。理想情况下,我希望在性能和交付保证之间保持适当的平衡,这就是我使用DeliveryMode.PERSISTENT和并行处理的原因。我已经更新了我的示例,上面的配置是以5k为100k的配置。 – c12

    1

    除了上面所说的,我还会专注于如何使用这些消息。如果您的接收端应用程序可以同时处理有限数量的消息,则无论您同时发送多少条消息都无所谓,因为它们都将在队列中等待,直到接收应用程序准备好消耗它们。所以,说你的接收应用程序可以同时处理10个请求。我会在您的队列中设置10个消费者,每个消费者只有在处理之前处理的请求之后才从队列中读取传入请求。这样你传入消息的速度并不重要(除了担心队列溢出)。而您的应用程序将始终同时处理不超过10个请求。

    +0

    嗨迈克尔,接收应用程序可以同时接收高达3k的请求,所以我计划在10台主机上站立300名消费者。我确实需要运行一些分析来了解每个主机有多少个消费者线程可以处理。谢谢。 – c12