2015-04-20 141 views
6

我有一个数据集,我想提取那些(审查/文本)具有(审查/时间)的x和y之间,例如(1183334400 <时间< 1185926400),RDD滤波器阶火花

这里是

product/productId: B000278ADA 
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large 
product/price: 46.34 
review/userId: A17KXW1PCUAIIN 
review/profileName: Mark Anthony "Mark" 
review/helpfulness: 4/4 
review/score: 5.0 
review/time: 1174435200 
review/summary: Jobst UltraSheer Knee High Stockings 
review/text: Does a very good job of relieving fatigue. 

product/productId: B000278ADB 
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large 
product/price: 46.34 
review/userId: A9Q3932GX4FX8 
review/profileName: Trina Wehle 
review/helpfulness: 1/1 
review/score: 3.0 
review/time: 1352505600 
review/summary: Delivery was very long wait..... 
review/text: It took almost 3 weeks to recieve the two pairs of stockings . 

product/productId: B000278ADB 
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large 
product/price: 46.34 
review/userId: AUIZ1GNBTG5OB 
review/profileName: dgodoy 
review/helpfulness: 1/1 
review/score: 2.0 
review/time: 1287014400 
review/summary: sizes recomended in the size chart are not real 
review/text: sizes are much smaller than what is recomended in the chart. I tried to put it and sheer it!. 

我的火花Scala代码:我的数据样本

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 
import org.apache.spark.{SparkConf, SparkContext} 

object test1 { 
    def main(args: Array[String]): Unit = { 
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local") 
    val sc = new SparkContext(conf1) 
    val conf: Configuration = new Configuration 
    conf.set("textinputformat.record.delimiter", "product/title:") 
    val input1=sc.newAPIHadoopFile("data/Electronics.txt",  classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) 
    val lines = input1.map { text => text._2} 
    val filt = lines.filter(text=>(text.toString.contains(tt => tt in (startdate until enddate)))) 
    filt.saveAsTextFile("data/filter1") 
    } 
} 

,但我的代码不能正常工作,

如何过滤这些行?

+1

我在输入文件中看不到分隔字符串“product/productId:”。 – ipoteka

+1

你期望输出什么,你面临什么问题? – maasg

回答

10

比这简单得多。试试这个:

object test1 
{ 
    def main(args: Array[String]): Unit = 
    { 
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local") 
    val sc = new SparkContext(conf1) 

    def extractDateAndCompare(line: String): Boolean= 
    { 
     val from = line.indexOf("/time: ") + 7 
     val to = line.indexOf("review/text: ") -1 
     val date = line.substring(from, to).toLong 
     date > startDate && date < endDate 
    } 

    sc.textFile("data/Electronics.txt") 
     .filter(extractDateAndCompare) 
     .saveAsTextFile("data/filter1") 
    } 
} 

我通常会发现那些中间辅助方法使事情更清晰。当然,这里假定边界日期是在某个地方定义的,并且输入文件包含格式问题。我故意这样做是为了保持这种简单,但添加一个try,返回一个Option子句并使用flatMap()可以帮助您避免错误(如果有)。另外,您的原始文本有点麻烦,您可能需要探索Json,TSV文件或其他一些更简单的格式。

+0

请注意,我从头编码它,索引上可能有小细节等,但我希望你明白。 –

+0

亲爱的丹尼尔,我有评论的1点千兆字节的数据集(文本)这里是一个示例我的数据集: 产品/产品编号:B000278ADA 产品/标题:因为Jobst ULTR 产品/价格:46.34 审查/用户名:A1ZJAH4 审查/评论:0/0 评论/得分:5.0 评论/时间:1359936000 评论/摘要:一站式购物 评论/文字:很高兴找到你正在寻找的东西。 我想提取在一段时间内的评论/文本,例如我想在2002年提取评论/文本,对于这项工作我写上面的代码,认为一个完整的评论数据作为RDD的记录 –

+0

哦,我看到你更新了示例文本。那么这意味着每个“记录”会产生多行? –