2013-02-27 52 views
0

我正在通过STOMP与ActiveMQ交互。我有一个发布消息的过程和一个订阅和处理消息的多个过程(大约10个并行实例)。使用STOMP从ActiveMQ队列中读取非阻塞事务

阅读消息后,我想确定如果出于某种原因,我的应用程序失败/崩溃,消息不会丢失。自然,我转向交易。不幸的是,我发现一旦消费者读取消息作为交易的一部分,所有以下消息都不会被发送给其他消费者,直到交易结束。

测试案例:abc队列有100条消息。如果我在两个不同的浏览器选项卡中激活以下代码,第一个将在10秒内返回,第二个将在20秒内返回。

<?php 
// Reader.php 
$con = new Stomp("tcp://localhost:61613"); 
$con->connect(); 

$con->subscribe(
    "/queue/abc", 
    array() 
); 

$tx = "tx3".microtime(); 
echo "TX:$tx<BR>"; 
$con->begin($tx); 
$messages = array(); 
for ($i = 0; $i < 10; $i++) { 
    $t = microtime(true); 
    $msg = $con->readFrame(); 
    if (!$msg) { 
     die("FAILED!"); 
    } 
    $t = microtime(true)-$t; echo "readFrame() took $t MS to complete<BR>"; 
    array_push($messages, $msg); 
    $con->ack($msg, $tx); 
    sleep(1); 
} 
$con->abort($tx); 

有什么我缺少代码明智吗?是否有配置ActiveMQ(或发送头文件)的方法,可以使事务从队列中删除项目,允许其他进程使用其他消息,并且如果事务失败或超时,将把项目恢复到? PS:我想过为每个阅读过程创建另一个队列--DetentionQueue,但我真的宁愿不这样做,如果我有选择。

回答

1

您可能需要调整预订的预取大小,以便ActiveMQ在客户端2有机会获得任何请求之前不会将队列上的消息发送到客户端1。默认情况下,它被设置为1000,以便最好地调整它以适合您的用例。

您可以通过预订帧上的“activemq.prefetchSize = 1”标头设置预取大小。有关所有框架选项,请参阅ActiveMQ Stomp页面。

+0

我的特定ActiveMQ版本的默认预取大小为1(根据AvctiveMQ控制台进行检查)。可以肯定的是,我检查了你的建议(使用activemq.prefetchSize = 1),但没有改变结果。 但是,谢谢,无论如何:) – ygilad 2013-03-02 00:37:53