目前,我的工作中,我有一个需要预处理之前,它是“流处理”一个CSV文件的项目。因此,我需要执行批处理以及流处理。具体而言,我的data.csv
文件需要预先处理并在特定字段上排序,该字段将用作流处理的EventTime
时间戳。下面的批处理脚本产生预处理输出:阿帕奇弗林克:排序数据集和创建的数据流中从多个输入文件与事件倍
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Long, String, String>> compactData = env
.readTextFile("data.csv")
.flatMap(new RichFlatMapFunction<String, Tuple3<Long, String, String>>() {
private CustomDelegate delegate;
@Override
public void open(Configuration parameters) throws Exception {
delegate = new CustomDelegate();
}
@Override
public void flatMap(String s, Collector<Tuple3<Long, String, String>> out)
throws Exception {
Tuple3<Long, String, String> datum = delegate.deserializeRide(s);
if (datum != null)
out.collect(datum);
}
});
compactData.partitionByRange(0)
.sortPartition(0, Order.ASCENDING)
.writeAsCsv("output_dir", "\n", ",");
env.execute();
我的默认并行是32,当批处理脚本(以上)总结执行中,output_dir
目录中创建,它包含32个文件。
问题1:我的问题是这些文件是否基于全局顺序生成。实质上,文件1中的记录与文件2中的记录相比是否具有较小的值(等等)?如果不是,我如何保证以前的或类似的东西?
如上所述,我使用output_dir
中的文件作为我的流处理作业的输入,该作业由前一个的第一个字段(即EventTime
)加上时间戳。流作业的代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Phase 0: Collect input and create timestamps
DataStream<Tuple3<Long, String, Integer>> dataStream = env
.readTextFile("output_dir")
.map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
@Override
public Tuple3<Long, String, Integer> map(String s) throws Exception {
String[] tokens = s.split(",");
return new Tuple3<Long, String, Integer>(Long.parseLong(tokens[0]),
tokens[1] + "-" + tokens[2], 1);
}
})
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> t) {
return t.f0;
}
});
env.execute();
问题2:我定义为输入目录output_dir
,其中包含上排序字段0文件将记录被解析并放置在数据流基于我想要的顺序(即,在它们的字段0上)。如果不是,我相信我在分配时间戳方面会遇到问题(对吗?)?我可以从多个文件中读取DataStream
(就像我现在所做的那样),还是必须将所有文件合并为一个文件,并通过从单个文件中串行读取所有记录来创建DataStream?
谢谢你的回答。关于问题1,我遵循了您的建议,并将全球排序的数据制作在一个文件中。谈到问题2,似乎如果我希望基于EventTime处理数据,那么我坚持等于1的并行性。我是否正确? –
@ nick.katsip对于问题2,如果您想要在事件时间内严格按照事件时间对所有记录进行处理,那么操作员处理这些记录的并行度应为1,并确保此操作员的输入记录为也全球订购。如果数据集不是很大,只需将批处理作业和流式作业的并行性设置为1,则记录将按全局排序处理。 – David
再次感谢大卫的评论。显然,在Flink 1.2上,我必须使用'windowAll()'转换,然后使用1的并行性来处理基于事件时间顺序的数据。再次感谢您的回答和您的意见。 –