0
我试图为多个RabbitMQ队列设置Spark流。如下所述,我设置了2个工作者,每个工作者都有一个内核和2GB的内存。所以,问题是当我保持这个参数为conf.set("spark.cores.max","2")
流不处理任何数据,只是继续添加作业。但是,一旦我将其设置为conf.set("spark.cores.max","3")
流式处理开始处理它。所以,我无法理解这个原因。另外,如果我想从两个队列中并行处理数据,我应该怎么做。我在下面提到了我的代码和配置设置。并行处理多个rabbitmq队列的Spark Streaming处理
Spark-env.sh:
SPARK_WORKER_MEMORY=2g
SPARK_WORKER_INSTANCES=1
SPARK_WORKER_CORES=1
Scala代码:
val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("queueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName"), "routingKeys" -> config.getString("routingKeys"))
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
receiverStream.start()
val predRabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2", "queueName" -> config.getString("queueName1"), "host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName1"), "routingKeys" -> config.getString("routingKeys1"))
val predReceiverStream = RabbitMQUtils.createStream(ssc, predRabbitParams)
predReceiverStream.start()