2016-03-25 79 views
1

我对数据流导出一些数据到CSV作为标题添加列名,但除了数据我想每个列名添加为在输出文件的第一行,如如何使用数据流时,将数据导出到CSV

col_name1, col_name2, col_name3, col_name4 ... 
    data1.1, data1.2, data1.3, data1.4 ... 
    data2.1 ... 

是否有任何与当前的API有关?(搜索TextIO.Write左右,但没有发现任何东西似乎相关...)或有无论如何,我可以在要成为的头“插入”列名导出的PCollection并强制数据按顺序写入...?

回答

0

由于数据流SDK版本1.7.0配合工作,您在TextIO.Write中有withHeader函数。

所以,你可以这样做:

TextIO.Write.named("WriteToText") 
      .to("/path/to/the/file") 
      .withHeader("col_name1,col_name2,col_name3,col_name4") 
      .withSuffix(".csv")); 

换行符被自动添加到标题的末尾。

1

有没有内置的方式使用TextIO.Write来做到这一点。 PCollection s是无序的,所以它不可能添加一个元素到前面。你可以写一个自定义的BoundedSink这是做到这一点。

+0

谢谢你,啊,我有一个后续问题...有没有什么办法可以为不同的碎片定义不同的行为,比如说我有最终的输出写入n个碎片的csv文件,但我只想写头到只有一个碎片... –

1

Custom sink APIs现在可用,如果你想成为一个制作CSV接收器的勇敢者。目前的解决方法,其积聚的输出作为一个字符串,并将其输出全部完成捆绑:

PCollection<String> output = data.apply(ParDo.of(new DoFn<String, String>() { 
private static final long serialVersionUID = 0; 

String new_line = System.getProperty("line.separator"); 
String csv_header = "id, stuff1, stuff2, stuff3" + new_line; 
StringBuilder csv_body = new StringBuilder().append(csv_header); 

@Override 
public void processElement(ProcessContext c) { 
    csv_body.append(c.element()).append(newline); 
} 

@Override 
public void finishBundle(Context c) throws Exception { 
    c.output(csv_body); 
} 

})).apply(TextIO.Write.named("WriteData").to(options.getOutput())); 

这将只有当你的大输出字符串在内存