我期待以增加我写对ActiveMQ的高通量生产商的表现,并根据this useAsyncSend会:异步.NET中发送的ActiveMQ
部队使用异步的发送这增加了巨大的性能提升; ,但意味着无论是否发送了 消息,send()方法都会立即返回,这可能会导致消息丢失。
但是我看不出它对我的简单测试用例有什么影响。
使用这个非常基本的应用:
const string QueueName = "....";
const string Uri = "....";
static readonly Stopwatch TotalRuntime = new Stopwatch();
static void Main(string[] args)
{
TotalRuntime.Start();
SendMessage();
Console.ReadLine();
}
static void SendMessage()
{
var session = CreateSession();
var destination = session.GetQueue(QueueName);
var producer = session.CreateProducer(destination);
Console.WriteLine("Ready to send 700 messages");
Console.ReadLine();
var body = new byte[600*1024];
Parallel.For(0, 700, i => SendMessage(producer, i, body, session));
}
static void SendMessage(IMessageProducer producer, int i, byte[] body, ISession session)
{
var message = session.CreateBytesMessage(body);
var sw = new Stopwatch();
sw.Start();
producer.Send(message);
sw.Stop();
Console.WriteLine("Running for {0}ms: Sent message {1} blocked for {2}ms",
TotalRuntime.ElapsedMilliseconds,
i,
sw.ElapsedMilliseconds);
}
static ISession CreateSession()
{
var connectionFactory = new ConnectionFactory(Uri)
{
AsyncSend = true,
CopyMessageOnSend = false
};
var connection = connectionFactory.CreateConnection();
connection.Start();
var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
return session;
}
我得到以下输出:
Ready to send 700 messages
Running for 2430ms: Sent message 696 blocked for 12ms
Running for 4275ms: Sent message 348 blocked for 1858ms
Running for 5106ms: Sent message 609 blocked for 2689ms
Running for 5924ms: Sent message 1 blocked for 2535ms
Running for 6749ms: Sent message 88 blocked for 1860ms
Running for 7537ms: Sent message 610 blocked for 2429ms
Running for 8340ms: Sent message 175 blocked for 2451ms
Running for 9163ms: Sent message 89 blocked for 2413ms
.....
这表明,每个消息需要800ms的发送和调用session.Send()
块约两个半秒钟。尽管文档中说,
“send()方法会立即返回”
此外,这些数字基本上是相同的,如果我要么改变并行进行的正常循环或改变AsyncSend = true
到AlwaysSyncSend = true
所以我不认为异步开关工作在所有...
任何人都可以看到我在这里失踪,使发送异步吗?
进一步测试后:
根据该绝大部分运行时间被花在等待同步蚁性能分析器。看起来问题在于各种传输类通过监视器在内部阻塞。特别是我似乎挂断了MutexTransport的OneWay方法,它只允许一个线程一次访问它。
看起来好像调用send将阻塞,直到先前的消息已完成,这就解释了为什么我的输出显示,封锁12ms的第一条消息,而未来了1858ms。我可以通过实现每个连接消息模式来实现多个传输,这样可以改善问题并使消息并行发送,但是大大增加了发送单个消息的时间,并且消耗了太多似乎不太像的资源正确的解决方案。
我重新测试所有这一切与1.5.6并没有看到任何区别。
这里缺少一些关键信息,NMS.ActiveMQ的哪些版本?您传递给经纪人的实际URI是什么?经纪人是否启用了生产者流量控制? – 2012-08-03 18:30:18
@Tim好点,对不起。我运行的是版本1.5.1,URI是内部服务器,但形式为“tcp:// ...”,我不知道代理是如何设置的 - 我可以访问控制台网页,但似乎并未公开该信息。进一步的调查使我相信这个问题与我发送的消息的大小(600Kb)有关,但我仍在研究它,所以任何帮助都将不胜感激。 – 2012-08-03 18:37:41