2017-06-16 28 views
1

输出:为什么我的火花的工作停留在卡夫卡流提交给Spark在由MINICUBE创建kubernetes群簇火花作业后

foreachRDD在myfile.scala:

----------------- RUNNING ---------------------- 
[Stage 0:>               (0 + 0)/2]17/06/16 16:08:15 INFO VerifiableProperties: Verifying properties 
17/06/16 16:08:15 INFO VerifiableProperties: Property group.id is overridden to xxx 
17/06/16 16:08:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
xxxxxxxxxxxxxxxxxxxxx 
[Stage 0:>               (0 + 0)/2] 

从火花网页UI信息:49个+细节

org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:625) myfile.run(myfile.scala:49)Myjob $。主要(Myjob.scala:100) Myjob.main(Myjob.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 ) org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:743) org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:187) org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:212) org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:126) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我的代码:

println("----------------- RUNNING ----------------------"); 
    eventsStream.foreachRDD { rdd => 
     println("xxxxxxxxxxxxxxxxxxxxx") 
     //println(rdd.count()); 
    if(!rdd.isEmpty) 
    { 
     println("yyyyyyyyyyyyyyyyyyyyyyy") 
     val df = sqlContext.read.json(rdd); 
     df.registerTempTable("data"); 

     val rules = rulesSource.rules(); 
     var resultsRDD : RDD[(String,String,Long,Long,Long,Long,Long,Long)]= sc.emptyRDD; 
     rules.foreach { rule => 
     ... 
     } 

     sqlContext.dropTempTable("data") 
    } 
    else 
    { 
     println("-------"); 
     println("NO DATA"); 
     println("-------"); 
    } 
} 

任何想法?由于

UPDATE

我的火花的工作在独立的火花泊坞窗容器中运行良好。但如果提交给kubernetes集群中的spark集群,它将卡住在kafka流中。不知道为什么?

火花主YAML的文件是从https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml

apiVersion: extensions/v1beta1 
kind: Deployment 
metadata: 
    labels: 
    name: spark-master 
    name: spark-master 
spec: 
    replicas: 1 
    template: 
    metadata: 
     labels: 
     name: spark-master 
    spec: 
     containers: 
     - name : spark-master 
     image: spark-2.1.0-bin-hadoop2.6 
     imagePullPolicy: "IfNotPresent" 
     name: spark-master 
     ports: 
     - containerPort: 7077 
      protocol: TCP 
     command: 
     - "/bin/bash" 
     - "-c" 
     - "--" 
     args : 
- './start-master.sh ; sleep infinity' 
+0

我有类似的问题。我正在为卡法卡经纪人0.10使用实验性火花流。一个任务卡住了,没有记忆。另一个很快返回。所以整个事情都卡住了。 –

回答

1

日志将有助于诊断问题。

实质上,您无法在RDD操作中创建另一个RDD。 即rdd1.map{rdd2.count()}无效

了解如何在导入implicit sqlContext之后将RDD转换为数据帧。

 import sqlContext.implicits._ 
     eventsStream.foreachRDD { rdd => 

      println("yyyyyyyyyyyyyyyyyyyyyyy") 

      val df = rdd.toDF(); 
      df.registerTempTable("data"); 
      .... //Your logic here. 
      sqlContext.dropTempTable("data") 
     } 
+0

我的火花作业在独立火花的码头集装箱中运行良好。但如果提交给kubernetes集群中的spark集群,它将卡住在kafka流中。不知道为什么? – BAE

+0

你的火花日志说什么?你能看到火花Web UI吗?它给你提示吗?在批处理持续时间结束后,看看Spark Web UI是否有Streaming选项卡。 – Manas