0
广播KafkaProducer引发执行人我创建了一个包装象下面这样:星火:最好的办法广播KafkaProducer星火流
public class KafkaSink implements Serializable {
private static KafkaProducer<String, String> producer = null;
public KafkaProducer<String, String> getInstance(final Properties properties) {
if(producer == null) {
producer = new KafkaProducer<>(properties);
}
return producer;
}
public void close() {
producer.close();
}
}
,并使用它像下面
JavaSparkContext jsc = new JavaSparkContext(sc);
Broadcast<KafkaSink> kafkaSinkBroadcast = jsc.broadcast(new KafkaSink()));
dataset.toJavaRDD().foreach(row -> kafkaSinkBroadcast.getValue().getInstance(kafkaProducerProps()).send(new ProducerRecord<String, String>(topic, row.mkString(", "))))
我只是想知道是否正确的做法,或者做什么是最好的方法
你可以在Java中添加相同的代码示例吗? – eatSleepCode