2017-02-22 60 views
4

我有一个具有以下驱动程序代码的流应用程序,用于实时消息转换。使用UncaughtExceptionHandler重新启动或关闭流的正确方法

String topicName = ... 
KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, String> source = builder.stream(topicName); 

source.transform(() -> new MyTransformer()).to(...); 

KafkaStreams streams = new KafkaStreams(builder, appConfig); 
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 
    public void uncaughtException(Thread t, Throwable e) { 
     logger.error("UncaughtExceptionHandler " + e.getMessage()); 
     System.exit(0); 
    } 
}); 


streams.cleanUp(); 
streams.start(); 

Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 

经过几分钟的执行后,应用程序抛出下面的异常,然后不通过流进展。

[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager 
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72) 
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89) 
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) 
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) 
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11 
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101) 
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69) 
    ... 13 more 

我试着冲了/tmp/kafka-streams/TRANSFORMATION-APP目录并重新启动应用程序,但再次抛出同样的异常。我注意到的一件事是,应用程序工作正常,直到它转换所有积压消息,但在处理一些新消息后抛出异常!

有时它也会抛出下面未捕获的异常。

[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created 

[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed 
to rebalance 

抛出这些异常之一后,应用程序仍在运行,但没有在流中进行。

处理这些错误的正确方法是什么?是否有可能以编程方式重新启动流,而不杀死应用程序?这个程序是在monit。在最坏的情况下,我宁愿正确终止应用程序(没有任何消息丢失),以便monit可以重新启动它。

输入主题有100个分区,我在应用程序配置中将num.stream.threads设置为100。该应用程序在Kafka 0.10.1.1-cp1.

回答

4

Kakfa 0.10.1.x有一些关于多线程的错误。你可以升级到0.10.2(AK今天公布,CP 3.2应该会很快跟进)或应用以下解决方法:

  • 使用单个线程的执行仅仅是
  • ,如果你需要更多的线程,启动多个实例
  • 对于每个实例,配置不同的状态目录

在重新启动之前,您可能还需要删除本地状态目录(只有一次)以进入总体一致的应用程序状态。

无论如何,都不会有数据丢失。即使在失败的情况下,Kafka Streams也能保证至少处理一次语义。这也适用于你本地的商店 - 在你删除本地状态目录之后,启动时这些状态将从底层的Kafka更新日志主题(尽管这是一个昂贵的操作)重新创建。

UncaughtExceptionHandler只能为您提供一种方法来找出线程死亡。它不(直接)帮助重新启动你的应用程序。要恢复死亡线程,您需要完全关闭KafkaStreams实例并创建/启动一个新实例。我们希望在未来为此增加更好的支持。

+1

谢谢。我用单线程重新启动了应用程序。它运行良好一段时间,但抛出'未能重新平衡'异常,这是'UncaughtExceptionHandler'中捕获的异常。为了解决这个问题,我增加了内部生产者/消费者的rebalance.backoff.ms和zookeeper.session.timeout.ms参数。现在它似乎运行良好! – Samy

+0

作为后续工作,是否有任何最佳实践要完全关闭流实例?在我的'UncaughtExceptionHandler'中启动流之前,我使用'streams :: close'。我没有后果.. – Samy

+1

没关系。您应该只有两个线程不能同时调用'streams :: close' - 这可能会导致死锁。否则,可以在异常处理程序中关闭。 –

相关问题