2017-04-04 67 views
0

我正在使用Cassandra的火花,我想将数据保存在Cassandra表中。我想将数据插入到一个见下表 -如何使用spark的saveToCassandra将数据保存在cassandra表中?

cqlsh:users> select * from subscription ; 

pk | a | b 
----+---+--- 

(0 rows) 
cqlsh:users> desc subscription ; 

CREATE TABLE users.subscription (
    pk uuid PRIMARY KEY, 
    a text, 
    b text 
) 

程序代码(consumer_demo.py) -

sudo /usr/local/spark/bin/spark-submit --packages TargetHolding/pyspark-cassandra:0.3.5 consumer_demo.py 
- 我使用的是执行上面的代码

from pyspark import SparkConf 
import pyspark_cassandra 
from pyspark_cassandra import CassandraSparkContext 

conf = SparkConf().set("spark.cassandra.connection.host", "localhost") 
sparkContext = CassandraSparkContext(conf = conf) 
rdd = sparkContext.parallelize([{"pk":"uuid()", "a":"aaa", "b":"bbbb" }]) 
rdd.saveToCassandra("users", "subscription", {"pk", "a", "b"}) 

命令

我正面临着以下错误 -

Traceback (most recent call last): 
    File "consumer_demo.py", line 8, in <module> 
    rdd.saveToCassandra("users", "subscription", {"pk", "a", "b"}) 
    File "/root/.ivy2/jars/TargetHolding_pyspark-cassandra-0.3.5.jar/pyspark_cassandra/rdd.py", line 83, in saveToCassandra 
    File "/root/.ivy2/jars/TargetHolding_pyspark-cassandra-0.3.5.jar/pyspark_cassandra/util.py", line 93, in helper 
    File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
    File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o26.newInstance. 
: java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror; 
    at com.datastax.spark.connector.types.TypeConverter$.<init>(TypeConverter.scala:114) 
    at com.datastax.spark.connector.types.TypeConverter$.<clinit>(TypeConverter.scala) 
    at pyspark_cassandra.PythonHelper.<init>(PythonHelper.scala:36) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
    at java.lang.Class.newInstance(Class.java:442) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 

卡桑德拉版本 -

cqlsh:users> show VERSION; 
[cqlsh 5.0.1 | Cassandra 2.2.3 | CQL spec 3.3.1 | Native protocol v4] 

星火版本 -

>spark-submit --version 
Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /___/ .__/\_,_/_/ /_/\_\ version 2.1.0 
     /_/ 

Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_91 
Branch 
Compiled by user jenkins on 2016-12-16T02:04:48Z 
Revision 
Url 
Type --help for more information. 

Python版本 -

>python --version 
Python 2.7.6 

问题 -

  1. 为什么我得到这个错误?
  2. 如何解决这个错误?

编辑 - 错误

>>> spark.read.format("org.apache.spark.sql.cassandra").options(table="temp", keyspace="users").load().show() 
17/04/06 10:57:34 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.lang.NoClassDefFoundError: scala/runtime/AbstractPartialFunction$mcJL$sp 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(CassandraLimit.scala:21) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:367) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassNotFoundException: scala.runtime.AbstractPartialFunction$mcJL$sp 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    ... 31 more 
17/04/06 10:57:34 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoClassDefFoundError: scala/runtime/AbstractPartialFunction$mcJL$sp 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(CassandraLimit.scala:21) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:367) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassNotFoundException: scala.runtime.AbstractPartialFunction$mcJL$sp 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    ... 31 more 

17/04/06 10:57:34 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 318, in show 
    print(self._jdf.showString(n, 20)) 
    File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
    File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco 
    return f(*a, **kw) 
    File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o37.showString. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoClassDefFoundError: scala/runtime/AbstractPartialFunction$mcJL$sp 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(CassandraLimit.scala:21) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:367) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassNotFoundException: scala.runtime.AbstractPartialFunction$mcJL$sp 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    ... 31 more 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) 
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112) 
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2112) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2327) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.NoClassDefFoundError: scala/runtime/AbstractPartialFunction$mcJL$sp 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(CassandraLimit.scala:21) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:367) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 
Caused by: java.lang.ClassNotFoundException: scala.runtime.AbstractPartialFunction$mcJL$sp 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    ... 31 more 

回答

0

这是最有可能再加上其他各种星火版本不匹配的错误Scala的版本错误。 TargetHolding lib在去年年底停止更新,所以它不太可能在未来工作。

所以第一个错误是与该库捆绑的Spark Cassandra Connector使用Scala 2.10,而Spark 2.1.0的默认版本使用Scala 2.11。这些不兼容(特别是在反思中),这就是为什么你看到上述错误。

解决方案是开始维护该包并将其更新到Spark 2.1.0和Scala 2.11,但这需要大量的工作。您可以尝试使用支持Spark 2.1.0的Spark Cassandra ConnectorDataFrame方法。

+0

感谢您的回复。我试图用python来做。你能否提供使用DataFrame方法的简单例子,使用spark cassandra连接器,以便我能理解它。 – kit

+0

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md – RussS

+0

我试图通过参考上面的链接在Cassandra表中保存数据。我面临例外。请参阅我编辑的错误。 – kit

相关问题