我想创建一个带有2个不同主题的2个kafkaSpouts的拓扑,并将这2个喷口合并为一个基于sourceComponent的流。2喷出一个风暴螺栓
public class Topology {
private static final String topic1 = Real2
private static final String topic2 = Real1
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException, IOException {
BasicConfigurator.configure();
String zookeeper_root = "";
SpoutConfig kafkaConfig1 = new SpoutConfig(localhost:2181, topic1,
zookeeper_root, "Real1KafkaSpout");
SpoutConfig kafkaConfig2 = new SpoutConfig(localhost:2181, topic2,
zookeeper_root, "Real2KafkaSpout");
kafkaConfigRealTime.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfigRealTime.forceFromStart = true;
kafkaConfigHistorical.scheme = new SchemeAsMultiScheme(
new StringScheme());
kafkaConfigHistorical.forceFromStart = true;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("Real1", new KafkaSpout(
kafkaConfig1), 2);
builder.setSpout("Real2", new KafkaSpout(
kafkaConfig2), 2);
builder.setBolt("StreamMerging", new StreamMergingBolt(), 2)
.setNumTasks(2).shuffleGrouping("Real1")
.shuffleGrouping("Real2");
Config config = new Config();
config.put("hdfs.config", yamlConf);
config.setDebug(false);
config.setMaxSpoutPending(10000);
if (args.length == 0) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Topology", config,
builder.createTopology());
cluster.killTopology("Topology");
cluster.shutdown();
} else {
StormSubmitter.submitTopology(args[0], config,
builder.createTopology());
}
try {
Thread.sleep(6000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
在博尔特Execute方法我做
public void execute(Tuple input, BasicOutputCollector collector) {
String id = input.getSourceComponent();
System.out.println("Stream Id in StreamMergingBolt is " + "---->" + id);
}
所以我想保存为独立的文件的元组从每个流 这就是我想为Real1KafkaSpout元组存储未来file1和Real2KafkaSpout到file2。我怎样才能做到这一点,我在这一点上
你是正确的..但我的拓扑的元组从2个不同的源来这就需要在螺栓被处理......但处理前我需要分别存储这些元组后处理是相同的从2来源的元组..数据将是不同的,但数据结构是相同的。 – Anji