2015-11-17 80 views
9

我对使用kafka主题的代码有一些JUnit测试。我尝试过的模拟卡夫卡主题不起作用,网上发现的例子很老,所以它们也不适用于0.8.2.1。如何使用0.8.2.1创建模拟卡夫卡主题?如何为junit测试实例化模拟卡夫卡主题?

澄清:我选择使用该主题的实际嵌入式实例来测试真实实例,而不是嘲笑mockito中的手势。这样我就可以测试我的自定义编码器和解码器的实际工作情况,并且在我使用真正的kafka实例时不会失败。

回答

6

https://gist.github.com/asmaier/6465468#file-kafkaproducertest-java

这个例子是更新的新版本0.8.2.2要工作。下面是代码片段与Maven的依赖关系:

的pom.xml:

<dependencies> 
<dependency> 
    <groupId>junit</groupId> 
    <artifactId>junit</artifactId> 
    <version>4.12</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <classifier>test</classifier> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 
</dependencies> 

KafkaProducerTest.java:

import java.nio.charset.StandardCharsets; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import org.I0Itec.zkclient.ZkClient; 
import org.junit.Test; 
import kafka.admin.TopicCommand; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.producer.KeyedMessage; 
import kafka.producer.Producer; 
import kafka.producer.ProducerConfig; 
import kafka.server.KafkaConfig; 
import kafka.server.KafkaServer; 
import kafka.utils.MockTime; 
import kafka.utils.TestUtils; 
import kafka.utils.TestZKUtils; 
import kafka.utils.Time; 
import kafka.utils.ZKStringSerializer$; 
import kafka.zk.EmbeddedZookeeper; 
import static org.junit.Assert.*; 

/** 
* For online documentation 
* see 
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
* https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala 
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
*/ 
public class KafkaProducerTest { 

    private int brokerId = 0; 
    private String topic = "test"; 

    @Test 
    public void producerTest() throws InterruptedException { 

     // setup Zookeeper 
     String zkConnect = TestZKUtils.zookeeperConnect(); 
     EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect); 
     ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$); 

     // setup Broker 
     int port = TestUtils.choosePort(); 
     Properties props = TestUtils.createBrokerConfig(brokerId, port, true); 

     KafkaConfig config = new KafkaConfig(props); 
     Time mock = new MockTime(); 
     KafkaServer kafkaServer = TestUtils.createServer(config, mock); 

     String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"}; 
     // create topic 
     TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments)); 

     List<KafkaServer> servers = new ArrayList<KafkaServer>(); 
     servers.add(kafkaServer); 
     TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000); 

     // setup producer 
     Properties properties = TestUtils.getProducerConfig("localhost:" + port); 
     ProducerConfig producerConfig = new ProducerConfig(properties); 
     Producer producer = new Producer(producerConfig); 

     // setup simple consumer 
     Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1); 
     ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties)); 

     // send message 
     KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8)); 

     List<KeyedMessage> messages = new ArrayList<KeyedMessage>(); 
     messages.add(data); 

     producer.send(scala.collection.JavaConversions.asScalaBuffer(messages)); 
     producer.close(); 

     // deleting zookeeper information to make sure the consumer starts from the beginning 
     // see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka 
     zkClient.delete("/consumers/group0"); 

     // starting consumer 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, 1); 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); 
     ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 

     if(iterator.hasNext()) { 
      String msg = new String(iterator.next().message(), StandardCharsets.UTF_8); 
      System.out.println(msg); 
      assertEquals("test-message", msg); 
     } else { 
      fail(); 
     } 

     // cleanup 
     consumer.shutdown(); 
     kafkaServer.shutdown(); 
     zkClient.close(); 
     zkServer.shutdown(); 
    } 
} 

一定要检查你的MVN依赖性:树任何冲突库。我不得不添加排除为SLF和log4j的:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <classifier>test</classifier> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.2</version> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 

我期待到使用Apache的馆长另一种选择: Is it possible to start a zookeeper server instance in process, say for unit tests?

<dependency> 
    <groupId>org.apache.curator</groupId> 
    <artifactId>curator-test</artifactId> 
    <version>2.2.0-incubating</version> 
    <scope>test</scope> 
</dependency> 

TestingServer zkTestServer; 

@Before 
public void startZookeeper() throws Exception { 
    zkTestServer = new TestingServer(2181); 
    cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000)); 
} 

@After 
public void stopZookeeper() throws IOException { 
    cli.close(); 
    zkTestServer.stop(); 
} 
+0

您能否提供适用于版本0.11.0.2的代码。以上代码无法使用 – dhroove

2

你有没有试过用Mockito这样的嘲讽框架嘲笑kafka消费者对象?

+0

我宁愿有一个模拟版本,卡夫卡,所以我知道生产者和消费者正在与之合作。网上有一些例子(例如:https://ransilberman.wordpress.com/2013/07/19/how-to-unit-test-kafka)。但是,它们适用于旧版本,所以它不再适用于0.8.2.1。 – Chip