1
喷口类中的我的失败方法只适用于第一个螺栓,从第二个螺栓到其不工作。
注:
Bolt1包含前三个素数(2,3,5)的列表。
Bolt2包含第二组素数(7,11,13)的列表。
在Bolt3中,只检查数字是否为素数。
从第一个螺栓,我能够从喷口类调用Fail(),但从第二个螺栓向前,我不能从喷口类调用Fail()。
拓扑类:
......
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new SpoutClass(), 1);
builder.setBolt("bolt1", new Bolt1(), 1).shuffleGrouping("spout");
builder.setBolt("bolt2", new Bolt2(), 1).shuffleGrouping("bolt1");
builder.setBolt("bolt3", new Bolt3(), 1).shuffleGrouping("bolt2");
脱粒机类:
SpoutClass implements IRichSpout{
private SpoutOutputCollector collector;
private TopologyContext context;
public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
public void nextTuple() {
try {
//messageQueue is blocking queue which contains data
String msg = messageQueue.take();
String ackId = msg;
this.collector.emit(new Values(msg), ackId);
}catch (Exception e) {
e.printStackTrace();
}
}
public void ack(Object msgId) {
System.out.println("Acknowledges that this tuple has been processed ........... " + msgId);
}
public void fail(Object msgId) {
System.out.println("FAILED To Process Message :-" + msgId);
}
}
Bolt1类:
public class Bolt1 extends BaseRichBolt {
private OutputCollector collector;
ArrayList<Integer> firstthreePrime = new ArrayList<Integer>();
firstthreePrime.add(2);
firstthreePrime.add(3);
firstthreePrime.add(5);
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String message = (String) tuple.getValueByField("msg");
System.out.println("Received " + message + " in Bolt1.");
Integer number = Integer.valueOf(message);
if (check this number contains bolt1 or not) {
//if number is contains
System.out.println(" Number is prime ............." + number + " and Throw from Bolt1");
this.collector.fail(tuple);
} else {
collector.emit(new Values(message));
collector.ack(tuple);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("msg"));
}
}
Bolt2类:
public class Bolt2 extends BaseRichBolt {
private OutputCollector collector;
ArrayList<Integer> secondthreePrime = new ArrayList<Integer>();
secondthreePrime.add(7);
secondthreePrime.add(11);
secondthreePrime.add(13);
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String message = (String) tuple.getValueByField("msg");
System.out.println("Received " + message + " in Bolt2.");
Integer number = Integer.valueOf(message);
if (check this number contains bolt2 or not) {
//if number is contains
System.out.println(" Number is prime ............." + number + " and Throw from Bolt2");
this.collector.fail(tuple);
} else {
collector.emit(new Values(message));
collector.ack(tuple);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("msg"));
}
}
Bolt3类:
public class Bolt3 extends BaseRichBolt {
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String message = (String) tuple.getValueByField("msg");
System.out.println("Received " + message + " in Bolt3.");
Integer number = Integer.valueOf(message);
if (check this number is prime or not) {
//if number is prime
System.out.println(" Number is prime ............." + number + " and Throw from Bolt3");
this.collector.fail(tuple);
} else {
collector.emit(new Values(message));
collector.ack(tuple);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
在喷口类中,我们不能用元组发射。它添加到螺栓类后工作: collector.emit(tuple,new Values(message)); – Ashish
对不起,这是一个错字意味着BaseRichBolt。如果它解决了你的问题,请接受我的答案。 –