2016-10-17 87 views
0

我试图为多个RabbitMQ队列设置Spark流。如下所述,我设置了2个工作者,每个工作者都有一个内核和2GB的内存。所以,问题是当我保持这个参数为conf.set("spark.cores.max","2")流不处理任何数据,只是继续添加作业。但是,一旦我将其设置为conf.set("spark.cores.max","3")流式处理开始处理它。所以,我无法理解这个原因。另外,如果我想从两个队列中并行处理数据,我应该怎么做。我在下面提到了我的代码和配置设置。并行处理多个rabbitmq队列的Spark Streaming处理

Spark-env.sh:

SPARK_WORKER_MEMORY=2g 
SPARK_WORKER_INSTANCES=1 
SPARK_WORKER_CORES=1 

Scala代码:

val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("queueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName"), "routingKeys" -> config.getString("routingKeys")) 
    val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams) 
    receiverStream.start()  

    val predRabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2", "queueName" -> config.getString("queueName1"), "host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName1"), "routingKeys" -> config.getString("routingKeys1")) 
    val predReceiverStream = RabbitMQUtils.createStream(ssc, predRabbitParams) 
    predReceiverStream.start() 

回答

1

这种行为在Streaming Guide解释。每个接收器都是一个长时间运行的进程,占用一个线程。

如果可用线程的数量小于或等于接收器的数量有剩余任务处理没有资源:

分配给星火流媒体应用内核的数量必须比多接收器数量。否则系统将接收数据,但无法处理它。