2013-10-22 64 views
0

我创建了一个简单的程序来读取文件并生成一个文件,它的工作完美。我担心如何使它实时拓扑。我想如果我修改源文件意味着添加一个新的记录它应该进来我的目标文件我怎么会做它无需重新部署在cluster.What我的拓扑结构一样,我需要配置来实现这一behavior.Below是本地提交拓扑代码: -如何让我的风暴拓扑实时工作?

Config conf= new Config(); 
     conf.setDebug(false); 
     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1); 
     TopologyBuilder builder = new TopologyBuilder(); 



      builder.setSpout("file-reader",new FileReaderSpout(args[0])); 
      builder.setBolt("file-writer",new WriteToFileBolt(args[0])).shuffleGrouping("file-reader"); 
      LocalCluster cluster= new LocalCluster(); 
       cluster.submitTopology("File-To-File",conf,builder.createTopology()); 
       Thread.sleep(10000); 
       cluster.shutdown(); 

回答

1

什么你也许可以做的是使用一个消息队列与你的风暴集群整合。 Kafka可能是一个非常好的候选人。它基本上是一个发布订阅的消息系统。有生产者负责将消息添加到另一端的队列和消费者以检索相同的消息。

因此,如果您在生产者发送/发布消息到队列中时将卡夫卡与风暴整合在一起,则它将适用于您的风暴拓扑结构。有一种叫做KafkaSpout的东西,它是一个能够从卡夫卡队列中读取的普通喷口实现。

所以它是这样的拓扑有KafaSpout(订阅特定的主题)开始,当它接收到任何东西,然后扫描链的输出到您相应的螺栓尽快发出。

您还可以查找Kestrel作为卡夫卡的替代品。你应该根据什么解决你的目的来选择。

+0

thanx您reply..if我的源是一个数据库表和目标文件,那么我如何能实现实时processing.can我实现,而无需使用任何其他第三方的jar(即卡夫卡) – user2435082

+0

根据我对实时做任何事情的理解,您需要确保持续处理数据(数据流)来源。这就是队列的概念。你可以查询一个数据库并检索一组信息(结果集/行)并处理它们(比如批处理),但是如果有人向db中添加新记录,你会怎么做?那么您需要某种机制来检测并使其可用于处理。你可以请分享你到底想要达到什么 – user2720864

+0

我完全想要你说的相同的东西,如果有人向db中添加一条新的记录,那么我需要什么机制来检测它,并使它可用于processing.i只是想知道对于这种检测,Storm提供了什么(类名等) – user2435082

0

在其他答案中阅读了您的意见后,您可能需要在之前实施排队系统以更新数据库中的行。

我个人使用RabbitMQ风暴,我知道卡夫卡也是一种选择。具体来说,尝试添加一个队列,使得拓扑的一部分(也可以在Storm之外)读取队列并更新数据库,而另一部分实现您想要的处理逻辑。

实施触发器将事件发送到Storm拓扑可能是一个坏主意,除非您没有其他选择。

- 迈克尔

+0

谢谢迈克尔..是的,我需要实现队列..你可以请建议风暴提供的所有东西实现排队我不想要使用任何其他第三方库。 – user2435082

+0

据我所知,Storm没有提供任何排队机制。 – mvogiatzis