2016-12-14 77 views
1

我有以下流:阿卡流TCP +阿卡流卡夫卡生产者未停止不发布消息,而不是错误-ING出

Source(IndexedSeq(ByteString.empty)) 
.via(
    Tcp().outgoingConnection(bsAddress, bsPort) 
    .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true)) 
    .map(_.utf8String) 
) 
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m)) 
.runWith(
    Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer) 
     .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}") 
) 
).onComplete { 
    case Success(Done) => printAndByeBye("Stream ends successfully") 
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString) 
    } 

它正常工作了一段时间,我可以消耗填充上的消息卡夫卡话题。但有时候,显然是随机的间隔,没有更多的消息发布,并且这段代码没有记录任何错误(printAndByeBye将打印传递的消息并终止参与者系统。)重新启动应用程序后,消息继续流。

关于如何知道这里发生了什么的任何想法?

编辑:我把卡蒙它,我可以看到以下行为:

Mailbox Size per Actor

Time in Mailbox per Actor

Processing Time per Actor

它看起来像是停不通知流应该停止,但我不知道如何明确并停止流。

回答

0

流没有失败,但TCP流已空闲,因为设备发布数据在一段时间后停止发送数据而没有丢失连接。 而不是使用简单的:

TCP().outgoingConnection(bsAddress, bsPort) 

我最终使用:

def outgoingConnection(
remoteAddress: InetSocketAddress, 
localAddress: Option[InetSocketAddress]   = None, 
options:  immutable.Traversable[SocketOption] = Nil, 
halfClose:  Boolean        = true, 
connectTimeout: Duration       = Duration.Inf, 
idleTimeout: Duration       = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = ??? 

所以

Tcp().outgoingConnection(bsAddress, bsPort) 

通过通知的idleTimeout成为

val connectTimeout: Duration = 1 second 
val idleTimeout: Duration = 2 second 
Tcp().outgoingConnection(
    remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort), 
    connectTimeout = connectTimeout, 
    idleTimeout = idleTimeout 
) 

,在遵循启动失败一个d另一个流程可以重新启动。

0

我倒是建议创建与监督的流动属性,像你的TCP连接来处理可能出现的异常:

val flow = 
    Tcp().outgoingConnection("", 12) 
      .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true)) 
      .map(_.utf8String).withAttributes(ActorAttributes.supervisionStrategy { 
     case ex: Throwable => 
     println("Error ocurred: " + ex) 
     Supervision.Resume 
    } 

Source(IndexedSeq(ByteString.empty)) 
.via(flow) 
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m)) 
.runWith(
    Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer) 
     .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}") 
) 
).onComplete { 
    case Success(Done) => printAndByeBye("Stream ends successfully") 
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString) 
    } 

如果存在与流量流中的所有错误停止。有了这个配置,你会看到流是否引发了任何异常。

+0

流不失败的。没有错误记录。 – Darien

0

如果一切都安静下来可能被降到背压应用的地方。 尝试并选择性地用非背压感知阶段替换您的背压感知阶段,并检查问题是否仍然存在。 你的情况有背压的2个可能的来源:

1)的TCP连接

你可以试试,并附ByteString无限源卡夫卡,线沿线的做事:

Source.cycle(() => List(???).iterator) 
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m)) 
.runWith(
    Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer) 
     .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}") 
) 
).onComplete { 
    case Success(Done) => printAndByeBye("Stream ends successfully") 
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString) 
    } 

2)卡夫卡下沉

一些测井

Source(IndexedSeq(ByteString.empty)) 
.via(
    Tcp().outgoingConnection(bsAddress, bsPort) 
    .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true)) 
    .map(_.utf8String) 
) 
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m)) 
.runForeach(println) 
.onComplete { 
    case Success(Done) => printAndByeBye("Stream ends successfully") 
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString) 
    } 
取代它

您是否只能在两种情况中的一种情况下看到问题?同时?没有?

+0

有没有办法监控这种情况?或者当背压停止流时超时? – Darien

+0

有捕获阶段背压超时http://doc.akka.io/docs/akka/2.4.14/scala/stream/stages-overview.html#backpressureTimeout –

+0

它发生在使用TCP流中获取数据。 – Darien