2013-07-12 100 views
4

有几个人与我一起在一个项目上工作,一直在试图找出处理这个问题的最佳方法。看起来这应该是一个经常需要的标准事物,但由于某种原因,我们似乎无法得到正确的答案。了解akka演员何时完成

如果我有一些工作要做,并且我在路由器上扔了一堆消息,我怎么知道所有的工作都完成了?例如,如果我们正在读取一百万行文件的行,并将行发送给行动者以处理此行,并且您需要处理下一个文件,但必须等待第一个文件完成,那么如何知道它何时发生做完了?

还有一点评论。我知道并使用了Patters.ask()使用的Await.result()和Await.ready()。一个区别是,每条生产线都会有一个未来,我们将有大量的这些期货等待,而不仅仅是一个。此外,我们正在填充一个占用大量内存的大型领域模型,并且不希望增加额外的内存来保存等待编写的内存中的相同数量的未来,同时使用每个人在完成它的工作后完成而没有等待内存等待组成。

我们使用Java而不是Scala。

伪代码:

for(File file : files) { 
    ... 
    while((String line = getNextLine(fileStream)) != null) { 
     router.tell(line, this.getSelf()); 
    } 
    // we need to wait for this work to finish to do the next 
    // file because it's dependent on the previous work 
} 

这似乎是你经常想做很多工作,当它与演员完成知道。

回答

4

我相信我有你的解决方案,它不涉及累积一大堆Future s。首先是高层次的概念。将有两个参与者参与这个流程。首先,我们将拨打FilesProcessor。这个演员将是短暂的和有状态的。每当你想要顺序处理一堆文件时,你首先创建一个这个actor的实例,并传递一个包含你想处理的文件名称(或路径)的消息。当它完成所有文件的处理时,它会自行停止。第二位演员,我们将拨打LineProcessor。这位演员是无国籍的,长期居住在路由器后面。它处理一个文件行,然后回应谁请求行处理,告诉他们已完成处理该行。现在到代码中。

第一消息:

public class Messages { 

    public static class ProcessFiles{ 
    public final List<String> fileNames; 
    public ProcessFiles(List<String> fileNames){ 
     this.fileNames = fileNames; 
    } 
    } 

    public static class ProcessLine{ 
    public final String line; 
    public ProcessLine(String line){ 
     this.line = line; 
    } 
    } 

    public static class LineProcessed{} 

    public static LineProcessed LINE_PROCESSED = new LineProcessed(); 
} 

FilesProcessor

public class FilesProcessor extends UntypedActor{ 
    private List<String> files; 
    private int awaitingCount; 
    private ActorRef router; 

    @Override 
    public void onReceive(Object msg) throws Exception { 
    if (msg instanceof ProcessFiles){  
     ProcessFiles pf = (ProcessFiles)msg; 
     router = ... //lookup router; 
     files = pf.fileNames; 
     processNextFile(); 
    } 
    else if (msg instanceof LineProcessed){ 
     awaitingCount--; 
     if (awaitingCount <= 0){ 
     processNextFile(); 
     } 
    } 

    } 

    private void processNextFile(){ 
    if (files.isEmpty()) getContext().stop(getSelf()); 
    else{    
     String file = files.remove(0); 
     BufferedReader in = openFile(file); 
     String input = null; 
     awaitingCount = 0; 

     try{ 
     while((input = in.readLine()) != null){ 
      router.tell(new Messages.ProcessLine(input), getSelf()); 
      awaitingCount++; 
     }   
     } 
     catch(IOException e){ 
     e.printStackTrace(); 
     getContext().stop(getSelf()); 
     } 

    } 
    } 

    private BufferedReader openFile(String name){ 
    //do whetever to load file 
    ... 
    } 

} 

LineProcessor

public class LineProcessor extends UntypedActor{ 

    @Override 
    public void onReceive(Object msg) throws Exception { 
    if (msg instanceof ProcessLine){ 
     ProcessLine pl = (ProcessLine)msg; 

     //Do whatever line processing... 

     getSender().tell(Messages.LINE_PROCESSED, getSelf()); 
    } 
    } 

} 

现在线处理器被发送回响应,没有额外的内容。如果您需要根据线路处理发送回来的东西,您当然可以改变这一点。我确信这段代码不是防弹的,我只是想告诉你一个高层次的概念,告诉你如何在没有请求/响应语义和Future的情况下完成这个流程。

如果您对此方法有任何疑问或想了解更多详情,请告诉我们,我很乐意为您提供。

+0

在您的设计中,什么能阻止ProcessFiles一次处理多个文件?如果它一次收到50个文件,是不是同时开始在所有文件上工作?也许if(awaitingCount <= 0)应该被添加到if(msg instanceof ProcessFiles)呢? –

+0

这个设计意味着短暂的'FileProcessor'仅仅意味着在其自身停止之前在其生命周期中提供一个'ProcessFiles'请求。它不打算提供多个'ProcessFiles'请求。如果您有更多的请求,请启动该演员的更多实例。或者,你可以像你说的那样做一些事情,并且在接收中有一个if块,看看它是否已经在处理一批文件,如果是这样,可能只是将新的传入文件列表追加到当前文件列表中处理。 – cmbaxter

0

在路由上使用context.setRecieveTimeout将消息发送回发送方并记录已处理消息的计数。当处理的邮件总数==您发送的金额完成后。

如果您的路线要保持足够的忙碌状态,setReceiveTimeout不会足够频繁地启动,请安排自己的消息以发回计数。