我有以下流:阿卡流TCP +阿卡流卡夫卡生产者未停止不发布消息,而不是错误-ING出
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
它正常工作了一段时间,我可以消耗填充上的消息卡夫卡话题。但有时候,显然是随机的间隔,没有更多的消息发布,并且这段代码没有记录任何错误(printAndByeBye将打印传递的消息并终止参与者系统。)重新启动应用程序后,消息继续流。
关于如何知道这里发生了什么的任何想法?
编辑:我把卡蒙它,我可以看到以下行为:
它看起来像是停不通知流应该停止,但我不知道如何明确并停止流。
流不失败的。没有错误记录。 – Darien