2015-09-08 44 views
2

我是Spark和Cassandra的新手。Cassandra的火花任务

我们在Cassandra上面使用Spark来读取数据,因为我们有要求使用非主键列读取数据。

一个观察是,火花工作任务数量增加w.r.t数据增长。由于这个原因,我们在获取数据时会面临很多延迟。

火花任务计数增加的原因是什么?

什么应该考虑用Cassandra提高Spark的性能?

请给我建议。

谢谢,
Mallikarjun

+0

您使用的是什么版本的Spark&Cassandra? – Gillespie

+0

我们使用Cassandra 2.1.5和Spark 1.4.0 –

回答

3

输入分割大小由配置spark.cassandra.input.split.size_in_mb控制。每个分割都将在Spark中生成任务,因此,Cassandra中的数据越多,处理的时间就越长(这是您所期望的)

要提高性能,请确保使用joinWithCassandraTable来对齐分区。不要使用context.cassandraTable(...),除非您绝对需要表格中的所有数据,并使用select优化检索的数据来仅投影所需的列。

如果您需要来自某些行的数据,建立一个存储这些行的ID的辅助表格是有意义的。

二级索引也可以帮助选择数据的子集,但我已经看到关于如果不是高性能的报告。

1

火花任务计数增加的原因是什么?

从maasgs答案继,而不是在SparkConf设置spark.cassandra.input.split.size_in_mb.,它可以在一个单一的工作,从不同keyspaces /数据中心阅读时使用的ReadConf配置有用:

val readConf = ReadConf(
     splitCount = Option(500), 
      splitSizeInMB = 64, 
      fetchSizeInRows = 1000, 
      consistencyLevel = ConsistencyLevel.LOCAL_ONE, 
      taskMetricsEnabled = true 
     ) 

    val rows = sc.cassandraTable(cassandraKeyspace, cassandraTable).withReadConf(readConf) 

应该考虑如何提高Spark的性能 Cassandra?

就提高性能而言,这取决于您正在运行的作业和所需的转换类型。下面概述了一些可最大限度提高Spark-Cassandra性能的常规建议(如可找到here)。

您所选择的操作及其应用顺序对于性能至关重要。

您必须牢记您的任务分配和记忆来组织您的流程。

首先要确定您的数据是否被正确分区。这个上下文中的分区仅仅是一个数据块。如果可能的话,在Spark之前分割你的数据,甚至摄取它。如果这不可行或不可行,您可以选择在加载后立即重新分区数据。您可以重新分区以增加分区数量或合并以减少分区数量。

分区的数量应该是一个下限,至少是将要对数据进行操作的核心数量的两倍。话虽如此,您还需要确保您执行的任何任务至少需要100ms才能证明整个网络的分布。请注意,重新分配总是会导致混洗,而融合通常不会。如果你和MapReduce一起工作,你就知道洗牌是大部分时间都在真正的工作中。

过早过滤并经常过滤。假设数据源未经过预处理以减少数据量,那么最初和最好的地方是减少Spark所需要处理的数据量就是初始数据查询。这通常通过添加where子句来实现。请勿携带任何不必要的数据来获得您的目标结果。引入任何额外的数据将影响整个网络中有多少数据可能被混洗,并写入磁盘。不必要的移动数据是一个真正的杀手锏,应该不惜一切代价避免

在每一步中,您应该寻找机会,以尽可能多地过滤,清除,减少或聚合数据,然后再继续操作。

尽可能地使用管道。流水线是一系列转换,它们代表对一部分数据的独立操作,并且不需要整体对数据进行重新组织(洗牌)。例如:来自字符串 - >字符串长度的映射是独立的,其中按值排序需要与其他数据元素进行比较并通过网络重新组织数据(混洗)。

在需要洗牌的作业中,看看在洗牌步骤之前是否可以使用部分聚合或缩减(类似于MapReduce中的组合器)。这将减少洗牌阶段的数据移动。

一些昂贵且需要洗牌的常见任务是按键分组,按键减少。这些操作要求将数据与其他昂贵的数据元素进行比较。了解Spark API非常重要,可以选择最佳的转换组合以及将它们放置在工作中的位置。创建回答问题所需的最简单和最有效的算法。