2015-10-07 216 views
1

我有一些Spark的经验,但刚刚开始与卡桑德拉。我试图做一个非常简单的阅读,并获得非常糟糕的表现 - 不能说明原因。这里是我使用的代码:卡桑德拉火花接头读取性能

sc.cassandraTable("nt_live_october","nt") 
    .where("group_id='254358'") 
    .where("epoch >=1443916800 and epoch<=1444348800") 
    .first 

enter image description here

所有3个PARAMS是餐桌上的关键的部分:

PRIMARY KEY(GROUP_ID,时代,组名,auto_generated_uuid_field ) )WITH CLUSTERING ORDER BY(epoch ASC,group_name ASC,auto_generated_uuid_field ASC)

我从我的司机看到输出是这样的:

15/10/07 15时05分02秒INFO CassandraConnector:连接到卡桑德拉 集群:shakassandra 15/10/07 15:07 :02 ERROR会议:错误 创建池attila./198.xxx:9042 com.datastax.driver.core.ConnectionException:运输初始化 (COM期间 [attila./198.xxx:9042]意外错误 。 datastax.driver.core.OperationTimedOutException:[attila /198.xxx:9042]操作超时)

15/10/07 15时07分02秒INFO SparkContext:开始的工作:采取在 CassandraRDD.scala:121

15/10/07 15时07分03秒INFO BlockManagerInfo:在内存 添加broadcast_5_piece0上 osd09:39903(尺寸:4.8 KB,免费:265.4 MB)

15/10/07 15时08分23秒 INFO TaskSetManager:完成任务0.0在阶段6.0 (TID 8)80153毫秒on osd09(1/1)

15/10/07 15时08分23秒INFO TaskSetManager:完成任务0.0在阶段6.0(TID 8) 在80153毫秒上osd09(1/1)

15/10/07 15时08分23秒 信息DAGScheduler:ResultStage 6(取在CassandraRDD.scala:121) 完成了80.958小号15/10/07 15时08分23秒INFO TaskSchedulerImpl:删除 taskset的6.0,其任务已全部建成后,从池

15/10/07 15:08:23信息DAGScheduler:工作5完成:拍下 CassandraRDD.scala:121,拍下81.043413 s

我希望这个查询速度非常快,但它花了一分钟。有几件事情我跳出

  1. 它需要近两分钟把会话错误 - 我通过3个节点的IP地址,以星火卡桑德拉连接器 - 是有没有办法告诉它跳过失败的连接速度更快?
  2. 任务被发送给一个不是Cassandra节点的Spark worker - 这对我来说很奇怪 - 有没有一种方法可以获得有关调度器选择将任务发送到远程节点的原因的方法?
  3. 即使该任务已发送到远程节点,该工作者的输入大小(最大)显示为334.0 B/1,但执行者时间为1.3分钟(请参见图片)。这似乎很慢 - 我希望要在反序列化所花费的时间,不计算...... enter image description here

如何调试这任何提示,到哪里寻找潜在的问题大加赞赏。使用Spark 1.4.1与连接器1.4.0-M3,cassandra ReleaseVersion:2.1.9,在可调连接器参数上的所有默认设置

回答

1

我认为问题在于分区之间的数据分布。您的表具有一个群集(分区)键 - groupId,epoch仅是一个群集列。数据仅通过groupId分布在集群节点上,因此您在集群上的一个节点上拥有一个具有groupId ='254358'的巨大分区。 当你运行你的查询时,Cassandra达到了与groupId ='254358'非常快的分区,然后筛选所有行以查找记录的时间在1443916800和1444348800之间。如果有很多行,查询将非常慢。实际上这个查询不是分布式的,它总是在一个节点上运行。

更好的做法,提取日期,甚至小时,并添加它作为分区键,你的情况是这样

PRIMARY KEY ((group_id, date), epoch, group_name, auto_generated_uuid_field) 
WITH CLUSTERING ORDER BY (epoch ASC, group_name ASC, auto_generated_uuid_field ASC) 

为了验证我的假设您可以在cqlsh运行当前查询通过打开跟踪阅读here如何做到这一点。所以这个问题与Spark没有任何关系。

关于错误和时间来得到它,一切都很好,因为你在超时发生后收到错误。

我还记得spark-cassandra-connector的建议,将Spark从属关联到Cassandra节点,以便通过分区键来分发查询。