2009-05-21 61 views
5

这个问题与我前面的问题一个有关..阻塞队列 - 需要更多信息

Previous Post

在有阻塞性质被提及作为一个优势。

我试图开发一些简单的代码来演示阻塞性质,但我卡住了。我只是尝试制作大小为4的BlockingQueue,并尝试添加5个元素,并以java.lang.IllegalStateException结尾。有人可以给我看一个阻止BlockingQueue性质的代码示例吗?


public static void main(String[] args) { 
    BlockingQueue<String> bq = new LinkedBlockingQueue<String>(4); 

    try { 
     bq.offer("A"); 
     bq.offer("B"); 
     bq.offer("C"); 
     bq.offer("D"); 
     bq.offer("E"); 

     System.out.println("1 = " + bq.take()); 
     System.out.println("2 = " + bq.take()); 
     System.out.println("3 = " + bq.take()); 
     System.out.println("4 = " + bq.take()); 
     System.out.println("5 = " + bq.take()); 
     System.out.println("6 = " + bq.take()); 
    } catch (Exception e) { 
     // TODO: handle exception 
     e.printStackTrace(); 
    } 
} 

我用这个代码段。在这种情况下,我试图将5个元素放入大小为4的队列中。在这种情况下,应将4个元素(A,B,C,D)添加到队列中。然后我打印时打电话给take()方法。当我致电System.out.println("1 = " + bq.take());时,是否不应该将“E”自动插入队列中?因为它获得一个空闲插槽?

回答

11

是你与addoffer,或put加入?我假设你使用的是add,因为它是唯一可以抛出IllegalStateException;但是如果您阅读table,则会看到如果要阻止语义,则应该使用put(和take删除)。

编辑:你的例子有几个问题。

首先我会回答这个问题:“当我第一次给take()打电话时,E为什么不插入?”答案是,当您拨打take()时,您已尝试,并且未能插入E.一旦释放空间,就没有插入内容。

现在如果您将offer()更改为put(),put("E")将永远不会返回。为什么?因为它在等待其他线程从队列中删除元素。请记住,BlockingQueues是专为多线程访问。如果您有单线程应用程序,则阻塞是无用的(实际上比无用的更糟糕)。

这里有一个改进的例子:

public static void main(String[] args) { 
    final BlockingQueue<String> bq = new LinkedBlockingQueue<String>(4); 

    Runnable producer = new Runnable() { 
     public void run() { 
      try { 
       bq.put("A"); 
       bq.put("B"); 
       bq.put("C"); 
       bq.put("D"); 
       bq.put("E"); 
      } catch (InterruptedException ex) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
    }; 
    Runnable consumer = new Runnable() { 
     public void run() { 
      try { 
       System.out.println("1 = " + bq.take()); 
       System.out.println("2 = " + bq.take()); 
       System.out.println("3 = " + bq.take()); 
       System.out.println("4 = " + bq.take()); 
       System.out.println("5 = " + bq.take()); 
       System.out.println("6 = " + bq.take()); 
      } catch (InterruptedException ex) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
    }; 
    new Thread(producer).start(); 
    new Thread(consumer).start(); 
} 

现在put("E")通话将真正成功,因为它现在可以等待,直到消费者线程从队列中删除“A”。最后的take()仍然会无限地阻止,因为没有第六个元素可以删除。

2

mmyers打我吧:P(+1)
应该是你需要的,祝你好运!

注意: put()在您的示例中将失败,因为put()会阻塞直到空间可用。由于空间永远不可用,程序永远不会继续执行。

====旧答案======

一个BlockingQueue的是一个接口,你就必须使用implementating类之一。

“堵自然”简单地说,你可以要求你的队列中的东西,如果是空的,它的线程是在意志(等待),直到东西被添加到队列,然后继续处理。

ArrayBlockingQueue
DelayQueue
LinkedBlockingQueue
PriorityBlockingQueue
SynchronousQueue

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

//your main collection 
LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<Integer>(); 

//Add your values 
lbq.put(100); 
lbq.put(200); 

//take() will actually remove the first value from the collection, 
//or block if no value exists yet. 
//you will either have to interrupt the blocking, 
//or insert something into the queue for the program execution to continue 

int currVal = 0; 
try { 
    currVal = lbq.take(); 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} 

+0

谢谢..我用 “的LinkedBlockingQueue” – 2009-05-21 13:59:47

+0

真棒,是你所需要的一切?我现在正在研究一个例子,如果你仍然希望我发布它 – 2009-05-21 14:01:07

+0

更好地发布它。只是一个简短的例子:) – 2009-05-21 14:02:02

1

具体回答你的问题:提供的是一种非阻塞提供调用,因此像您发布的一个单线程方法,调用提供(“E”),直接返回,而无需修改队列满假的。如果你使用阻塞放('E')呼叫,它会睡觉,直到空间可用。永远在你的简单例子。您需要从队列中读取一个单独的线程来为投入完成创建空间。

0

嗨更多信息此类

/** 
* Inserts the specified element into this queue if it is possible to do 
* so immediately without violating capacity restrictions, returning 
* {@code true} upon success and throwing an 
* {@code IllegalStateException} if no space is currently available. 
* When using a capacity-restricted queue, it is generally preferable to 
* use {@link #offer(Object) offer}. 
* 
* @param e the element to add 
* @return {@code true} (as specified by {@link Collection#add}) 
* @throws IllegalStateException if the element cannot be added at this 
*   time due to capacity restrictions 
* @throws ClassCastException if the class of the specified element 
*   prevents it from being added to this queue 
* @throws NullPointerException if the specified element is null 
* @throws IllegalArgumentException if some property of the specified 
*   element prevents it from being added to this queue 
*/ 
boolean add(E e); 

/** 
* Inserts the specified element into this queue if it is possible to do 
* so immediately without violating capacity restrictions, returning 
* {@code true} upon success and {@code false} if no space is currently 
* available. When using a capacity-restricted queue, this method is 
* generally preferable to {@link #add}, which can fail to insert an 
* element only by throwing an exception. 
* 
* @param e the element to add 
* @return {@code true} if the element was added to this queue, else 
*   {@code false} 
* @throws ClassCastException if the class of the specified element 
*   prevents it from being added to this queue 
* @throws NullPointerException if the specified element is null 
* @throws IllegalArgumentException if some property of the specified 
*   element prevents it from being added to this queue 
*/ 
boolean offer(E e); 

/** 
* Inserts the specified element into this queue, waiting if necessary 
* for space to become available. 
* 
* @param e the element to add 
* @throws InterruptedException if interrupted while waiting 
* @throws ClassCastException if the class of the specified element 
*   prevents it from being added to this queue 
* @throws NullPointerException if the specified element is null 
* @throws IllegalArgumentException if some property of the specified 
*   element prevents it from being added to this queue 
*/ 
void put(E e) throws InterruptedException; 

/** 
* Inserts the specified element into this queue, waiting up to the 
* specified wait time if necessary for space to become available. 
* 
* @param e the element to add 
* @param timeout how long to wait before giving up, in units of 
*  {@code unit} 
* @param unit a {@code TimeUnit} determining how to interpret the 
*  {@code timeout} parameter 
* @return {@code true} if successful, or {@code false} if 
*   the specified waiting time elapses before space is available 
* @throws InterruptedException if interrupted while waiting 
* @throws ClassCastException if the class of the specified element 
*   prevents it from being added to this queue 
* @throws NullPointerException if the specified element is null 
* @throws IllegalArgumentException if some property of the specified 
*   element prevents it from being added to this queue 
*/ 
boolean offer(E e, long timeout, TimeUnit unit) 
    throws InterruptedException; 

/** 
* Retrieves and removes the head of this queue, waiting if necessary 
* until an element becomes available. 
* 
* @return the head of this queue 
* @throws InterruptedException if interrupted while waiting 
*/ 
E take() throws InterruptedException; 

/** 
* Retrieves and removes the head of this queue, waiting up to the 
* specified wait time if necessary for an element to become available. 
* 
* @param timeout how long to wait before giving up, in units of 
*  {@code unit} 
* @param unit a {@code TimeUnit} determining how to interpret the 
*  {@code timeout} parameter 
* @return the head of this queue, or {@code null} if the 
*   specified waiting time elapses before an element is available 
* @throws InterruptedException if interrupted while waiting 
*/ 
E poll(long timeout, TimeUnit unit) 
    throws InterruptedException;