2017-05-24 19 views
2

目前,我的工作中,我有一个需要预处理之前,它是“流处理”一个CSV文件的项目。因此,我需要执行批处理以及流处理。具体而言,我的data.csv文件需要预先处理并在特定字段上排序,该字段将用作流处理的EventTime时间戳。下面的批处理脚本产生预处理输出:阿帕奇弗林克:排序数据集和创建的数据流中从多个输入文件与事件倍

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
DataSet<Tuple3<Long, String, String>> compactData = env 
    .readTextFile("data.csv") 
    .flatMap(new RichFlatMapFunction<String, Tuple3<Long, String, String>>() { 
     private CustomDelegate delegate; 
     @Override 
     public void open(Configuration parameters) throws Exception { 
     delegate = new CustomDelegate(); 
     } 
     @Override 
     public void flatMap(String s, Collector<Tuple3<Long, String, String>> out) 
      throws Exception { 
     Tuple3<Long, String, String> datum = delegate.deserializeRide(s); 
     if (datum != null) 
      out.collect(datum); 
     } 
    }); 
compactData.partitionByRange(0) 
    .sortPartition(0, Order.ASCENDING) 
    .writeAsCsv("output_dir", "\n", ","); 
env.execute(); 

我的默认并行是32,当批处理脚本(以上)总结执行中,output_dir目录中创建,它包含32个文件。

问题1:我的问题是这些文件是否基于全局顺序生成。实质上,文件1中的记录与文件2中的记录相比是否具有较小的值(等等)?如果不是,我如何保证以前的或类似的东西?

如上所述,我使用output_dir中的文件作为我的流处理作业的输入,该作业由前一个的第一个字段(即EventTime)加上时间戳。流作业的代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
// Phase 0: Collect input and create timestamps 
DataStream<Tuple3<Long, String, Integer>> dataStream = env 
    .readTextFile("output_dir") 
    .map(new MapFunction<String, Tuple3<Long, String, Integer>>() { 
     @Override 
     public Tuple3<Long, String, Integer> map(String s) throws Exception { 
     String[] tokens = s.split(","); 
     return new Tuple3<Long, String, Integer>(Long.parseLong(tokens[0]), 
      tokens[1] + "-" + tokens[2], 1); 
     } 
    }) 
    .assignTimestampsAndWatermarks(
     new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() { 
      @Override 
      public long extractAscendingTimestamp(Tuple3<Long, String, Integer> t) { 
      return t.f0; 
      } 
     }); 
env.execute(); 

问题2:我定义为输入目录output_dir,其中包含上排序字段0文件将记录被解析并放置在数据流基于我想要的顺序(即,在它们的字段0上)。如果不是,我相信我在分配时间戳方面会遇到问题(对吗?)?我可以从多个文件中读取DataStream(就像我现在所做的那样),还是必须将所有文件合并为一个文件,并通过从单个文件中串行读取所有记录来创建DataStream?

回答

2

问题1:我的问题是,是否这些文件是根据全球秩序的产生。实质上,文件1中的记录与文件2中的记录相比是否具有较小的值(等等)?如果不是,我如何保证以前的或类似的东西?

号由于有32个分区,在每个分区中的数据进行排序。但不能保证不同输出文件之间的数据顺序。您可以手动将sortPartition操作符的并行性设置为1,或者实现您自己的分区程序而不是散列分区程序。

问题2:我定义为输入目录OUTPUT_DIR,其中包含上排序字段0文件将记录进行解析,并根据我想要的顺序(即,在放置于数据流他们的领域0)。如果不是,我相信我在分配时间戳方面会遇到问题(对吗?)?我可以从多个文件读取DataStream(就像我现在这样做),还是必须将所有文件合并为一个文件,并通过从单个文件中串行读取所有记录来创建DataStream?

假设有32个输出文件,如果您的并发流作业也是32,那么每个文件将被一个并行使用,所有来自该输入文件的数据将根据数据顺序进行处理以当前并行性出现在文件中。但是,一旦您尝试从32个并行性中汇总数据或尝试对数据进行混洗,数据的顺序就不再被排序。如果希望接收机获得全局排序的数据,则可能必须将所有数据放在一个文件中,并使用一个并行性的流式作业处理它们。

+0

谢谢你的回答。关于问题1,我遵循了您的建议,并将全球排序的数据制作在一个文件中。谈到问题2,似乎如果我希望基于EventTime处理数据,那么我坚持等于1的并行性。我是否正确? –

+1

@ nick.katsip对于问题2,如果您想要在事件时间内严格按照事件时间对所有记录进行处理,那么操作员处理这些记录的并行度应为1,并确保此操作员的输入记录为也全球订购。如果数据集不是很大,只需将批处理作业和流式作业的并行性设置为1,则记录将按全局排序处理。 – David

+0

再次感谢大卫的评论。显然,在Flink 1.2上,我必须使用'windowAll()'转换,然后使用1的并行性来处理基于事件时间顺序的数据。再次感谢您的回答和您的意见。 –

相关问题