2017-08-15 46 views
0

我想在我的Akka Streams工作流程中将列表项目转换为单个地图作为舞台。举个例子,说我有下面的课。将转换列表映射到Akka Stream中的地图

case class MyClass(myString: String, myInt: Int) 

我想myStringMyClassList一个实例转换为Map该键他们。

所以,如果我有List(MyClass("hello", 1), MyClass("world", 2), MyClass("hello", 3)),我希望地图hello映射List(1, 3)world映射List(2)的。

以下是我到目前为止的内容。

val flowIWant = { 
    Flow[MyClass].map { entry => 
     entry.myString -> entry.myInt 
    } ??? // How to combine tuples into a single map? 
} 

而且,这将是理想的流向最终产生各个地图的实体,所以我可以用每一独立工作,为下一阶段(我想每个地图实体单独再做一次手术)。

我不确定这是一个fold类型的操作还是什么。谢谢你的帮助。

+0

每一个都是你写的:'entry => entry.myString - > entry.myInt'。不确定你的意思/想要达到什么目的。 –

+0

这只会将它们转换为元组不会吗?我想汇总数据。所以我想要“你好” - >列表(1,3)而不是“你好” - > 1和“你好” - > 3 –

回答

0

真的不清楚你实际上想要得到什么。从你说你的问题的方式,我认为至少有以下变换你可能意味着:

Flow[List[MyClass], Map[String, Int], _] 
Flow[List[MyClass], Map[String, List[Int]], _] 
Flow[MyClass, (String, Int), _] 
Flow[MyClass, (String, List[Int]), _] 

从你的措辞我怀疑很可能是你想要的东西就像是最后一个,但它并没有真正做感觉有这样一个转换,因为它不能发射任何东西 - 为了组合所有对应于一个键的值,你需要读取整个输入。

如果您有一个输入流MyClass并想从中获得Map[String, List[Int]],则没有其他选择,只能将其连接到折叠接收器并执行流直到完成。例如:

val source: Source[MyClass, _] = ??? // your source of MyClass instances 

val collect: Sink[MyClass, Future[Map[String, List[Int]]] = 
    Sink.fold[Map[String, List[Int]], MyClass](Map.empty.withDefaultValue(List.empty)) { 
    (m, v) => m + (v.myString -> (v.myInt :: m(v.myString))) 
    } 

val result: Future[Map[String, List[Int]]] = source.toMat(collect)(Keep.right).run() 
0

我想你想scan它:

source.scan((Map.empty[String, Int], None: Option((String, Int))))((acc, next) => { val (map, _) 
    val newMap = map.updated(next._1 -> map.getOrElse(next._1, List())) 
    (newMap, Some(newMap.get(next._1)))}).map(_._2.get) 

这样,您就可以检查Map内容直到内存被耗尽。 (与最后一个元素的含量是在最初的元组包裹在一个Option的价值的一部分。)

0

这可能是你在找什么:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 

import scala.util.{Failure, Success} 

object Stack { 

    def main(args: Array[String]): Unit = { 
    case class MyClass(myString: String, myInt: Int) 
    implicit val actorSystem = ActorSystem("app") 
    implicit val actorMaterializer = ActorMaterializer() 
    import scala.concurrent.ExecutionContext.Implicits.global 

    val list = List(MyClass("hello", 1), MyClass("world", 2), MyClass("hello", 3)) 

    val eventualMap = Source(list).fold(Map[String, List[Int]]())((m, e) => { 
     val newValue = e.myInt :: m.get(e.myString).getOrElse(Nil) 
     m + (e.myString -> newValue) 
    }).runWith(Sink.head) 
    eventualMap.onComplete{ 
     case Success(m) => { 
     println(m) 
     actorSystem.terminate() 
     } 
     case Failure(e) => { 
     e.printStackTrace() 
     actorSystem.terminate() 
     } 
    } 
    } 
} 

有了这个代码,你会得到下面的输出:

Map(hello -> List(3, 1), world -> List(2)) 

如果你想有以下输出:

Vector(Map(), Map(hello -> List(1)), Map(hello -> List(1), world -> List(2)), Map(hello -> List(3, 1), world -> List(2))) 

只需使用扫描而不是折叠并使用Sink.seq运行。

折叠和扫描之间的差异是折叠等待上游完成之前完成按下,而扫描推动每个更新下游。

相关问题