2011-12-21 65 views
6

在线程间通信时遇到问题,并通过在整个地方使用“虚拟消息”来“解决”它。这是一个坏主意吗?有什么解决方法?生产者 - 消费者线程间通信

示例我有问题。

主线程启动一个处理线程并将记录插入到数据库中。 主线程读取一个可能很大的文件,并将一个记录(对象)放入另一个记录(对象)中。处理线程从队列中读取并确实工作。

如何告诉“处理线程”停止? 队列可以是空的,但工作没有完成,并且主线程现在不会在处理线程完成工作并且不能中断它时。

所以处理线程做

while (queue.size() > 0 || !Thread.currentThread().isInterrupted()) { 
    MyObject object= queue.poll(100, TimeUnit.MILLISECONDS); 
    if (object != null) { 
     String data = object.getData(); 
     if (data.equals("END")) { 
      break; 
     } 
    // do work 
    } 
} 
// clean-up 
synchronized queue) { 
    queue.notifyAll(); 
} 
return; 

和主线程

// ...start processing thread... 
while(reader.hasNext(){ 
    // ...read whole file and put data in queue... 
} 
MyObject dummy = new MyObject(); 
dummy.setData("END"); 
queue.put(dummy); 
//Note: empty queue here means work is done 
while (queue.size() > 0) { 
    synchronized (queue) { 
     queue.wait(500); // over-cautios locking prevention i guess 
    } 
} 

注意,插入必须在同一个事务和事务不能由主线程来处理 。

这样做会更好吗? (我正在学习,不想以“错误的方式开始”)

回答

4

这些虚拟消息是有效的。它被称为“毒药”。生产者发送给消费者的东西使其停止。

其他可能性是在主线程的某处调用Thread.interrupt(),并在工作线程中相应地捕获并处理InterruptedException。

+0

好的。如果传递的对象的类型没有可以轻易用于此的String字段,我将如何使用它?如果任何文本可能是有效的数据,因此可能等于毒药?使用GUID? – 2011-12-22 14:01:28

+1

理想情况下,您可以更改MyObject类并在其中添加isPoison()方法。这种方法如何确定对象是否是毒药取决于你,可能是一个简单的事情,比如比较消息是否为空,或者消息是否明确创建为毒药。一个想法是将MyObject重构为两个实现的接口,一个始终在isPoison()方法中返回false的MessageObject和一个始终返回true的PoisonMessage。但是,这很大程度上取决于你在做什么,根据你的真实情况可能会有更好的解决方案。 – 2011-12-22 14:08:54

2

通过在各处使用“虚拟消息”“解决”它。这是一个 坏主意吗?有什么解决方法?

这不是一个坏主意,它被称为“毒丸”,是停止线程服务的合理方法。

但它只适用于知道生产者和消费者的数量。

在你发布的代码中,有两个线程,一个是“主线程”,它产生数据,另一个是“处理线程”,它消耗数据,“毒丸”在这种情况下工作良好。但想象一下,如果您还有其他生产者,消费者如何知道何时停止(只有当所有生产者发送“毒丸”时),您都需要准确了解所有生产者的数量,并检查消费者中的“毒丸”数量,如果它等于生产者的数量,这意味着所有的生产者停止工作,那么消费者就会停止。

在“主线程”中,您需要找到InterruptedException,否则“主线程”可能无法设置“毒丸”。你可以不喜欢它的下方,

... 
try { 
    // do normal processing 
} catch (InterruptedException e) { /* fall through */ } 
finally { 
    MyObject dummy = new MyObject(); 
    dummy.setData("END"); 
    ... 
} 
... 

此外,您还可以尝试使用ExecutorService为您解决所有的问题。

(它的工作原理,当你只是需要做一些工作,然后停止,当所有人都完成)

void doWorks(Set<String> works, long timeout, TimeUnit unit) 
    throws InterruptedException { 
    ExecutorService exec = Executors.newCachedThreadPool(); 
    try { 
     for (final String work : works) 
      exec.execute(new Runnable() { 
        public void run() { 
         ... 
        } 
       }); 
    } finally { 
     exec.shutdown(); 
     exec.awaitTermination(timeout, unit); 
    } 
} 

我学习,不希望启动“做了错误的方式”

您可能需要阅读Book:Java Concurrency in Practice。相信我,这是最好的。

+0

我知道执行者服务,但读取数据和处理(验证)和插入不能在同一个任务或方法中,因为它们都应该在同一个数据库事务中运行。如果你明白我的意思。 – 2011-12-22 14:07:16

+0

好吧,我明白了,'ExecutorService'只是我的一般建议,根据您的要求使用它或不使用。但要注意捕捉所有例外情况,如果某些例外未被捕获,您可能会失去设置“毒丸”的机会。 – 2011-12-22 14:24:22

0

你可以做什么(我在最近的一个项目中做的)是包装队列,然后添加一个'isOpen()'方法。

class ClosableQ<T> { 

    boolean isOpen = true; 

    private LinkedBlockingQueue<T> lbq = new LinkedBlockingQueue<T>(); 

    public void put(T someObject) { 
     if (isOpen) { 
     lbq.put(someObject); 
     } 
    } 

    public T get() { 
     if (isOpen) { 
     return lbq.get(0); 
     } 
    } 

    public boolean isOpen() { 
     return isOpen; 
    } 

    public void open() { 
     isOpen = true; 
    } 

    public void close() { 
     isOpen = false; 
    } 
} 

所以你写线程变成类似:

while (reader.hasNext()) { 
    // read the file and put it into the queue 
    dataQ.put(someObject); 
} 
// now we're done 
dataQ.close(); 

和读取器线程:

while (dataQ.isOpen) { 
    someObject = dataQ.get(); 
} 

你当然可以扩展列表,而不是而是为用户提供了一个水平访问你可能不想要的。你需要为这个代码添加一些并发的东西,比如AtomicBoolean。

+0

我将Queue传递给一个方法。读你的想法让我有以下几点。该方法也可以接受AtomicBoolean keepProcessing。然后它可以运行,只要queue.size> 0 || keepProcessing.get() – 2011-12-22 15:07:31