1

是否可以配置Spark和spark-streaming-kafka-0-10库来读取多个Kafka分区或整个Kafka主题使用单个任务而不是为每个可用的Kafka分区创建不同的Spark任务?spark-streaming-kafka-0-10:如何限制Spark分区的数量

请原谅我对这些技术的粗略理解;我认为我对Spark和Kafka仍然是新手。体系结构和设置大多只是为了探索并了解这些技术如何协同工作。

我有一个四个虚拟主机,一个有Spark主,每个都有一个Spark工作者。根据Spotify's Docker image,其中一位主持人也在经营卡夫卡经纪人。每个主机有四个内核和大约8 GB未使用的RAM。

卡夫卡经纪人有206个主题,每个主题有10个分区。所以总共有2,060个分区供应用程序读取。

我正在使用spark-streaming-kafka-0-10库(当前是试验版)通过Spark Streaming作业订阅Kafka中的主题。我现在用的是SubscribePattern类从星火订阅所有206个主题:

val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    SubscribePattern[String, String](Pattern.compile("(pid\\.)\\d+"), 
    kafkaParams) 
) 

当我提交这份工作星火主人,它看起来像16个执行人启动,一个集群中的每个核心。它也看起来像每个卡夫卡分区得到自己的任务,总共2060个任务。我认为我的16个执行者集群在执行这么多任务时遇到了麻烦,因为在1500到1800个任务完成之后,这个任务在不同的点上一直处于故障状态。

我发现a tutorial by Michael Noll from 2014使用火花流 - 卡夫卡0-8库来控制消费者线程的每个主题数哪些地址:

val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...) 

val consumerThreadsPerInputDstream = 3 
val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream) 
val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...) 

回答

3

是否有可能配置与星火spark-streaming-kafka-0-10库可以通过单个任务读取多个Kafka分区或整个Kafka主题,而不是为每个可用的Kafka分区创建不同的Spark任务?

你可以通过调用流repartition改变所产生的分区数量,但你失去1:卡夫卡和RDD分区之间的一一对应。

卡夫卡分区生成的任务数量与您拥有16个执行程序的事实无关。执行程序的数量取决于您正在使用的设置和资源管理器。

卡夫卡分区和RDD分区与直接流API之间存在1:1映射,每个执行器都会从卡夫卡获取这些分区的子集并进行处理,每个分区都是独立的,并且可以独立计算。这与基于接收器的API不同,后者在任意执行器上创建单个接收器,并通过节点上的线程使用数据本身。

如果您有206个主题和10个分区,您最好有一个体面大小的集群,它可以处理生成的任务的负载。您可以控制每个分区生成的最大消息,但您可以更改分区数,除非您想调用repartition转换的混洗效果。

+0

感谢您使用上相当细致。你碰到一个问题,我其实并没有完全弄明白。你的意思是,新的直接API中,对应于一个分区的任务(卡夫卡分区/ RDD分区),做数据及其处理的同时阅读。我一直在想,为了确定我的执行者,最终是我的节点和集群。 – MaatDeamon

+0

@Maatdeamon是的。当执行器被分配到一个分区进行操作时,驱动程序为每个分区分配偏移量以供读取。 –

+0

我认为这个问题确定了他自己的职位。我仍然有点困惑。请你看看这里https://stackoverflow.com/questions/45526554/spark-streaming-kafka-direct-streaming-parallelism我的文章,并尝试回答呢? – MaatDeamon

1

第二种方法将是最适合你的需求。只有你必须设置consumerThreadsPerInputDstream = 1,所以只有一个线程将每次读取操作来创建的,因此单台机器将每簇参与。