2015-12-08 10 views
0

我在一个Yarn集群中使用Flink来处理使用各种源和汇的数据。在拓扑中的某个点,有一个操作不能并行化,而且需要访问大量的内存。实际上,我用于此步骤的API需要以数组形式输入。现在,我已经实现了它像在Flink管道中集成了具有高内存需求的不可并行化任务

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
DataSet<Pojo> input = ... 
List<Pojo> inputList = input.collect(); 
Pojo[] inputArray = inputList.toArray(); 
Pojo[] resultArray = costlyOperation(inputArray); 
List<Pojo> resultList = Arrays.asList(resultArray); 
DataSet<Pojo> result = env.fromCollection(resultList); 
result.otherStuff() 
  1. 东西这个方案似乎相当不自然。有没有一种直接的方式将这个任务并入我的Flink管道?
  2. 我在another thread中读到collect()函数不应该用于大型数据集。我认为将数据集收集到一个列表中,然后是一个数组并不是并行发生的事实并不是我现在最大的问题,但是您是否仍然倾向于将上面所述的input写入文件并从中构建数组?
  3. 我也看到options在flink中配置托管内存。原则上,可以调整这个方式,以便为昂贵的操作留下足够的堆。另一方面,恐怕拓扑中所有其他运营商的表现可能会受到影响。你对此有何看法?

回答

1

您可以用一个代理键替换“collect-> array-> costlyOperation-> array-> fromCollection”步骤,该代码键对所有元组都有一个唯一值,这样您只能得到一个分区。这将是Flink。

在您的昂贵的操作本身,这是作为GroupReduceFunction实施,你会得到一个迭代器的数据。如果您不需要一次访问所有数据,那么您也可以安全地使用堆空间,因为您不需要将所有数据都保存在内存中(但这当然取决于您的昂贵操作计算的内容)。

作为一种替代方案,您也可以拨打reduce()而不使用之前的groupBy()。但是,您不会获得迭代器或输出收集器,只能计算部分聚合。 (请参阅https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations中的“减少”)

使用Flink样式操作的优点是数据保留在群集中。如果您执行collect(),结果将转移到客户端,则在客户端执行代价高昂的操作,并将结果转移回集群。此外,如果输入较大,Flink会自动将中间结果散布到光盘上。

+0

谢谢你的快速回答!不幸的是,昂贵的操作确实需要一次完成所有数据。所以要走的路将是(1)将数据集的所有元素分组到一个集合中,然后(2)在GroupReduceFunction中聚合数组,对吧?有没有一种自然的方法可以做到(1)而不引入不必要的代理键?此外,你会推荐为(2)定义(Rich)GroupReduceFunction ,其中输出数据集实际上只包含一个巨大的数组,还是有更好的方法来做到这一点? – PassfishSwordword

+1

刚刚更新了我的答案。我猜你需要坚持使用代理键,因为你需要迭代器和输出采集器......关于输出:我不会输出'Pojo []',而是为输出数组中的每个元素调用'Collector.collect()'在你的功能。也许你不需要在内存中组装输出数组,而是直接输出元组,而不是将它们添加到结果中。否则,只需循环遍历函数中的数组。 –

相关问题