2017-03-14 98 views
1

我想通过其掩码IP更改数据中的IP地址。这是在我的Flume代理的“备份”部分完成的(见下文)。Flume自定义拦截器不工作

在此配置有2个通道:所述第一信道的数据转储到HBase的,而第二个用于备份:

a1.sources = r1 r2 
a1.channels = channel1 Backup_channel 
a1.sinks = FSink 

a1.sources.r1.handler = com.flume.handler.JSONHandler 
a1.sources.r1.type = avro 
a1.sources.r1.bind = x.x.x.x 
a1.sources.r1.port = 10008 

a1.sources.r2.handler = com.flume.handler.JSONHandler 
a1.sources.r2.type = avro 
a1.sources.r2.bind = x.x.x.x 
a1.sources.r2.port = 10009 
a1.sources.r2.interceptors = i1 
a1.sources.r2.interceptors.i1.type = com.flume.interceptor.DcInterceptor 

a1.channels.channel1.type = file 
a1.channels.channel1.checkpointDir = /root/flume/channels/Livechannel/checkpoint 
a1.channels.channel1.dataDirs = /root/flume/channels/Livechannel/data 

a1.sinks.FSink.type = hbase 
a1.sinks.FSink.table = Temp_Test 
a1.sinks.FSink.batchSize = 300 
a1.sinks.FSink.columnFamily = T 
a1.sinks.FSink.serializer = com.flume.sink.TestTP 

a1.sources.r1.channels = channel1 
a1.sources.r2.channels = Backup_channel 

a1.channels.Backup_channel.type = file 
a1.channels.Backup_channel.checkpointDir = /data/disk/flume/backup/checkpoint 
a1.channels.Backup_channel.dataDirs = /data/disk/flume/backup/data 

a1.sinks.FSink.channel = channel1 

以下是我定制的Java代码拦截。它实现了拦截方法,从身体获取IP地址,计算其IP掩码,然后将其添加到主体。但不知何故,它不工作:

public class DcInterceptor implements Interceptor { 
    private byte[] jsonTestBeans; 

    private final Type listType = new TypeToken < List <TestBeans>>() {}.getType(); 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void initialize() { 
     // TODO Auto-generated method stub 
     new Logger(); 
    } 

    @Override 
    public Event intercept(Event event) { 
     // TODO Auto-generated method stub 
     List <Row> actions = new ArrayList <Row>(); 
     this.jsonTestBeans = event.getBody(); 
     Logger.logger.debug("In Interceptor"); 
     System.out.println("In Interceptor"); 
     Gson _Gson = new Gson(); 
     String jsonstr = ""; 
     try { 
      jsonstr = new String(jsonTestBeans, "UTF-8"); 
     } catch (Exception e) { 
      // TODO: handle exception 
      Logger.logger.error(e.getMessage() + "In Interceptor"); 
      jsonstr = new String(jsonTestBeans); 
     } 
     List <TestBeans> TestBeanss = _Gson.fromJson(jsonstr, listType); 
     System.out.println("Json String :" + jsonstr); 
     List <String> gTouch = new ArrayList <String>(); 
     for (TestBeans TestBeans: TestBeanss) { 
      String str = TestBeans.getIp(); 
      Logger.logger.debug("IP : " + str); 
      String st = (str.substring(0, str.lastIndexOf(".") + 1) + "x"); 
      Logger.logger.debug("Mask IP : " + st); 
      TestBeans.setRemoteIp(st); 
     } 
     event.setBody(_Gson.toJson(TestBeanss).getBytes()); 
     Logger.logger.debug("Interceptor Ends"); 
     return event; 
    } 

    @Override 
    public List <Event> intercept(List <Event> events) { 
     // TODO Auto-generated method stub 
     System.out.println("In List Interceptor"); 
     Logger.logger.debug("In List Interceptor"); 
     for (Event event: events) { 
      intercept(event); 
     } 
     return events; 
    } 

    public static class CounterInterceptorBuilder implements Interceptor.Builder { 

     private Context ctx; 

     @Override 
     public Interceptor build() { 
      Logger.logger.debug("In Interceptor Build"); 
      System.out.println("In Build Interceptor"); 
      return new DcInterceptor(); 
     } 

     @Override 
     public void configure(Context context) { 
      this.ctx = context; 
     } 

    } 

回答

0

至少,我可以看到:

  • 关于你的拦截器的配置线指的经纪人打电话ECircleTp_Test,而其余配置是指a1
  • 您已配置com.flume.interceptor.DcInterceptor2,但您开发的拦截器类名为DcInterceptor(没有最终的2)。
  • 您已将com.flume.interceptor.DcInterceptor2配置为自定义拦截器的完全限定类名称。尽管如此,拦截器的代码并没有为DcInterceptor(2)类声明任何包。
+0

这是一个打字错误,现在它是完美的。可以üplz再次检查#frb谢谢 –

+0

很好。所以,尝试添加到拦截器的'package com.flume.interceptor;'到文件的开头。 – frb

+0

如果仍然失败,请将相关日志添加到您的问题。 – frb