0

我正在使用最新的spark(2.1.0)和python(3.5.3)。我在本地安装了kafka(2.10.0)。运行python程序与kafka进行火花流传输时出错

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 
from pykafka import KafkaClient 
import json 
import sys 
import pprint 

spsc = SparkContext(appName="SampleApp") 
stsc = StreamingContext(spsc, 1) 
print('contexts =================== {} {}'.format(spsc,stsc)); 
kvs = KafkaUtils.createStream(stsc, "localhost:2181", "spark-consumer", {"7T-test3": 1}) 
spsc.stop() 

这里'打印'行执行罚款。但在下一行,同时创造流我得到以下错误,

Traceback (most recent call last): 
    File "/Users/MacAdmin/Downloads/spark-streaming/spark/spark_streaming_osample.py", line 24, in <module> 
    kvs = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer", {"7T-test3": 1}) 
    File "/Users/MacAdmin/Documents/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 70, in createStream 
    File "/Users/MacAdmin/Documents/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
    File "/Users/MacAdmin/Documents/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o25.createStream. 
: java.lang.NoClassDefFoundError: org/apache/spark/Logging 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:91) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:168) 
    at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createStream(KafkaUtils.scala:632) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    ... 25 more 

我从命令行运行我的程序作为

/Users/MacAdmin/Documents/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.3.jar spark_streaming_sample.py 

我需要任何环境变量或者我没有使用正确的库版本?

回答

0

事情很少人失踪,添加类路径

export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip/:$PYTHONPATH 

和火花记录是2 *私人起所以不得不使用下面卡夫卡流版本正在运行的程序,而

spark-streaming-kafka-0-8-assembly_2.10-2.1.0.jar 
0

确保您在执行流之前在Kafka中创建主题(7T-test3)。

您可能还想提供导致错误的更多详细信息。

相关问题