2

将Flume代理收集的Twitter数据传递给Spark Stream时,我遇到了麻烦。我只能使用Flume来独立下载twits。但我得到以下错误。我觉得这是FlumeUtils.createStream()中默认的UTF-8编码问题。我该如何改变它?我应该改变什么?UTF-8编码错误,同时连接Flume twitter流以在python中产生火花

错误pyspark终端上:

org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main 
    process() 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/flume.py", line 107, in func 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/flume.py", line 36, in utf8_decoder 
    return s.decode('utf-8') 
    File "/usr/lib/python2.7/encodings/utf_8.py", line 16, in decode 
    return codecs.utf_8_decode(input, errors, True) 
UnicodeDecodeError: 'utf8' codec can't decode byte 0xe4 in position 17: invalid continuation byte 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
17/01/01 15:36:41 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 

PySpark代码:

from pyspark.sql import SparkSession 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.flume import FlumeUtils 

ss = SparkSession.builder \ 
    .master("local[2]") \ 
    .appName("Stream_Analysis")\ 
    .config("spark.sql.crossJoin.enabled", "true") \ 
    .getOrCreate() 

sc = ss.sparkContext 

strm = StreamingContext(sc, 5) 

flume = FlumeUtils.createStream(strm,"localhost", 9999) 
flume.pprint() 
strm.start() 
strm.awaitTermination() 

Cmd的启动pyspark

spark-submit --jars ~/project/spark-streaming-flume-assembly_2.11-2.0.2.jar ~/project/news_stream_flume/news_stream_analysis.py localhost 9999 

水槽CONF:

# Name the components on this agent 
FlumeAgent.sources = Twitter 
FlumeAgent.sinks = spark 
FlumeAgent.channels = MemChannel 

# Twitter source 
FlumeAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource 
FlumeAgent.sources.Twitter.consumerKey = x 
FlumeAgent.sources.Twitter.consumerSecret = y 
FlumeAgent.sources.Twitter.accessToken = z 
FlumeAgent.sources.Twitter.accessTokenSecret = xx 
FlumeAgent.sources.Twitter.keywords = flume, spark 

FlumeAgent.sinks.spark.type = avro 
FlumeAgent.sinks.spark.channel = memoryChannel 
FlumeAgent.sinks.spark.hostname = localhost 
FlumeAgent.sinks.spark.port = 9999 
FlumeAgent.sinks.spark.batch-size = 1 

# Use a channel which buffers events in memory 
FlumeAgent.channels.MemChannel.type = memory 
FlumeAgent.channels.MemChannel.capacity = 10000 
FlumeAgent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel 
FlumeAgent.sources.Twitter.channels = MemChannel 
FlumeAgent.sinks.spark.channel = MemChannel 

Cmd的运行水槽剂:

flume-ng agent --name FlumeAgent --conf-file /home/hduser/project/flume_config_2src_spark_avro -f /usr/lib/flume-ng/conf/flume-conf.properties -Dflume.root.logger=DEBUG,console 

回答

1

FlumeUtils.createStream需要bodyDecoder参数,它是用于字符串解码的功能。默认实现只检查Nonedecodes为UTF-8:

def utf8_decoder(s): 
    """ Decode the unicode as UTF-8 """ 
    if s is None: 
     return None 
    return s.decode('utf-8') 
  • 在Python 2.x的,你应该能够用自己的来取代它,它使用所需的编码,甚至跳过带有身份完全解码(lambda x: x)。

  • 的Python 3.x的,可能需要一些额外的步骤(JVM侧映射与_.getBytes)绕过String - 在地幔岩>unicode映射。