2009-02-06 26 views
5

我有两个线程。生产者正在生产数据片段(字符串对象),在这里消费者处理这些字符串。问题在于我的应用程序只需要处理最新的数据对象。换句话说,如果生产者设法产生了两个字符串“s1”,然后是“s2”,那么我希望消费者只处理“s2”。 “s1”可以安全地丢弃。在线程之间传递工作项(Java)

当然,实现一个实现此行为的类没有问题,但我想使用java.util.concurrent中的标准机制(如果存在此类机制)。请注意,SynchronousQueue并不是一个好的解决方案:消费者在排队“s1”时会阻止,并且不会有机会产生“s2”。

(总之,我找了一个单元素集合与阻塞删除操作和非阻塞设置操作)

任何想法?

回答

3

我认为你最好的答案可能是使用ArrayBlockingQueue,它在生产商(你只有一个生产者,对吗?)在添加新元素之前删除任何现有的元素。

当然,在这个实现中有竞争条件:消费者可以在生产者移除它之前开始处理元素。但是无论您使用何种数据结构,这些竞争条件都会一直存在。

0

你可以使用大小一个数组为:

String[] oeq = new String[1]; 

样品来源:

public class Test { 
    private static final String[] oeq = new String[1]; 
    public static void main(String[] args) { 
     (new Producer()).start(); 
     (new Consumer()).start(); 
     (new Consumer()).start(); 
     (new Consumer()).start(); 
     (new Consumer()).start(); 
     (new Consumer()).start(); 
     (new Consumer()).start(); 
    } 

    private static class Producer extends Thread { 
     public void run() { 
      int i=0; 
      while(true) { 
       i++; 
       synchronized(oeq) { 
        oeq[0] = ""+i; 
        oeq.notifyAll(); 
       } 
      } 
     } 
    } 

    private static class Consumer extends Thread { 
     public void run() { 
      String workload = null; 
      while(true) { 
       synchronized(oeq) { 
        try { 
         oeq.wait(); 
        } catch(InterruptedException ie) { 
         ie.printStackTrace(); 
        } 
        if(oeq[0] != null) { 
         workload = oeq[0]; 
         oeq[0] = null; 
        } 
       } 
       if(workload != null) { 
        System.out.println(workload); 
       } 
      } 
     } 
    } 
} 
+0

这将是非常低效的。消费者线程在等待工作时不应该阻塞CPU。 – 2009-02-06 16:08:18

+0

实际上,这里的消费者会消耗所有的CPU,因为循环中没有wait()(这可能就是你所说的“block”)。这就是为什么,我认为,OP想要使用现有的JDK类 - 写一个破碎的本地并发对象很容易。 – kdgregory 2009-02-06 16:11:16

+0

这两个提示都是真的,我确定了第一个,谢谢。 – 2009-02-06 16:14:44

3

Exchanger class呢?这是线程之间交换对象的标准方式。专门用你的课程,可能是一个字符串列表。让消费者只使用第一个/最后一个。

0

那么,如果你只想要最近生成的字符串,那么你根本不需要一个队列 - 你需要的只是一个字符串引用:生产者设置它,消费者读取它。如果消费者花了很长时间阅读它,生产者重新设置它......那又如何?

设置和读取参考是原子的。唯一的问题是,如果您希望消费者以某种方式被通知有可用的字符串。但即使如此......如果消费者正在做一些需要一段时间的事情,那么你实际上并不需要来自并发库的任何花哨的东西。

请注意,顺便说一句,这个例子适用于任何数量的生产者和/或消费者线程。

import java.util.Random; 

public class Example { 
    public static void main(String[] av) { 
     new Example().go(); 
    } 

    Object mutex  = new Object(); 
    String theString = null; 

    void go() { 
     Runnable producer = new Runnable() { 
      public void run() { 
       Random rnd = new Random(); 
       try { 
        for (;;) { 
         Thread.sleep(rnd.nextInt(10000)); 
         synchronized (mutex) { 
          theString = "" + System.currentTimeMillis(); 
          System.out.println("Producer: Setting string to " + theString); 
          mutex.notify(); 
         } 
        } 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 

      } 
     }; 

     Runnable consumer = new Runnable() { 
      public void run() { 
       try { 
        String mostRecentValue = null; 
        Random rnd = new Random(); 
        for (;;) { 
         synchronized (mutex) { 
          // we use == because the producer 
          // creates new string 
          // instances 
          if (theString == mostRecentValue) { 
           System.out.println("Consumer: Waiting for new value"); 
           mutex.wait(); 
           System.out.println("Consumer: Producer woke me up!"); 
          } else { 
           System.out.println("Consumer: There's a new value waiting for me"); 
          } 
          mostRecentValue = theString; 
         } 
         System.out.println("Consumer: processing " + mostRecentValue); 
         Thread.sleep(rnd.nextInt(10000)); 
        } 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } 
     }; 


     new Thread(producer).start(); 
     new Thread(consumer).start(); 
    } 
}