2015-10-17 35 views
7

我正在尝试使用来自Kafka队列的序列化avro事件。 kafka队列使用一个简单的java生产者来填充。为了清楚起见,我分享三个组成部分:Logstash with Kafka:无法解码avro

Avro的模式文件

{"namespace": "example.avro", 
"type": "record", 
"name": "User", 
"fields": [ 
    {"name": "name", "type": "string"}, 
    {"name": "favorite_number", "type": ["int", "null"]}, 
    {"name": "favorite_color", "type": ["string", "null"]} 
] 
} 

的Java监制代码片段(User.class生产使用的Avro工具)

User user1 = new User(); 
    user1.setName("Alyssa"); 
    user1.setFavoriteNumber(256); 
    user1.setFavoriteColor("blue"); 
    String topic = "MemoryTest"; 

    // Properties set in 'props' 
    KafkaProducer<Message, byte[]> producer = new KafkaProducer<Message, byte[]>(props); 

    ByteArrayOutputStream out = new ByteArrayOutputStream(); 
    DatumWriter<User> writer = new SpecificDatumWriter<User>(User.class); 
    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
    writer.write(user1, encoder); 
    encoder.flush(); 
    out.close(); 
    byte[] serializedBytes = out.toByteArray(); 
    producer.send(new ProducerRecord<Message, byte[]>(topic, serializedBytes)); 

Logstash配置文件

input { 
     kafka { 
       zk_connect => "localhost:2181" 
       topic_id => "MemoryTest" 
       type => "standard_event" 
       group_id => "butiline_dash_prod" 
     reset_beginning => true 
     auto_offset_reset => smallest 
     codec => { 
       avro => { 
        schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc" 
       } 
      } 
     } 
} 

output { 
    stdout { 
    codec => rubydebug 
    } 
} 

问题

管道未能在logstash水平。当一个新事件被推入Kafka时,我在Logstash控制台上得到以下结果:

Alyssa�blue {:exception=>#<NoMethodError: undefined method `decode' for ["avro", {"schema_uri"=>"/opt/ELK/logstash-1.5.4/bin/user.avsc"}]:Array>, :backtrace=>["/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:in `queue_event'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in `run'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:in `inputworker'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error} 

回答

10

最后计算出错误。取而代之的是(作为建议就Logstash网站 - https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html

codec => { 
    avro => { 
     schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc" 
    } 
} 

正确的语法(如插件的文档https://github.com/logstash-plugins/logstash-codec-avro/blob/master/DEVELOPER.md在建议):

codec => avro { 
     schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc" 
} 

我猜的语法被改变。