2016-06-13 29 views
2

问题说明:识别来自整数的无限流的两个连续整数,其中这些整数由多个生产者生成,但单个消费者在同一个数再次重复时生成警报。同一执行器服务中的多个生产者和单个消费者

我有多个Producers和单Consumer。如果我将消费者提交给ExecutorService,则消费者未启动。但是,如果我在单独的线程中运行Consumer,则Consumer线程将按预期启动。

代码:

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.locks.ReentrantLock; 
import java.util.Iterator; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ExecutorService; 

public class FixedBlockingQueue { 
    final BlockingQueue<Integer> queue; 
    private int capacity; 

    public FixedBlockingQueue(int capacity){ 
     super(); 
     this.capacity = capacity; 
     queue = new ArrayBlockingQueue<Integer>(capacity); 
     System.out.println("Capactiy:"+this.capacity); 
    } 
    public void addElement(Integer element){ 
     try{ 
      queue.put(element); 
     }catch(Exception err){ 
      err.printStackTrace(); 
     } 
    } 
    public void startThreads(){ 
     ExecutorService es = Executors.newFixedThreadPool(1); 
     for (int i =0; i < 10; i++){ 
      es.submit(new MyProducer(this)); 
     } 
     //es.submit(new MyConsumer(queue)); 
     new Thread(new MyConsumer(this)).start(); 
    } 
    public BlockingQueue<Integer> getQueue(){ 
     return queue; 
    } 
    public static void main(String args[]){ 
     FixedBlockingQueue f = new FixedBlockingQueue(1); 
     f.startThreads(); 
    } 
} 

class MyProducer implements Runnable{ 

    private FixedBlockingQueue queue; 
    public MyProducer(FixedBlockingQueue queue){ 
     this.queue = queue;  
    } 
    public void run(){ 
     for (int i=1; i< 5; i++){ 
      queue.addElement(new Integer(i)); 
      System.out.println("adding:"+i); 
     } 
    } 
} 

class MyConsumer implements Runnable{ 
    private BlockingQueue<Integer> queue; 
    Integer firstNumber = 0; 
    private final ReentrantLock lock = new ReentrantLock(); 

    public MyConsumer(FixedBlockingQueue fQueue){ 
     this.queue = fQueue.getQueue(); 
    } 
    /* TODO : Compare two consecutive integers in queue are same or not*/ 
    public void run(){ 
     Integer secondNumber = 0; 
     while (true){ 
      try{ 
       lock.lock(); 
       System.out.println("queue size:"+queue.size()); 
       if (queue.size() > 0) { 
        secondNumber = queue.remove(); 
        System.out.println("Removed:"+secondNumber); 
        System.out.println("Numbers:Num1:Num2:"+firstNumber+":"+secondNumber); 
        if (firstNumber.intValue() == secondNumber.intValue()){ 
         System.out.println("Numbers matched:"+firstNumber); 
        } 
        firstNumber = secondNumber; 
       } 
       Thread.sleep(1000); 
      }catch(Exception err){ 
       err.printStackTrace(); 
      }finally{ 
       lock.unlock(); 
      } 
     } 
    } 
} 

输出:

Capactiy:1 
adding:1 

如果我从

es.submit(new MyConsumer(queue)); 
//new Thread(new MyConsumer(queue)).start(); 

更改代码以

//es.submit(new MyConsumer(queue)); 
new Thread(new MyConsumer(queue)).start(); 

消费者线程正常启动。

输出:

Capactiy:1 
adding:1 
queue size:1 
Removed:1 
Numbers:Num1:Num2:0:1 
adding:2 
queue size:1 
Removed:2 
Numbers:Num1:Num2:1:2 
adding:3 
queue size:1 
Removed:3 
Numbers:Num1:Num2:2:3 
adding:4 
queue size:1 
Removed:4 
Numbers:Num1:Num2:3:4 
adding:1 
queue size:1 
Removed:1 
Numbers:Num1:Num2:4:1 
adding:2 
queue size:1 
Removed:2 
adding:3 
Numbers:Num1:Num2:1:2 
queue size:1 
Removed:3 
Numbers:Num1:Num2:2:3 

在第一种方法:

我知道这个编号不被消费者食用,但它不应该阻止提交的其他Producer任务理想。

如果是这种情况,那么使用ExecutorService作为简单Threads的替代品不可能达到100%?

回答

2

您使用单个线程创建线程池,并使用固定容量1创建一个BlockingQueue。然后向池中提交三个任务:前两个每个尝试将每个值排入五个值,然后将任何值可用。

因为你的固定大小的池只有一个线程,您提交给它的任务将按顺序运行,而不是并行。您首先提交生产者任务,因此它首先运行。但是一旦它排入第一个数字,它就无法取得进一步的进展,因为队列已满。并且队列仍会永久保留,因为生产者任务必须在混合线程可用于其他任务(如消费者)之前完成。

我不确定你为什么使用线程池,因为直接进行线程管理并不困难,特别是因为你的任务已经实现Runnable。如果您使用游泳池,但是,请确保它有足够的线程在它能够同时容纳所有的任务。

还要注意,BlockingQueue实现应该是线程安全的,标准库提供的所有实际上都是如此。因此,您不需要在addElement()中执行自己的锁定。此外,如果您确实需要执行自己的锁定,那么您不仅需要在排队元素时执行锁定,还需要执行锁定时将其排除。

此外,您有生产者任务通过FixedBlockingQueue实例间接地将元素添加到底层队列是非常奇怪的,但您将消费者任务直接转到底层队列。

而你的FixedBlockingQueue类的名称选择不好,因为它暗示该类实现BlockingQueue,但该类实际上并不这样做。

+0

我必须按到达顺序插入元素,因此选择与ReentrantLock单线程。 100多个线程产生整数,我必须按照到达顺序检查两个连续的整数。 –

+0

@Ravindrababu,''BlockingQueue'是线程安全的“的一部分,你会认为额外的锁定会贡献任何你不会从'BlockingQueue'中自动获得的东西? –

+0

我稍后修复了消费者部分,它接受与Producer相同的类。 –

相关问题