2017-07-02 107 views
0

我有以下数据:取消组合分组的表Scala中

53,Male,11th,<=50K 
53,Male,11th,<=50K 
53,Male,11th,<=50K 
20,Female,Masters,>50K 
20,Female,Masters,>50K 
33,Male,Bachelors,<=50K 

接下来,我通过使用选择和组需要组以上的数据。所以它会是这样的:

53,Male,11th,<=50K,3 
20,Female,Masters,>50K,2 
33,Male,Bachelors,<=50K,1 

其中最后一个数字显示类似记录的数量。现在我需要过滤等效记录数> 2,并将其存储在单独的文件中

我已经通过Scala命令中的SQL查询对数据进行了分组。为了取消分组数据,我可以创建一个表格并通过(插入命令)并逐行添加分组数据。它可以工作,但速度非常慢,只需要几个小时就可以拍摄几张唱片。有没有什么想法使用斯卡拉非常感谢。 命令如下所示:

import spark.sqlContext.implicits._ 
import scala.collection.immutable.Map 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} 


case class Rating(age: Double,edu: String, sex: String, salary: String) 

val Result = sc.textFile("hdfs://NameNode01:9000/input/adult.csv").map(_.split(",")).map(p => Rating(p(0).trim.toDouble,p(1),p(2),p(3))).toDF() 
Result.registerTempTable("Start") 

val sal1=spark.sqlContext.sql("SELECT age,edu,sex,salary,count(*) as cnt from Start group by age,edu,sex,salary") 
sal1.registerTempTable("adult") 

val sal2=spark.sqlContext.sql("SELECT age,edu,sex,salary,cnt from adult WHERE cnt>3") 
sal2.registerTempTable("adult2") 

var ag=sal2.map(age => ""+age(0)).collect() 
var ed=sal2.map(edu => ""+edu(1)).collect() 
var se=sal2.map(sex => ""+sex(2)).collect() 
var sa=sal2.map(salary => ""+salary(3)).collect() 
var cn=sal2.map(cnt => ""+cnt(4)).collect() 

//convert age to double 
val ages= ag.map(_.toDouble) 

//convert the cnt to integer 
val counts= cn.map(_.toInt) 

//length of the array 
var cnt_length=counts.size 


//create a table and add the sal2 records in it 
val adlt2=spark.sqlContext.sql("CREATE TABLE adult3 (age double, edu string, sex string, salary string)") 


//loop and enter the number of cn 
var sql_querys="query" 
var i=0 
var j=0 
var loop_cnt=0 

for(i <-0 to cnt_length-1){ 
    loop_cnt=counts(i) 
    for(j <-0 to loop_cnt-1){ 
     sql_querys="INSERT into adult3 values ("+ages(i)+",'"+ed(i)+"','"+se(i)+"','"+sa(i)+"')" 

     val adlt3=spark.sqlContext.sql("INSERT into adult3 values ("+ages(i)+",'"+ed(i)+"','"+se(i)+"','"+sa(i)+"')") 
    } 

} 

的主要部分是在代码的结束时的循环。

+0

我不知道你究竟问..你能否给出'sal1','sal2'的数据示例和所需的输出? – shakedzy

+0

你可以编辑你的帖子。 – mtoto

回答

1

你可能要考虑按照groupBy计数取消组合使用explode您的数据框:

import org.apache.spark.sql.functions._ 

case class Rating(age: Double, edu: String, sex: String, salary: String) 

val Result = sc.textFile("/Users/leo/projects/spark/files/testfile.csv"). 
    map(_.split(",")). 
    map(p => Rating(p(0).trim.toDouble, p(1).trim, p(2).trim, p(3).trim)). 
    toDF 

val saDF1 = Result.groupBy("age", "edu", "sex", "salary").agg(count("*") as "cnt") 

val saDF2 = Result.groupBy("age", "edu", "sex", "salary").agg(count("*") as "cnt").where($"cnt" > 2) 

// Create a UDF to fill array of 1's to be later exploded 
val fillArr = (n: Int) => Array.fill(n)(1) 
val fillArrUDF = udf(fillArr) 

val expandedDF1 = saDF1.withColumn("arr", fillArrUDF($"cnt")) 

expandedDF1.show 
+----+------+---------+------+---+---------+ 
| age| edu|  sex|salary|cnt|  arr| 
+----+------+---------+------+---+---------+ 
|33.0| Male|Bachelors| <=50K| 1|  [1]| 
|20.0|Female| Masters| >50K| 2| [1, 1]| 
|53.0| Male|  11th| <=50K| 3|[1, 1, 1]| 
+----+------+---------+------+---+---------+ 

// Ungroup dataframe using explode 
val ungroupedDF1 = expandedDF1.withColumn("a", explode($"arr")). 
    select("age", "edu", "sex", "salary") 

ungroupedDF1.show 
+----+------+---------+------+ 
| age| edu|  sex|salary| 
+----+------+---------+------+ 
|33.0| Male|Bachelors| <=50K| 
|20.0|Female| Masters| >50K| 
|20.0|Female| Masters| >50K| 
|53.0| Male|  11th| <=50K| 
|53.0| Male|  11th| <=50K| 
|53.0| Male|  11th| <=50K| 
+----+------+---------+------+ 
2

这里是一个较短的解决方案,它仅使用RDDS:

val result = sc 
    .textFile("hdfs://NameNode01:9000/input/adult.csv") 
    .map({ (line: String) => 
    val p = line.split(",") 
    (Rating(p(0).trim.toDouble,p(1),p(2),p(3)), 1) 
    }) 
    .reduceByKey(_ + _) 
    .filter(_._2 > 2) 
    .flatMap(rating => Array.fill(rating._2)(rating._1)) 

其工作原理如下:

  • textfile将rdd从文件
  • map变换线对的形式的(rating, 1)
  • reduceByKey组对由rating和求和1S(即统计每个等级的出现)
  • filter丢弃其出现次数少于3次
  • flatmap重复每个等级这么多次作为计数的收视率,然后变平,所有的结果到一个RDD

这里是初始方法无法执行的一些原因:

  1. collectcollect在数据帧上用于读取本地计算机上的内容。这意味着你直接放弃了Spark的所有并行和集群优势。
  2. for循环执行单个数据帧的插入。火花对象的可用转换(例如,map,filter,reduce,单个sql查询)经过高度优化,以分布式方式执行这些操作。通过使用for循环执行单行操作,您将失去此优势,并且您可能会因循环中每次迭代期间复制的数据帧而承受极大的开销。
  3. (次要)将RDD转换为数据帧会增加一些额外的计算成本。因此,除非您打算执行几项可从数据框或数据集的性能特征中受益的操作,否则我会建议仅使用rdds来简化操作。
1

根据我的理解你的问题,你想过滤掉大于2的相似记录并写入文件。如果那么如此如此的可以成为你的解决方案。

您必须已经有原来的数据帧作为

+----+------+---------+------+ 
|age |edu |sex  |salary| 
+----+------+---------+------+ 
|53.0|Male |11th  |<=50K | 
|53.0|Male |11th  |<=50K | 
|53.0|Male |11th  |<=50K | 
|20.0|Female|Masters |>50K | 
|20.0|Female|Masters |>50K | 
|33.0|Male |Bachelors|<=50K | 
+----+------+---------+------+ 

你不需要编写复杂的SQL查询找到数,你可以使用内置的功能

val columnNames = Result.columns 
val finalTemp = Result.groupBy(columnNames.map(col): _*).agg(count("salary").as("similar records")) 

这应该给输出为

+----+------+---------+------+---------------+ 
|age |edu |sex  |salary|similar records| 
+----+------+---------+------+---------------+ 
|33.0|Male |Bachelors|<=50K |1    | 
|20.0|Female|Masters |>50K |2    | 
|53.0|Male |11th  |<=50K |3    | 
+----+------+---------+------+---------------+ 

现在要过滤,您可以使用过滤功能作为

val finalTable = finalTemp.filter($"similar records" < 3) 

最终输出是

+----+------+---------+------+---------------+ 
|age |edu |sex  |salary|similar records| 
+----+------+---------+------+---------------+ 
|33.0|Male |Bachelors|<=50K |1    | 
|20.0|Female|Masters |>50K |2    | 
+----+------+---------+------+---------------+ 

可以将其保存到一个文件

finalTable.write.format("com.databricks.spark.csv").save("output path") 

如果你想过滤掉,那么你可以简单地使用原始数据合并为

Result.join(finalTable, Seq(columnNames: _*)).show(false) 

输出是

+----+------+---------+------+---------------+ 
|age |edu |sex  |salary|similar records| 
+----+------+---------+------+---------------+ 
|33.0|Male |Bachelors|<=50K |1    | 
|20.0|Female|Masters |>50K |2    | 
|20.0|Female|Masters |>50K |2    | 
+----+------+---------+------+---------------+ 

你可以将它保存到一个文件作为上述

注意:您将需要以下导入了上述功能的工作

import org.apache.spark.sql.functions._