我在eclipse中运行flink,必要的jar已被Maven提取。我的机器有一个带有八个内核的处理器,我必须写的流应用程序从它的输入中读取行并计算一些统计数据。在多核处理器上本地运行Apache flink
当我在我的机器上运行该程序时,我期望flink使用CPU的所有内核以及线程代码。但是,当我观察内核时,我发现只有一个内核正在使用。我尝试了很多东西,并在下面的代码中留下了我的最后一次尝试,即设置环境的并行性。我也尝试将它设置为单独的流等等。
public class SemSeMi {
public static void main(String[] args) throws Exception {
System.out.println("Starting Main!");
System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
.getLocalFileSystem().getWorkingDirectory());
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(8);
env.socketTextStream("localhost", 9999).flatMap(new SplitterX());
env.execute("Something");
}
public static class SplitterX implements
FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence,
Collector<Tuple2<String, Integer>> out) throws Exception {
// Do Nothing!
}
}
}
我喂的数据PROGRAMM使用netcat的:
nc -lk 9999 < fileName
的问题是如何使程序在本地规模和使用所有可用的核心?
我故意清空程序来隔离问题。在我的完整版本中有很多需要处理。我按预期收到任务编号,但整个负载仍在一个核心上。要看到它,你需要通过每一百万行左右只打印一次来制动输出! – AHH