2016-02-26 57 views
4

我得到以下错误:星火1.6卡夫卡流媒体dataproc py4j错误

Py4JError(u'An error occurred while calling o73.createDirectStreamWithoutMessageHandler. Trace:\npy4j.Py4JException: Method createDirectStreamWithoutMessageHandler([class org.apache.spark.streaming.api.java.JavaStreamingContext, class java.util.HashMap, class java.util.HashSet, class java.util.HashMap]) does not exist\n\tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)\n\tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)\n\tat py4j.Gateway.invoke(Gateway.java:252)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\n\n',)

我使用的火花流 - 卡夫卡assembly_2.10-1.6.0.jar(这是目前位于/ usr/lib目录/ Hadoop的/ lib中/我的所有节点+ Master)的文件夹

(EDIT) 实际的错误是:java.lang.NoSuchMethodError:org.apache.hadoop.yarn.util.Apps.crossPlatformify(Ljava /朗/字符串;)Ljava /郎/字符串;

这是由于错误的hadoop版本。因此火花应该用正确的Hadoop版本进行编译:

mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 -DskipTests clean package

这将导致外部/卡夫卡组装/目标文件夹一个罐子。

+0

您的群集何时创建/您创建时是否传递过任何--image-version标志?你可以给你一些关于如何调用KafkaUtils.createStream()(或者,你是如何调用基础方法)的上下文吗? –

+0

集群仅在昨天创建,映像版本1.0(火花1.6)。在这一刻。我们仍然使用spark-streaming-kafka-assembly_2.10-1.5.0.jar使用图像版本0.1(spark 1.5.0),以及使用相同jar的图像版本0.2,因为1.5.2 jar无法使用它们。然而,对于最新的版本,没有一个罐子似乎工作。使用KafkaUtils.createDirectStream方法时会引发该错误。当我使用spark 1.6和jar spark-streaming-kafka-assembly_2.10-1.6.0.jar(通过spark-submit使用--jars选项)在本地运行代码时,代码似乎完美运行。 – bjorndv

+0

我一直在试图复制这个今天下午,并没有太多的运气。 Spark 1.5是否有可能通过Spark 1.6进入Dataproc 1.0集群(例如,spark-1.6和spark-1.5都已安装,或者spark-1.5与您的工作一起打包)?我已经通过了每个spark spark kafka jar我可以得到我的手(并从源代码构建一个装配jar),并在解包后检查KafkaUtilsPythonHelper以确保createDirectStreamWithoutMessageHandler方法存在正确的签名。 –

回答

1

使用图像版本1,我已经成功地运行pyspark流/ kafka example wordcount

在每个例子中“广告卡夫卡出师表”是一个“测试”题目我的测试卡夫卡实例。

  1. 使用没有初始化动作集群:

    $ gcloud dataproc jobs submit pyspark --cluster ad-kafka2 --properties spark.jars.packages=org.apache.spark:spark-streaming-kafka_2.10:1.6.0 ./kafka_wordcount.py ad-kafka-inst:2181 test 
    
  2. 使用初始化动作,尽显卡夫卡组件:

    • 下载/解压火花1.6.0.tgz
    • 搭建:

      $ mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 package 
      
    • 将spark-streaming-kafka-assembly_2.10-1.6.0.jar上传到新的GCS存储桶(例如MYBUCKET)。
    • 创建在同一GCS桶下面的初始化动作(例如,GS://MYBUCKET/install_spark_kafka.sh):

      $ #!/bin/bash 
      
      gsutil cp gs://MY_BUCKET/spark-streaming-kafka-assembly_2.10-1.6.0.jar /usr/lib/hadoop/lib/ 
      chmod 755 /usr/lib/hadoop/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar 
      
    • 开始与上述初始化动作群集:

      $ gcloud dataproc clusters create ad-kafka-init --initialization-actions gs://MYBUCKET/install_spark_kafka.sh 
      
    • 开始流字数:

      $ gcloud dataproc jobs submit pyspark --cluster ad-kafka-init ./kafka_wordcount.py ad-kafka-inst:2181 test 
      
+0

第二个选择的确有诀窍。由于不正确的hadoop版本,mvn的jar无法工作。上面的错误可能是由于一个错误的jar。 – bjorndv

相关问题