2014-10-08 68 views
0

我试图在演员死亡时发送消息。使用DeathWatch杀死Akka actor时发送的消息

这是基于阿卡临终看护文档: http://doc.akka.io/docs/akka/2.3.6/java/untyped-actors.html#deathwatch-java

在serviceActor我等待“杀”的消息,但我从来没有真正发送此消息。所以要在ServiceActor收到消息我用:

else if (msg instanceof Terminated) { 
     final Terminated t = (Terminated) msg; 
     if (t.getActor() == child) { 
      lastSender.tell(Msg.TERMINATED, getSelf()); 
     } 
    } else { 
     unhandled(msg); 
    } 

我已经设置持续时间10毫秒:

Duration.create(10, TimeUnit.MILLISECONDS) 

但该消息Msg.TERMINATED永远不会在的onReceive方法获得:

@Override 
    public void onReceive(Object msg) { 
     if (msg == ServiceActor.Msg.SUCCESS) { 
      System.out.println("Success"); 
      getContext().stop(getSelf()); 
     } else if (msg == ServiceActor.Msg.TERMINATED) { 
      System.out.println("Terminated"); 
     } else 
      unhandled(msg); 
    } 

当ServiceActor失败时,如何向HelloWorld发送消息?

整个代码:

package terminatetest; 
import akka.Main; 

public class Launcher { 

    public static void main(String args[]) { 

     String[] akkaArgsArray = new String[1]; 

     akkaArgsArray[0] = "terminatetest.HelloWorld"; 

     Main.main(akkaArgsArray); 

    } 

} 

package terminatetest; 


import java.util.concurrent.TimeUnit; 

import scala.concurrent.duration.Duration; 
import akka.actor.ActorRef; 
import akka.actor.PoisonPill; 
import akka.actor.Props; 
import akka.actor.UntypedActor; 

public class HelloWorld extends UntypedActor { 

    @Override 
    public void preStart() { 

     int counter = 0; 

     akka.actor.ActorSystem system = getContext().system(); 

     final ActorRef greeter = getContext().actorOf(
       Props.create(ServiceActor.class), String.valueOf(counter)); 

     system.scheduler().scheduleOnce(
       Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() { 
        public void run() { 
         greeter.tell(PoisonPill.getInstance(), getSelf()); 
        } 
       }, system.dispatcher()); 

     greeter.tell("http://www.google.com", getSelf()); 

     counter = counter + 1; 
    } 

    @Override 
    public void onReceive(Object msg) { 
     if (msg == ServiceActor.Msg.SUCCESS) { 
      System.out.println("Success"); 
      getContext().stop(getSelf()); 
     } else if (msg == ServiceActor.Msg.TERMINATED) { 
      System.out.println("Terminated"); 
     } else 
      unhandled(msg); 
    } 
} 

package terminatetest; 

import static com.utils.PrintUtils.println; 

import java.io.BufferedReader; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.net.HttpURLConnection; 
import java.net.MalformedURLException; 
import java.net.URL; 

import akka.actor.ActorRef; 
import akka.actor.Props; 
import akka.actor.Terminated; 
import akka.actor.UntypedActor; 

public class ServiceActor extends UntypedActor { 

    final ActorRef child = this.getContext().actorOf(Props.empty(), "child"); 
    { 
     this.getContext().watch(child); 
    } 

    ActorRef lastSender = getContext().system().deadLetters(); 

    public static enum Msg { 
     SUCCESS, FAIL, TERMINATED; 
    } 

    @Override 
    public void onReceive(Object msg) { 

     if (msg instanceof String) { 
      String urlName = (String) msg; 

      try { 
       long startTime = System.currentTimeMillis(); 
       URL url = new URL(urlName); 
       HttpURLConnection conn = (HttpURLConnection) url.openConnection(); 
       conn.connect(); 

       BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream())); 
       StringBuilder out = new StringBuilder(); 
       String line; 
       while ((line = reader.readLine()) != null) { 
        out.append(line); 
       } 
       System.out.println("Connection successful to " + url); 
       System.out.println("Content is " + out); 
       long endTime = System.currentTimeMillis(); 
       System.out.println("Total Time : " + (endTime - startTime) + " milliseconds"); 

      } catch (MalformedURLException mue) { 
       println("URL Name " + urlName); 
       System.out.println("MalformedURLException"); 
       System.out.println(mue.getMessage()); 
       mue.printStackTrace(); 
       getSender().tell(Msg.FAIL, getSelf()); 
      } catch (IOException ioe) { 
       println("URL Name " + urlName); 
       System.out.println("IOException"); 
       System.out.println(ioe.getMessage()); 
       ioe.printStackTrace(); 
       System.out.println("Now exiting"); 
       getSender().tell(Msg.FAIL, getSelf()); 
      } 
     } 

     else if (msg instanceof Terminated) { 
       final Terminated t = (Terminated) msg; 
       if (t.getActor() == child) { 
        lastSender.tell(Msg.TERMINATED, getSelf()); 
       } 
      } else { 
       unhandled(msg); 
      } 
    } 

} 

更新:

更新到ServiceActor:

if (urlName.equalsIgnoreCase("poisonPill")) { 
    this.getSelf().tell(PoisonPill.getInstance(), getSelf()); 
    getSender().tell(Msg.TERMINATED, getSelf()); 
} 

更新到的HelloWorld 我现在从儿童演员本身使用启动poisonPill:

system.scheduler().scheduleOnce(
     Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() { 
      public void run() { 
       greeter.tell("poisonPill", getSelf()); 
      } 
     }, system.dispatcher()); 

这显示下面的输出:经过10毫秒被发送

startTime : 1412777375414 
Connection successful to http://www.google.com 
Content is ....... (I'veremoved the content for brevity) 
Total Time : 1268 milliseconds 
Terminated 

的poisonPill消息和用于该示例中,演员住为1268毫秒。那么为什么演员在poisonPill发送时没有终止?这是因为时间很短?

更新代码:

package terminatetest; 


import java.util.concurrent.TimeUnit; 

import scala.concurrent.duration.Duration; 
import akka.actor.ActorRef; 
import akka.actor.Props; 
import akka.actor.UntypedActor; 

public class HelloWorld extends UntypedActor { 

    @Override 
    public void preStart() { 

     int counter = 0; 

     akka.actor.ActorSystem system = getContext().system(); 

     final ActorRef greeter = getContext().actorOf(
       Props.create(ServiceActor.class), String.valueOf(counter)); 

     system.scheduler().scheduleOnce(
       Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() { 
        public void run() { 
         greeter.tell("poisonPill", getSelf()); 
        } 
       }, system.dispatcher()); 

     greeter.tell("http://www.google.com", getSelf()); 

     counter = counter + 1; 
    } 

    @Override 
    public void onReceive(Object msg) { 
     if (msg == ServiceActor.Msg.SUCCESS) { 
      System.out.println("Success"); 
      getContext().stop(getSelf()); 
     } else if (msg == ServiceActor.Msg.TERMINATED) { 
      System.out.println("Terminated"); 
     } else 
      unhandled(msg); 
    } 
} 


package terminatetest; 

import static com.utils.PrintUtils.println; 

import java.io.BufferedReader; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.net.HttpURLConnection; 
import java.net.MalformedURLException; 
import java.net.URL; 

import akka.actor.ActorRef; 
import akka.actor.PoisonPill; 
import akka.actor.UntypedActor; 

public class ServiceActor extends UntypedActor { 

    ActorRef lastSender = getSender(); 

    public static enum Msg { 
     SUCCESS, FAIL, TERMINATED; 
    } 

    @Override 
    public void onReceive(Object msg) { 

     if (msg instanceof String) { 
      String urlName = (String) msg; 

      if (urlName.equalsIgnoreCase("poisonPill")) { 
       this.getSelf().tell(PoisonPill.getInstance(), getSelf()); 
       getSender().tell(Msg.TERMINATED, getSelf()); 
      } 

      else { 

       try { 
        long startTime = System.currentTimeMillis(); 
        System.out.println("startTime : "+startTime); 
        URL url = new URL(urlName); 
        HttpURLConnection conn = (HttpURLConnection) url.openConnection(); 
        conn.connect(); 

        BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream())); 
        StringBuilder out = new StringBuilder(); 
        String line; 
        while ((line = reader.readLine()) != null) { 
         out.append(line); 
        } 
        System.out.println("Connection successful to " + url); 
        System.out.println("Content is " + out); 
        long endTime = System.currentTimeMillis(); 
        System.out.println("Total Time : " + (endTime - startTime) + " milliseconds"); 

       } catch (MalformedURLException mue) { 
        println("URL Name " + urlName); 
        System.out.println("MalformedURLException"); 
        System.out.println(mue.getMessage()); 
        mue.printStackTrace(); 
        getSender().tell(Msg.FAIL, getSelf()); 
       } catch (IOException ioe) { 
        println("URL Name " + urlName); 
        System.out.println("IOException"); 
        System.out.println(ioe.getMessage()); 
        ioe.printStackTrace(); 
        System.out.println("Now exiting"); 
        getSender().tell(Msg.FAIL, getSelf()); 
       } 
      } 
     } 
    } 

} 

回答

1

我觉得从这个事实,你只设置lastSender一次,施工ServiceActor的过程中,你明确地将其设置为死信您的问题造成的。如果你想发送邮件给发给你String消息的演员,那么你需要将lastSender设置为sender()。如果不这样做,您的Msg.TERMINATED将永远失效。

编辑

现在我在这里看到了真正的问题。在HelloWorld演员中,您将发送PoisonPillServiceActorServiceActor将因此而自行停止,因此也会停止child参考(因为它是ServiceActor的子演员)。此时,您会认为Terminated消息将被传送到ServiceActor,因为它明确地注意到child(并且它可能会传送),但是您已经发送了PoisonPillServiceActor,因此它不会处理在该消息之后收到的任何消息(这将是Terminate),所以这就是为什么块:

else if (msg instanceof Terminated) { 

永远不会在ServiceActor击中。

EDIT2

你的演员接收先打谷歌的请求和第二接收"poisonPill"消息(10毫秒之后)。作为一名演员按顺序处理它的邮箱,演员会完全处理请求,以便在处理消息停止自己之前触击Google。这就是为什么演员在10毫秒后不会停下来的原因。你不能阻止演员在做什么。

+0

谢谢,我已经更新了poisonPill的发送方式 - 它现在作为一个String消息发送,当actor收到“poisonPill”时它自行终止:this.getSelf()。tell(PoisonPill.getInstance(),getSelf )); 。但是,这似乎也不工作,我已更新问题,包括这一点,你可以看看吗? – 2014-10-08 14:17:54

+0

@ blue-sky,增加了更多的细节...... – cmbaxter 2014-10-08 14:26:02

+0

所以通过这个推理“你不能阻止一个演员在它正在做什么。”一个演员不能在N秒后停止? – 2014-10-08 15:00:51

相关问题