我想处理一些在Spark独立模式下运行的代码以及在群集上运行的Spark。基本上,对于RDD中的每个项目,我试图将其添加到列表中,一旦完成,我想将此列表发送到Solr。如何处理在Apache Spark中的foreach块之前运行的代码?
当我在Spark的独立模式下运行以下代码时,此功能完全正常,但在集群上运行相同的代码时无效。当我在集群上运行相同的代码时,就像“发送到Solr”一样,代码的一部分在要发送到Solr的列表中填充项目之前执行。我尝试在foreach
之后强制执行solrInputDocumentJavaRDD.collect();
,但它似乎没有任何效果。
// For each RDD
solrInputDocumentJavaDStream.foreachRDD(
new Function<JavaRDD<SolrInputDocument>, Void>() {
@Override
public Void call(JavaRDD<SolrInputDocument> solrInputDocumentJavaRDD) throws Exception {
// For each item in a single RDD
solrInputDocumentJavaRDD.foreach(
new VoidFunction<SolrInputDocument>() {
@Override
public void call(SolrInputDocument solrInputDocument) {
// Add the solrInputDocument to the list of SolrInputDocuments
SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
}
});
// Try to force execution
solrInputDocumentJavaRDD.collect();
// After having finished adding every SolrInputDocument to the list
// add it to the solrServer, and commit, waiting for the commit to be flushed
try {
if (SolrIndexerDriver.solrInputDocumentList != null
&& SolrIndexerDriver.solrInputDocumentList.size() > 0) {
SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
SolrIndexerDriver.solrServer.commit(true, true);
SolrIndexerDriver.solrInputDocumentList.clear();
}
} catch (SolrServerException | IOException e) {
e.printStackTrace();
}
return null;
}
}
);
我应该怎么做,使发送到Solr的部分SolrDocuments名单后执行添加到solrInputDocumentList
(和作品也是在集群模式)?