2017-08-30 35 views
1

我必须创建一个不断监听和数据推到DatabaseKafka topics的消费者一个周期。如何阅读卡夫卡多个记录中使用Java

这里的要求是: - 如果你碰巧在一个循环中从Kafka读取多个记录,试着把它作为一个单独的调用进入db而不是多个。

public static void kafkaConsumer(String topicName, String groupId, String autoOffsetReset, 
     String enableAutoCommit, String kafkaServers, String acks, String retries, String lingerMS, 
     String bufferMemory) throws Exception { 

    ObjectMapper mapper = new ObjectMapper(); 

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
      getKafkaParams(groupId, kafkaServers, autoOffsetReset, enableAutoCommit)); 

    consumer.subscribe(Arrays.asList(topicName)); 
    logger.info("subscibed to the topic {}", topicName); 
    cluster = Cluster.builder().addContactPoints(CASSANDRA_IPS.split(",")).build(); 
    session = cluster.connect(KEYSPACE); 

    try { 

     while (true) { 
      try { 
       ConsumerRecords<String, String> records = consumer.poll(1000); 
       for (ConsumerRecord<String, String> record : records) { 

        Model model= mapper.readValue(record.value(), Model.class); 

       try { 
         boolean flag = insertIntoDB(session, model); 
         if (flag) { 
          logger.info("************ Data Persisted Successfully ***************"); 
         } else { 
          logger.info("******* Data Persition Failed *************"); 
         } 
        } catch (Exception ex) { 
         logger.error("Exception while persisting data into DB", ex); 
        } 
       } 
      } catch (Exception ex) { 
       logger.error("Exception while reading data from kafka", ex); 
      } 
     } 
    } finally { 
     consumer.close(); 
    } 
} 
+1

所以你真正的问题是如何将多个记录到数据库? – GuangshengZuo

+0

我正在阅读1条记录,并且能够将1条记录插入到数据库中,但是如何从'Kafka'中读取多条记录以及如何在单个调用中将它们插入数据库? – Sat

+0

添加代码,显示你如何从卡夫卡 – Natalia

回答

1

Mysql INSERT支持插入多行一次。像这样:

INSERT INTO tbl_name (a,b,c) VALUES(1,2,3),(4,5,6),(7,8,9); 

所以,你可以先保存记录到一个数组,而当数组大小等于BATCH_SIZE,你可以把它作为你的insertIntoDb方法。然后清除数组,然后继续循环。

您还可以将一次轮询的所有消息放入数组,并将其传递给insertIntoDb。

但如果消息计数过大,则MySQL会抱怨封包太大,所以在这种情况下,使用指定的BATCH_SIZE更好。

此外,您可以指定消费者的“max.poll.records”配置,以限制一次轮询中的消息数量。

像这样在卡桑德拉:

PreparedStatement ps = session.prepare("INSERT INTO messages (user_id,msg_id, title, body) VALUES (?, ?, ?, ?)"); 
BatchStatement batch = new BatchStatement(); 
batch.add(ps.bind(uid, mid1, title1, body1)); 
batch.add(ps.bind(uid, mid2, title2, body2)); 
batch.add(ps.bind(uid, mid3, title3, body3)); 
session.execute(batch); 
+0

我的DB是卡桑德拉阅读,我们如何能够插入多条记录到卡珊德拉用java – Sat

+0

https://www.datastax.com/dev/blog/client-side -improvements功能于卡桑德拉-2-0 – GuangshengZuo