2017-09-01 17 views
1

我使用Spark DataSet处理以下地图格式的cassandra中的columnfamily。因此,我想分两种类型溢价(City and Duster)与非溢价(Alto K10, Aspire, Nano and i10),我希望溢价与非溢价的最终计数值为2(City,Duster计数)与10( Alto K10, Aspire, Nano and i10)。使用Spark DataSet的地图值的聚合地图

代码:

case class UserProfile(userdata:Map[String,Map[String,Int]]) 

val userprofileDataSet = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table"->"userprofilesagg","keyspace" -> "KEYSPACENAME")).load().as[UserProfile] 

怎么做才能对userprofileDataSet的处理?

数据格式:

{'bodystyle': {'Compact Sedan': 1, 'Hatchback': 8, 'SUV': 1, 'Sedan': 4}, 
    'models': {'Alto K10': 3, 'Aspire': 4, 'City': 1, 'Duster': 1, 'Nano': 3, 'i10': 2}} 

编辑问题:

对于鱿鱼的回答。现在我要聚集每个用户的结果是这样的:

DOICvncGKUH9xBLnW3e9jXcd2 | non-premium | [Nano, Alto K10, Aspire, i10] | 12 | premium | [City, Duster] | 2 
    BkkpgeAdCkYJEXsdZjiVz3bSb | non-premium | [Nano, Alto K10, Aspire, i10] | 17 | premium | [City, Duster] | 5 

现在情况类是这样的

案例类:

case class UserProfile(userid:String, userdata:Map[String,Map[String,Int]]) 

数据:

DOICvncGKUH9xBLnW3e9jXcd2 | {'bodystyle': {'Compact Sedan': 1, 'Hatchback': 8, 'SUV': 1, 'Sedan': 4}, 
    'models': {'Alto K10': 3, 'Aspire': 4, 'City': 1, 'Duster': 1, 'Nano': 3, 'i10': 2}} 

BkkpgeAdCkYJEXsdZjiVz3bSb | {'bodystyle': {'Compact Sedan': 7, 'Hatchback': 5, 'SUV': 3, 'Sedan': 7}, 
    'models': {'Alto K10': 1, 'Aspire': 7, 'City': 4, 'Duster': 1, 'Nano': 8, 'i10': 1}} 

此外,你问我为什么提到Bodystyle。因此,我可以将类似汇总(SUV, Sedan)作为溢价应用,并对其进行非溢价处理。

+0

你想只处理模式? bodystyle的作用是什么? –

+0

@squid我编辑了这个问题。你可以请看看它。 – Naresh

回答

1

我不确定bodystyle的作用究竟是什么。如果我的理解这个问题正确的,那么你想要的类别和数量,你可以尝试像下面,如果没有用删除types

--userprofile table 
CREATE TABLE `userprofile`(
`properties` map<string,map<string,int>>); 

--Aggregate by category 
select category, 
     collect_set(type) as types, 
     sum(value) as count 
from (select case when lower(type) in ('city','duster') then 'premium' 
      when lower(type) in ('alto k10', 'aspire', 'nano' , 'i10') then 'non-premium' 
     end as category, 
     type,value 
from (select properties['models'] as models from userprofile) t 
    lateral view explode(models) t as type, value)l group by category 

输出

category | types       |   count 
non-premium | ["Aspire","i10","Nano","Alto K10"] |   12 
premium  | ["City","Duster"]     |   2