2017-03-27 92 views
1

我尝试使用下面的代码来启动服务器卡夫卡:kafka.server.KafkaServerStartable - java.lang.OutOfMemoryError:Java堆空间

public class MockKafkaServer { 

    private static final String LOCALHOST = "127.0.0" + ".1"; 
    private static final int CONSUMER_TIMEOUT_MS = 5000; 
    private static final int CONSUMER_BUFFER_SIZE = 64 * 1024; 
    private static final int PRODUCER_SLEEP_INTERVAL = 100; 

    private final KafkaServerStartable broker; 
    private final MockZooKeeper mockZooKeeper; 

    private KafkaProducer<byte[], byte[]> kafkaProducer; 
    private SimpleConsumer simpleConsumer; 

    private final int port; 


    public MockKafkaServer() throws IOException, InterruptedException { 

     this.mockZooKeeper = new MockZooKeeper(); 
     final int zkPort = mockZooKeeper.start(); 

     this.port = getAvailablePort(); 

     final File logDirectory = Files.createTempDir(); 
     logDirectory.deleteOnExit(); 

     final Properties properties = new Properties(); 
     properties.put("zookeeper.connect", LOCALHOST + ":" + zkPort); 
     properties.put("broker.id", "0"); 
     properties.put("num.partitions", "1"); 
     properties.put("host.name", "localhost"); 
     properties.put("port", String.valueOf(port)); 
     properties.put("log.dir", logDirectory.getAbsolutePath()); 
     properties.put("auto.create.topics.enable", "true"); 

     this.broker = new KafkaServerStartable(new KafkaConfig(properties)); 
    } 

    public void start() throws IOException, InterruptedException { 
     broker.startup(); 
    } 

    public void stop() { 
     broker.shutdown(); 
     broker.awaitShutdown(); 
     mockZooKeeper.stop(); 
    } 

    } 

而单元测试用例调用start()和stop( )用于每个单元测试用例的执行。我观察的是,开始的几次后停下来,我收到以下错误:

7790 [main] FATAL kafka.server.KafkaServerStartable - Fatal error during KafkaServerStartable startup. Prepare to shutdown 
    java.lang.OutOfMemoryError: Java heap space 
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) 
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) 
    at kafka.log.SkimpyOffsetMap.<init>(OffsetMap.scala:44) 
    at kafka.log.LogCleaner$CleanerThread.<init>(LogCleaner.scala:196) 
    at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:86) 
    at kafka.log.LogCleaner$$anonfun$2.apply(LogCleaner.scala:86) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.immutable.Range.foreach(Range.scala:141) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at kafka.log.LogCleaner.<init>(LogCleaner.scala:86) 
    at kafka.log.LogManager.<init>(LogManager.scala:66) 
    at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:647) 
    at kafka.server.KafkaServer.startup(KafkaServer.scala:209) 
    at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39) 
    at com.viper.mockkafkaj.MockKafkaServer.start(MockKafkaServer.java:87) 
    at com.viper.project.restj.RestControllerTest.setUp(RestControllerTest.java:68) 
    at junit.framework.TestCase.runBare(TestCase.java:139) 
    at junit.framework.TestResult$1.protect(TestResult.java:122) 
    at junit.framework.TestResult.runProtected(TestResult.java:142) 
    at junit.framework.TestResult.run(TestResult.java:125) 
    at junit.framework.TestCase.run(TestCase.java:129) 
    at junit.framework.TestSuite.runTest(TestSuite.java:255) 
    at junit.framework.TestSuite.run(TestSuite.java:250) 
    at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:84) 
    at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) 
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:272) 
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:236) 
    at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) 
    at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:310) 
    at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:261) 

我试着Maven的测试插件设置以下值

<plugin> 
    <groupId>org.apache.maven.plugins</groupId> 
    <artifactId>maven-surefire-plugin</artifactId> 
    <configuration> 
     <argLine>-server -Xss128m -Xms64m -Xmx256m -XX:+UseCompressedOops -XX:+UseG1GC -XX:G1HeapWastePercent=50 -XX:+CMSClassUnloadingEnabled -XX:MaxGCPauseMillis=10 -XX:+CMSScavengeBeforeRemark </argLine> 
    </configuration> 
</plugin> 

所以,即使XMX集到1024米,我得到了OutOfMemoryError。任何帮助感谢!

+0

如果它是Linux尝试使用'top'跟踪内存消耗更多的了解它。 – Confused

+0

您是否启用Kafka安全性?什么是Kafka版本? – amethystic

回答

1

它在默认情况下使用高内存竟然是一个问题与卡夫卡0.10.2.0使用日志清理线程。我设置了下面的值,我再也看不到堆空间错误了。

private static final long CLEANER_BUFFER_SIZE = 2 * 1024 * 1024L; 

properties.put("log.cleaner.dedupe.buffer.size", CLEANER_BUFFER_SIZE); 
相关问题