2014-01-16 39 views
5

如果我遇到类似以下情况:并发观察模式

ObserverA,ObserverB,ObserverC都从AbstractObserver继承。

创建观察员名单:

List<AbstractObserver> list = new ArrayList<AbstractObserver>(); 
list.add(new ObserverA()); 
list.add(new ObserverB()); 
list.add(new ObserverC()); 

而且某种以下方法处理程序运行在“主”线程:

public void eat(Food item) { 
    for(AbstractObserver o : list) { 
      o.eatFood(item); 
    } 
} 

public void drink(Coffee cup) { 
    for(AbstractObserver o : list) { 
      o.drinkCoffee(cup); 
    } 
} 

我将如何设计一个系统,我可以运行每一种食物和喝咖啡的方法在不同线程中的观察者?具体来说,当“MAIN”线程收到一个事件(喝或吃的方法被调用)时,我将如何在ObserverA,ObserverB和ObserverC中的自己的线程中运行eatFood或drinkCoffee方法?

我想为每个AbstractObserver子类实例设置不同的线程,因为目前我正在按顺序通知每个观察者,这可能会导致延迟。

+0

我不明白这里的观察对象是如何起作用的。他们观察的主题是什么?它们与食品饮料和饮用咖啡方法有何关系? – Philipp

+0

不,不适合我。从我的角度来看,我不知道什么时候吃喝(主线程方法)会被调用。他们可以被称为五分钟或更多。我只是希望每个AbstractObserver子类实例都能够在主线程的吃喝方法被调用时同时运行drinkCoffee或eatFood。 – sneeze1

+0

您对并发有任何限制吗?例如,在之前的o.eatFood()结束之前,是否可以调用同一对象上的后续o.eatFood()? –

回答

2

使一些简化的假设在这里,你不关心收到通知时吃/喝完成后,你还可以使用执行框架抛出的工作到一个队列:

// declare the work queue 
    private final Executor workQueue = Executors.newCachedThreadPool(); 




    // when you want to eat, schedule a bunch of 'eating' jobs 
     public void eat(final Food item){ 
      for (final AbstractObserver o: list) { 
      workQueue.execute(new Runnable() { 

       @Override 
       public void run() { 
        o.eatFood(item); // runs in background thread 
       } 
      }); 
      } 
     } 

在出口你的程序,你必须关闭执行程序:

workQueue.shutdown(); 
+0

此方法仍然要求您修改每种方法(吃,喝等),但至少您只有一个workQueue(不是20),并且执行程序框架为您管理线程。如果您需要限制并发性,就像使用FixedThreadPool交换执行程序一样简单。 – JVMATL

+0

我认为assylias的答案也是正确的,但这似乎是最简单的解决方案。 – sneeze1

2

我不是专业人士,但也许你可以使用生产者 - 消费者设置。在这里,作为观察实体的生产者可以在其自己的线程的队列中添加一个通知,这里的观察者将从同一个队列中获取消息,但是会在其自己的线程中获得通知。

2

为了详细说明气垫船的回答,基本实现您的观察者可能看起来像这样:

class ObserverA implements Runnable { 
    private final BlockingQueue<Food> queue = new ArrayBlockingQueue<>(); 

    public void eatFood(Food f) { 
     queue.add(f); 
    } 

    public void run() { 
     try { 
      while (true) { 
       Food f = queue.take(); //blocks until some food is on the queue 
       //do what you have to do with that food 
      } 
     } catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
      //and exit 
     } 
    } 
} 

所以,你的代码,它eatFood将从该方法立即返回并不会阻止你的主线程。

您显然需要为观察者直接分配一个线程:new Thread(observerA).start();或通过ExecutorService,这可能更容易,更可取。


或者,您可以在“观察”对象级别创建线程:

private static final ExecutorService fireObservers = Executors.newFixedThreadPool(10); 

public void eat(Food food) { 
    for (AbstractObserver o : observers) { 
     //(i) if an observer gets stuck, the others can still make progress 
     //(ii) if an observer throws an exception, a new thread will be created 
     Future<?> f = fireObservers.submit(() -> o.dataChanged(food)); 
     fireObservers.submit(new Callable<Void>() { 
      @Override public Void call() throws Exception { 
       try { 
        f.get(1, TimeUnit.SECONDS); 
       } catch (TimeoutException e) { 
        logger.warn("Slow observer {} has not processed food {} in one second", o, food); 
       } catch (ExecutionException e) { 
        logger.error("Observer " + o + " has thrown exception on food " + food, e.getCause()); 
       } 
       return null; 
      } 
     }); 
    } 
} 

(我主要是复制了here粘贴 - 你可能需要以使其适应您的需求)。

+0

谢谢,我想我明白这是如何工作的。我的问题是,如果我走这条路线,我将不得不为主线程中的每个方法自己创建队列。在你的例子中,我相信我必须为drinkFood创建一个方法。在我的项目中,我有大约20种需要这种方法的方法,所以如果我尝试自己处理20个队列,然后处理run方法中的queue.take()方法,恐怕会变得很难处理。 – sneeze1

+0

@ sneeze1看我的编辑。 – assylias