2015-09-25 41 views
0

我有一个cassandra数据库,有大量记录〜400万。我有3个奴隶机器和一个司机。我想将这些数据加载到spark内存中并对其进行处理。当我执行以下操作时,它将读取一台从机中的所有数据(6 Gb中的300 MB),并且所有其他从机的内存都未使用。我对数据帧做了3次重新数据处理,但数据仍在一台机器上。由于每个作业都在一台机器上执行,因此处理数据需要很长时间。这是我在做什么火花中的数据帧重新分区不起作用

val tabledf = _sqlContext.read.format("org.apache.spark.sql.cassandra").options(Map("table" -> "events", "keyspace" -> "sams")).load 
     tabledf.registerTempTable("tempdf"); 
     _sqlContext.cacheTable("tempdf"); 
val rdd = _sqlContext.sql(query); 
val partitionedRdd = rdd.repartition(3) 
     val count = partitionedRdd.count.toInt 

当我做partitionedRdd一些操作,因为所有的数据存在一台机器上执行它只有一台机器上只

UPDATE 我在配置中使用此 - -conf spark.cassandra.input.split.size_in_mb = 32,还是我的所有数据被加载到一个执行

enter image description here

更新 我使用的火花1.4版和火花卡桑德拉连接器版本1.4发布

+0

你确定你的配置是正确的,你没有'val conf = new SparkConf()。setMaster(“local [*]”)'某处? –

+0

不,我正在集群模式下运行,Web UI显示3个从机。此外,我正在运行此配置spark.cassandra.input.split.size_in_mb = 67108864 – Nipun

+0

http://stackoverflow.com/questions/31583249/apache-spark-taking-5-to-6-minutes-for-simple-count这是我为什么使用67108864 – Nipun

回答

0

如果“查询”只访问一个C *分区键,您将只会得到一个任务,因为我们没有办法(还)自动并行获取单个cassandra分区。如果您正在访问多个C *分区,则尝试进一步缩小输入split_size(以MB为单位)。

+0

是的,我正在尝试使用单个分区键。在缓存中加载内存后,我试图重新分配数据框,但这没有帮助。 – Nipun

+0

有没有办法,我可以分散到其他机器的数据,或者我可以索引一个特定的列,以便我可以在该列上的范围查询。 – Nipun

+0

要并行化单个查询,您需要知道分区中的数据并执行并行范围查询 – RussS