2015-06-17 28 views
3

我想通过单个API调用发送和获取来自Kafka的数据(请参见下图)。如何在单个API调用中发送和获取来自Kafka的数据

enter image description here

这可能吗?我已经知道如何使数据向一个方向发展(例如,Spark Streaming使用Kafka使用者API读取数据)。我也知道如何通过做两种单向的方法来“伪造”它(例如,Web应用程序既是生产者又是消费者)。但是,当Web应用程序进行API调用时,我只希望它必须处理它自己的记录,而不是主题中的所有记录,所以这看起来是错误的方法。我想过的

其他次优的方法:

  1. 保存星火流导致数据库,以便Web应用程序可以不断查询数据库,直到结果显示出来。我担心这会消耗大量资源并延迟响应时间。
  2. 每次打电话给卡夫卡制作人时,都会创建一个短暂/临时消费者。临时消费者会过滤除查找的记录以外的所有记录。当它找到它正在寻找的记录时,临时消费者就会关闭。我不认为这会起作用,因为API调用者关心的记录可能会到不同的分区,所以它永远不会被发现。
  3. 为每个Web应用程序的客户API调用创建一个临时主题。我不确定卡夫卡是否会抱怨太多的话题。

有什么建议吗?

回答

2

我所做的是....

  1. 创建synProducer谁与一键发送数据和具有名称作为发送消息的关键主题创建一个消费者。
  2. 然后,synConsumer处理消息并回复步骤1中消费者正在等待的主题。
  3. 删除临时主题

这种方法的缺点是,问题并没有立即删除。

+1

请在这里概述代码。 – Adriaan

+0

这听起来与我在原始问题中列出的第三种方法类似。这不会压倒Zookeeper(跟踪主题的实用程序)吗?通过这种方法,卡夫卡和Zookeeper似乎在做同样的工作量,而通常Zookeeper的占地面积很小。 – user554481

+0

是的,这可能会压倒服务器,因为删除的主题不会立即从Kafka服务器中消失......我只是在其他答案中发布新方法。 –

0

我会建议你第三个,但有两个主题:1为请求和1为响应。这是一个例子:

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

public class ConsumerGroupExample extends Thread { 

    private final ConsumerConnector consumer; 
    private final String topic; 
    private ConsumerIterator<byte[], byte[]> it; 
    private String mensaje=""; 

    public ConsumerGroupExample(Properties props, String a_topic) 
    { 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); 
     this.topic = a_topic; 

     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, 1); 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

     KafkaStream stream = streams.get(0); 
     it = stream.iterator(); 
    } 

    public void shutdown() 
    { 
     if (consumer != null) consumer.shutdown(); 
    } 

    public void run() 
    { 
     if (it.hasNext()) 
     { 
      mensaje = new String(it.next().message()); 
     } 
     System.out.println(mensaje); 
    } 

    public String getMensaje() 
    { 
     return this.mensaje; 
    } 

    public static void main(String[] args) { 

     Properties props = new Properties(); 
     props.put("zookeeper.connect", "localhost:2181"); 
     props.put("group.id", "Group"); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 
     props.put("consumer.timeout.ms", "10000"); 

     ConsumerGroupExample example = new ConsumerGroupExample(props, "topicFoRResponse"); 

     props = new Properties(); 
     props.put("metadata.broker.list", "localhost:9092"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("request.required.acks", "1"); 
     ProducerConfig config = new ProducerConfig(props); 


     example.start(); 
     try { 

      Producer<String, String> colaParaEscritura; 
      KeyedMessage<String, String> data = new KeyedMessage<String, String>("topicForRequest", " message "); 
      colaParaEscritura = new kafka.javaapi.producer.Producer<String, String>(config); 
      colaParaEscritura.send(data); 
      System.out.println("enviado"); 
      colaParaEscritura.close(); 

      example.join(); 

      System.out.println("final"+ example.getMensaje()); 

     } 
     catch (InterruptedException ie) { 

     } 
     example.shutdown(); 
    } 

} 
相关问题