2014-07-07 240 views
2

我希望能够从分布式(非本地)Storm拓扑中将新条目写入HBase。有几个GitHub项目提供HBase Mapperspre-made Storm bolts来将元组写入HBase。这些项目提供了在LocalCluster上执行其样本的说明。从风暴螺栓插入HBase

我遇到了这两个项目并直接从bolt中访问HBase API的问题,它们都需要HBase-site.xml文件包含在类路径中。使用直接API方法,也可能使用GitHub,当您执行HBaseConfiguration.create();时,它将尝试从类路径中的条目中查找所需的信息。

如何修改风暴螺栓的类路径以包含Hbase配置文件?

更新:使用danehammer的答案,这是我得到了它的工作

将以下文件复制到你的〜/ .storm目录:

  • HBase的,共0.98.0.2.1.2 .0-402-hadoop2.jar
  • hbase-site.xml
  • storm.yaml:注意:如果不将storm.yaml复制到该目录中,那么storm jar命令将不会在类路径中使用该目录(见storm.py python script看到自己这个逻辑 - 将是很好,如果这被证明)

接下来,在您的拓扑结构类的主要方法得到了HBase的配置和序列化:

final Configuration hbaseConfig = HBaseConfiguration.create(); 
final DataOutputBuffer databufHbaseConfig = new DataOutputBuffer(); 
hbaseConfig.write(databufHbaseConfig); 
final byte[] baHbaseConfigSerialized = databufHbaseConfig.getData(); 

传递字节数组你的喷口类通过构造函数。我发现如果喷口有一个配置字段,当运行拓扑时你将得到一个不能序列化的异常)

在喷口的open方法中,反序列化的配置和访问HBase的表:

Configuration hBaseConfiguration = new Configuration(); 
ByteArrayInputStream bas = new ByteArrayInputStream(baHbaseConfigSerialized); 
hBaseConfiguration.readFields(new DataInputStream(bas)); 
HTable tbl = new HTable(hBaseConfiguration, HBASE_TABLE_NAME); 

Scan scan = new Scan(); 
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("YOUR_COLUMN")); 

scnrTbl = tbl.getScanner(scan); 

现在,在nextTuple方法,你可以使用扫描仪获取的下一行:

Result rsltWaveform = scnrWaveformTbl.next(); 

提取您从结果想要的东西,并通过这些值在一些可序列化的对象中ct到螺栓。

+0

加上一个用于不反序列化构造函数中的字节数组。 –

回答

2

当您使用“storm jar”命令部署拓扑时,~/.storm文件夹将位于类路径中(请参阅jar命令下的this link)。如果您将hbase-site.xml文件(或相关的* -site.xml文件)放在该文件夹中,则在“storm jar”期间HBaseConfiguration.create()会找到该文件并正确返回org.apache.hadoop.configuration.Configuration。这需要在拓扑结构中存储和序列化,以便在集群周围分发该配置。

+0

你是说你会在拓扑类中创建HBaseConfiguration,然后序列化它并以某种方式将它传递给螺栓(也许在setBolt(...)。addConfiguration方法中? –