2015-05-04 39 views
1

我想处理一些在Spark独立模式下运行的代码以及在群集上运行的Spark。基本上,对于RDD中的每个项目,我试图将其添加到列表中,一旦完成,我想将此列表发送到Solr。如何处理在Apache Spark中的foreach块之前运行的代码?

当我在Spark的独立模式下运行以下代码时,此功能完全正常,但在集群上运行相同的代码时无效。当我在集群上运行相同的代码时,就像“发送到Solr”一样,代码的一部分在要发送到Solr的列表中填充项目之前执行。我尝试在foreach之后强制执行solrInputDocumentJavaRDD.collect();,但它似乎没有任何效果。

// For each RDD 
solrInputDocumentJavaDStream.foreachRDD(
     new Function<JavaRDD<SolrInputDocument>, Void>() { 
      @Override 
      public Void call(JavaRDD<SolrInputDocument> solrInputDocumentJavaRDD) throws Exception { 

      // For each item in a single RDD 
      solrInputDocumentJavaRDD.foreach(
        new VoidFunction<SolrInputDocument>() { 
         @Override 
         public void call(SolrInputDocument solrInputDocument) { 

         // Add the solrInputDocument to the list of SolrInputDocuments 
         SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument); 
         } 
        }); 

      // Try to force execution 
      solrInputDocumentJavaRDD.collect(); 


      // After having finished adding every SolrInputDocument to the list 
      // add it to the solrServer, and commit, waiting for the commit to be flushed 
      try { 
       if (SolrIndexerDriver.solrInputDocumentList != null 
         && SolrIndexerDriver.solrInputDocumentList.size() > 0) { 
       SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList); 
       SolrIndexerDriver.solrServer.commit(true, true); 
       SolrIndexerDriver.solrInputDocumentList.clear(); 
       } 
      } catch (SolrServerException | IOException e) { 
       e.printStackTrace(); 
      } 


      return null; 
      } 
     } 
); 

我应该怎么做,使发送到Solr的部分SolrDocuments名单后执行添加到solrInputDocumentList(和作品也是在集群模式)?

回答

0

正如我在Spark邮件列表中提到的那样: 我对Solr API并不熟悉,但是假设'SolrIndexerDriver'是单身人士,我猜在群集上运行时发生的情况是:

SolrIndexerDriver.solrInputDocumentList.add(elem) 

是在SolrIndexerDriver不同的单实例发生在不同的JVM而

SolrIndexerDriver.solrServer.commit 

是发生在驱动程序。

实际上,执行者列表正在填写,但他们从未提交过,而在驱动程序上则相反。

推荐的方式来处理,这是使用foreachPartition这样的:

rdd.foreachPartition{iter => 
    // prepare connection 
    Stuff.connect(...) 
    // add elements 
    iter.foreach(elem => Stuff.add(elem)) 
    // submit 
    Stuff.commit() 
} 

这种方式,你可以在每个执行人的当地情况添加的每个分区的数据并提交结果。请注意,此添加/提交必须是线程安全的,以避免数据丢失或损坏。

0

您是否在spark UI中检查过该行为以查看此作业的执行计划。 检查它是如何分裂成阶段和它们的依赖关系。这应该会给你一个想法,希望。

相关问题