问题说明:识别来自整数的无限流的两个连续整数,其中这些整数由多个生产者生成,但单个消费者在同一个数再次重复时生成警报。同一执行器服务中的多个生产者和单个消费者
我有多个Producers
和单Consumer
。如果我将消费者提交给ExecutorService
,则消费者未启动。但是,如果我在单独的线程中运行Consumer,则Consumer线程将按预期启动。
代码:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class FixedBlockingQueue {
final BlockingQueue<Integer> queue;
private int capacity;
public FixedBlockingQueue(int capacity){
super();
this.capacity = capacity;
queue = new ArrayBlockingQueue<Integer>(capacity);
System.out.println("Capactiy:"+this.capacity);
}
public void addElement(Integer element){
try{
queue.put(element);
}catch(Exception err){
err.printStackTrace();
}
}
public void startThreads(){
ExecutorService es = Executors.newFixedThreadPool(1);
for (int i =0; i < 10; i++){
es.submit(new MyProducer(this));
}
//es.submit(new MyConsumer(queue));
new Thread(new MyConsumer(this)).start();
}
public BlockingQueue<Integer> getQueue(){
return queue;
}
public static void main(String args[]){
FixedBlockingQueue f = new FixedBlockingQueue(1);
f.startThreads();
}
}
class MyProducer implements Runnable{
private FixedBlockingQueue queue;
public MyProducer(FixedBlockingQueue queue){
this.queue = queue;
}
public void run(){
for (int i=1; i< 5; i++){
queue.addElement(new Integer(i));
System.out.println("adding:"+i);
}
}
}
class MyConsumer implements Runnable{
private BlockingQueue<Integer> queue;
Integer firstNumber = 0;
private final ReentrantLock lock = new ReentrantLock();
public MyConsumer(FixedBlockingQueue fQueue){
this.queue = fQueue.getQueue();
}
/* TODO : Compare two consecutive integers in queue are same or not*/
public void run(){
Integer secondNumber = 0;
while (true){
try{
lock.lock();
System.out.println("queue size:"+queue.size());
if (queue.size() > 0) {
secondNumber = queue.remove();
System.out.println("Removed:"+secondNumber);
System.out.println("Numbers:Num1:Num2:"+firstNumber+":"+secondNumber);
if (firstNumber.intValue() == secondNumber.intValue()){
System.out.println("Numbers matched:"+firstNumber);
}
firstNumber = secondNumber;
}
Thread.sleep(1000);
}catch(Exception err){
err.printStackTrace();
}finally{
lock.unlock();
}
}
}
}
输出:
Capactiy:1
adding:1
如果我从
es.submit(new MyConsumer(queue));
//new Thread(new MyConsumer(queue)).start();
更改代码以
//es.submit(new MyConsumer(queue));
new Thread(new MyConsumer(queue)).start();
消费者线程正常启动。
输出:
Capactiy:1
adding:1
queue size:1
Removed:1
Numbers:Num1:Num2:0:1
adding:2
queue size:1
Removed:2
Numbers:Num1:Num2:1:2
adding:3
queue size:1
Removed:3
Numbers:Num1:Num2:2:3
adding:4
queue size:1
Removed:4
Numbers:Num1:Num2:3:4
adding:1
queue size:1
Removed:1
Numbers:Num1:Num2:4:1
adding:2
queue size:1
Removed:2
adding:3
Numbers:Num1:Num2:1:2
queue size:1
Removed:3
Numbers:Num1:Num2:2:3
在第一种方法:
我知道这个编号不被消费者食用,但它不应该阻止提交的其他Producer
任务理想。
如果是这种情况,那么使用ExecutorService
作为简单Threads
的替代品不可能达到100%?
我必须按到达顺序插入元素,因此选择与ReentrantLock单线程。 100多个线程产生整数,我必须按照到达顺序检查两个连续的整数。 –
@Ravindrababu,''BlockingQueue'是线程安全的“的一部分,你会认为额外的锁定会贡献任何你不会从'BlockingQueue'中自动获得的东西? –
我稍后修复了消费者部分,它接受与Producer相同的类。 –