2012-04-27 103 views
4

所以我模拟了我的生产者消费者问题,我有下面的代码。我的问题是这样的:如果消费者在(真实)时间内保持不变,消费者会如何停下来。生产者 - 消费;消费者如何停止?

在下面的代码,我已经添加

    if (queue.peek()==null) 
         Thread.currentThread().interrupt(); 

在这个例子很好地工作。但在我的真实世界设计中,这不起作用(有时,生产者需要更长时间才能“放入”数据,因此在消费者中抛出的异常是不正确的。一般来说,我知道我可以将“毒性”数据如对象是某某,我可以在消费者进行检查。但这种毒药使得代码真的看不好。不知道是否有人有不同的方法。

public class ConsumerThread implements Runnable 
{ 
private BlockingQueue<Integer> queue; 
private String name; 
private boolean isFirstTimeConsuming = true; 
public ConsumerThread(String name, BlockingQueue<Integer> queue) 
{ 
    this.queue=queue; 
    this.name=name; 
} 

@Override 
public void run() 
{ 
    try 
    {  
     while (true) 
     { 
      if (isFirstTimeConsuming) 
      { 
       System.out.println(name+" is initilizing..."); 
       Thread.sleep(4000); 
       isFirstTimeConsuming=false; 
      } 
      try{ 

       if (queue.peek()==null) 
        Thread.currentThread().interrupt(); 

       Integer data = queue.take(); 

       System.out.println(name+" consumed ------->"+data); 
       Thread.sleep(70);  

      }catch(InterruptedException ie) 
      { 
       System.out.println("InterruptedException!!!!"); 
       break; 
      } 
     } 

     System.out.println("Comsumer " + this.name + " finished its job; terminating."); 

    }catch (InterruptedException e) 
    { 
     e.printStackTrace(); 
    } 
} 

}

回答

6

答:是根本不存在,只是因为peek返回null,生产商已经停止生产的保证。如果制片人简单地放慢了怎么办?现在,消费者退出,生产者继续生产。所以'偷看' - >'打破'的想法基本上失败了。

B:设置从消费者“做/运行”标志和生产阅读它也失败了,如果:

  1. 消费者检查标志,认为应该继续运行,然后做一个“主动“
  2. 在同时
  3. ,制作人将该标志设置为‘不运行’
  4. 现在消费者永远块等待鬼包

相反可以也会发生,并且一个数据包不被使用。

然后为了解决这个问题,你需要在'BlockingQueue'之上进行额外的互斥锁同步。

C: 我发现“罗塞塔代码”是决定什么是好的做法,在情况下,像这样的精源:

http://rosettacode.org/wiki/Synchronous_concurrency#Java

生产者和消费者必须在一个对象同意(或对象中的一个属性),代表输入的结束。然后生产者在最后一个数据包中设置该属性,并且消费者停止使用它。即你在你的问题中提到的“毒药”。

在上面的代码罗塞塔例如,该“对象”是一个简单的空String称为“EOF”:

final String EOF = new String(); 

// Producer 
while ((line = br.readLine()) != null) 
    queue.put(line); 
br.close(); 
// signal end of input 
queue.put(EOF); 

// Consumer 
while (true) 
    { 
    try 
     { 
     String line = queue.take(); 
     // Reference equality 
     if (line == EOF) 
      break; 
     System.out.println(line); 
     linesWrote++; 
     } 
    catch (InterruptedException ie) 
     { 
     } 
    } 
+0

是的,这被称为'毒'它工作正常,我只是想知道是否有更好的方法。 – adhg 2012-04-27 15:59:14

+0

@adhg - 所以我的答案是:这似乎是唯一可行的简单技术。对我而言,它似乎也是'干净的'(尽管什么是'干净'实际上是个人意见的问题)。 – ArjunShankar 2012-04-27 16:15:31

+0

我完全同意你的看法。我最终使用这个解决方案。感谢+1 – adhg 2012-04-28 01:55:10

3

不要使用中断而是在不再需要时打破环路:

if (queue.peek()==null) 
     break; 

或者你也可以使用一个变量来标记关闭操作挂起,然后打破循环和关闭循环后:

if (queue.peek()==null) 
     closing = true; 

//Do further operations ... 
if(closing) 
    break; 
+0

感谢,我的问题是更多的相对于“终止线程”和queue.peek( )== null将无法解决它(它不适用于真正的问题)。感谢您使用不使用中断的提示。 – adhg 2012-04-27 15:20:27

0

在现实世界中,大多数邮件带有某种形式的头部定义了消息类型/子类型或可能不同的对象。

您可以创建一个命令和控件对象或消息类型,告诉线程在获取消息时执行某些操作(如关闭,重新加载表,添加新侦听器等)。

这样,你可以说一个命令和控制线程只是发送消息到正常的消息流。您可以让CNC线程与大型系统中的操作终端通话等。

0

如果您的队列在您希望消费者终止之前可以清空,那么您需要一个标志来告诉线程何时停止。添加一个setter方法,这样生产者可以告诉消费者关闭。然后修改代码,以便代替:

if (queue.isEmpty()) 
    break; 

有你的代码检查

if (!run) 
{ 
    break; 
} 
else if (queue.isEmpty()) 
{ 
    Thread.sleep(200); 
    continue; 
}