我希望能够从分布式(非本地)Storm拓扑中将新条目写入HBase。有几个GitHub项目提供HBase Mappers或pre-made Storm bolts来将元组写入HBase。这些项目提供了在LocalCluster上执行其样本的说明。从风暴螺栓插入HBase
我遇到了这两个项目并直接从bolt中访问HBase API的问题,它们都需要HBase-site.xml文件包含在类路径中。使用直接API方法,也可能使用GitHub,当您执行HBaseConfiguration.create();
时,它将尝试从类路径中的条目中查找所需的信息。
如何修改风暴螺栓的类路径以包含Hbase配置文件?
更新:使用danehammer的答案,这是我得到了它的工作
将以下文件复制到你的〜/ .storm目录:
- HBase的,共0.98.0.2.1.2 .0-402-hadoop2.jar
- hbase-site.xml
- storm.yaml:注意:如果不将storm.yaml复制到该目录中,那么storm jar命令将不会在类路径中使用该目录(见storm.py python script看到自己这个逻辑 - 将是很好,如果这被证明)
接下来,在您的拓扑结构类的主要方法得到了HBase的配置和序列化:
final Configuration hbaseConfig = HBaseConfiguration.create();
final DataOutputBuffer databufHbaseConfig = new DataOutputBuffer();
hbaseConfig.write(databufHbaseConfig);
final byte[] baHbaseConfigSerialized = databufHbaseConfig.getData();
传递字节数组你的喷口类通过构造函数。我发现如果喷口有一个配置字段,当运行拓扑时你将得到一个不能序列化的异常)
在喷口的open方法中,反序列化的配置和访问HBase的表:
Configuration hBaseConfiguration = new Configuration();
ByteArrayInputStream bas = new ByteArrayInputStream(baHbaseConfigSerialized);
hBaseConfiguration.readFields(new DataInputStream(bas));
HTable tbl = new HTable(hBaseConfiguration, HBASE_TABLE_NAME);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("YOUR_COLUMN"));
scnrTbl = tbl.getScanner(scan);
现在,在nextTuple方法,你可以使用扫描仪获取的下一行:
Result rsltWaveform = scnrWaveformTbl.next();
提取您从结果想要的东西,并通过这些值在一些可序列化的对象中ct到螺栓。
加上一个用于不反序列化构造函数中的字节数组。 –