2013-09-24 44 views
18

在书IntroToRx中,作者建议为I/O写一个“智能”重试,在一段时间后重试I/O请求,如网络请求。写一个Rx“RetryAfter”扩展方法

这里是确切的段落:

一个有用的扩展方法来添加到您自己的图书馆可能是一个“回到 关闭和重试”的方法。与我合作的团队在执行I/O(尤其是网络请求)时发现了这样的 功能。 的概念是尝试,并在失败后等待一段时间,然后再试一次。此方法的版本可能会考虑要重试的异常类型 以及重试次数的最大值 。您甚至可能希望延长等待时间至 对每次后续重试都不那么积极。

不幸的是,我想不出如何编写这种方法。 :(

回答

29

执行退避重试的关键是deferred observables。延迟observable将不会执行它的工厂直到有人订阅它,并且它会为每个订阅调用工厂,使它成为我们重试的理想选择场景。

假设我们有触发的网络请求的方法。

public IObservable<WebResponse> SomeApiMethod() { ... } 

对于这个小片段的目的,让我们定义为递延source

var source = Observable.Defer(() => SomeApiMethod()); 

每当有人订阅源时,它将调用SomeApiMethod并启动一个新的Web请求。无论何时失败都会重试它的天真方式是使用内置的Retry运算符。

source.Retry(4) 

这对API来说不是很好,但它不是你要求的。我们需要在每次尝试之间推迟发起请求。一种做法是使用delayed subscription

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4) 

这并不理想,因为它会在第一个请求中添加延迟,让我们来解决这个问题。

int attempt = 0; 
Observable.Defer(() => { 
    return ((++attempt == 1) ? source : source.DelaySubscription(TimeSpan.FromSeconds(1))) 
}) 
.Retry(4) 
.Select(response => ...) 

只是停留了第二个是不是一个很好的重试方法,但这样让我们改变常数为接收的重试次数,并返回一个适当的延迟功能。指数后退很容易实施。

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2)); 

((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1))) 

我们现在差不多完成了,我们只需要添加一个方法来指定我们应该重试哪些异常。让我们添加一个给定异常的函数,返回是否重试有意义,我们将其称为retryOnError。

现在我们需要写一些可怕的代码,但忍受着我。

Observable.Defer(() => { 
    return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1))) 
     .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null)) 
     .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e) 
      ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e) 
      : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e))); 
}) 
.Retry(retryCount) 
.SelectMany(t => t.Item1 
    ? Observable.Return(t.Item2) 
    : Observable.Throw<T>(t.Item3)) 

所有这些尖括号在那里元帅,我们不应该重试过去.Retry()异常。我们已经将内部可观察值设为IObservable<Tuple<bool, WebResponse, Exception>>,其中第一个bool表示我们是否有响应或异常。如果retryOnError指示我们应该重试某个特定的异常,那么内部可观察值将会抛出,并且会被重试拾取。 SelectMany只是展开我们的元组,并使得可观测值再次变为IObservable<WebRequest>

查看我的gist with full source and tests的最终版本。有了这个操作符可以让我们写我们重试代码相当简洁

Observable.Defer(() => SomApiMethod()) 
    .RetryWithBackoffStrategy(
    retryCount: 4, 
    retryOnError: e => e is ApiRetryWebException 
) 
+0

伟大的东西马库斯数量。 –

+1

它看起来像我这个实现,源observable从不退订。这是一个有点难以粘贴在这里,但试试这个,你会看到的间隔保持滴答声:。 Observable.Interval(TimeSpan.FromSeconds(1))不要(Console.WriteLine).RetryWithBackoffStrategy()采取(1)。订阅(); –

+0

@NiallConnaughton漂亮!之所以没有取消订阅源代码,是因为我最初是在我们拥有另一个内部运营商的基础上建立了这个方法的模型,这个运营商生产的是热门的可观测数据。这个操作员不应该那样做。我已经更改了代码来生成冷观察值,并添加了一个测试来验证它是否取消订阅。谢谢! –

1

这里的另一种略有不同的实现,我想出了边学习边Rxx是怎么做的。所以它主要是Rxx的方法的缩减版本。

签名与Markus的版本略有不同。您指定一种要重试的异常类型,并且延迟策略将采用异常和重试计数,因此您可以延长每个连续重试的延迟时间等。

我无法保证它的错误证明,或者最好的方法,但它似乎工作。

public static IObservable<TSource> RetryWithDelay<TSource, TException>(this IObservable<TSource> source, Func<TException, int, TimeSpan> delayFactory, IScheduler scheduler = null) 
where TException : Exception 
{ 
    return Observable.Create<TSource>(observer => 
    { 
     scheduler = scheduler ?? Scheduler.CurrentThread; 
     var disposable = new SerialDisposable(); 
     int retryCount = 0; 

     var scheduleDisposable = scheduler.Schedule(TimeSpan.Zero, 
     self => 
     { 
      var subscription = source.Subscribe(
      observer.OnNext, 
      ex => 
      { 
       var typedException = ex as TException; 
       if (typedException != null) 
       { 
        var retryDelay = delayFactory(typedException, ++retryCount); 
        self(retryDelay); 
       } 
       else 
       { 
        observer.OnError(ex); 
       } 
      }, 
      observer.OnCompleted); 

      disposable.Disposable = subscription; 
     }); 

     return new CompositeDisposable(scheduleDisposable, disposable); 
    }); 
} 
+0

我发现了这个impl中断的边缘情况(混合Immediate和CurrentThread调度器): int a = 0; Observable.Defer(()=>一个++ <1 Observable.Return(一):Observable.Timer(TimeSpan.Zero,Scheduler.CurrentThread).SelectMany(Observable.Return的(a))) .Concat(Observable.Throw (新的异常()))RetryWithDelay ((例如,I)=> TimeSpan.Zero,Scheduler.Immediate).Subscribe(I => Console.WriteLine(I)); 支持Scheduler.Immediate的简单修复方法是在分配串行一次性函数之前,检查Subscribe()调用期间订阅值是否已更改。 – blueling

+1

根据RetryWithDelay()函数的预期语义,在OnNext处理程序中重置retryCount为0是有意义的,例如, onNext:x => {observer.OnNext(x); retryCount = 0; }。 – blueling

10

也许我是在简化的情况,但如果我们看一下重试的实施,它仅仅是在可观测的无限枚举的Observable.Catch:

private static IEnumerable<T> RepeatInfinite<T>(T value) 
{ 
    while (true) 
     yield return value; 
} 

public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source) 
{ 
    return Observable.Catch<TSource>(QueryLanguage.RepeatInfinite<IObservable<TSource>(source)); 
} 

因此,如果我们采取这种方法,我们可以在第一个收益率之后添加一个延迟。

private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource> (IObservable<TSource> source, TimeSpan dueTime) 
{ 
    // Don't delay the first time   
    yield return source; 

    while (true) 
     yield return source.DelaySubscription(dueTime); 
    } 

public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime) 
{ 
    return RepeateInfinite(source, dueTime).Catch(); 
} 

,捕捉特定异常与重试次数可以更加简洁的过载:

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(this IObservable<TSource> source, TimeSpan dueTime, int count) where TException : Exception 
{ 
    return source.Catch<TSource, TException>(exception => 
    { 
     if (count <= 0) 
     { 
      return Observable.Throw<TSource>(exception); 
     } 

     return source.DelaySubscription(dueTime).RetryAfterDelay<TSource, TException>(dueTime, --count); 
    }); 
} 

请注意,这里的过载使用递归。在第一次出现时,如果count类似于Int32.MaxValue,那么似乎可能会出现StackOverflowException。但是,DelaySubscription使用调度程序来执行预订操作,所以堆栈溢出将不可能(即使用“trampolining”)。通过查看代码,我想这不是很明显。我们可以通过明确地将DelaySubscription重载中的调度器设置为Scheduler.Immediate,并传递TimeSpan.Zero和Int32.MaxValue来强制堆栈溢出。我们可以通过在非直接调度到多一点明确地表达我们的意图,例如:

return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay<TSource, TException>(dueTime, --count); 

更新:新增超载采取特定的调度。

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(
    this IObservable<TSource> source, 
    TimeSpan retryDelay, 
    int retryCount, 
    IScheduler scheduler) where TException : Exception 
{ 
    return source.Catch<TSource, TException>(
     ex => 
     { 
      if (retryCount <= 0) 
      { 
       return Observable.Throw<TSource>(ex); 
      } 

      return 
       source.DelaySubscription(retryDelay, scheduler) 
        .RetryAfterDelay<TSource, TException>(retryDelay, --retryCount, scheduler); 
     }); 
} 
+0

如果您愿意,您也可以使用工厂替换dueTime参数,如上例所示。 –

+0

我喜欢第一个版本,对递归的一个不太确定。理想情况下,您希望让用户通过调度程序,此时您不能再保证他们将使用哪一个。 – Benjol

+0

感谢您的评论Benjol。我同意。我实际上使用了一个需要调度程序的重载(更新上面添加的代码)。你说得对,即使用户可以立即通过。其中一个可能的解决方案可能是,如果Scheduler.Immediate是通过抛出异常。 –

2

下面是我使用的一个:

public static IObservable<T> DelayedRetry<T>(this IObservable<T> src, TimeSpan delay) 
{ 
    Contract.Requires(src != null); 
    Contract.Ensures(Contract.Result<IObservable<T>>() != null); 

    if (delay == TimeSpan.Zero) return src.Retry(); 
    return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry()); 
} 
+0

'src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry())'和'src.Catch(src。DelaySubscription(delay).Retry())'? – Sabacc

+0

看起来不像。 –

+0

我担心我错过了什么。我认为DelaySubscription比Observable.Timer + SelectMany更容易理解。感谢分享你的解决方案,帮助我找到自己的:) – Sabacc

0

这是我想出了一个。

不想将单个重试的项目连接成一个序列,而是在每次重试时将整个源序列作为一个整体发出 - 因此操作员返回IObservable<IObservable<TSource>>。如果不需要,可以简单地将其编辑回到一个序列中。

(背景:在我的使用情况下,源是热热序列,其余GroupByUntil一个项目出现封闭的组如果该项目是两次重试之间丢失时,基团被永远不会关闭,导致内存泄漏。有序列的序列允许在内部序列唯一的(或异常处理或......)分组)

/// <summary> 
/// Repeats <paramref name="source"/> in individual windows, with <paramref name="interval"/> time in between. 
/// </summary> 
public static IObservable<IObservable<TSource>> RetryAfter<TSource>(this IObservable<TSource> source, TimeSpan interval, IScheduler scheduler = null) 
{ 
    if (scheduler == null) scheduler = Scheduler.Default; 
    return Observable.Create<IObservable<TSource>>(observer => 
    { 
     return scheduler.Schedule(self => 
     { 
      observer.OnNext(Observable.Create<TSource>(innerObserver => 
      { 
       return source.Subscribe(
        innerObserver.OnNext, 
        ex => { innerObserver.OnError(ex); scheduler.Schedule(interval, self); }, 
        () => { innerObserver.OnCompleted(); scheduler.Schedule(interval, self); }); 
      })); 
     }); 
    }); 
} 
+0

btw我的第一篇文章在stackoverflow *** – tinudu

2

基于Markus的答案,我写了下面的:

public static class ObservableExtensions 
{ 
    private static IObservable<T> BackOffAndRetry<T>(
     this IObservable<T> source, 
     Func<int, TimeSpan> strategy, 
     Func<int, Exception, bool> retryOnError, 
     int attempt) 
    { 
     return Observable 
      .Defer(() => 
      { 
       var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt); 
       var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay); 

       return s 
        .Catch<T, Exception>(e => 
        { 
         if (retryOnError(attempt, e)) 
         { 
          return source.BackOffAndRetry(strategy, retryOnError, attempt + 1); 
         } 
         return Observable.Throw<T>(e); 
        }); 
      }); 
    } 

    public static IObservable<T> BackOffAndRetry<T>(
     this IObservable<T> source, 
     Func<int, TimeSpan> strategy, 
     Func<int, Exception, bool> retryOnError) 
    { 
     return source.BackOffAndRetry(strategy, retryOnError, 0); 
    } 
} 

我喜欢它更多,因为

  • 它不修改attempts但使用递归。
  • 它不使用retries但经过尝试,以retryOnError