2
我正在测试flume将数据加载到hHase中,并考虑使用flume的选择器和inteceptor进行并行数据加载,因为源和汇之间的速度差距。如何在槽中一起使用regex_extractor选择器和多路复用拦截器?
所以,我想与水槽做的
- 创建活动的头与拦截器的regex_extractor型
与头多于两个通道多路复用器的事件
在一个选择的复用型源沟道水槽。
并尝试配置如下。
agent.sources = tailsrc agent.channels = mem1 mem2 agent.sinks = std1 std2 agent.sources.tailsrc.type = exec agent.sources.tailsrc.command = tail -F /home/flumeuser/test/in.txt agent.sources.tailsrc.batchSize = 1 agent.sources.tailsrc.interceptors = i1 agent.sources.tailsrc.interceptors.i1.type = regex_extractor agent.sources.tailsrc.interceptors.i1.regex = ^(\\d) agent.sources.tailsrc.interceptors.i1.serializers = t1 agent.sources.tailsrc.interceptors.i1.serializers.t1.name = type agent.sources.tailsrc.selector.type = multiplexing agent.sources.tailsrc.selector.header = type agent.sources.tailsrc.selector.mapping.1 = mem1 agent.sources.tailsrc.selector.mapping.2 = mem2 agent.sinks.std1.type = file_roll agent.sinks.std1.channel = mem1 agent.sinks.std1.batchSize = 1 agent.sinks.std1.sink.directory = /var/log/flumeout/1 agent.sinks.std1.rollInterval = 0 agent.sinks.std2.type = file_roll agent.sinks.std2.channel = mem2 agent.sinks.std2.batchSize = 1 agent.sinks.std2.sink.directory = /var/log/flumeout/2 agent.sinks.std2.rollInterval = 0 agent.channels.mem1.type = memory agent.channels.mem1.capacity = 100 agent.channels.mem2.type = memory agent.channels.mem2.capacity = 100
但是,它不起作用!
当选择器部件被移除时,flume的日志中会有一些拦截器调试消息。 但是当选择器和拦截器在一起时,没有任何东西。
有没有错误的表达或我错过了什么?
感谢您的阅读。 :)
这是绑定源和chanel。还需要通过'agent.sinks.std1 = mem1'和'agent.sinks.std2 = mem2'来绑定sink和chanel –