2015-02-23 71 views
1

我正在使用Play Framework 2.2.2,并且正在使用JavaAkka(Akka Actor System)实现RabbitMQ消费者应用程序。所以我有一个MainActor,它在Play应用程序使用Global.OnStart函数时出现时被初始化。 MainActor创建一个RabbitMQ通道,然后开始从队列中消费。该队列中的每条消息都是另一个队列的名称,该队列必须分配给另一个必须从消息中提到的队列开始消费的子actor或子actor。所以基本上,我有一个订阅了一个RabbitMQ队列的MainActor和由主演员创建的几个子actor,每个子actor都订阅了它们自己的RabbitMQ队列。问题是,出于某种原因,我无法提出超过7名儿童演员。我怀疑这是在等待来自RabbitMQ的消息的儿童演员中的while(真)构造。下面是我的实现:RabbitMQ Play Java Akka

主要演员:

import play.Logger; 
import com.typesafe.config.ConfigFactory; 

import java.io.IOException; 

import akka.actor.Props; 
import akka.actor.UntypedActor; 
import akka.actor.ActorRef; 

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 
import play.libs.Akka; 
import util.RabbitMQConnection; 

public class MainActor extends UntypedActor { 

@Override 
public void onReceive(Object msg) throws Exception { 

     try{ 
      Connection connection = RabbitMQConnection.getConnection(); 
      Channel channel = connection.createChannel(); 

      String main_queue_name = ConfigFactory.load().getString("rabbitmq.default_queue"); 

      channel.queueDeclare(main_queue_name, false, false, false, null); 

      QueueingConsumer consumer = new QueueingConsumer(channel); 
      channel.basicConsume(main_queue_name, true, consumer); 

      while (true) { 

       QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
       String message = new String(delivery.getBody()); 

       System.out.println(" [x] Received '" + message + "'"); 

       ActorRef childActor = getContext().actorOf(Props.create(childActor.class)); 
       childActor.tell(message, getSelf()); 
      } 
     }catch (Exception e){ 
      System.out.println(e.toString()); 
     } 
    } 
} 

儿童演员:

import play.Logger; 
import com.typesafe.config.ConfigFactory; 
import java.io.IOException; 
import com.fasterxml.jackson.databind.JsonNode; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.fasterxml.jackson.databind.ObjectWriter; 
import play.libs.Akka; 
import play.libs.Json; 

import akka.actor.UntypedActor; 

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 
import util.RabbitMQConnection; 


public class childActor extends UntypedActor { 

@Override 
public void onReceive(Object msg) throws Exception { 

    ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); 
    String queue_name = ow.writeValueAsString(msg); 

    try{ 
     Connection connection = RabbitMQConnection.getConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(queue_name, false, false, false, null); 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(queue_name, true, consumer); 

     while (true) { 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 


      JsonNode jsonMsg = Json.parse(message); 

      // Call some function to process the message 

     } 
    }catch (Exception e){ 
     System.out.println(e.toString()); 
    } 
} 
} 

回答

1

我想你没有正确使用的演员在这种情况下。在我看来,你不应该在给定演员的receive方法中有一段时间(真)。此外,QueueingConsumer已弃用,rabbitmq家伙建议使用界面Consumer或缺省无操作实施DefaultConsumer来实现您的客户。

我会做的方式是:

  • 实施RabbitMQ的定制消费,将每次得到的东西的时间将消息发送到演员。
  • 对主演员使用该实现。将队列名称作为消息发送,并启动一个新的具有队列名称的子actor作为构造器字段。
  • 将该实现用于子actor。将收到的消息发送给演员,并在演员中进行JSON解析。

一些代码在这里:(警告:没有编译或测试)

定制RabbitMQ的消费者:

public class MyCustomRabbitMQConsumer extends DefaultConsumer { 

    private ActorRef destinationActor; 

    public MyCustomRabbitMQConsumer(ActorRef destinationActor) { 
     this.destinationActor = destinationActor; 
    } 

    @Override 
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { 
     destinationActor.tell(new String(body)); 
    } 

} 

主要演员:

import play.Logger; 
import com.typesafe.config.ConfigFactory; 

import java.io.IOException; 

import akka.actor.Props; 
import akka.actor.UntypedActor; 
import akka.actor.ActorRef; 

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 
import play.libs.Akka; 
import util.RabbitMQConnection; 

public class MainActor extends UntypedActor { 

    private MyCustomRabbitMQConsumer rabbitConsumer; 

    @Override 
    public void preStart() { 
     Connection connection = RabbitMQConnection.getConnection(); 
     Channel channel = connection.createChannel(); 

     String main_queue_name = ConfigFactory.load().getString("rabbitmq.default_queue"); 
     channel.queueDeclare(main_queue_name, false, false, false, null); 

     rabbitConsumer = new MyCustomRabbitMQConsumer(getSelf()); 
     channel.basicConsume(main_queue_name, true, rabbitConsumer); 
    } 

    @Override 
    public void onReceive(Object msg) throws Exception { 
     if(msg instanceOf String) { 
      String queueName = (String) msg; 
      System.out.println(" [x] Received '" + queueName + "'"); 
      getContext().actorOf(Props.create(childActor.class, queueName)); 
     } 
    } 
} 

ChildActor:

import akka.actor.UntypedActor; 

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 
import util.RabbitMQConnection; 


public class ChildActor extends UntypedActor { 

    private MyCustomRabbitMQConsumer rabbitConsumer; 
    private String queueName; 

    public ChildActor(String queueName) { 
     this.queueName = queueName; 
    } 

    @Override 
    public void preStart() { 
     Connection connection = RabbitMQConnection.getConnection(); 
     Channel channel = connection.createChannel(); 

     String main_queue_name = ConfigFactory.load().getString("rabbitmq.default_queue"); 
     channel.queueDeclare(queueName, false, false, false, null); 

     rabbitConsumer = new MyCustomRabbitMQConsumer(getSelf()); 
     channel.basicConsume(queueName, true, rabbitConsumer); 
    } 


    @Override 
    public void onReceive(Object msg) throws Exception { 

     if(msg instanceOf String) { 
      String strMsg = (String) msg; 
      JsonNode jsonMsg = Json.parse(message); 

      // Call some function to process the message 
     } 
    } 
} 

这应该适用于n个演员。