我必须创建一个不断监听和数据推到Database
的Kafka topics
的消费者一个周期。如何阅读卡夫卡多个记录中使用Java
这里的要求是: - 如果你碰巧在一个循环中从Kafka读取多个记录,试着把它作为一个单独的调用进入db而不是多个。
public static void kafkaConsumer(String topicName, String groupId, String autoOffsetReset,
String enableAutoCommit, String kafkaServers, String acks, String retries, String lingerMS,
String bufferMemory) throws Exception {
ObjectMapper mapper = new ObjectMapper();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
getKafkaParams(groupId, kafkaServers, autoOffsetReset, enableAutoCommit));
consumer.subscribe(Arrays.asList(topicName));
logger.info("subscibed to the topic {}", topicName);
cluster = Cluster.builder().addContactPoints(CASSANDRA_IPS.split(",")).build();
session = cluster.connect(KEYSPACE);
try {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
Model model= mapper.readValue(record.value(), Model.class);
try {
boolean flag = insertIntoDB(session, model);
if (flag) {
logger.info("************ Data Persisted Successfully ***************");
} else {
logger.info("******* Data Persition Failed *************");
}
} catch (Exception ex) {
logger.error("Exception while persisting data into DB", ex);
}
}
} catch (Exception ex) {
logger.error("Exception while reading data from kafka", ex);
}
}
} finally {
consumer.close();
}
}
所以你真正的问题是如何将多个记录到数据库? – GuangshengZuo
我正在阅读1条记录,并且能够将1条记录插入到数据库中,但是如何从'Kafka'中读取多条记录以及如何在单个调用中将它们插入数据库? – Sat
添加代码,显示你如何从卡夫卡 – Natalia