2015-11-15 173 views
3

目标

我的目标是让使用与卡夫卡的工作接口的直接方法简单星火流的例子,但我不能让过去一个特定的错误。PySpark直接流从卡夫卡

理想的结果是打开两个控制台窗口。一个我可以输入句子,另一个显示所有句子的“实时”字数。

控制台1

猫喜欢培根

我的猫吃腊肉

控制台2

时间:..

[(以下简称“ “,2),(”猫“,1),(”喜欢“,1),(”培根“,1)]

时间:..

[(以下简称 “”,3),( “猫”,2),( “喜欢”,1),( “培根”,2),( “我”, 1),( “吃”,1)]采取


步骤

下载和解压

kafka_2.10-0.8.2.0 
spark-1.5.2-bin-hadoop2.6 

在单独的屏幕中启动ZooKeeper和Kafka服务器。

screen -S zk 
bin/zookeeper-server-start.sh config/zookeeper.properties 

“CTRL-A” “d” 分离屏幕

screen -S kafka 
bin/kafka-server-start.sh config/server.properties 

“CTRL-A” “d”

开始卡夫卡生产

使用一个单独的控制台窗口并在其中键入单词以模拟流。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

开始Pyspark

使用Spark流,卡夫卡包。基于关中docs例子的

bin/pyspark --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 

运行简单的单词计数。

from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

ssc = StreamingContext(sc, 2) 
topic = "test" 
brokers = "localhost:9092" 
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
lines = kvs.map(lambda x: x[1]) 
counts = lines.flatMap(lambda line: line.split(" ")) \ 
    .map(lambda word: (word, 1)) \ 
    .reduceByKey(lambda a, b: a+b) 
counts.pprint() 
ssc.start() 
ssc.awaitTermination() 


误差

打字词语到卡夫卡生产者控制台产生的结果正好一个时间,但随后低于引发错误一次并且没有进一步的结果产生(虽然“时间“部分继续出现)。

Time: 2015-11-15 18:39:52 
------------------------------------------- 

15/11/15 18:42:57 ERROR PythonRDD: Error while sending iterator 
java.net.SocketTimeoutException: Accept timed out 
     at java.net.PlainSocketImpl.socketAccept(Native Method) 
     at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) 
     at java.net.ServerSocket.implAccept(ServerSocket.java:530) 
     at java.net.ServerSocket.accept(ServerSocket.java:498) 
     at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645) 
Traceback (most recent call last): 
    File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/streaming/util.py", line 62, in call 
    r = self.func(t, *rdds) 
    File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/streaming/dstream.py", line 171, in takeAndPrint 
    taken = rdd.take(num + 1) 
    File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/rdd.py", line 1299, in take 
    res = self.context.runJob(self, takeUpToNumLeft, p) 
    File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/context.py", line 917, in runJob 
    return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 
    File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/rdd.py", line 142, in _load_from_socket 
    for item in serializer.load_stream(rf): 
    File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 139, in load_stream 
    yield self._read_with_length(stream) 
    File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 156, in _read_with_length 
    length = read_int(stream) 
    File "/vagrant/install_files/spark-1.5.2-bin-hadoop2.6/python/pyspark/serializers.py", line 543, in read_int 
    length = stream.read(4) 
    File "/usr/lib/python2.7/socket.py", line 380, in read 
    data = self._sock.recv(left) 
error: [Errno 104] Connection reset by peer 

任何帮助或建议将不胜感激。

回答

0

尝试运行: 火花提交--packages org.apache.spark:火花流-kafka_2.10:1.5.1 your_python_file_name.py 您可以设置其他参数(--deploy模式等)