2017-03-01 31 views
0

在Flume代理中,我收集来自Kafka主题的元素,并且需要将它们插入到ES中。不过,我需要在接收器中执行以前的消解过程,所以我需要编写一个定制接收器将代理通道中的数据传递给java消解模块(我已经写过)。如何在Flume 1.7中编写自定义ES接收器

任何人都可以与我分享一个自定义接收器的模板,并可以用作参考? Flumes官方网站并没有多说这个话题: 启动Flume代理时,自定义接收器的类及其依赖关系必须包含在代理的类路径中。自定义接收器的类型是其FQCN。 https://flume.apache.org/FlumeUserGuide.html#custom-sink

而一旦自定义接收已经准备好,我怎么能联系以下三个文件,以使代理工作:

  • 自定义接收
  • 摄入JAR(Java的模块来执行摄取过程)
  • FlumeAgent.properties

感谢您的任何反馈。一旦我完成这项任务,我会继续添加信息。

回答

1

希望您正在尝试使用Flume从Kafka(源代码)接收事件并将其转发到ES(接收器),并且已有一些数据处理逻辑。

有了这个理解,我建议你看看Flume拦截器,它负责在发送到接收器之前动态更改/过滤事件。

因此,所有改变事件的业务逻辑都可以作为自定义拦截器来实现,并且应该配置为Flume通道。

仅供参考,您可以结算已有的native interceptors source code。这应该可以让你对Flume拦截器框架有所了解。

这里是ES Sink source code

样品水槽配置

a1.sources = kafkaSource 
a1.sinks = ES_Sink 
a1.channels = channel1 

a1.sources.kafkaSource.interceptors = i1 
a1.sources.kafkaSource.interceptors.i1.type = org.apache.flume.interceptor.<Custom_Interceptor_name>$Builder 

a1.sinks.ES_Sink.channel = channel1 
a1.sinks.ES_Sink.type = elasticsearch 
a1.sinks.ES_Sink.hostNames = 127.0.0.1:9200 
+0

感谢您的反馈! –

相关问题