2016-11-09 88 views
-2

一个有一些数据,看起来像这样FlatMap和的GroupBy在星火

from local_spark import sc,sqlContext 
rdd = sc.parallelize([ 
         ("key1", 'starttime=10/01/2015', 'param1', '1,2,3,99,88'), 
         ("key2", 'starttime=10/02/2015'', 'param1', '11,12'), 
         ("key1", 'starttime=10/01/2015'', 'param2', '66,77') 
        ]) 

第三个参数是一个逗号分隔(每秒一个值)值的列表,可以是非常巨大的

什么我需要做的是按键分组数据集,然后flapMap。 预期的结果会是这样的:

(key1)  # rdd key 

# for each key, a table with the values 
key timestamp  param1 param2 
key1 10/01/2015 1   66  
key1 10/01/2015 2   77 
key1 10/01/2015 3   null 
key1 10/01/2015 99  null 


(key2) # rdd key 
key timestamp  param1 param2 
key2 10/01/2015 11  null  
key2 10/01/2015 12  null 

到目前为止,我所试图做的是这样的: rdd.groupByKey()flatMap(functionToParseValuesAndTimeStamps)

如果我做了什么像这样,flatMap操作的结果是否仍然按键分组?我会“通过”手术来放松这个组吗?

obs:一个更幼稚的方法是首先使用flapMap,然后按键分组。但由于键值比键值要少得多,所以我认为这会导致性能不佳

+1

源数据和结果之间的关系是什么?如何记录:''key1“,'starttime = 10/01/2015','param1','1,2,3,99,88''变成'key1 10/01/2015 1 66'? – maasg

回答

0

我建议转换为数据框并编写一个UDF来拆分列,并且在一些争议后,您可以按照以下代码操作:I took和根据您的数据进行编辑。复制并粘贴到Spark-Shell中以便与之一起玩。

import org.apache.spark.{ SparkConf, SparkContext } 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

import sqlContext.implicits._ 
import org.apache.spark.sql.functions.udf 

val df2 = Seq(("key1", "11", "12"), 
       ("key2", "66", "77")).toDF("keys", "num1", "num2") 


def toLong(df: DataFrame, by: Seq[String]): DataFrame = { 
       val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip 
       require(types.distinct.size == 1) 

       val kvs = explode(array(
        cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _* 
       )) 

       val byExprs = by.map(col(_)) 

       df2 
        .select(byExprs :+ kvs.alias("_kvs"): _*) 
        .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*) 
       } 

val x = toLong(df2, Seq("keys")) 

继承人它的外观
enter image description here

0

flatMap不守组...

所以,如果你flapMap一个GROUPBY()操作之后的东西,在RDD记录将不分组