2017-09-15 60 views
0

我有一个应用程序可以抓取大约六千个URL。为了最大限度地减少这项工作,我创建了一个RecursiveTask,它使用所有要爬网的URL的ConcurrentLinkedQueue。它最多可以关闭50个,如果这个Que是空的,它会直接抓取它,但是如果没有,它会首先创建一个新的自己的实例并将其分叉,之后它会抓取50的子集,然后它将加入分叉的任务。ForkJoinFramework只使用两名工作人员

现在出现我的问题,直到每个线程同时工作了他的50个全部四个工作快速anf。但是,在两次停止工作并等待加入之后,只有另外两个人正在工作并创建新的分支和抓取页面。

为了可视化这个数据,我计算了一个线程爬行URL的数量,并让JavaFX GUI显示它。

我错了什么,所以ForkJoinFramewok只使用我的四个允许线程中的两个?我能做些什么来改变它?

这里是任务的我的计算方法:

LOG.debug(
     Thread.currentThread().getId() + " Starting new Task with " 
      + urlsToCrawl.size() + " left." 
    ); 
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>(); 
    for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++) 
    { 
     urlsToCrawlSubset.offer(urlsToCrawl.poll()); 
    } 
    LOG.debug(
     Thread.currentThread().getId() + " Crated a Subset with " 
     + urlsToCrawlSubset.size() + "." 
    ); 
    LOG.debug(
     Thread.currentThread().getId() 
     + " Now the Urls to crawl only left " + urlsToCrawl.size() + "." 
    ); 

    if (urlsToCrawl.isEmpty()) 
    { 
     LOG.debug(Thread.currentThread().getId() + " Crawling the subset."); 
     crawlPage(urlsToCrawlSubset); 
    } 
    else 
    { 
     LOG.debug(
      Thread.currentThread().getId() 
       + " Creating a new Task and crawling the subset." 
     ); 
     final AbstractUrlTask<T, D> otherTask = createNewOwnInstance(); 
     otherTask.fork(); 
     crawlPage(urlsToCrawlSubset); 
     taskResults.addAll(otherTask.join()); 
    } 
    return taskResults; 

这里是我的图的快照: enter image description here

附:如果我允许多达80个线程,我们将使用它们,直到每个网站都有50个网址被抓取,然后只使用两个。

如果你有兴趣,这里是完整的源代码:https://github.com/mediathekview/MServer/tree/feature/cleanup

+1

你确定这无疑是正确的otherTask.join()那里调用? – algrid

+1

我无法通过github中的代码山。如果您需要帮助,请创建一个sscc示例。 http://sscce.org/另外请注意,join()阻塞高达50%的线程,如此处所述:http://coopsoft.com/ar/Calamity2Article.html#join – edharned

+0

您可以显示提交任务的代码去游泳池? –

回答

0

我固定它。我的错误是,我分裂然后工作了一个小protion,而不是等待,而不是分成一半,然后再打电话给我自己与其他一半等

换句话说,之前我分裂和直接工作,但正确是分裂,直到所有分裂,然后开始工作。

这里是我的代码现在的样子:

@Override 
protected Set<T> compute() 
{ 
    if (urlsToCrawl.size() <= config.getMaximumUrlsPerTask()) 
    { 
     crawlPage(urlsToCrawl); 
    } 
    else 
    { 
     final AbstractUrlTask<T, D> rightTask = createNewOwnInstance(createSubSet(urlsToCrawl)); 
     final AbstractUrlTask<T, D> leftTask = createNewOwnInstance(urlsToCrawl); 
     leftTask.fork(); 
     taskResults.addAll(rightTask.compute()); 
     taskResults.addAll(leftTask.join()); 
    } 
    return taskResults; 
} 

private ConcurrentLinkedQueue<D> createSubSet(final ConcurrentLinkedQueue<D> aBaseQueue) 
{ 
    final int halfSize = aBaseQueue.size()/2; 
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>(); 
    for (int i = 0; i < halfSize; i++) 
    { 
     urlsToCrawlSubset.offer(aBaseQueue.poll()); 
    } 
    return urlsToCrawlSubset; 
}