2015-12-02 180 views
0

我想创建一个带有2个不同主题的2个kafkaSpouts的拓扑,并将这2个喷口合并为一个基于sourceComponent的流。2喷出一个风暴螺栓

public class Topology { 
private static final String topic1 = Real2 
private static final String topic2 = Real1 


public static void main(String[] args) throws AlreadyAliveException, 
     InvalidTopologyException, IOException { 

    BasicConfigurator.configure(); 

    String zookeeper_root = ""; 
    SpoutConfig kafkaConfig1 = new SpoutConfig(localhost:2181, topic1, 
      zookeeper_root, "Real1KafkaSpout"); 

    SpoutConfig kafkaConfig2 = new SpoutConfig(localhost:2181, topic2, 
      zookeeper_root, "Real2KafkaSpout"); 


    kafkaConfigRealTime.scheme = new SchemeAsMultiScheme(new StringScheme()); 
    kafkaConfigRealTime.forceFromStart = true; 

    kafkaConfigHistorical.scheme = new SchemeAsMultiScheme(
      new StringScheme()); 
    kafkaConfigHistorical.forceFromStart = true; 



    TopologyBuilder builder = new TopologyBuilder(); 

    builder.setSpout("Real1", new KafkaSpout(
      kafkaConfig1), 2); 
    builder.setSpout("Real2", new KafkaSpout(
      kafkaConfig2), 2); 

    builder.setBolt("StreamMerging", new StreamMergingBolt(), 2) 
      .setNumTasks(2).shuffleGrouping("Real1") 
      .shuffleGrouping("Real2"); 



    Config config = new Config(); 
    config.put("hdfs.config", yamlConf); 

    config.setDebug(false); 
    config.setMaxSpoutPending(10000); 

    if (args.length == 0) { 
     LocalCluster cluster = new LocalCluster(); 

     cluster.submitTopology("Topology", config, 
       builder.createTopology()); 
     cluster.killTopology("Topology"); 
     cluster.shutdown(); 
    } else { 

     StormSubmitter.submitTopology(args[0], config, 
       builder.createTopology()); 
    } 

    try { 
     Thread.sleep(6000); 
    } catch (InterruptedException ex) { 
     ex.printStackTrace(); 
    } 

} 

}

在博尔特Execute方法我做

public void execute(Tuple input, BasicOutputCollector collector) { 
    String id = input.getSourceComponent(); 
    System.out.println("Stream Id in StreamMergingBolt is " + "---->" + id); 

    } 

所以我想保存为独立的文件的元组从每个流 这就是我想为Real1KafkaSpout元组存储未来file1和Real2KafkaSpout到file2。我怎样才能做到这一点,我在这一点上

回答

0

袭击可以按如下步骤去做:

public void execute(Tuple input, BasicOutputCollector collector) { 
    String id = input.getSourceComponent(); 
    if(id.equals("Real1")) { 
     // store into file1 
    } else { 
     assert (id.equals("Real2"); 
     // store in file2 
    } 
} 

你会在Bolt.open(...)打开这两个文件。

但是,我想知道为什么你想要使用单一的拓扑来做到这一点。如果您只将Kafka源文件中的数据写入文件1,将Kafka中的数据写入文件2中,则可以简单地创建两个拓扑...(当然,您只编写一次,并且对两种情况进行不同的配置) 。

+0

你是正确的..但我的拓扑的元组从2个不同的源来这就需要在螺栓被处理......但处理前我需要分别存储这些元组后处理是相同的从2来源的元组..数据将是不同的,但数据结构是相同的。 – Anji

0

有线结果,当我通过下面的代码做这个

public void execute(Tuple input, BasicOutputCollector collector) { 
String id = input.getSourceComponent(); 
if(id.equals("Real1")) { 
    String json = input.getString(0); 
    //writetoHDFS1(json) 
    } else { 
     assert (id.equals("Real2"); 
    String json = input.getString(0); 
//writetoHDFS2(json) 
    } 
} 
+0

但上面的代码适用于大多数场景 – Anji

+0

“有线结果”是什么意思?还是它是一个错字,你的意思是“奇怪的结果”? “以上代码在大多数情况下都适用”是什么意思?什么时候不起作用?为什么不? –

+0

我测试它工作 – Anji

相关问题