2017-04-10 119 views
1

我是Spark的新手。我正在尝试将Spark 2.1版本用于CEP目的。 在最近2分钟内检测到丢失的事件。我将接收到的输入转换为JavaDSStream的输入事件,然后在inputEvents上执行reducebykeyandWindow并执行spark sql。Spark RDD vs DataSet性能

JavaPairDStream<String, Long> reduceWindowed = inputEvents.reduceByKeyAndWindow(new MaxTimeFuntion(), 
       Durations.seconds(124), new Duration(2000)); 
reduceWindowed.foreachRDD((rdd, time) -> { 
       SparkSession spark = TestSparkSessionSingleton.getInstance(rdd.context().getConf()); 
       JavaRDD<EventData> rowRDD = rdd.map(new org.apache.spark.api.java.function.Function<Tuple2<String,Long>, EventData>() { 
        @Override 
        public EventData call(Tuple2<String, Long> javaRDD) { 
        { 
          EventData record = new EventData(); 
          record.setId(javaRDD._1); 
          record.setEventTime(javaRDD._2); 
          return record;    
        } 
       }) 
    Dataset<Row> eventDataFrames = spark.createDataFrame(rowRDD, EventData.class); 
    eventDataFrames.createOrReplaceTempView("checkins"); 


Dataset<Row> resultRows=       
        spark.sql("select id, max(eventTime) as maxval, from events group by id having (unix_timestamp()*1000 - maxval >= 120000)"); 

相同的过滤我执行使用RDD功能:

JavaPairDStream<String, Long> filteredStream = reduceWindowed.filter(new Function<Tuple2<String,Long>, Boolean>() { 

     public Boolean call(Tuple2<String,Long> val) 
     { 
      return (System.currentTimeMillis() - val._2() >= 120000); 
     } 
    }); 

    filteredStream.print(); 

无论是方法提供我相同的结果为数据集& RDD。

我是否正确使用Spark sql?

在本地模式下,对于相同的输入速率,Spark SQL查询执行消耗的CPU相对高于RDD函数。谁能帮助我了解为什么SQL星火相比消耗RDD过滤功能比较高的CPU ..

回答

1

星火SQL使用催化剂(SQL优化器),它的作用:

SQL的
  1. 分析查询
  2. 使一些逻辑优化
  3. 添加一些物理规划
  4. 生成一些代码

d ata在内部设置外部JVM对象的行。可以使用类型安全+快速。比DataFrames慢,不适合交互式分析。 Dataset API作为API预览发布在Spark 1.6中,旨在提供两全其美;熟悉的面向对象编程风格和编译时类型安全性的RDD API,但具有Catalyst查询优化器的性能优势。数据集也使用与DataFrame API相同的高效堆外存储机制。

RDD,另一方面,仅仅是一个弹性分布式数据集是比较数据的黑盒不能作为可以针对它要执行的操作进行优化的,并不像约束。