2017-02-20 119 views
1

我一直在尝试使用APACHE KAFKA和FLUME将数据流到MySQL数据库。 (这是我的水槽的配置文件)流到mysql的流水线

agent.sources=kafkaSrc 
agent.channels=channel1 
agent.sinks=jdbcSink 

agent.channels.channel1.type=org.apache.flume.channel.kafka.KafkaChannel 
agent.channels.channel1.brokerList=localhost:9092 
agent.channels.channel1.topic=kafkachannel 
agent.channels.channel1.zookeeperConnect=localhost:2181 
agent.channels.channel1.capacity=10000 
agent.channels.channel1.transactionCapacity=1000 


agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource 
agent.sources.kafkaSrc.channels = channel1 
agent.sources.kafkaSrc.zookeeperConnect = localhost:2181 
agent.sources.kafkaSrc.topic = kafka-mysql 

***agent.sinks.jdbcSink.type = How to declare this?*** 
agent.sinks.jdbcSink.connectionString = jdbc:mysql://1.1.1.1:3306/test 
agent.sinks.jdbcSink.username=user 
agent.sinks.jdbcSink.password=password 
agent.sinks.jdbcSink.batchSize = 10 
agent.sinks.jdbcSink.channel =channel1 
agent.sinks.jdbcSink.sqlDialect=MYSQL 
agent.sinks.jdbcSink.driver=com.mysql.jdbc.Driver 
agent.sinks.jdbcSink.sql=(${body:varchar}) 

我知道如何将数据传送到Hadoop的HBase的或(记录器型或HDFS型),但是找不到一个类型以流到MySQL数据库。所以我的问题是我如何声明jdbcSink.type?

+0

水槽中没有JDBC接收器。使用Flume无法将数据传输到MySQL。 – franklinsijo

+0

@franklinsijo感谢您的回复。那么有什么方法可以将数据从Kafka提取到RDBMS?我愿意接受任何建议。 – SLIT

+0

[Kafka JDBC Sink](http://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html)是不可能的? – franklinsijo

回答

0

您可以随时为MySQL创建自定义接收器。这就是我们在FIWARE用Cygnus工具所做的。

随意从它那里得到启发:https://github.com/telefonicaid/fiware-cygnus/blob/master/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMySQLSink.java

它扩展了我们所有的汇这个其他自定义的基类:https://github.com/telefonicaid/fiware-cygnus/blob/master/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java

基本上,你必须扩展AbstractSink并实现Configurable接口。这意味着覆盖人至少以下方法:

public Status process() throws EventDeliveryException 

和:分别

public void configure(Context context) 

+0

非常感谢。 – SLIT