2017-06-18 49 views
0

我对卡夫卡非常陌生,并开始致力于生产者 - 消费者模型。
我为Kafka制作人编写了一个程序,它将从MySQL读取表格,并且我正在命令提示符下阅读这些信息。
问题是我只能看到表格最后一列的数据。我的卡夫卡生产者类中的错误

MySQL表结构及数据:

+----------+---------+----------------+ 
| dept  | empName | salary(double) | 
+----------+---------+----------------+ 
| Engg  | Fred | 2000   | 
| Engg  | Bob  | 3000   | 
| Engg  | Joe  | 1000   | 
| Arts  | Jack | 5000   | 
| Commerce | Jill | 2400   | 
| Arts  | James | 3000   | 
| Commerce | Rob  | 8700   | 
+----------+---------+----------------+ 

卡夫卡制作类:

[[email protected] kafka_2.11-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbrec --from-beginning 
[2017-06-18 10:23:22,428] WARN Error while fetching metadata with correlation id 2 : {dbrec=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 
[2017-06-18 10:23:22,628] WARN The following subscribed topics are not assigned to any members in the group console-consumer-33197 : [dbrec] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 

2000.0 
3000.0 
1000.0 
5000.0 
2400.0 
3000.0 
8700.0 

我刚开始从最后一列中的数据:

package com.kafka.producer; 

import java.sql.Connection; 
import java.sql.DriverManager; 
import java.sql.PreparedStatement; 
import java.sql.ResultSet; 
import java.sql.SQLException; 
import java.util.Properties; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

@SuppressWarnings("deprecation") 
public class KafkaMySqlProducer { 
    public static void main(String[] args) { 

     Connection con; 
     PreparedStatement pstmnt; 
     ResultSet rs; 

     Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("serializer.class","kafka.serializer.StringEncoder"); 
     props.put("metadata.broker.list","localhost:9092"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer producer = new Producer(config); 

     try { 
      Class.forName("com.mysql.jdbc.Driver"); 
      con = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb","root","cloudera"); 
      pstmnt = con.prepareStatement("select * from department"); 
      rs = pstmnt.executeQuery(); 
      while(rs.next()) { 
       String name = rs.getString(1); 
       String department = rs.getString(2); 
       String salary = " " + rs.getDouble(3); 
       producer.send(new KeyedMessage("dbrec", name, department, salary)); 
      } 
      con.close(); 
     } catch (ClassNotFoundException e) { 
      e.printStackTrace(); 
     } catch (SQLException e) { 
      e.printStackTrace(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

    } 
} 

从我的消费输出的表格。
任何人都可以告诉我我在这里做什么错误?

+0

消耗message如果你开始与卡夫卡一个新的项目,最好开始使用新的生产国和消费类。我看到你正在使用生产者连接到Zookeeper的旧版本获取关于经纪人的信息,而不是直接向其中的一个。 – ppatierno

回答

2

具有4个参数的KeyedMessage构造函数的最后一个参数是实际的消息。

该类不仅接受所有值的列表。

具体而言,您提供的参数是​​。你只能从topic

+1

好吧..纠正它这样。 (新的KeyedMessage(“dbrec”,name +“”+ department +“”+ salary));现在正在工作。 – Sidhartha

+0

另外我看到这两行不推荐使用的消息。 \t \t ProducerConfig config = new ProducerConfig(props); \t \t Producer producer = new Producer(config); 您能告诉我这些方法有哪些新选项,以便我可以实施它们。 – Sidhartha

+0

不确定,但JavaDoc可能会为你解答 –

相关问题