2017-08-22 99 views
0

我是Spark新手,使用Scala 2.10和Spark 1.6。 试图Input_file_001.txt格式如下,使用Spark格式化文本文件

Input_file_001.txt:

Dept 0100 Batch Load Errors for 8/16/2016 4:45:56 AM 

Case 1111111111 
Rectype: ABCD 
Key:UMUM_REF_ID=A12345678,UMSV_SEQ_NO=1 
UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 

Case 2222222222 
Rectype: ABCD 
Key:UMUM_REF_ID=B87654321,UMSV_SEQ_NO=2 
UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 
NTNB ERROR :Invalid Value       NTNB_MCTR_SUBJ=AMOD 

Case 3333333333 
Rectype: WXYZ 
Key:UMUM_REF_ID=U19817250,UMSV_SEQ_NO=2 
UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 

输出文件

Input_file_001.txt

case~Rectype~key,Error 
1111111111~ABCD~UMUM_REF_ID=A12345678,UMSV_SEQ_NO=1~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 
2222222222~ABCD~UMUM_REF_ID=B87654321,UMSV_SEQ_NO=2~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID,NTNB ERROR :Invalid Value       NTNB_MCTR_SUBJ=AMOD 
3333333333~WXYZ~UMUM_REF_ID=U19817250,UMSV_SEQ_NO=2~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 

我试图去实现它像下面,

val source = sc.textFile("Input_file_001.txt") 
val fileread = source.filter(x => ! x.startsWith("Dept"))).filter(_.nonEmpty).map(z => z.trim) 

上面的代码给我阵列[字符串],不能把它转发。 任何帮助表示赞赏。

回答

0

您可以利用wholeTextFiles api读取输入文件,该文件将读取输入文件==>(filename, whole text as one line)。然后你可以操作整个文本行并将其转换为你想要的输出。最后,您可以添加header,并将其保存到一个文件

val rdd = sc.wholeTextFiles("path to Input_file_001.txt") 
val finalRdd = rdd.flatMap(tuple => tuple._2.split("\nCase ") 
    .map(record => record.replace("\nRectype: ", "~").trim 
    .replace("\nKey:", "~").trim 
    .replace("\nUMSV ERROR :", "~UMSV ERROR :").trim 
    .replace("\nNTNB ERROR :", ",NTNB ERROR :").trim) 
).filter(record => !record.startsWith("Dept")) 
val header: RDD[String] = sc.parallelize(Array("case~Rectype~key,Error")) 
header.union(finalRdd).saveAsTextFile("path to ouput file") 

你应该有以下输出

case~Rectype~key,Error 
1111111111~ABCD ~UMUM_REF_ID=A12345678,UMSV_SEQ_NO=1~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 
2222222222~ABCD ~UMUM_REF_ID=B87654321,UMSV_SEQ_NO=2~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID ,NTNB ERROR :Invalid Value       NTNB_MCTR_SUBJ=AMOD 
3333333333~WXYZ ~UMUM_REF_ID=U19817250,UMSV_SEQ_NO=2~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID 

我希望答案是有帮助的

+0

谢谢拉梅什您的输入。我尝试了你的解决方案,它给了我与源文件相同的格式,但是在每一行中用〜代替。我试图实现输出文件格式,因为每个字段由〜和行分隔,并以新行结束,因此我可以在其上创建外部配置单元表。也可以在输出文件中添加源文件名作为字段? – vin

+0

是不是你的源文件中的问题?如果不是那么解决方案将无法正常工作, –

+0

哦,我明白了。我使用部分代码在IntelliJ本地运行它。我刚刚尝试在群集上运行.jar。现在正在给予预期的结果。你太棒了。谢谢。 – vin