2011-09-07 30 views
2

我使用观察者模式和BlockingQueue来添加一些实例。现在,在另一种方法我用了队列,但它似乎采取()永远等待,即使我做这件事是这样的:Observer - BlockingQueue

/** {@inheritDoc} */ 
@Override 
public void diffListener(final EDiff paramDiff, final IStructuralItem paramNewNode, 
    final IStructuralItem paramOldNode, final DiffDepth paramDepth) { 
    final Diff diff = 
     new Diff(paramDiff, paramNewNode.getNodeKey(), paramOldNode.getNodeKey(), paramDepth); 
    mDiffs.add(diff); 
    try { 
     mDiffQueue.put(diff); 
    } catch (final InterruptedException e) { 
     LOGWRAPPER.error(e.getMessage(), e); 
    } 
    mEntries++; 

    if (mEntries == AFTER_COUNT_DIFFS) { 
     try { 
      mRunner.run(new PopulateDatabase(mDiffDatabase, mDiffs)); 
     } catch (final Exception e) { 
      LOGWRAPPER.error(e.getMessage(), e); 
     } 
     mEntries = 0; 
     mDiffs = new LinkedList<>(); 
    } 
} 

/** {@inheritDoc} */ 
@Override 
public void diffDone() { 
    try { 
     mRunner.run(new PopulateDatabase(mDiffDatabase, mDiffs)); 
    } catch (final Exception e) { 
     LOGWRAPPER.error(e.getMessage(), e); 
    } 
    mDone = true; 
} 

而mDiffQueue是的LinkedBlockingQueue,我使用它像这样的:

while (!(mDiffQueue.isEmpty() && mDone) || mDiffQueue.take().getDiff() == EDiff.INSERTED) {} 

但我认为第一个表达式进行检查,而mDone是不正确的,那么也许mDone设置为true(观察者始终是多线程的?),但它已经调用mDiffQueue.take()? : -/

编辑:我真的不明白现在。我最近把它改成:

synchronized (mDiffQueue) { 
    while (!(mDiffQueue.isEmpty() && mDone)) { 
     if (mDiffQueue.take().getDiff() != EDiff.INSERTED) { 
      break; 
     } 
    } 
} 

如果我在调试器等待它的工作原理有点时间,但也应该在“实时”的工作,因为mDone被初始化为假,并因此而条件应是真的,身体应该被执行。

如果mDiffQueue为空且mDone为true,它应该跳过while循环的主体(这意味着队列不再被填充)。

编辑:看来,这是:

synchronized (mDiffQueue) { 
    while (!(mDiffQueue.isEmpty() && mDone)) { 
     if (mDiffQueue.peek() != null) { 
      if (mDiffQueue.take().getDiff() != EDiff.INSERTED) { 
       break; 
      } 
     } 
    } 
} 

即使我不明白为什么偷看()是强制性的。

编辑:

我想跳过所有插入的节点是什么,我做的是遍历树和:

for (final AbsAxis axis = new DescendantAxis(paramRtx, true); axis.hasNext(); axis.next()) { 
    skipInserts(); 
    final IStructuralItem node = paramRtx.getStructuralNode(); 
    if (node.hasFirstChild()) { 
     depth++; 
     skipInserts(); 
     ... 

基本上计算树的最大深度或水平,而不考虑其中的节点已经在树的另一个版本中被删除(用于比较Sunburst可视化),但是好的,这可能超出范围。只是为了说明我正在做一些尚未插入的节点,即使它只是调整最大深度。

问候,

约翰内斯

+0

mDone设置为true或false? – Scorpion

+0

在diffDone()中,但我认为你忘了向下滚动? ;-) – Johannes

回答

1

首先建议:不要synchronized (mDiffQueue)。如果LinkedBlockingQueue有一些​​方法,则会发生死锁;这里并非如此,但这是一种你应该避免的做法。无论如何,我不明白为什么你在这个时候同步。

你要“唤醒”定期在等待检查mDone已设置:

while (!(mDiffQueue.isEmpty() && mDone)) { 
    // poll returns null if nothing is added in the queue for 0.1 second. 
    Diff diff = mDiffQueue.poll(0.1, TimeUnit.SECONDS); 
    if (diff != null) 
     process(diff); 
} 

这是关于与使用peek,但peek基本上等待纳秒代替。使用peek被称为“忙等待”(您的线程不停地运行while循环),并使用pool被称为“半忙等待”(让线程休眠)。

我想你的情况下process(diff)将脱离循环,如果diff不是类型EDiff.INSERTED。我不确定这是否是你想要完成的。这看起来很奇怪,因为你基本上只是拖延消费者线程,直到你得到一个正确类型的单个元素,然后你什么都不做。由于您不在while循环中,因此无法接收未来的传入元素。

+0

我的diffListener方法是从另一个线程调用的,所以我不知道它是否是线程安全的,只是想同步调用。所以我可以安全地删除它?但好的,我认为take()和put()是线程安全的。我需要一些东西来检查mDiffQueue没有被填充(对于最后一个节点)。否则采取()会永远阻止。 – Johannes

+0

是的,你可以删除它。 LinkedBlockingQueue自身很好地同步。 – toto2

+0

我不明白你的段落的最后部分。我想我实际上不明白你在做什么。从原来的文章中不清楚。谁把Diff放在队列中,谁在接收它们?和他们一起做什么? – toto2

2

take()是一个 “堵呼叫”。这意味着它会阻止(等到永远),直到队列中有东西,然后它会返回添加的内容。当然,如果队列中有东西,它会立即返回。

您可以使用peek()返回什么take()返回 - 也就是说,peek()返回的下一个项目从队列中删除,或返回null如果没有什么队列。尝试使用peek()代替您的测试(但也检查null)。

+0

是的,这就是为什么我没有遇到更多差异时引入了设置为true的布尔成员变量,因此BlockingQueue不再被填充,那么它应该跳过while-body。查看我对原始帖子的编辑: - /。我看不到peek()会如何帮助,因为有时队列可能目前是空的,但可能会在之后填充。 – Johannes