1

我需要配置通过ExecutorCompletionService调用API的重试策略。CompletionService中的重试策略

示例代码:

public void func() throws Exception{ 
    ExecutorService executorService = Executors.newFixedThreadPool(5); 
    CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); 
    List<Future<String>> list = new ArrayList<Future<String>>(); 
    for(int i=0; i<10; i++) { 
     AsyncTest asyncTest = new AsyncTest(); 
     Future<String> futureString = completionService.submit(asyncTest); 
     list.add(futureString); 
    } 
    while (list.size() > 0) { 
     Future<String> futureResponse = completionService.take(); 
     System.out.println(futureResponse.get()); 
     list.remove(futureResponse); 
     } 
    executorService.shutdown(); 
} 
public class AsyncTest implements Callable<String> { 
     public String call() throws Exception { 
       //returns a response from api call 
       //this is a network call and throws TimeoutException 
     } 
} 

什么来实现,同时调用API抛出TimeoutException异常重试政策最好的方法是什么?

+0

只是一点点澄清之前,我尝试回答,以确保我知道究竟一个if-then-else条件正在进行。 TimeoutException是来自网络呼叫滞后,而不是等待执行者完成其业务? – Xype

+0

是的,TimeoutException是来自网络通话滞后。 – Harsh

+1

而不是双倍答案,我想你可以从这里得到你需要的东西https://stackoverflow.com/a/4738630/3630719 – Xype

回答

0

我增强了你的类AsyncTest:

public class RetryableAsyncTest implements Callable<RetryableAsyncTest> { 

    private final String _name; 
    private /* */ String _value; 
    private /* */ boolean _timeouted; 
    private /* */ int  _retryCount; 

    public RetryableAsyncTest(String name) { 
     _name = name; 
    } 

    @Override 
    public RetryableAsyncTest call() throws Exception { 
     try { 
     ++_retryCount; 
     _timeouted = false; 
     //-------- Begin of functionnal code 
     if(Math.random() > 0.5) {  // Simulation of 
      throw new TimeoutException(); // timeout condition 
     } 
     _value = "computation result"; 
     //-------- End of functionnal code 
     } 
     catch(final TimeoutException x) { 
     _timeouted = true; 
     } 
     return this; 
    } 

    public String getName() { 
     return _name; 
    } 

    public String getValue() { 
     return _value; 
    } 

    public boolean isTimeouted() { 
     return _timeouted; 
    } 

    public int getRetryCount() { 
     return _retryCount; 
    } 
} 

RetryableAsyncExecutor类:

public class RetryableAsyncExecutor { 

    private final ExecutorService      _exec; 
    private final CompletionService<RetryableAsyncTest> _comp; 

    public RetryableAsyncExecutor(int nThreads) { 
     _exec = Executors.newFixedThreadPool(nThreads); 
     _comp = new ExecutorCompletionService<>(_exec); 
    } 

    public void submit(RetryableAsyncTest task) { 
     _comp.submit(task); 
    } 

    public RetryableAsyncTest get() throws Exception { 
     final Future<RetryableAsyncTest> f = _comp.take(); 
     final RetryableAsyncTest task = f.get(); 
     if(task.isTimeouted()) { 
     _comp.submit(task); 
     } 
     return task; 
    } 

    public void shutdown() { 
     _exec.shutdown(); 
    } 
} 

测试用例:

public class Main { 

    public static void main(String[] args) { 
     final int COUNT = 8; 
     final RetryableAsyncExecutor re = new RetryableAsyncExecutor(5); 
     try { 
     for(int i = 0; i < COUNT; ++i) { 
      re.submit(new RetryableAsyncTest("Async#"+(i+1))); 
     } 
     int count = 0; 
     while(count < COUNT) { 
      final RetryableAsyncTest task = re.get(); 
      if(task.isTimeouted()) { 
       System.err.printf("%s: retrying (%d)\n", 
        task.getName(), task.getRetryCount()); 
      } 
      else { 
       System.err.printf("%s: done with '%s'.\n", 
        task.getName(), task.getValue()); 
       ++count; 
      } 
     } 
     } 
     catch(final Throwable t) { 
     t.printStackTrace(); 
     } 
     re.shutdown(); 
     System.exit(0); 
    } 
} 

执行日志:

Async#4: done with 'computation result'. 
Async#1: done with 'computation result'. 
Async#6: retrying (1) 
Async#3: done with 'computation result'. 
Async#8: done with 'computation result'. 
Async#7: retrying (1) 
Async#2: done with 'computation result'. 
Async#5: retrying (1) 
Async#6: done with 'computation result'. 
Async#7: done with 'computation result'. 
Async#5: retrying (2) 
Async#5: done with 'computation result'. 

如果你想小区的重试次数,这样的逻辑进行到RetryableAsyncExecutor.get()方法,围绕_comp.submit(task);