2017-02-20 45 views
1

我下面从akka.io容错http://doc.akka.io/docs/akka/current/java/fault-tolerance.html .I代码reference.My要求如下采取这种代码重试相同的消息与定义的间隔时间定义的数字: 让我们假设演员在消息 上崩溃,并由其主管重新启动。然后他开始在他的邮箱中处理下一个 消息。这导致崩溃的消息是 “dropped'.But我要处理的时间的特定数目相同的动作(假定3次),在它们之间限定的间隔(假定1秒)。如何做到这一点使用阿卡监督。实际上是通过演员我想检查特定服务的API是否运行(即给予一定的例外)。所以,如果有一个特定的尝试任何异常,(假设404未找到),然后重新发送邮件到失败的工人 直到达到supervisorStrategy指定的maxNrOfRetries。如果工作人员失败了“maxNrOfRetries”次,那么只需记录“为此xx消息达到的最大尝试次数”。如何在java中执行此操作。使用演员监督,如何在发生故障时

我的上司类:

public class Supervisor extends UntypedActor { 


private static SupervisorStrategy strategy = 

new OneForOneStrategy(3, Duration.create("1 minute"), 
    new Function<Throwable, Directive>() { 
    @Override 
    public Directive apply(Throwable t) { 
     if (t instanceof Exception) { 
     return restart(); 
     }else if (t instanceof IllegalArgumentException) { 
     return stop(); 
     } else { 
     return escalate(); 
     } 
    } 
    }); 

@Override 
public SupervisorStrategy supervisorStrategy() { 
return strategy; 


} 
public void onReceive(Object o) { 
if (o instanceof Props) { 
    getSender().tell(getContext().actorOf((Props) o), getSelf()); 
} else { 
    unhandled(o); 
} 


} 
} 

子类:

public class Child extends UntypedActor { 


    public void onReceive(Object o) throws Exception { 
if (o instanceof String) { 
Object response = someFunction((String) message);//this function returns either successfull messgae as string or exception 
if(response instanceOf Exception) { 
    throw (Exception) response; 
    } 
    else 
    getSender().tell(response, getSelf()) 
}else { 
    unhandled(o); 
} 


} 

} 

创建演员:

Props superprops = Props.create(Supervisor.class); 
ActorRef supervisor = system.actorOf(superprops, "supervisor"); 
ActorRef child = (ActorRef) Await.result(ask(supervisor, 
Props.create(Child.class), 5000), timeout); 
child.tell("serVice_url", ActorRef.noSender()); 

因为我要重复的过程中如果失败occurs.But它的SERVICE_URL没有发生。如果写在creatng演员作为child.tell("serVice_url_2", ActorRef.noSender());然后这条线exucted下一行,但我要处理的相同的操作(其中发生故障)的时间的特定数目(假定3次),在它们之间限定的间隔。 请指导我做到这一点。

回答

0

我想我已经开发出了一种方法。尽管我仍然需要在生产层面上做一个测试。我在下面写下这个答案,因为它可能对尝试实现相同目标的人有所帮助。如果有人发现更好他/她受到欢迎。 这里要提一下,通过这种方法Supervisor在一定的时间范围内处理与发生故障的消息相同的动作(假设3次)。我无法定义他们。 这里是代码。主管类。

public class MyUntypedActor extends UntypedActor { 
//here I have given Max no retrilas as 10.I will controll this number from logic as per my own requirements.But user given number of retrials can not exceed 10. 
private static SupervisorStrategy strategy = new AllForOneStrategy(10, Duration.create(5, TimeUnit.MINUTES), 
     new Function<Throwable, SupervisorStrategy.Directive>() { 
      @Override 
      public SupervisorStrategy.Directive apply(Throwable t) { 
       if (t instanceof Exception) { 
        //System.out.println("exception" + "*****" + t.getMessage() + "***" + t.getLocalizedMessage()); 
        return restart(); 
       } else if (t instanceof NullPointerException) { 
        return restart(); 
       } else if (t instanceof IllegalArgumentException) { 
        return stop(); 
       } else { 
        return escalate(); 
       } 
      } 
     }); 

@Override 
public SupervisorStrategy supervisorStrategy() { 
    return strategy; 
} 

public void onReceive(Object o) { 
    if (o instanceof Props) { 
     getSender().tell(getContext().actorOf((Props) o), getSelf()); 
    } else { 
     unhandled(o); 
    } 
} 
} 

孩子类,我们将写我们的逻辑。

public class Child extends UntypedActor { 
//Through preRestart it will push the message for which exception occured before the restart of the child 
@Override 
public void preRestart(final Throwable reason, final scala.Option<Object> message) throws Exception { 
    System.out.println("reStarting :::" + message.get()); 
    SetRules.setRemainingTrials(SetRules.remainingTrials + 1); 
    getSelf().tell(message.get(), getSender()); 
}; 

public void onReceive(Object o) throws Exception { 

    if (o instanceof Exception) { 
     throw (Exception) o; 
    } else if (o instanceof Integer) { 
    } else if (o.equals("get")) { 
     getSender().tell("get", getSelf()); 
    } else if (o instanceof String) { 

     try { 
      // here either we can write our logic directly or for a better 
      // approach can call a function where the logic will be excuted. 
      getSender().tell("{\"meggase\":\"Succesfull after " + SetRules.remainingTrials + " retrials\"}", 
        getSelf()); 
     } catch (Exception ex) { 
      if (SetRules.remainingTrials == SetRules.noOfRetries) { 
       getSender().tell("{\"meggase\":\"Failed to connect after " + SetRules.noOfRetries + " retrials\"}", 
         getSelf()); 
      } else { 
       Exception value1 = ex; 
       throw (Exception) value1; 
      } 
     } 
    } else { 
     unhandled(o); 
    } 
} 
} 

的SetRules类有相关信息的用户提供noOfRetrials并在重试的每个沙爹通过remainingTrials

public class SetRules { 

public static int noOfRetries; 
public static int remainingTrials; 

public SetRules(int noOfRetries, int remainingTrials) { 
    super(); 
    SetRules.noOfRetries = noOfRetries; 
    SetRules.remainingTrials = remainingTrials; 
} 

public int getRemainingTrials() { 
    return remainingTrials; 
} 

public static void setRemainingTrials(int remainingTrials) { 
    SetRules.remainingTrials = remainingTrials; 
} 
} 

存储有关重试次数的信息,现在,让我们创建的演员。

Props superprops = Props.create(MyUntypedActor.class); 
SetRules setRules=new SetRules(3,0); 
ActorSystem system = ActorSystem.create("helloakka"); 
ActorRef supervisor = system.actorOf(superprops, "supervisor"); 
ActorRef child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), Duration.create(5, "minutes")); 
Future<Object> future = Patterns.ask(child, service_Url, new Timeout(Duration.create(5, "minutes"))); 
Object result = Await.result(future, Duration.create(5, "minutes")); 
System.out.println(result); 
+0

我问[类似的问题(https://stackoverflow.com/questions/42319399/akka-supervisor-delayed-restart-without-losing-message)几乎在同一时间比你;我也在审查你的。使用此解决方案,您确实可以将消息放回,但无法定义重新启动之间的时间(回退)。 –

+0

是的,我没有能够定义他们之间的时间间隔。任何建议如何做到这一点?@ɐuıɥɔɐɯ –