我有一个宁静的API接收一组JSON消息,这些消息将被转换为Avro个别消息,然后发送给Kafka。在路由内部,我调用了3个不同的角色:1)一个actor出去并从磁盘2中检索Avro模式),然后遍历JSON消息数组,并将其与第二个actor中的Avro模式进行比较。如果任何消息没有验证,那么我需要将响应返回给API的调用者并停止处理。 3)遍历数组并传入第3个参与者,该参与者接受JSON对象,将其转换为Avro消息并发送给Kafka主题。在喷雾路线中链接Akka Actor
如果我遇到问题,我的头部缠绕的是如何在路线中停止处理,如果其中一个演员失败。我将请求上下文传递给每个actor,并调用它是完整的方法,但它似乎不立即停止,即使不应该,下一个actor仍然处理。下面是我在做什么的航线代码片段:
post {
entity(as[JsObject]) { membersObj =>
requestContext =>
val membersJson = membersObj.fields("messages").convertTo[JsArray].elements
val messageService = actorRefFactory.actorOf(Props(new MessageProcessingServicev2()))
val avroService = actorRefFactory.actorOf(Props(new AvroSchemaService()))
val validationService = actorRefFactory.actorOf(Props(new JSONMessageValidationService()))
implicit val timeout = Timeout(5 seconds)
val future = avroService ? AvroSchema.MemberSchema(requestContext)
val memberSchema:Schema = Await.result(future, timeout.duration).asInstanceOf[Schema]
for (member <- membersJson) validationService ! ValidationService.MemberValidation(member.asJsObject, memberSchema, requestContext)
for (member <- membersJson) (messageService ! MessageProcessingv2.ProcessMember(member.asJsObject, topicName, memberSchema, requestContext))
我已经经历了很多围绕这一话题的博客/书籍/幻灯片和不知道的最好的方法是什么的看着。我一直在使用Scala/Akka大约2个月,基本上我只是自学了一些我一直需要的东西。因此,对于经验丰富的Scala/Akka/Spray开发人员所具有的任何洞察力,都非常感谢。我想过的一件事就是把这3名演员包装在一位“主人”演员中,并让每个演员都成为这个演员的孩子,并试图像那样接近它。
卡洛斯,谢谢你的想法。因此,我所做的就是创建一个路由调用的“主管”角色,并且该角色处理与路由的接口(管理来自其他3个角色的成功和错误消息,然后一旦Actor A成功,消息演员B.如果演员B成功,则调用演员C.如果演员A,B,C中存在错误,则将错误消息发回给监督演员。现在就像魅力一样工作。 –