2016-04-28 118 views
0

我正在运行一个spark工作,其中一些数据从cassandra表中加载。从这些数据中,我做了一些插入和删除语句。 并执行它们。 (使用的forEach)session.execute()没有反映在cassandra上完成火花集群

boolean deleteStatus= connector.openSession().execute(delete).wasApplied(); 
boolean insertStatus = connector.openSession().execute(insert).wasApplied(); 
System.out.println(delete+":"+deleteStatus); 
System.out.println(insert+":"+insertStatus); 

当我在本地运行它,我看到表中的相应的结果。

但是,当我在群集上运行它时,有时会显示结果并且有时候不会发生更改。 我看到了来自web-ui的spark的标准输出,并且这两个查询都打印了查询以及true。 (。数据被正确加载,但有时,只能插入被反射,有时只删除,有时两者,并且大多数时候都不)

规格:

  1. 上同样的机器作为火花从站cassandra节点(每个节点有两个从机实例)
  2. 在另一台机器上运行spark master。
  3. 修复在所有节点上完成。
  4. 卡桑德拉重启

回答

0

布尔deleteStatus = connector.openSession()执行(删除).wasApplied();

boolean insertStatus = connector.openSession()。execute(insert).wasApplied();

这是一个已知的反模式,您创建的每个查询,这是非常昂贵的一个新的Session对象。

只需创建一次会话并将其重新用于所有查询。

要查看正在执行并送往卡桑德拉查询,使用慢速查询记录器功能作为一个黑客:http://datastax.github.io/java-driver/manual/logging/#logging-query-latencies

的想法是把阈值设置到一个低得可笑的值,使得每一个查询将被视为缓慢并显示在日志中。

你应该使用这个技巧只为当然

+0

测试它仅用于测试目的而进行的。 实际上,每个分区打开一个会话(forEachPartition),然后(forEachRemaining)打开Iterator