我有一个flink作业,它使用TextOutputFormat将数据写入目标。代码是这样的:Flink在HDFS上写入产生空文件
String basePath = "/Users/me/out";
// String basePath = "hdfs://10.199.200.204:9000/data";
// ensure we have a format for this.
TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection + "/" + uid));
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
format.configure(GlobalConfiguration.getConfiguration());
format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
// then serialize and write.
String record = serializationFunction.map(value);
log.info("Writing " + record);
format.writeRecord(record);
当在普通文件系统上使用路径作为目的地时,这很好地工作。但是,当我将基础路径更改为hdfs位置时,它不再按预期工作。会发生什么情况是,输出文件实际上是在HDFS上创建的,但它的大小为零字节。通话期间我没有收到任何例外。
我正在使用Hadoop 2.6.0和Flink 0.10.1。使用命令行工具(hadoop fs -put ...
)将文件复制到hdfs的作品,所以我认为我可以排除一些Hadoop的配置错误。另外我开始使用Wireshark并将数据传输到Hadoop服务器,那么在实际编写之前我需要提交一些数据吗?