2016-01-15 31 views
1

我在eclipse中运行flink,必要的jar已被Maven提取。我的机器有一个带有八个内核的处理器,我必须写的流应用程序从它的输入中读取行并计算一些统计数据。在多核处理器上本地运行Apache flink

当我在我的机器上运行该程序时,我期望flink使用CPU的所有内核以及线程代码。但是,当我观察内核时,我发现只有一个内核正在使用。我尝试了很多东西,并在下面的代码中留下了我的最后一次尝试,即设置环境的并行性。我也尝试将它设置为单独的流等等。

public class SemSeMi { 


    public static void main(String[] args) throws Exception { 
     System.out.println("Starting Main!"); 

     System.out.println(org.apache.flink.core.fs.local.LocalFileSystem 
       .getLocalFileSystem().getWorkingDirectory()); 

     StreamExecutionEnvironment env = StreamExecutionEnvironment 
       .getExecutionEnvironment(); 

     env.setParallelism(8); 

     env.socketTextStream("localhost", 9999).flatMap(new SplitterX()); 

     env.execute("Something");  
    } 

    public static class SplitterX implements 
      FlatMapFunction<String, Tuple2<String, Integer>> { 
     @Override 
     public void flatMap(String sentence, 
       Collector<Tuple2<String, Integer>> out) throws Exception { 
      // Do Nothing! 

     } 
    } 
} 

我喂的数据PROGRAMM使用netcat的:

nc -lk 9999 < fileName 

的问题是如何使程序在本地规模和使用所有可用的核心?

回答

2

您不必明确指定并行度。以默认设置运行的作业将自动将并行度设置为可用核心的数量。

在你的情况下,源代码将以1的并行性运行,因为从套接字读取不能分发。但是,对于flatMap操作,系统将实例化8个实例。如果你打开日志记录,那么你也会看到它。现在输入数据以循环方式分配给flatMap任务。每个flatMap任务都由一个单独的线程执行。

我会怀疑你为什么只看到一个核心的负载是因为SplitterX没有做任何工作。试试下面的代码计算每String的字符数,然后打印出结果到控制台:

public static void main(String[] args) throws Exception { 
    System.out.println("Starting Main!"); 

    System.out.println(org.apache.flink.core.fs.local.LocalFileSystem 
     .getLocalFileSystem().getWorkingDirectory()); 

    StreamExecutionEnvironment env = StreamExecutionEnvironment 
     .getExecutionEnvironment(); 

    env.socketTextStream("localhost", 9999).flatMap(new SplitterX()).print(); 

    env.execute("Something"); 
} 

public static class SplitterX implements 
    FlatMapFunction<String, Tuple2<String, Integer>> { 
    @Override 
    public void flatMap(String sentence, 
         Collector<Tuple2<String, Integer>> out) throws Exception { 
     out.collect(Tuple2.of(sentence, sentence.length())); 

    } 
} 

的数字在每一行的开始告诉你哪些任务打印的结果。

+0

我故意清空程序来隔离问题。在我的完整版本中有很多需要处理。我按预期收到任务编号,但整个负载仍在一个核心上。要看到它,你需要通过每一百万行左右只打印一次来制动输出! – AHH

相关问题