我有一个应用程序可以抓取大约六千个URL。为了最大限度地减少这项工作,我创建了一个RecursiveTask,它使用所有要爬网的URL的ConcurrentLinkedQueue。它最多可以关闭50个,如果这个Que是空的,它会直接抓取它,但是如果没有,它会首先创建一个新的自己的实例并将其分叉,之后它会抓取50的子集,然后它将加入分叉的任务。ForkJoinFramework只使用两名工作人员
现在出现我的问题,直到每个线程同时工作了他的50个全部四个工作快速anf。但是,在两次停止工作并等待加入之后,只有另外两个人正在工作并创建新的分支和抓取页面。
为了可视化这个数据,我计算了一个线程爬行URL的数量,并让JavaFX GUI显示它。
我错了什么,所以ForkJoinFramewok只使用我的四个允许线程中的两个?我能做些什么来改变它?
这里是任务的我的计算方法:
LOG.debug(
Thread.currentThread().getId() + " Starting new Task with "
+ urlsToCrawl.size() + " left."
);
final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>();
for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++)
{
urlsToCrawlSubset.offer(urlsToCrawl.poll());
}
LOG.debug(
Thread.currentThread().getId() + " Crated a Subset with "
+ urlsToCrawlSubset.size() + "."
);
LOG.debug(
Thread.currentThread().getId()
+ " Now the Urls to crawl only left " + urlsToCrawl.size() + "."
);
if (urlsToCrawl.isEmpty())
{
LOG.debug(Thread.currentThread().getId() + " Crawling the subset.");
crawlPage(urlsToCrawlSubset);
}
else
{
LOG.debug(
Thread.currentThread().getId()
+ " Creating a new Task and crawling the subset."
);
final AbstractUrlTask<T, D> otherTask = createNewOwnInstance();
otherTask.fork();
crawlPage(urlsToCrawlSubset);
taskResults.addAll(otherTask.join());
}
return taskResults;
附:如果我允许多达80个线程,我们将使用它们,直到每个网站都有50个网址被抓取,然后只使用两个。
如果你有兴趣,这里是完整的源代码:https://github.com/mediathekview/MServer/tree/feature/cleanup
你确定这无疑是正确的otherTask.join()那里调用? – algrid
我无法通过github中的代码山。如果您需要帮助,请创建一个sscc示例。 http://sscce.org/另外请注意,join()阻塞高达50%的线程,如此处所述:http://coopsoft.com/ar/Calamity2Article.html#join – edharned
您可以显示提交任务的代码去游泳池? –