2015-06-24 41 views
0

我想为卡夫卡0.8.2中的卡夫卡斯卡拉客户端的抽象编写一个简单的测试。它基本上只是写信给卡夫卡,然后我尝试将其读回来。但是,我遇到了间歇性失败的问题,所以我将测试代码烧录到下面的代码中。此测试有时(很少)通过,有时会失败。我在做什么?卡夫卡测试失败/间歇地成功

package mykafkatest 

import java.net.ServerSocket 
import java.nio.file.Files 
import java.util.{UUID, Properties} 

import kafka.consumer.{Whitelist, ConsumerConfig, Consumer} 
import kafka.producer.{ProducerConfig, Producer, KeyedMessage} 
import kafka.serializer.StringDecoder 
import kafka.server.KafkaConfig 
import kafka.server.KafkaServerStartable 
import org.apache.curator.test.TestingServer 

import scala.concurrent.{Await, Future} 
import scala.concurrent.duration._ 

class KafkaSenderTest extends org.scalatest.FunSpecLike with org.scalatest.ShouldMatchers with org.scalatest.BeforeAndAfterAll { 

    import scala.concurrent.ExecutionContext.Implicits.global 
    val zkServer = new TestingServer() 

    val socket = new ServerSocket(0) 
    val port = socket.getLocalPort.toString 
    socket.close() 
    val tmpDir = Files.createTempDirectory("kafka-test-logs") 

    val serverProps = new Properties 
    serverProps.put("broker.id", port) 
    serverProps.put("log.dirs", tmpDir.toAbsolutePath.toString) 
    serverProps.put("host.name", "localhost") 
    serverProps.put("zookeeper.connect", zkServer.getConnectString) 
    serverProps.put("port", port) 

    val config = new KafkaConfig(serverProps) 
    val kafkaServer = new KafkaServerStartable(config) 

    override def beforeAll ={ 
    kafkaServer.startup() 
    } 

    override def afterAll = { 
    kafkaServer.shutdown() 
    } 

    it("should put messages on a kafka queue") { 
    println("zkServer: " + zkServer.getConnectString) 
    println("broker port: " + port) 

    val consumerProps = new Properties() 
    consumerProps.put("group.id", UUID.randomUUID().toString) 
    consumerProps.put("zookeeper.connect", zkServer.getConnectString) 

    val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) 
    val topic = "some-topic" 
    val filterSpec = new Whitelist(topic) 
    val stream = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head 

    val producerProps = new Properties() 
    producerProps.put("metadata.broker.list","localhost:"+port) 

    val sender = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps)) 
    val keyedMessage = new KeyedMessage[Array[Byte], Array[Byte]](topic, "awesome message".getBytes("UTF-8")) 
    sender.send(keyedMessage) 

    val msg = Await.result(Future { stream.take(1) }, 5 seconds) 
    msg.headOption should not be(empty) 

    } 
} 

编辑: 我已经创建了以下build.sbt和上面的代码为测试类新项目。

name := "mykafkatest" 

version := "1.0" 

scalaVersion := "2.11.5" 


libraryDependencies ++= Seq(
    "org.apache.kafka" %% "kafka" % "0.8.2.0", 

    "org.scalatest" %% "scalatest" % "2.2.2" % "test", 
    "org.apache.curator" % "curator-test" % "2.7.0" % "test" 
) 

而且测试似乎更频繁地通过,但它仍不能间断......

+0

有什么错误讯息? 1. val stream是否可以使用String而生产者使用Array [byte]? 2.如果将'Thread.sleep(5)'放在'sender.send'之后,会有帮助吗? –

+0

@cppinitiator当出现错误时,它来自'Await.result(Future {stream.take(1)},5秒)'给出'TimeoutException'。所以有时候这个测试通过了(而且速度很快),但是通常它会失败......在发送之后添加Thread.sleep(5)并没有帮助。如果这是由于字符串/数组[字节]造成的错误,我认为测试不会通过。 –

+0

看来你使用的是StringDecoder,但是发送者需要Array [Byte]。如何将'val sender'和'val keyedMessage'都改为[String,String]? –

回答

2

您可能有竞争条件导致消费者在发送消息后实际完成其初始化,然后忽略该消息,因为它默认情况下以最大偏移量开始。

尝试增加

consumerProps.put("auto.offset.reset", "smallest") 

你的消费特性

+0

这个配置适用于我的1条消息! – Noah

+0

这似乎工作。为了我的好处,如果在创建'stream'后添加'Thread.sleep(XXX)'会有相同的结果吗?我尝试过,但这似乎导致测试失败一致 –

+0

它会有一个随机概率(随着睡眠时间增加),这是不一样的结果相同的结果。另外,在测试中睡觉会增加运行所需的时间。如果您不想使用最小偏移量,您可以执行的操作是发送“预热”消息,直到您成功接收到一条消息,然后开始发送一批固定的消息,以供您进行实际测试。但似乎更复杂^^ – C4stor

0

我认为这是某种信息的缓冲问题。如果您发送200条消息(对我而言):

(1 to 200).foreach(i => sender.send(keyedMessage)) 

199消息失败。我试着改变配置,但找不到任何魔法让1条消息工作,但我确定有一些配置可以使这项工作。