2015-12-17 24 views
3

我有一个flink作业,它使用TextOutputFormat将数据写入目标。代码是这样的:Flink在HDFS上写入产生空文件

String basePath = "/Users/me/out"; 
    // String basePath = "hdfs://10.199.200.204:9000/data"; 
    // ensure we have a format for this. 
    TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection + "/" + uid)); 
    StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); 
    format.configure(GlobalConfiguration.getConfiguration()); 
    format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); 
    // then serialize and write. 
    String record = serializationFunction.map(value); 
    log.info("Writing " + record); 
    format.writeRecord(record); 

当在普通文件系统上使用路径作为目的地时,这很好地工作。但是,当我将基础路径更改为hdfs位置时,它不再按预期工作。会发生什么情况是,输出文件实际上是在HDFS上创建的,但它的大小为零字节。通话期间我没有收到任何例外。

我正在使用Hadoop 2.6.0和Flink 0.10.1。使用命令行工具(hadoop fs -put ...)将文件复制到hdfs的作品,所以我认为我可以排除一些Hadoop的配置错误。另外我开始使用Wireshark并将数据传输到Hadoop服务器,那么在实际编写之前我需要提交一些数据吗?

回答

2

为了将结果清理到HDFS,必须在完成记录写入后调用TextOutputFormatclose方法。

// do writing 
while (some condition) { 
    format.writeRecord(record); 
} 

// finished writing 
format.close(); 
0

我发现它为什么发生。实际上有两个原因:

  1. 正如Till Rohrmann指出的那样,输出格式没有被刷新。由于我在流式作业中使用格式,因此关闭格式不适用。我使出写我自己的格式,可以刷新:

    public class MyTextOutputFormat<T> extends TextOutputFormat<T> { 
        public MyTextOutputFormat(Path outputPath) { 
         super(outputPath); 
        } 
    
        public MyTextOutputFormat(Path outputPath, String charset) { 
         super(outputPath, charset); 
        } 
    
        // added a custom flush method here. 
        public void flush() throws IOException { 
         stream.flush(); 
        } 
    } 
    
  2. 我在VM guest虚拟机上运行HDFS和VM主机连接到它。 Flink的HDFS客户端默认使用数据节点的IP地址连接到数据节点。但datanode的IP地址报告为127.0.0.1。所以flink试图连接到127.0.0.1,当然在主机系统中没有运行HDFS数据节点。然而这只是在我添加了手动冲洗操作后才显示出来。为了解决这个问题,我不得不改变两两件事:

    • 里面的VM guest虚拟机,修改$HADOOP_HOME/etc/hadoop/hdfs-site.xml并添加

      <property> 
          <name>dfs.datanode.hostname</name> 
          <value>10.199.200.204</value> <!-- IP of my VM guest --> 
      </property> 
      

      这种变化做出的名称节点上报数据节点的正确路由的主机名。它实际上是一个无证的设置,但似乎工作。

    • 在其中弗林克实际运行系统,我有一个文件夹(例如/home/me/conf)中创建一个hdfs-site.xml只好再设置一个环境变量HADOOP_CONF_DIR指向/home/me/conf。该文件有如下内容:

      <?xml version="1.0" encoding="UTF-8"?> 
      <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> 
      <configuration> 
          <property> 
           <name>dfs.client.use.datanode.hostname</name> 
           <value>true</value> 
          </property> 
      </configuration> 
      

      这种变化指示通过Hadoop客户端使用主机名代替IP地址连接到数据管理部。这些更改后,我的数据被正确写入HDFS。