2012-08-03 44 views
1

我期待以增加我写对ActiveMQ的高通量生产商的表现,并根据this useAsyncSend会:异步.NET中发送的ActiveMQ

部队使用异步的发送这增加了巨大的性能提升; ,但意味着无论是否发送了 消息,send()方法都会立即返回,这可能会导致消息丢失。

但是我看不出它对我的简单测试用例有什么影响。

使用这个非常基本的应用:

const string QueueName = "...."; 
const string Uri = "...."; 

static readonly Stopwatch TotalRuntime = new Stopwatch(); 

static void Main(string[] args) 
{ 
    TotalRuntime.Start(); 
    SendMessage(); 
    Console.ReadLine(); 
} 

static void SendMessage() 
{ 
    var session = CreateSession(); 
    var destination = session.GetQueue(QueueName); 
    var producer = session.CreateProducer(destination); 

    Console.WriteLine("Ready to send 700 messages"); 
    Console.ReadLine(); 

    var body = new byte[600*1024]; 

    Parallel.For(0, 700, i => SendMessage(producer, i, body, session));   
} 

static void SendMessage(IMessageProducer producer, int i, byte[] body, ISession session) 
{ 
    var message = session.CreateBytesMessage(body); 

    var sw = new Stopwatch(); 
    sw.Start(); 
    producer.Send(message); 
    sw.Stop(); 

    Console.WriteLine("Running for {0}ms: Sent message {1} blocked for {2}ms", 
      TotalRuntime.ElapsedMilliseconds, 
      i, 
      sw.ElapsedMilliseconds); 
}  

static ISession CreateSession() 
{ 
    var connectionFactory = new ConnectionFactory(Uri) 
            { 
             AsyncSend = true, 
             CopyMessageOnSend = false 
            }; 
    var connection = connectionFactory.CreateConnection(); 
    connection.Start(); 
    var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); 
    return session; 
} 

我得到以下输出:

Ready to send 700 messages 

Running for 2430ms: Sent message 696 blocked for 12ms 
Running for 4275ms: Sent message 348 blocked for 1858ms 
Running for 5106ms: Sent message 609 blocked for 2689ms 
Running for 5924ms: Sent message 1 blocked for 2535ms 
Running for 6749ms: Sent message 88 blocked for 1860ms 
Running for 7537ms: Sent message 610 blocked for 2429ms 
Running for 8340ms: Sent message 175 blocked for 2451ms 
Running for 9163ms: Sent message 89 blocked for 2413ms 
..... 

这表明,每个消息需要800ms的发送和调用session.Send()块约两个半秒钟。尽管文档中说,

“send()方法会立即返回”

此外,这些数字基本上是相同的,如果我要么改变并行进行的正常循环或改变AsyncSend = trueAlwaysSyncSend = true所以我不认为异步开关工作在所有...

任何人都可以看到我在这里失踪,使发送异步吗?


进一步测试后:

根据该绝大部分运行时间被花在等待同步蚁性能分析器。看起来问题在于各种传输类通过监视器在内部阻塞。特别是我似乎挂断了MutexTransport的OneWay方法,它只允许一个线程一次访问它。

看起来好像调用send将阻塞,直到先前的消息已完成,这就解释了为什么我的输出显示,封锁12ms的第一条消息,而未来了1858ms。我可以通过实现每个连接消息模式来实现多个传输,这样可以改善问题并使消息并行发送,但是大大增加了发送单个消息的时间,并且消耗了太多似乎不太像的资源正确的解决方案。

我重新测试所有这一切与1.5.6并没有看到任何区别。

+0

这里缺少一些关键信息,NMS.ActiveMQ的哪些版本?您传递给经纪人的实际URI是什么?经纪人是否启用了生产者流量控制? – 2012-08-03 18:30:18

+0

@Tim好点,对不起。我运行的是版本1.5.1,URI是内部服务器,但形式为“tcp:// ...”,我不知道代理是如何设置的 - 我可以访问控制台网页,但似乎并未公开该信息。进一步的调查使我相信这个问题与我发送的消息的大小(600Kb)有关,但我仍在研究它,所以任何帮助都将不胜感激。 – 2012-08-03 18:37:41

回答

0

一如既往做的最好的事情就是更新到最新版本(1.5.6在写这篇文章的时间)。如果代理启用了生产者流量控制,并且您已达到队列大小限制,则发送可以阻止,尽管使用异步发送不应该发生,除非您使用producerWindowSize集发送消息。获得帮助的一个好方法是创建一个测试用例,并通过Jira问题将其提交给NMS.ActiveMQ网站,以便我们可以使用您的测试代码查看它。从1.5.1开始有很多修正,所以我建议尝试一下这个新版本,因为它可能已经不是问题了。