2017-09-09 39 views
0

在akka-http websocket应用程序中,我有一条Route回应给定的消息,并且我还需要在应用程序中维护状态。所以下面的工作正常:使用statefulMapConcat返回源代码

override protected def routes: Route = 
    pathSuffix("echo") { 
     handleWebSocketMessages(echoMessageFlow) 
    } 

    def echoMessageFlow: Flow[Message, Message, NotUsed] = Flow[Message].statefulMapConcat {() => 
    implicit var state = new SessionState() 
    msg: Message => 
    List(msg, msg, msg) // echo the message back 3 times 
    } 

但是,我也需要扼杀回声消息,所以只有一个每秒发生。所以我希望能够做到这一点:

def echoMessageFlow: Flow[Message, Message, NotUsed] = Flow[Message].statefulMapConcat {() => 
    implicit var state = new SessionState() 
    msg: Message => 
    Source(List(msg, msg, msg)).throttle(1, 1 second, 1, ThrottleMode.shaping) 
    } 

然而,statefulMapConcat返回的功能要求,这是一个Iterable。有什么方法可以返回Source吗?

回答

1

您可以使用flatMapConcat(或flatMapMerge,如果您需要并行化)并为其赋予每个传入元素产生Source的函数。

每个产生的Source都可以通过追加throttle组合子来限制,就像你上面做的那样。

最后,如果你想让你的Source成为有状态,你可以使用Source.unfold来创建它。下面

实施例(使用产生的消息作为内部状态的计数):

def echoMessageFlow: Flow[Message, Message, NotUsed] = Flow[Message].flatMapConcat { msg: Message => 
    Source.unfold(0){ count: Int ⇒ 
     if (count < 3) 
     Some(count + 1, msg) 
     else 
     None 
    }.throttle(1, 1.second, 1, ThrottleMode.shaping) 
    } 
+0

这正是我一直在寻找。谢谢! –