2016-09-22 145 views
0

广播KafkaProducer引发执行人我创建了一个包装象下面这样:星火:最好的办法广播KafkaProducer星火流

public class KafkaSink implements Serializable { 
    private static KafkaProducer<String, String> producer = null; 

    public KafkaProducer<String, String> getInstance(final Properties properties) { 
     if(producer == null) { 
      producer = new KafkaProducer<>(properties); 
     } 
     return producer; 
    } 

    public void close() { 
     producer.close(); 
    } 
} 

,并使用它像下面

JavaSparkContext jsc = new JavaSparkContext(sc); 
Broadcast<KafkaSink> kafkaSinkBroadcast = jsc.broadcast(new KafkaSink())); 
dataset.toJavaRDD().foreach(row -> kafkaSinkBroadcast.getValue().getInstance(kafkaProducerProps()).send(new ProducerRecord<String, String>(topic, row.mkString(", ")))) 

我只是想知道是否正确的做法,或者做什么是最好的方法

回答

0

我真的可以推荐这blog post。简而言之,您应该通过传递'recipe'来创建Kafka制作人,为每个分区创建一个可序列化的接收器。

+0

你可以在Java中添加相同的代码示例吗? – eatSleepCode