一个选项是使用Hadoop HDFS Java API。假设你正在使用maven,你将包括Hadoop的共同在你的pom.xml:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0.2.2.0.0-2041</version>
</dependency>
然后,在你的嘴实现你会使用HDFS文件系统对象。例如,下面是一些伪代码,用于以字符串形式发送文件中的每行:
@Override
public void nextTuple() {
try {
Path pt=new Path("hdfs://servername:8020/user/hdfs/file.txt");
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
String line = br.readLine();
while (line != null){
System.out.println(line);
line=br.readLine();
// emit the line which was read from the HDFS file
// _collector is a private member variable of type SpoutOutputCollector set in the open method;
_collector.emit(new Values(line));
}
} catch (Exception e) {
_collector.reportError(e);
LOG.error("HDFS spout error {}", e);
}
}
谢谢Kit!这确实是单个文件逐个流式化元组的解决方案。怎么样批量元组(还是风暴三叉戟)的喷口? – florins
@florins自己并没有尝试三叉戟,但它看起来像你会实现[IBatchSpout](https://nathanmarz.github.io/storm/doc/storm/trident/spout/IBatchSpout.html),然后你的代码会去在emitBatch而不是nextTuple中。 –