2017-04-14 76 views
1

我想了解同步块。这里我实现了一个生产者线程和2个消费者线程。Java多线程:缓慢的生产者和快速消费者

由于LinkedList为空,我一直在线程中收到异常。

package com.Main; 

import com.qed.Consumer; 
import com.qed.Producer; 
import com.qed.Store; 

public class Main { 

public static void main(String[] args) throws InterruptedException { 
    Store st = new Store(); 

    Thread populate = new Thread(new Producer(st)); 
    Thread con1 = new Thread(new Consumer(st)); 
    Thread con2 = new Thread(new Consumer(st)); 
    con1.setName("A"); 
    con2.setName("B"); 
    populate.start(); 
    con1.start(); 
    con2.start(); 

    populate.join(); 
    con1.join(); 
    con2.join(); 
    if(populate.isAlive()){ 
     con1.interrupt(); 
     con2.interrupt(); 
    } 
    } 
} 

package com.qed; 

import java.util.LinkedList; 

public class Store { 

private LinkedList<Integer> qu = new LinkedList<Integer>(); 
private final Object lock = new Object(); 

public void add(int data){ 
    try{ 
     while(qu.size() ==10){ 
      Thread.sleep(1); 
      } 
     qu.add(data); 
    }catch(InterruptedException ie){ 
     ie.printStackTrace(); 
    } 
} 

public int remove(){ 
    int data=0; 
    try{ 
     synchronized(lock){ 
      while(qu.size() == 0){ 
       Thread.sleep(1); 
       } 
      data = qu.removeFirst(); 
     } 
    }catch(InterruptedException ie){ 
     ie.printStackTrace(); 
    } 
    return data; 
    } 
} 

package com.qed; 

public class Consumer implements Runnable{ 

private Store st; 
public Consumer(Store st){ 
    this.st=st; 
} 


public void run(){ 
    while(true){ 
     System.out.println(Thread.currentThread().getName() + ". " +st.remove()); 
    } 
    } 
} 

package com.qed; 

public class Producer implements Runnable{ 

private Store st; 
private final int runs = 5000; 
public Producer(Store st){ 
    this.st = st; 
} 

public void run(){ 
    int data = 0; 
    int curRun =0; 
    while(++curRun < runs){ 
     st.add(data+=200); 
     } 
    System.out.println("DONE."); 
    } 
} 

堆栈跟踪:

Exception in thread "B" Exception in thread "A" 
java.util.NoSuchElementException  
    at java.util.LinkedList.removeFirst(Unknown Source)  
    at com.qed.Store.remove(Store.java:46)  
    at com.qed.Consumer.run(Consumer.java:20)  
    at java.lang.Thread.run(Unknown Source)  
java.util.NoSuchElementException  
    at java.util.LinkedList.removeFirst(Unknown Source)  
    at com.qed.Store.remove(Store.java:46)  
    at com.qed.Consumer.run(Consumer.java:20)  
    at java.lang.Thread.run(Unknown Source)  
+2

如果不在'add()'中进行同步,这是未定义的行为。 – 1000ml

回答

1

你必须锁定加为好。您的代码允许生产者更新队列,而消费者可能想要删除条目!

当两个线程并行修改同一个队列时,所有投注都关闭!

单一的锁使用只会阻止多个消费者踩在彼此!

因此:为添加值的部分添加相同类型的锁定。

除此之外,EJP是正确的 - 一个真正的解决方案将使用低级信号方法,如wait()和notify()。但是,当然,使用这些会导致非常不同的行为。请注意,这些是两个不同的事情:A)消费者/生产者互相发送信号B)消费者/生产者以相同的外观进行同步。我知道你不想要“A)” - 但你需要“B)”;否则你的队列被破坏,并且会发生意外。

+0

正确,但他也需要'wait()'而不是睡觉,并且他在添加或删除时还需要'notify()'。 – EJP

+0

是和不是。我认为他在这里试验。使用信号将是自然而然的改进。但我相应地更新了答案。谢谢。 – GhostCat

+0

@GhostCat你是对的。我在这里试验。我试图实现一个队列,其中单个生产者线程可以继续添加,直到队列满,消费者可以一次清空队列。我试图在这里实现一个机制,其中生产者和消费者不相互同步。我不想使用wait和notify,因为read方法必须在使用者使用的同一个锁上使用同步。这违背了我的目的,即当消费者访问删除方法时,制作者将被锁定。 – Manik

0

您应该在这里调用wait()方法。
wait()使您的线程等待,直到其他线程调用通知唤醒他。
sleep()只是不执行指定时间段内的下一条语句。
如果你看到你的程序片段,你正在使用同步块,它使用一个对象来检查监视器的可用性。但是您没有使用任何对象监视器方法wait/notify/notifyAll并且您正试图在不调用这些方法的情况下获取并释放锁。由于消费者和生产者都使用了列表对象,因此您应该使用此列表对象监视器来同步所有线程。如果一个线程获得了监视器,那么其他线程将无法访问它。因为每个对象只有一个监视器。这种方法将解决所有工作线程之间的同步问题。

0

问题是您的Store类实现。在添加和删除元素时,您需要在那里实现wait()notify机制,而不是睡觉。

您在分享所有的消费者和生产者之间的一个Store实例正确的,但你的店需要表现得像一个BlockingQueue

因此,无论您使用现有的实施BlockingQueue从JDK或修改Store类来实现类似机制。

implement-your-own blocking queue in java

希望它可以帮助!