2017-03-08 47 views
1

我使用星火1.6.1遍历在Java中星火数据帧不收集

我有一个数据帧,我需要遍历每行写卡夫卡。截至目前,我做这样的事情:

Producer<String><String> message; 
for(Row x: my_df.collect()){ 
    kafka_message = new Producer<String><String>(topic, String.valueOf(x)) 
    my_kafka_producer.send(kafka_message); 
} 

这里的问题是,收集的数据发送到驱动程序然后推到卡夫卡。考虑到我大约有250个执行者,我的驱动程序无法高效地处理工作负载。所以,我想知道如何通过执行器上的数据框进行迭代。这将需要避免执行collect()。我找到了一篇粗略解释如何做的文章,但不幸的是,他们与GitHub的链接实际上已过期,所以我找不到如何实现它。

文章供参考: https://pythagoreanscript.wordpress.com/2015/05/28/iterate-through-a-spark-dataframe-using-its-partitions-in-java/comment-page-1/

回答

2

在Java中,你可以尝试像下面的东西。扩展AbstractFunction1

import scala.runtime.AbstractFunction1; 

abstract class MyFunction1<T,R> extends AbstractFunction1<T, R> implements Serializable { 
} 

现在叫foreachPartition您的数据帧像下面。

import scala.collection.Iterator; 
import scala.runtime.BoxedUnit; 

df.foreachPartition(new MyFunction1<Iterator<Row>,BoxedUnit>(){ 
     @Override 
     public BoxedUnit apply(Iterator<Row> rows) { 
      while(rows.hasNext()){ 
       //get the Row 
       Row row = rows.next(); 
      } 
      return BoxedUnit.UNIT; 
     } 
    });