虽然其他人建议的阻塞队列可以工作,但我并不是很喜欢它,因为它需要忙于等待(消费者循环无限轮询消息)。另一种选择是在每次Observer
收到通知时提交任务。
public class Main extends Observable implements Observer {
private final int numCores = Runtime.getRuntime().availableProcessors();
private final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numCores);
public Main() {
addObserver(this);
}
public static void main(String[] args) throws InterruptedException {
new Main().execute();
}
private void execute() {
for (int i = 0; i < 5; ++i) {
this.setChanged();
this.notifyObservers(i);
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
executor.shutdown();
}
@Override
public void update(Observable o, Object arg) {
System.out.printf("Received notification on thread: %s.\n", Thread.currentThread().getName());
executor.submit(() -> System.out.printf("Running in thread: %s, result: %s.\n",
Thread.currentThread().getName(), arg));
}
}
输出:
Received notification on thread: main.
Running in thread: pool-1-thread-1, result: 0.
Received notification on thread: main.
Running in thread: pool-1-thread-2, result: 1.
Received notification on thread: main.
Running in thread: pool-1-thread-3, result: 2.
Received notification on thread: main.
Running in thread: pool-1-thread-4, result: 3.
Received notification on thread: main.
Running in thread: pool-1-thread-5, result: 4.
使用的BlockingQueue。当update()被调用时发送队列中的事件。从run()中的BlockingQueue中读取事件。 –