2017-02-28 25 views
0

我有一个文件夹包含我的本地目录中的许多销售文本文件。让我们的2个文本文件的例子:非结构化文件的火花提取和转换

文本文件1:

Sales Details 
20161120 


Sales Person: John 

Code Product  Quantity Price 
A0001 Product1 20   15.90 
A0003 Product3 13   23.80 

文本文件2:

Sales Details 
20161130 


Sales Person: Alicia 

Code Product  Quantity Price 
A0007 Product7 342   79.50 
A0008 Product8 55   432.80 
A0009 Product9 100   134.30 

我用水槽的文件流进HDFS。所有这些小文件在HDFS中合并为一个大文件。当我使用Spark对这些文件执行提取和转换时,我遇到了一些需要在这里寻求建议的问题。

基于上述2个文件,它将在HDFS中组合成一个文件。我使用的火花从HDFS读取文本文件,如下所示:

lines = spark.read.text('/user/tester/sales') 

如何拆分线为两个销售细节,然后提取出的信息对每个销售人员?我的最终目标是提取信息,并把它在蜂巢表具有以下strucuture:

Date  SalesPerson  Code  Product  Quantity Price 

感谢。

回答

0

你的文件结构并不是很方便处理,但你总是可以使用正则表达式与火花的wholeTextFiles重写它们为表格格式。以此pyspark代码为例:

import re 

def extract_sales(file): 
    for line in file[1].split("\n"): 
     if re.match('\d{8}', line.strip()): 
      date = line.strip() 
     if re.search('^Sales Person', line): 
      person = re.match("^Sales Person: (.*)", line).group(1) 
     if re.search('^A00', line): 
      yield [date, person] + re.split('\s+', line) 

raw_data = spark.sparkContext.wholeTextFiles('sales/') 
raw_data.flatMap(extract_sales) \ 
    .toDF(['Date', 'SalesPerson', 'Code', 'Product', 'Quantity', 'Price']).show() 

+--------+-----------+-----+--------+--------+------+ 
| Date|SalesPerson| Code| Product|Quantity| Price| 
+--------+-----------+-----+--------+--------+------+ 
|20161120|  John|A0001|Product1|  20| 15.90| 
|20161120|  John|A0003|Product3|  13| 23.80| 
|20161130|  Alicia|A0007|Product7|  342| 79.50| 
|20161130|  Alicia|A0008|Product8|  55|432.80| 
|20161130|  Alicia|A0009|Product9|  100|134.30| 
+--------+-----------+-----+--------+--------+------+ 
+0

非常感谢Mariusz!它可以帮助我了解如何处理它。通过使用用户定义的函数,当我们运行Spark应用程序时,它仍然会利用Spark引擎而不是本地python处理权限? – kcyea

+0

是的,当然。该函数将在集群节点上执行,但为了实现并行性,您的输入目录应该有多个大文件。 – Mariusz

+0

好的...顺便说一下,使用“整个文本文件”与“文本”比较,性能会更好吗? – kcyea