2015-05-14 46 views
0

我有一个宁静的API接收一组JSON消息,这些消息将被转换为Avro个别消息,然后发送给Kafka。在路由内部,我调用了3个不同的角色:1)一个actor出去并从磁盘2中检索Avro模式),然后遍历JSON消息数组,并将其与第二个actor中的Avro模式进行比较。如果任何消息没有验证,那么我需要将响应返回给API的调用者并停止处理。 3)遍历数组并传入第3个参与者,该参与者接受JSON对象,将其转换为Avro消息并发送给Kafka主题。在喷雾路线中链接Akka Actor

如果我遇到问题,我的头部缠绕的是如何在路线中停止处理,如果其中一个演员失败。我将请求上下文传递给每个actor,并调用它是完整的方法,但它似乎不立即停止,即使不应该,下一个actor仍然处理。下面是我在做什么的航线代码片段:

post { 
entity(as[JsObject]) { membersObj => 
     requestContext => 
     val membersJson = membersObj.fields("messages").convertTo[JsArray].elements 
     val messageService = actorRefFactory.actorOf(Props(new MessageProcessingServicev2())) 
     val avroService = actorRefFactory.actorOf(Props(new AvroSchemaService())) 
     val validationService = actorRefFactory.actorOf(Props(new JSONMessageValidationService())) 

     implicit val timeout = Timeout(5 seconds) 

     val future = avroService ? AvroSchema.MemberSchema(requestContext) 
     val memberSchema:Schema = Await.result(future, timeout.duration).asInstanceOf[Schema] 

     for (member <- membersJson) validationService ! ValidationService.MemberValidation(member.asJsObject, memberSchema, requestContext) 

     for (member <- membersJson) (messageService ! MessageProcessingv2.ProcessMember(member.asJsObject, topicName, memberSchema, requestContext)) 

我已经经历了很多围绕这一话题的博客/书籍/幻灯片和不知道的最好的方法是什么的看着。我一直在使用Scala/Akka大约2个月,基本上我只是自学了一些我一直需要的东西。因此,对于经验丰富的Scala/Akka/Spray开发人员所具有的任何洞察力,都非常感谢。我想过的一件事就是把这3名演员包装在一位“主人”演员中,并让每个演员都成为这个演员的孩子,并试图像那样接近它。

回答

0

由于您正在使用异步处理(!),所以在发送消息后无法控制处理。您需要使用ask(?),这将返回您可以使用的未来。

但我有一个更好的主意。你可以从第一个演员发送消息到第二个演员。而不是将结果返回给第一个参与者,您可以将消息发送给第三个参与者以继续计算。

+0

卡洛斯,谢谢你的想法。因此,我所做的就是创建一个路由调用的“主管”角色,并且该角色处理与路由的接口(管理来自其他3个角色的成功和错误消息,然后一旦Actor A成功,消息演员B.如果演员B成功,则调用演员C.如果演员A,B,C中存在错误,则将错误消息发回给监督演员。现在就像魅力一样工作。 –