2016-05-25 34 views
0

我正在尝试创建一个SingleBlockingQueue<T>同步器,它允许一个线程将offer()一个元素给它,而另一个线程将它take()它。一次只有一个T元素被保存在SingleBlockingQueue<T>内部,并且如果前一个元素正在等待其线程为take(),则推送线程在offer()上被阻止。推动线程将继续推送物品,直到它调用setComplete(),并且线程将继续呼叫take()isComplete()为假。如果线程正在等待某个元素,它将被阻塞。创建SingleBlockingQueue同步器

这是我到目前为止的同步器。

import java.util.concurrent.atomic.AtomicBoolean; 

public final class SingleBlockingQueue<T> { 

    private volatile T value; 
    private final AtomicBoolean isComplete = new AtomicBoolean(false); 
    private final AtomicBoolean isPresent = new AtomicBoolean(false); 

    public void offer(T value) throws InterruptedException { 
     while (isPresent.get()) { 
      this.wait(); 
     } 
     this.value = value; 
     synchronized(this) { 
      this.notifyAll(); 
     } 
    } 
    public boolean isComplete() { 
     return !isPresent.get() && isComplete.get(); 
    } 
    public void setComplete() { 
     isComplete.set(true); 
    } 
    public T take() throws InterruptedException { 
     while (!isPresent.get()) { 
      this.wait(); 
     } 
     T returnValue = value; 
     isPresent.set(false); 
     synchronized(this) { 
      this.notifyAll(); 
     } 
     return returnValue; 
    } 
} 

这里是科特林

val queue = SingleBlockingQueue<Int>() 

    thread { 
     for (i in 1..1000) { 
      queue.offer(i) 
     } 
     queue.setComplete() 
    } 

    thread { 
     while (!queue.isComplete) { 
      println(queue.take()) 
     } 
    } 

    Thread.sleep(100000) 

不过,我得到一个错误的使用例子,我在我头上了一下,在这一点上。感谢RxJava,我很久没有做同步器。我究竟做错了什么?

Exception in thread "Thread-1" java.lang.IllegalMonitorStateException 
    at java.lang.Object.wait(Native Method) 
    at java.lang.Object.wait(Object.java:502) 
    at com.swa.rm.common.util.SingleBlockingQueue.take(SingleBlockingQueue.java:29) 
    at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:33) 
    at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:8) 
    at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:18) 
+3

不应该等待()在同步块内调用吗? –

+0

我试过,但有同样的问题。也许我需要创建两个独立的锁?另外,对于我之前写过的同步器没有这样做,也没有任何问题。 https://github.com/thomasnield/tom-sync/blob/master/src/main/java/org/nield/concurrency/BufferedLatch.java – tmn

+0

github代码给出了同样的错误,如果我直接调用await()[Just尝试在我当地的日食]。在调用await()之前,您可能会在与该对象关联的监视器上锁定一个锁。 –

回答

1

正如其他人所指出的,您可以使用SynchronousQueue中的现有实现。

如果您想实现自己的功能,则非常接近,只需确保wait()的调用位于​​区块内。

不幸的是,我相信你的原代码isComplete()/setComplete()机制是受竞争条件,为isComplete()返回之后false之前或者甚至在读线程执行take()setComplete()可以被调用。这可能会挂起阅读线程。

public final class SingleBlockingQueue<T> { 
    private final Object lock = new Object(); 
    private T value; 
    private boolean present = false; 

    public void offer(T value) throws InterruptedException { 
     synchronized (lock) { 
     while (present) 
      lock.wait(); 
     this.value = value; 
     present = true; 
     lock.notifyAll(); 
     } 
    } 

    public T take() throws InterruptedException { 
     synchronized (lock) { 
     while (!present) 
      lock.wait(); 
     T returnValue = value; 
     value = null; // Should release reference 
     present = false; 
     lock.notifyAll(); 
     return returnValue; 
     } 
    } 
    } 

为了进行比较,可以更自然地实现这种队列的基础上SemaphoreCondition对象。这是一个使用一对信号量来表示空/满条件的实现。

public final class SingleBlockingQueue<T> { 
    private volatile T value; 
    private final Semaphore full = new Semaphore(0); 
    private final Semaphore empty = new Semaphore(1); 

    public void offer(T value) throws InterruptedException { 
     empty.acquire(); 
     this.value = value; 
     full.release(); 
    } 

    public T take() throws InterruptedException { 
     full.acquire(); 
     T returnValue = value; 
     value = null; // Should release reference 
     empty.release(); 
     return returnValue; 
    } 
    } 
+0

The Semaphore方法非常简单,谢谢。最后一个问题。我如何安全地沟通没有更多的项目?我在客户端使用'AtomicBoolean'正面测试了这个bejeezus,但我仍然很谨慎。 – tmn

+0

这是我在Kotlin的用法。虽然我无法证明它,但我担心'take()'上的最后一个元素调用可能会被忽略https://gist.github.com/thomasnield/a3f7981ea447e0c049ba5943afa44fb8#file-singleblockingqueue-usage-kt – tmn

+0

哈,证明了我的担心是正确的。我用'睡眠()'和一个评论显示了哪里会出轨。 – tmn

2

你不需要自己实现它,你可以使用SynchronousQueue

参考文献:

SynchronousQueue javadoc

http://tutorials.jenkov.com/java-util-concurrent/synchronousqueue.html

的的SynchronousQueue类实现了BlockingQueue接口。 阅读BlockingQueue文本以获取有关界面的更多信息。

SynchronousQueue是一个内部只能包含单个元素 的队列。将一个元素插入队列的线程被阻塞 ,直到另一个线程从队列中获取该元素。同样,如果 线程尝试获取元素并且当前没有元素存在,则线程将被阻塞,直到线程将一个元素插入 队列。

+1

“SynchronousQueue”的Javadoc描述是误导性的,它并不具备能力可以存储一个元素:容量是_zero_。对行为的描述是关键:生产者线程不能仅仅放置()一个元素并离开。任何'q.put(e)'调用都会被阻塞,直到消费者调用q.take()'。这使得“SynchronousQueue”的行为与OP描述的行为不同。 (当然,它与OP实际上_wants_是不同的是一个完全不同的问题) –

+0

是的,如果它正在等待'take()'去除单个元素,我想'put()'被阻塞并释放插槽。 – tmn

+1

实际上,Java8 JDK中的文档说:“一个同步队列没有任何内部容量,甚至不能有一个容量。”(强调,我的。) –

0

只是说明我有一些问题与ResultSet雀跃着由于在RxJava-JDBC框架next()通话时间。我用这个实现去修改了前面给出的答案。

public final class SingleBlockingQueue<T> { 
    private volatile T value; 
    private final Semaphore nextGate = new Semaphore(0); 
    private final Semaphore waitGate = new Semaphore(0); 

    private volatile boolean hasValue = true; 
    private volatile boolean isFirst = true; 

    public void offer(T value) throws InterruptedException { 
     if (isFirst) { 
      nextGate.acquire(); 
      isFirst = false; 
     } 
     this.value = value; 
     waitGate.release(); 
     nextGate.acquire(); 
    } 

    public T take() throws InterruptedException { 
     T returnValue = value; 
     value = null; // Should release reference 
     return returnValue; 
    } 
    public boolean next() throws InterruptedException { 
     nextGate.release(); 
     waitGate.acquire(); 
     return hasValue; 
    } 
    public void setDone() { 
     hasValue = false; 
     waitGate.release(); 
    } 
} 

这是我使用它:翻动RxJava Observable<T>到科特林一个Sequence<T>

import com.github.davidmoten.rx.jdbc.QuerySelect 
import rx.Observable 
import rx.Scheduler 
import rx.lang.kotlin.subscribeWith 
import java.io.Closeable 

class ObservableIterator<T>(
     observable: Observable<T> 
) : Iterator<T>, Closeable { 

    private val queue = SingleBlockingQueue<T>() 

    private val subscription = 
      observable 
        .subscribeWith { 
         onNext { queue.offer(it) } 
         onCompleted { queue.setDone() } 
         onError { queue.setDone() } 
        } 

    override fun hasNext(): Boolean { 
     return queue.next() 
    } 

    override fun next(): T { 
     return queue.take() 
    } 
    override fun close() { 
     subscription.unsubscribe() 
     queue.setDone() 
    } 
} 

fun <T> Observable<T>.asSequence() = ObservableIterator(this).asSequence() 

fun QuerySelect.Builder.asSequence(scheduler: Scheduler) = get { it } 
     .subscribeOn(scheduler) 
     .asSequence() 
+1

请注意,当返回'Sequence'没有被完全消耗时,对observable的订阅会永远在'onNext'中被阻塞。 – Ilya