2016-04-23 68 views
0

我是Spring AOP的新手,并且已经尝试了一下。Spring AOP - 正确配置重试建议

我正在尝试设置重试&通过Spring AOP为我的项目中的一个限制器。 用例如下: -

  1. 检查TPS是否可用。如果不是,扔ThrottledException
  2. 如果一个ThrottledException被抛出,Retry

我遇到的问题是:此限制&重试组合运行到无限循环(如果TPS = 0)。也就是说,重试不会在'x'尝试后停止。

我节流Interceptor是(高级别)是这样的:

@Before("<pointcut>") 
public void invoke() throws ThrottlingException { 
     if (throttler.isThrottled(throttleKey)) { 
      throw new ThrottlingException("Call Throttled"); 
    } 
} 

我重试Interceptor是这样的:

@AfterThrowing(pointcut="execution(* com.company.xyz.method())", throwing="exception") 
public Object invoke(JoinPoint jp, ThrottlingException exception) throws Throwable { 
    return RetryingCallable.newRetryingCallable(new Callable<Object>() { 

     @Override 
     public Object call() throws Exception { 
       MethodSignature signature = (MethodSignature) p.getSignature(); 
       Method method = signature.getMethod(); 
       return method.invoke(jp.getThis(), (Object[]) null); 
     } 

    }, retryPolicy).call(); 
} 

这里RetryingCallable是有人写了一个简单的实现(内部库在我的公司),并采用RetryAdvice

我的相关弹簧的配置如下:这里

<bean id="retryInterceptor" class="com.company.xyz.RetryInterceptor"> 
    <constructor-arg index="0"><ref bean="retryPolicy"/></constructor-arg> 
</bean> 


<bean id="throttlingInterceptor" class="com.company.xyz.ThrottlingInterceptor"> 
    <constructor-arg><value>throttleKey</value></constructor-arg> 
</bean> 
<context:component-scan base-package="com.company.xyz"> 
    <context:include-filter type="annotation" expression="org.aspectj.lang.annotation.Aspect"/> 
</context:component-scan> 
<aop:aspectj-autoproxy/> 

的问题,因为我看到的是,在每个ThrottlingExceptionRetry Advice被应用,而不是以前的一个是进入影响。

有关如何解决此问题的任何输入?

回答

2

免责声明: Spring的用户,因此,我要在这里提出一个纯粹的AspectJ的解决方案。但它应该在Spring AOP中以相同的方式工作。您需要更改的唯一方法是从@DeclarePresedence切换到@Order以获取宽高比配置,如Spring AOP manual中所述。

驱动程序:

package de.scrum_master.app; 

public class Application { 
    public static void main(String[] args) { 
     new Application().doSomething(); 
    } 

    public void doSomething() { 
     System.out.println("Doing something"); 
    } 
} 

节流异常类:

package de.scrum_master.app; 

public class ThrottlingException extends RuntimeException { 
    private static final long serialVersionUID = 1L; 

    public ThrottlingException(String arg0) { 
     super(arg0); 
    } 
} 

节流拦截:

为了模拟节流的情况我创建了一个辅助方法isThrottled(),其中3例随机返回true

package de.scrum_master.aspect; 

import java.util.Random; 

import org.aspectj.lang.JoinPoint; 
import org.aspectj.lang.annotation.Aspect; 
import org.aspectj.lang.annotation.Before; 

import de.scrum_master.app.ThrottlingException; 

@Aspect 
public class ThrottlingInterceptor { 
    private static final Random RANDOM = new Random(); 

    @Before("execution(* doSomething())") 
    public void invoke(JoinPoint thisJoinPoint) throws ThrottlingException { 
     System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint); 
     if (isThrottled()) { 
      throw new ThrottlingException("call throttled"); 
     } 
    } 

    private boolean isThrottled() { 
     return RANDOM.nextInt(3) > 0; 
    } 
} 

重试拦截:

请注意,AspectJ的注解@DeclarePrecedence("RetryInterceptor, *")说这个拦截器是之前任何其他的执行。请用两个拦截器类上的@Order注释替换它。否则,@Around建议无法捕获限制拦截器引发的异常。

另外值得一提的是,这个拦截器不需要任何反射来实现重试逻辑,它直接在重试循环中使用连接点,以便重试thisJoinPoint.proceed()。这可以很容易地被分解成实施不同类型的重试行为的帮助方法或助手类。只要确保使用ProceedingJoinPoint作为参数而不是Callable

package de.scrum_master.aspect; 

import org.aspectj.lang.ProceedingJoinPoint; 
import org.aspectj.lang.annotation.Around; 
import org.aspectj.lang.annotation.Aspect; 
import org.aspectj.lang.annotation.DeclarePrecedence; 

import de.scrum_master.app.ThrottlingException; 

@Aspect 
@DeclarePrecedence("RetryInterceptor, *") 
public class RetryInterceptor { 
    private static int MAX_TRIES = 5; 
    private static int WAIT_MILLIS_BETWEEN_TRIES = 1000; 

    @Around("execution(* doSomething())") 
    public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable { 
     System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint); 
     ThrottlingException throttlingException = null; 
     for (int i = 1; i <= MAX_TRIES; i++) { 
      try { 
       return thisJoinPoint.proceed(); 
      } 
      catch (ThrottlingException e) { 
       throttlingException = e; 
       System.out.println(" Throttled during try #" + i); 
       Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES); 
      } 
     } 
     throw throttlingException; 
    } 
} 

控制台登录成功重试:

RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #1 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #2 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
Doing something 

控制台登录失败重试:

RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #1 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #2 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #3 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #4 
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething()) 
    Throttled during try #5 
Exception in thread "main" de.scrum_master.app.ThrottlingException: call throttled 
    at de.scrum_master.aspect.ThrottlingInterceptor.invoke(ThrottlingInterceptor.aj:19) 
    at de.scrum_master.app.Application.doSomething_aroundBody0(Application.java:9) 
    at de.scrum_master.app.Application.doSomething_aroundBody1$advice(Application.java:22) 
    at de.scrum_master.app.Application.doSomething(Application.java:1) 
    at de.scrum_master.app.Application.main(Application.java:5) 

随意问有关我的回答任何后续问题。


更新:我不知道你是如何和RetryingCallableRetryPolicy /接口工作,你没有告诉我很多关于它。但是我做了什么,得到它的工作是这样的:

package de.scrum_master.app; 

import java.util.concurrent.Callable; 

public interface RetryPolicy<V> { 
    V apply(Callable<V> callable) throws Exception; 
} 
package de.scrum_master.app; 

import java.util.concurrent.Callable; 

public class DefaultRetryPolicy<V> implements RetryPolicy<V> { 
    private static int MAX_TRIES = 5; 
    private static int WAIT_MILLIS_BETWEEN_TRIES = 1000; 

    @Override 
    public V apply(Callable<V> callable) throws Exception { 
     Exception throttlingException = null; 
     for (int i = 1; i <= MAX_TRIES; i++) { 
      try { 
       return callable.call(); 
      } 
      catch (ThrottlingException e) { 
       throttlingException = e; 
       System.out.println(" Throttled during try #" + i); 
       Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES); 
      } 
     } 
     throw throttlingException; 
    } 
} 
package de.scrum_master.app; 

import java.util.concurrent.Callable; 

public class RetryingCallable<V> { 
    private RetryPolicy<V> retryPolicy; 
    private Callable<V> callable; 

    public RetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) { 
     this.callable = callable; 
     this.retryPolicy = retryPolicy; 
    } 

    public static <V> RetryingCallable<V> newRetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) { 
     return new RetryingCallable<V>(callable, retryPolicy); 
    } 

    public V call() throws Exception { 
     return retryPolicy.apply(callable); 
    } 
} 

现在改变重试拦截这样的:

package de.scrum_master.aspect; 

import java.util.concurrent.Callable; 

import org.aspectj.lang.ProceedingJoinPoint; 
import org.aspectj.lang.annotation.Around; 
import org.aspectj.lang.annotation.Aspect; 
import org.aspectj.lang.annotation.DeclarePrecedence; 

import de.scrum_master.app.DefaultRetryPolicy; 
import de.scrum_master.app.RetryPolicy; 
import de.scrum_master.app.RetryingCallable; 

@Aspect 
@DeclarePrecedence("RetryInterceptor, *") 
public class RetryInterceptor { 
    private RetryPolicy<Object> retryPolicy = new DefaultRetryPolicy<>(); 

    @Around("execution(* doSomething())") 
    public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable { 
     System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint); 
     return RetryingCallable.newRetryingCallable(
      new Callable<Object>() { 
       @Override 
       public Object call() throws Exception { 
        return thisJoinPoint.proceed(); 
       } 
      }, 
      retryPolicy 
     ).call(); 
    } 
} 

日志输出将是非常类似于你之前看到过。对我来说这很好。

+0

感谢您的回答。我已经在使用'@ Ordered'。在这里有一个问题,因为我使用'@ ThrowsException'的建议,我有种无限循环的感觉。但是你的代码似乎可以用'@ Around'正常工作,我在这里失踪的细微差别是什么? – AgentX

+0

也许你的意思是'@ AfterThrowing'。那么,正如名字所暗示的那样,这个通知类型会在方法执行完成后拦截它们,'@ Around'包装自己_方法执行,即可以在执行之前和之后执行某些操作,捕获并处理异常,重试方法执行等。 '@ AfterThrowing'已经太迟了,抛出异常并且方法执行结束。此外,请查看我的日志中的异常callstack:异常在'ThrottlingInterceptor.invoke'中引发,因此您可能错误地使用了'@ AfterThrowing'切入点。 – kriegaex

+0

谢谢,我现在添加了一个循环并重新尝试,正如你所解释的那样。最后还有一件事:在将来我想要使用一个通用的重试策略(它提供了良好的默认值,例如'3次重试','随机性'和'指数退避'),正如我在上面的例子中所使用的('return RetryingCallable .newRetryingCallable(new Callable ()')有关如何添加它的任何输入,而不是自定义循环? – AgentX