2015-01-14 144 views
1

从一个拓扑结构发射元组到另一个拓扑结构有可能或很好吗?风暴 - 拓扑结构到拓扑结构

比方说,在一个拓扑结构中,一个特定的螺栓正在将元组存储到db中。在另一个拓扑结构中,我不想复制或创建用于存储元组的同一个螺栓。那么从第二个拓扑可以发射到第一个拓扑螺栓?

-Hariprasad

回答

0

这是目前不支持,你不能传递从一个拓扑到另一个元组。 根据您的使用情况,您为什么不使用其他螺栓(在同一拓扑结构中)订阅db螺栓而不是运行单独的拓扑结构

1

虽然不能将元组从一个拓扑直接传递到另一个拓扑,但可以使用排队系统如Apache Kafka来完成你描述的内容。 Storm在最新发布的版本中包含Kafka喷嘴。

1

该设置需要两个风暴拓扑(A和B)和一个Kafka主题。让我们把它叫做“转移”

在要发送数据到B的拓扑结构的拓扑,使用卡夫卡制片人:

[卡夫卡初始化代码是直接从文档采取:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example,显然需要要为您定制卡夫卡安装]

public void Execute(Tuple input){ 
... 
    Properties props = new Properties(); 
    props.put("metadata.broker.list", "broker1:9092,broker2:9092 "); 
    props.put("serializer.class", "kafka.serializer.StringEncoder"); 
    props.put("partitioner.class", "example.producer.SimplePartitioner"); 
    props.put("request.required.acks", "1"); 

    ProducerConfig config = new ProducerConfig(props); 

    Producer<String, String> producer = new Producer<String, String (config); 
    String msg = ... 
    KeyedMessage<String, String> data = new KeyedMessage<String, String> 
     ("transfers", ip, msg); 
    producer.send(data); 
    producer.close(); 

在拓扑B,创建一个卡夫卡的喷嘴。当你初始化拓扑:

BrokerHosts hosts = new ZkHosts(zkConnString); 
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, 
    UUID.randomUUID().toString()); 
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); 
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 

// Now it's just like any other spout 
topologyBuilder.setSpout(kafkaSpout); 

这需要运行kafka,当然(请查看https://kafka.apache.org/08/quickstart.html)。

[编辑:再次读你的问题:听起来你有一个可重复使用的组件(保存元组),你想从两个不同的拓扑调用,并且你试图从另一个拓扑中调用一个。另一种方法是将此任务卸载到第三个拓扑,专门用于处理保存元组并仅创建需要在拓扑中保留的项的kafka消息。这样,保存元组的所有事件都将以同样的方式处理。]