2014-02-12 160 views
15

我正在使用Kafka 0.8.0并尝试实现下面提到的场景。消费者在Apache Kafka中使用消息的延迟

JCA API(用作生产商和将数据发送到)----->消费------> HBase的

我尽快发送的每个消息,以消费者为我取使用JCA客户端的数据。例如,只要生产者发送消息1,我想从消费者那里获取相同的信息,并将其放入HBase中。但是我的消费者在一些随机的n消息之后开始提取消息。我想让生产者和消费者保持同步,以便他们两人开始一起工作。

我用:

1经纪人

1单一主题

1单曲制作和高层次的消费

任何人都可以建议我需要什么做到这一点吗?

编辑:

添加一些相关的代码片段。

Consumer.java

public class Consumer extends Thread { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    PrintWriter pw = null; 
    int t = 0; 
    StringDecoder kd = new StringDecoder(null); 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    Map<String, List<KafkaStream<String, Signal>>> consumerMap; 
    KafkaStream<String, Signal> stream; 
    ConsumerIterator<String, Signal> it; 

    public Consumer(String topic) { 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 

     this.topic = topic; 
     topicCountMap.put(topic, new Integer(1)); 
     consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
       new VerifiableProperties())); 
     stream = consumerMap.get(topic).get(0); 
     it = stream.iterator(); 

    } 

    private static ConsumerConfig createConsumerConfig() { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", KafkaProperties.zkConnect); 
     props.put("group.id", KafkaProperties.groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 
     props.put("fetch.size", "1024"); 

     return new ConsumerConfig(props); 

    } 

    synchronized public void run() { 

     while (it.hasNext()) { 
      t = (it.next().message()).getChannelid(); 
      System.out.println("In Consumer received msg" + t); 
     } 
    } 
} 

producer.java

public class Producer { 
    public final kafka.javaapi.producer.Producer<String, Signal> producer; 
    private final String topic; 
    private final Properties props = new Properties(); 

    public Producer(String topic) 
    { 
     props.put("serializer.class", "org.bigdata.kafka.Serializer"); 
     props.put("key.serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("metadata.broker.list", "localhost:9092"); 
     // Use random partitioner. Don't need the key type. Just set it to Integer. 
     // The message is of type userdefined Object . 
     producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props)); 
     this.topic = topic; 
    } 
} 

KafkaProperties.java

public interface KafkaProperties { 
    final static String zkConnect = "127.0.0.1:2181"; 
    final static String groupId = "group1"; 
    final static String topic = "test00"; 
    final static String kafkaServerURL = "localhost"; 
    final static int kafkaServerPort = 9092; 
    final static int kafkaProducerBufferSize = 64 * 1024; 
    final static int connectionTimeOut = 100000; 
    final static int reconnectInterval = 10000; 
    final static String clientId = "SimpleConsumerDemoClient"; 
} 

这是消费者如何处理前10条消息,而不是消费者收到消息的系统消息,但从第11条消息开始,它开始正常工作。

 producer sending msg1 

    producer sending msg2 

    producer sending msg3 

    producer sending msg4 

    producer sending msg5 

    producer sending msg6 

    producer sending msg7 

    producer sending msg8 

    producer sending msg9 

    producer sending msg10 

    producer sending msg11 

    producer sending msg12 
    In Consumer received msg12 

    producer sending msg13 
    In Consumer received msg13 

    producer sending msg14 
    In Consumer received msg14 

    producer sending msg15 
    In Consumer received msg15 

    producer sending msg16 
    In Consumer received msg16 

    producer sending msg17 
    In Consumer received msg17 

    producer sending msg18 
    In Consumer received msg18 

    producer sending msg19 
    In Consumer received msg19 

    producer sending msg20 
    In Consumer received msg20 

    producer sending msg21 
    In Consumer received msg21 

EDITED:将其中生产者将消息发送到消费者的收听者的功能。而我使用的是默认的配置生产者没有覆盖它

public synchronized void onValueChanged(final MonitorEvent event_) { 


    // Get the value from the DBR 
    try { 
     final DBR dbr = event_.getDBR(); 

     final String[] val = (String[]) dbr.getValue(); 

     producer1.producer.send(new KeyedMessage<String, Signal>   
        (KafkaProperties.topic,new Signal(messageNo))); 
     System.out.println("producer sending msg"+messageNo); 

     messageNo++; 


    } catch (Exception ex) { 
     ex.printStackTrace(); 
    } 
} 
+0

你能显示你的生产者和消费者代码/配置吗?看起来他们中的一些人使用批量操作(事实上这是好事)。 – Dmitry

+0

@Dmitry添加了代码片段。 – Ankita

+0

消费者似乎没问题(财产fetch.size = 1K除外 - 这意味着消费者无法收到更大的消息,但可能不是我们正在寻找的问题)。你可以分享生产者的newProducerConfig()和run()方法的代码吗? – Dmitry

回答

8
  1. 尝试添加props.put("request.required.acks", "1")生产者配置。默认情况下,制作者不会等待确认,并且不保证消息传送。因此,如果您在测试之前启动代理,生产商可能会在代理完全初始化之前开始发送消息,并且可能会丢失前几个消息。

  2. 尝试将props.put("auto.offset.reset", "smallest")添加到消费者配置中。它等于kafka-console-consumer.sh的--from-beginning选项。如果您的消费者晚于生产者启动,且Zookeeper中没有保存偏移数据,则默认情况下它将开始仅使用新消息(请参阅文档中的Consumer configs)。

+0

感谢您的建议。向生产者添加了props.put(“request.required.acks”,“1”),但程序的行为是随机的。我每次用一个新主题运行程序5次。但是,这五次都给出了不同的结果。生产者和消费者在消费者推迟的其余时间里同步。 – Ankita

+0

通过'延迟'你的意思是所有的邮件都收到了,但不是在发送后立即发送?在原始输出中,首先几条消息完全丢失。 – Dmitry

+0

是的,实际上有两种情况: 1)有时候所有的消息都是收到的,但不是在发送后立即发送。 2)其他时间,如提供的输出中所示,丢失了几条消息。 但是,当我从控制台运行此命令“bin/kafka-console-consumer.sh --zookeeper localhost:2181 - topic topicname --from-beginning”时,我在消费者中获得与生产者生成的消息相同数量的消息。 为什么会这样? – Ankita

0

这可能是由于更多的分区比没有消费者。检查主题是否仅使用单个分区创建,然后您不会错过消费者中的任何消息。