0
我是Spark新手,使用Scala 2.10和Spark 1.6。 试图Input_file_001.txt格式如下,使用Spark格式化文本文件
Input_file_001.txt:
Dept 0100 Batch Load Errors for 8/16/2016 4:45:56 AM
Case 1111111111
Rectype: ABCD
Key:UMUM_REF_ID=A12345678,UMSV_SEQ_NO=1
UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID
Case 2222222222
Rectype: ABCD
Key:UMUM_REF_ID=B87654321,UMSV_SEQ_NO=2
UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID
NTNB ERROR :Invalid Value NTNB_MCTR_SUBJ=AMOD
Case 3333333333
Rectype: WXYZ
Key:UMUM_REF_ID=U19817250,UMSV_SEQ_NO=2
UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID
输出文件
Input_file_001.txt
case~Rectype~key,Error
1111111111~ABCD~UMUM_REF_ID=A12345678,UMSV_SEQ_NO=1~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID
2222222222~ABCD~UMUM_REF_ID=B87654321,UMSV_SEQ_NO=2~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID,NTNB ERROR :Invalid Value NTNB_MCTR_SUBJ=AMOD
3333333333~WXYZ~UMUM_REF_ID=U19817250,UMSV_SEQ_NO=2~UMSV ERROR :UNITS_ALLOW must be > or = UNITS_PAID
我试图去实现它像下面,
val source = sc.textFile("Input_file_001.txt")
val fileread = source.filter(x => ! x.startsWith("Dept"))).filter(_.nonEmpty).map(z => z.trim)
上面的代码给我阵列[字符串],不能把它转发。 任何帮助表示赞赏。
谢谢拉梅什您的输入。我尝试了你的解决方案,它给了我与源文件相同的格式,但是在每一行中用〜代替。我试图实现输出文件格式,因为每个字段由〜和行分隔,并以新行结束,因此我可以在其上创建外部配置单元表。也可以在输出文件中添加源文件名作为字段? – vin
是不是你的源文件中的问题?如果不是那么解决方案将无法正常工作, –
哦,我明白了。我使用部分代码在IntelliJ本地运行它。我刚刚尝试在群集上运行.jar。现在正在给予预期的结果。你太棒了。谢谢。 – vin