我在一个Yarn集群中使用Flink来处理使用各种源和汇的数据。在拓扑中的某个点,有一个操作不能并行化,而且需要访问大量的内存。实际上,我用于此步骤的API需要以数组形式输入。现在,我已经实现了它像在Flink管道中集成了具有高内存需求的不可并行化任务
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo> input = ...
List<Pojo> inputList = input.collect();
Pojo[] inputArray = inputList.toArray();
Pojo[] resultArray = costlyOperation(inputArray);
List<Pojo> resultList = Arrays.asList(resultArray);
DataSet<Pojo> result = env.fromCollection(resultList);
result.otherStuff()
- 东西这个方案似乎相当不自然。有没有一种直接的方式将这个任务并入我的Flink管道?
- 我在another thread中读到
collect()
函数不应该用于大型数据集。我认为将数据集收集到一个列表中,然后是一个数组并不是并行发生的事实并不是我现在最大的问题,但是您是否仍然倾向于将上面所述的input
写入文件并从中构建数组? - 我也看到options在flink中配置托管内存。原则上,可以调整这个方式,以便为昂贵的操作留下足够的堆。另一方面,恐怕拓扑中所有其他运营商的表现可能会受到影响。你对此有何看法?
谢谢你的快速回答!不幸的是,昂贵的操作确实需要一次完成所有数据。所以要走的路将是(1)将数据集的所有元素分组到一个集合中,然后(2)在GroupReduceFunction中聚合数组,对吧?有没有一种自然的方法可以做到(1)而不引入不必要的代理键?此外,你会推荐为(2)定义(Rich)GroupReduceFunction,其中输出数据集实际上只包含一个巨大的数组,还是有更好的方法来做到这一点? –
PassfishSwordword
刚刚更新了我的答案。我猜你需要坚持使用代理键,因为你需要迭代器和输出采集器......关于输出:我不会输出'Pojo []',而是为输出数组中的每个元素调用'Collector.collect()'在你的功能。也许你不需要在内存中组装输出数组,而是直接输出元组,而不是将它们添加到结果中。否则,只需循环遍历函数中的数组。 –