2015-10-29 25 views

UPDATE FROM二○一五年十月三十○日阿卡插播实现比单线程实现较慢


阿卡流是使用异步消息参与者之间传递给 实现流处理阶段。通过异步边界传递数据会产生以下开销:您的 计算似乎只需要大约160ns(源自 单线程度量),而流式传输解决方案每个元素需要大约1μs,其中 占主导地位消息传递。

另一个误解是,说“流”意味着并行:在 代码都计算在一个单一的演员(地图 阶段)顺序运行,因此没有任何好处,可以预计在原始 单线程解决方案。

为了从阿卡提供的并行性收益流,你 需要有多个处理阶段,每个执行的每个元素



object MultiThread { 
    implicit val actorSystem = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    var counter = 0 
    var oldProgess = 0 

    //RunnableFlow: in -> flow -> sink 
    val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f))) 

    val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p))) 

    val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform) 

    val eventToFactorial = Flow[Event].map(SharedFunctions.transform2) 

    val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder => 
    import FlowGraph.Implicits._ 

    val dispatchTuple = builder.add(Balance[(Long, String, Int, Float)](4)) 
    val mergeEvents = builder.add(Merge[Int](4)) 

    dispatchTuple.out(0) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(0) 
    dispatchTuple.out(1) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(1) 
    dispatchTuple.out(2) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(2) 
    dispatchTuple.out(3) ~> tupleToEvent ~> eventToFactorial ~> mergeEvents.in(3) 

    (dispatchTuple.in, mergeEvents.out) 

    val sink = Sink.foreach[Int]{ 
    v => counter += 1 
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, 
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
    if(counter == SharedFunctions.maxEventCount) endAkka() 

    def endAkka() = { 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now) 
    println("Time: " + duration.getMillis + " || Data: " + counter) 

    def main(args: Array[String]) { 
    println("MultiThread started: " + SharedFunctions.startTime) 
    // in.via(eventChef).runWith(sink) 



我是比较新的两阶&阿卡 - 流。我写了一个小测试项目,创建一些事件,直到一个计数器达到特定的数字。对于每个事件,正在计算事件的一个字段的阶乘。我实施了两次。一次使用akka-stream,一次不使用akka-stream(单线程),并比较运行时间。


  • 单事件,而不阿卡流403(+ - 2)MS
  • 与单事件阿卡流444(+ -13)ms的

  • 70Mio事件而不阿卡流11778(+ -70)MS

  • 70Mio事件阿卡回笼75424(+ - 2959)MS




object MultiThread { 
    implicit val actorSystem = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 

    var counter = 0 
    var oldProgess = 0 

    //RunnableFlow: in -> flow -> sink 
    val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f))) 

    val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p))) 

    val sink = Sink.foreach[Int]{ 
    v => counter += 1 
    oldProgess = SharedFunctions.printProgress(oldProgess, SharedFunctions.maxEventCount, counter, 
    DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
    if(counter == SharedFunctions.maxEventCount) endAkka() 

    def endAkka() = { 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now) 
    println("Time: " + duration.getMillis + " || Data: " + counter) 

    def main(args: Array[String]) { 
    import scala.concurrent.ExecutionContext.Implicits.global 
    println("MultiThread started: " + SharedFunctions.startTime) 
    in.via(flow).runWith(sink).onComplete(_ => endAkka()) 



对象SingleThread {

def main(args: Array[String]) { 
    println("SingleThread started at: " + SharedFunctions.startTime) 
    val i = createEvent(0) 
    val duration = new Duration(SharedFunctions.startTime, DateTime.now()); 
    println("Time: " + duration.getMillis + " || Data: " + i) 

    def createEventWorker(oldProgress: Int, count: Int, randDate: Long, name: String, age: Int, myFloat: Float): Int = { 
    if (count == SharedFunctions.maxEventCount) count 
    else { 
     val e = SharedFunctions.transform((randDate, name, age, myFloat)) 
     val p = SharedFunctions.printProgress(oldProgress, SharedFunctions.maxEventCount, count, 
     DateTime.now.getMillis - SharedFunctions.startTime.getMillis) 
     createEventWorker(p, count + 1, 1254785478l, "name", 48, 23.09f) 

    def createEvent(count: Int): Int = { 
    createEventWorker(0, count, 1254785478l, "name", 48, 23.09f) 


object SharedFunctions { 
    val maxEventCount = 70000000 
    val startTime = DateTime.now 

    def transform(t : (Long, String, Int, Float)) : Event = new Event(t._1 ,t._2,t._3,t._4) 
    def transform2(e : Event) : Int = factorial(e.getAgeYrs) 

    def calculatePercentage(totalValue: Long, currentValue: Long) = Math.round((currentValue * 100)/totalValue) 
    def printProgress(oldProgress : Int, fileSize: Long, currentSize: Int, t: Long) = { 
    val cProgress = calculatePercentage(fileSize, currentSize) 
    if (oldProgress != cProgress) println(s"$oldProgress% | $t ms") 

    private def factorialWorker(n1: Int, n2: Int): Int = { 
    if (n1 == 0) n2 
    else factorialWorker(n1 -1, n2*n1) 
    def factorial (n : Int): Int = { 
    factorialWorker(n, 1) 


* Autogenerated by Avro 

public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { 
    public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"week2P2\",\"fields\":[{\"name\":\"timestampMS\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ageYrs\",\"type\":\"int\"},{\"name\":\"sizeCm\",\"type\":\"float\"}]}"); 
    public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } 
    @Deprecated public long timestampMS; 
    @Deprecated public CharSequence name; 
    @Deprecated public int ageYrs; 
    @Deprecated public float sizeCm; 

    * Default constructor. Note that this does not initialize fields 
    * to their default values from the schema. If that is desired then 
    * one should use <code>newBuilder()</code>. 
    public Event() {} 

    * All-args constructor. 
    public Event(Long timestampMS, CharSequence name, Integer ageYrs, Float sizeCm) { 
    this.timestampMS = timestampMS; 
    this.name = name; 
    this.ageYrs = ageYrs; 
    this.sizeCm = sizeCm; 

    public org.apache.avro.Schema getSchema() { return SCHEMA$; } 
    // Used by DatumWriter. Applications should not call. 
    public Object get(int field$) { 
    switch (field$) { 
    case 0: return timestampMS; 
    case 1: return name; 
    case 2: return ageYrs; 
    case 3: return sizeCm; 
    default: throw new org.apache.avro.AvroRuntimeException("Bad index"); 
    // Used by DatumReader. Applications should not call. 
    public void put(int field$, Object value$) { 
    switch (field$) { 
    case 0: timestampMS = (Long)value$; break; 
    case 1: name = (CharSequence)value$; break; 
    case 2: ageYrs = (Integer)value$; break; 
    case 3: sizeCm = (Float)value$; break; 
    default: throw new org.apache.avro.AvroRuntimeException("Bad index"); 

    * Gets the value of the 'timestampMS' field. 
    public Long getTimestampMS() { 
    return timestampMS; 

    * Sets the value of the 'timestampMS' field. 
    * @param value the value to set. 
    public void setTimestampMS(Long value) { 
    this.timestampMS = value; 

    * Gets the value of the 'name' field. 
    public CharSequence getName() { 
    return name; 

    * Sets the value of the 'name' field. 
    * @param value the value to set. 
    public void setName(CharSequence value) { 
    this.name = value; 

    * Gets the value of the 'ageYrs' field. 
    public Integer getAgeYrs() { 
    return ageYrs; 

    * Sets the value of the 'ageYrs' field. 
    * @param value the value to set. 
    public void setAgeYrs(Integer value) { 
    this.ageYrs = value; 

    * Gets the value of the 'sizeCm' field. 
    public Float getSizeCm() { 
    return sizeCm; 

    * Sets the value of the 'sizeCm' field. 
    * @param value the value to set. 
    public void setSizeCm(Float value) { 
    this.sizeCm = value; 

    /** Creates a new Event RecordBuilder */ 
    public static Event.Builder newBuilder() { 
    return new Event.Builder(); 

    /** Creates a new Event RecordBuilder by copying an existing Builder */ 
    public static Event.Builder newBuilder(Event.Builder other) { 
    return new Event.Builder(other); 

    /** Creates a new Event RecordBuilder by copying an existing Event instance */ 
    public static Event.Builder newBuilder(Event other) { 
    return new Event.Builder(other); 

    * RecordBuilder for Event instances. 
    public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event> 
    implements org.apache.avro.data.RecordBuilder<Event> { 

    private long timestampMS; 
    private CharSequence name; 
    private int ageYrs; 
    private float sizeCm; 

    /** Creates a new Builder */ 
    private Builder() { 

    /** Creates a Builder by copying an existing Builder */ 
    private Builder(Event.Builder other) { 
     if (isValidValue(fields()[0], other.timestampMS)) { 
     this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS); 
     fieldSetFlags()[0] = true; 
     if (isValidValue(fields()[1], other.name)) { 
     this.name = data().deepCopy(fields()[1].schema(), other.name); 
     fieldSetFlags()[1] = true; 
     if (isValidValue(fields()[2], other.ageYrs)) { 
     this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs); 
     fieldSetFlags()[2] = true; 
     if (isValidValue(fields()[3], other.sizeCm)) { 
     this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm); 
     fieldSetFlags()[3] = true; 

    /** Creates a Builder by copying an existing Event instance */ 
    private Builder(Event other) { 
     if (isValidValue(fields()[0], other.timestampMS)) { 
     this.timestampMS = data().deepCopy(fields()[0].schema(), other.timestampMS); 
     fieldSetFlags()[0] = true; 
     if (isValidValue(fields()[1], other.name)) { 
     this.name = data().deepCopy(fields()[1].schema(), other.name); 
     fieldSetFlags()[1] = true; 
     if (isValidValue(fields()[2], other.ageYrs)) { 
     this.ageYrs = data().deepCopy(fields()[2].schema(), other.ageYrs); 
     fieldSetFlags()[2] = true; 
     if (isValidValue(fields()[3], other.sizeCm)) { 
     this.sizeCm = data().deepCopy(fields()[3].schema(), other.sizeCm); 
     fieldSetFlags()[3] = true; 

    /** Gets the value of the 'timestampMS' field */ 
    public Long getTimestampMS() { 
     return timestampMS; 

    /** Sets the value of the 'timestampMS' field */ 
    public Event.Builder setTimestampMS(long value) { 
     validate(fields()[0], value); 
     this.timestampMS = value; 
     fieldSetFlags()[0] = true; 
     return this; 

    /** Checks whether the 'timestampMS' field has been set */ 
    public boolean hasTimestampMS() { 
     return fieldSetFlags()[0]; 

    /** Clears the value of the 'timestampMS' field */ 
    public Event.Builder clearTimestampMS() { 
     fieldSetFlags()[0] = false; 
     return this; 

    /** Gets the value of the 'name' field */ 
    public CharSequence getName() { 
     return name; 

    /** Sets the value of the 'name' field */ 
    public Event.Builder setName(CharSequence value) { 
     validate(fields()[1], value); 
     this.name = value; 
     fieldSetFlags()[1] = true; 
     return this; 

    /** Checks whether the 'name' field has been set */ 
    public boolean hasName() { 
     return fieldSetFlags()[1]; 

    /** Clears the value of the 'name' field */ 
    public Event.Builder clearName() { 
     name = null; 
     fieldSetFlags()[1] = false; 
     return this; 

    /** Gets the value of the 'ageYrs' field */ 
    public Integer getAgeYrs() { 
     return ageYrs; 

    /** Sets the value of the 'ageYrs' field */ 
    public Event.Builder setAgeYrs(int value) { 
     validate(fields()[2], value); 
     this.ageYrs = value; 
     fieldSetFlags()[2] = true; 
     return this; 

    /** Checks whether the 'ageYrs' field has been set */ 
    public boolean hasAgeYrs() { 
     return fieldSetFlags()[2]; 

    /** Clears the value of the 'ageYrs' field */ 
    public Event.Builder clearAgeYrs() { 
     fieldSetFlags()[2] = false; 
     return this; 

    /** Gets the value of the 'sizeCm' field */ 
    public Float getSizeCm() { 
     return sizeCm; 

    /** Sets the value of the 'sizeCm' field */ 
    public Event.Builder setSizeCm(float value) { 
     validate(fields()[3], value); 
     this.sizeCm = value; 
     fieldSetFlags()[3] = true; 
     return this; 

    /** Checks whether the 'sizeCm' field has been set */ 
    public boolean hasSizeCm() { 
     return fieldSetFlags()[3]; 

    /** Clears the value of the 'sizeCm' field */ 
    public Event.Builder clearSizeCm() { 
     fieldSetFlags()[3] = false; 
     return this; 

    public Event build() { 
     try { 
     Event record = new Event(); 
     record.timestampMS = fieldSetFlags()[0] ? this.timestampMS : (Long) defaultValue(fields()[0]); 
     record.name = fieldSetFlags()[1] ? this.name : (CharSequence) defaultValue(fields()[1]); 
     record.ageYrs = fieldSetFlags()[2] ? this.ageYrs : (Integer) defaultValue(fields()[2]); 
     record.sizeCm = fieldSetFlags()[3] ? this.sizeCm : (Float) defaultValue(fields()[3]); 
     return record; 
     } catch (Exception e) { 
     throw new org.apache.avro.AvroRuntimeException(e); 

只是为了完整性,您能否给出Event定义。我想尝试优化您的多线程代码... –


确定我已经添加了它 –



除了Roland的解释,我完全同意,应该理解,akka Streams不仅仅是一个并发编程框架。流也提供背压,这意味着事件仅在Source产生,当需要在Sink中处理它们时。这种需求传递在每个处理步骤中增加了一些开销。




是的。流的良好用例涉及实时流式传输或大量数据的流式传输。解析巨大的视频文件时,无需将所有内容全部读入内存,通过互联网连接获取数百万条记录,或者在解析之前使用文件观察器等待文件丢失是很好的例子。它们要么限制最终速度不是必需的环境中的复杂性,要​​么是不明智的(数据进入较长时间或需要数百万次网络调用),或者在有大量数据的情况下帮助缓解复杂性。 –




为了受益于Akka Streams提供的并行性,您需要有多个处理阶段,每个处理阶段每个元素执行1μs以上的任务,另请参阅the docs


不知道为什么这不是被接受的答案 - 它早于接受的答案,实际上与它一致,只是次要的次要点回答。 – doug