2017-06-13 60 views
1

我玩卡夫卡流试图做基本聚合(为这个问题的目的,每个消息只增加1)。在接收完成的KTable变化的输出话题,我变得很怪异输出:卡夫卡流给怪异输出

@B� 
@C 
@C� 
@D 
@D� 
@E 
@E� 
@F 
@F� 

我认识到,“”表示,它的打印出某种性格并不在字符存在设置,但我不知道为什么。这里是我的代码,以供参考:

public class KafkaMetricsAggregator { 

    public static void main(final String[] args) throws Exception { 
     final String bootstrapServers = args.length > 0 ? args[0] : "my-kafka-ip:9092"; 

     final Properties streamsConfig = new Properties(); 
     streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator"); 
     // Where to find Kafka broker(s). 
     streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
     // Specify default (de)serializers for record keys and for record values. 
     streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
     streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
     // Records should be flushed every 10 seconds. This is less than the default 
     // in order to keep this example interactive. 
     streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); 
     // For illustrative purposes we disable record caches 
     streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); 
     // Class to extract the timestamp from the event object 
     streamsConfig.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "my.package.EventTimestampExtractor"); 

     // Set up serializers and deserializers, which we will use for overriding the default serdes 
     // specified above. 
     final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer()); 
     final Serde<String> stringSerde = Serdes.String(); 
     final Serde<Double> doubleSerde = Serdes.Double(); 

     final KStreamBuilder builder = new KStreamBuilder(); 

     final KTable<String, Double> aggregatedMetrics = builder.stream(jsonSerde, jsonSerde, "test2") 
      .groupBy(KafkaMetricsAggregator::generateKey, stringSerde, jsonSerde) 
      .aggregate(
       () -> 0d, 
       (key, value, agg) -> agg + 1, 
       doubleSerde, 
       "metrics-table2"); 

     aggregatedMetrics.to(stringSerde, doubleSerde, "metrics"); 

     final KafkaStreams streams = new KafkaStreams(builder, streamsConfig); 
     // Only clean up in development 
     streams.cleanUp(); 
     streams.start(); 

     // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams 
     Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 
     } 

} 

编辑:使用aggregatedMetrics.print();并打印出正确的输出到控制台:

[KSTREAM-AGGREGATE-0000000002]: my-generated-key , (43.0<-null) 

这是怎么回事任何想法?

回答