2016-09-12 43 views
0

如何在RDD/DF中创建List/Map以便获取聚合?SPARK:如何在Scala中创建RDD [Row]的集合

我有一个文件,其中每行是一个JSON对象:

{ 
itemId :1122334, 

language: [ 
     { 
      name: [ 
       "US", "FR" 
      ], 
      value: [ 
       "english", "french" 
      ] 
     }, 
     { 
      name: [ 
       "IND" 
      ], 
      value: [ 
       "hindi" 
      ] 
     } 
    ], 

country: [ 
    { 
     US: [ 
      { 
       startTime: 2016-06-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     CANADA: [ 
      { 
       startTime: 2016-06-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     DENMARK: [ 
      { 
       startTime: 2016-06-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     FRANCE: [ 
      { 
       startTime: 2016-08-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ] 
    } 
] 
}, 

{ 
itemId :1122334, 

language: [ 
     { 
      name: [ 
       "US", "FR" 
      ], 
      value: [ 
       "english", "french" 
      ] 
     }, 
     { 
      name: [ 
       "IND" 
      ], 
      value: [ 
       "hindi" 
      ] 
     } 
    ], 

country: [ 
    { 
     US: [ 
      { 
       startTime: 2016-06-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     CANADA: [ 
      { 
       startTime: 2016-07-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     DENMARK: [ 
      { 
       startTime: 2016-06-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     FRANCE: [ 
      { 
       startTime: 2016-08-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ] 
    } 
] 
} 

我有匹配的POJO这让我从JSON值。

import com.mapping.data.model.MappingUtils 
import com.mapping.data.model.CountryInfo 


val mappingPath = "s3://.../" 

val timeStamp = "2016-06-06T17: 39: 35.000Z" 
val endTimeStamp = "2016-06-07T17: 39: 35.000Z" 


val COUNTRY_US = "US" 
val COUNTRY_CANADA = "CANADA" 
val COUNTRY_DENMARK = "DENMARK" 
val COUNTRY_FRANCE = "FRANCE" 


val input = sc.textFile(mappingPath) 

输入是jsons的列表,其中每行是JSON这我映射到POJO类CountryInfo使用MappingUtils它负责JSON解析和转换:

val MappingsList = input.map(x=> { 
        val countryInfo = MappingUtils.getCountryInfoString(x); 
        (countryInfo.getItemId(), countryInfo) 
       }).collectAsMap 

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo] 


def showCountryInfo(x: Option[CountryInfo]) = x match { 
     case Some(s) => s 
    } 

但我需要创建DF/RDD,这样我就可以根据itemId获得国家和语言的汇总。

在给出的例子中,如果国家的起始时间不小于“2016-06-07T17:39:35.000Z”,那么该值将为零。

哪种格式会好创建最终的总JSON:

1. List ? 

    |-----itemId-------|----country-------------------|-----language---------------------| 
    |  1122334  | [US, CANADA,DENMARK]  |  [english,hindi,french]  | 
    |  1122334  | [US,DENMARK]    |  [english]     | 
    |------------------|------------------------------|----------------------------------| 

2. Map ?  



|-----itemId-------|----country---------------------------------|-----language---------------------| 
    |  1122334  | (US,2) (CANADA,1) (DENMARK,2) (FRANCE, 0) |(english,2) (hindi,1) (french,1) | 
       |....                        | 
       |....                        |  
       |....                        | 
       |------------------|--------------------------------------------|----------------------------------| 

我想创造出具有类似的总价值最终JSON:

{ 
    itemId: "1122334", 
    country: { 
     "US" : 2, 
     "CANADA" : 1, 
     "DENMARK" : 2, 
     "FRANCE" : 0 

    }, 
    language: { 
     "english" : 2, 
     "french" : 1, 
     "hindi" : 1 
    } 
    } 

我试图名单:

val events = sqlContext.sql("select itemId EventList") 

    val itemList = events.map(row => { 
     val itemId = row.getAs[String](1); 
     val countryInfo = showTitleInfo(MappingsList.get(itemId)); 

     val country = new ListBuffer[String]() 
     country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) COUNTRY_US; 
     country += if (countryInfo.getCountry().getCANADA().get(0).getStartTime() < endTimeStamp) COUNTRY_CANADA; 
     country += if (countryInfo.getCountry().getDENMARK().get(0).getStartTime() < endTimeStamp) COUNTRY_DENMARK; 
     country += if (countryInfo.getCountry().getFRANCE().get(0).getStartTime() < endTimeStamp) COUNTRY_FRANCE; 

     val languageList = new ListBuffer[String]() 
     val language = countryInfo.getLanguages().collect.foreach(x => languageList += x.getValue()); 

     Row(itemId, country.toList, languageList.toList) 
      }) 

and Map:

val itemList = events.map(row => { 
    val itemId = row.getAs[String](1); 
    val countryInfo = showTitleInfo(MappingsList.get(itemId)); 

    val country: Map[String, Int] = Map() 
    country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_US' -> 1) else ('COUNTRY_US' -> 0) 
    country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_CANADA' -> 1) else ('COUNTRY_CANADA' -> 0) 
    country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_DENMARK' -> 1) else ('COUNTRY_DENMARK' -> 0) 
    country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_FRANCE' -> 1) else ('COUNTRY_FRANCE' -> 0) 


    val language: Map[String, Int] = Map() 
    countryInfo.getLanguages().collect.foreach(x => language += (x.getValue -> 1)) ; 

    Row(itemId, country, language) 
     }) 

但两者都在齐柏林飞船上冻结。有没有更好的方式来获得作为JSON聚合?哪个更好List/Map构造最终的聚合?

回答

0

如果用Spark DataFrame/Dataset和Row重新表述问题将会很有帮助;我知道你最终想要使用JSON,但JSON输入/输出的细节是一个单独的问题。

您正在查找的功能是Spark SQL aggregate function(请参阅该页面上的组)。功能collect_listcollect_set是相关的,但是你需要的功能还没有实现。

您可以实施我所谓的count_by_value通过派生自org.spark.spark.sql.expressions.UserDefinedAggregateFunction。这需要深入了解Spark SQL的工作原理。

一旦count_by_value实现,你可以使用它像这样:

df.groupBy("itemId").agg(count_by_value(df("country")), count_by_value(df("language"))) 
相关问题