是的,你当然可以在Spark中做到这一点!然而,你接近这个问题的方式使得它看起来有点难以实现。
所以我可以显示一个完整的复制pastable到REPL例子,让我们假设你的数据存储在一个字符串(不ARGS(0)文件)
val data = """Customer1| item1:x1,x2,x3; item2:x1,x4,x5; item1:x1,x3,x6|time1|url
Customer1| item1:x1,x7,x3; item2:x1,x4,x5; item3:x5|time2|url2
Customer2| item1:x1,x7,x3; item3:x5|time3|url3"""
和RDD你所说的“线”,可以被读入RDD“rdd”为
val rdd = sc.parallelize(data.split("\n"))
到目前为止没有新的东西。下一步是重要的一步。我们可以准备我们的数据一次完成所有工作,而不是分层计数和汇总。这是更可读性,也更有效,因为它是一个单一的地图,然后是一个单一的减少。
val mapped= rdd.flatMap(line => {
val arr = line.split("\\|")
val customer = arr(0)
val items = arr(1)
val time = arr(2)
val url = arr(3)
items.split(";").flatMap(item => {
val itemKey = item.split(":")(0)
val itemValues = item.split(":")(1).split(",")
itemValues.map(value => (customer, itemKey, value, time, url))
})
})
我们可以看到什么是在这一点,我们可以用mapped.toDF("customer", "itemId", "itemValue", "time", "url").show
+---------+------+---------+-----+----+
| customer|itemId|itemValue| time| url|
+---------+------+---------+-----+----+
|Customer1| item1| x1|time1| url|
|Customer1| item1| x2|time1| url|
|Customer1| item1| x3|time1| url|
|Customer1| item2| x1|time1| url|
|Customer1| item2| x4|time1| url|
|Customer1| item2| x5|time1| url|
|Customer1| item1| x1|time1| url|
|Customer1| item1| x3|time1| url|
|Customer1| item1| x6|time1| url|
|Customer1| item1| x1|time2|url2|
|Customer1| item1| x7|time2|url2|
|Customer1| item1| x3|time2|url2|
|Customer1| item2| x1|time2|url2|
|Customer1| item2| x4|time2|url2|
|Customer1| item2| x5|time2|url2|
|Customer1| item3| x5|time2|url2|
|Customer2| item1| x1|time3|url3|
|Customer2| item1| x7|time3|url3|
|Customer2| item1| x3|time3|url3|
|Customer2| item3| x5|time3|url3|
+---------+------+---------+-----+----+
很好地打印出来。最后,我们可以指望,减少到你所需要的载体:
val reduced = mapped.map{case (customer, itemKey, itemValue, time, url) => ((customer, itemKey, itemValue), 1)}.
reduceByKey(_+_).
map{case ((customer, itemKey, itemValue), count) => (customer, itemKey, itemValue, count)}
并查看它:reduced.toDF("customer", "itemKey", "itemValue", "count").show
+---------+-------+---------+-----+
| customer|itemKey|itemValue|count|
+---------+-------+---------+-----+
|Customer1| item1| x2| 1|
|Customer1| item1| x1| 3|
|Customer2| item1| x7| 1|
|Customer1| item1| x6| 1|
|Customer1| item1| x7| 1|
|Customer2| item1| x3| 1|
|Customer2| item3| x5| 1|
|Customer1| item2| x5| 2|
|Customer1| item2| x4| 2|
|Customer1| item2| x1| 2|
|Customer1| item3| x5| 1|
|Customer1| item1| x3| 3|
|Customer2| item1| x1| 1|
+---------+-------+---------+-----+
如果您需要将所有组合到矢量的Array/Seq表示中,则可以通过进一步聚合数据来完成此操作。希望这可以帮助!
也有值的时间和URL不存在,在这种情况下,arr(2)和arr(3)将失败,ArrayIndexOutOfBoundsException。是否有可能过滤4列的行。例如line.split(“\\ |”))。filter(l => l.length == 4) 我可以忽略没有url和time的数据。 –
只需从元组中删除这些列,然后如果他们不需要。或者,'import scala.util.Try',然后将这些行更新为 'val time = Try(Some(arr(2)))。getOrElse(None)'和 'val url = Try(Some(arr )))。getOrElse(None)' –
取决于您是否需要这些行中的值。如果你不这样做,那么你可以按照你的建议进行过滤。如果你这样做,然后看到以前的评论:) –