2014-02-07 91 views
16

我正在寻找一种优雅的方式来缓存我的异步操作的结果。缓存异步操作

我第一次碰到这样的同步方法:

public String GetStuff(String url) 
{ 
    WebRequest request = WebRequest.Create(url); 
    using (var response = request.GetResponse()) 
    using (var sr = new StreamReader(response.GetResponseStream())) 
     return sr.ReadToEnd(); 
} 

然后我做了异步:

public async Task<String> GetStuffAsync(String url) 
{ 
    WebRequest request = WebRequest.Create(url); 
    using (var response = await request.GetResponseAsync()) 
    using (var sr = new StreamReader(response.GetResponseStream())) 
     return await sr.ReadToEndAsync(); 
} 

于是我决定,我应该缓存的结果,所以我并不需要查询外面那往往:

ConcurrentDictionary<String, String> _cache = new ConcurrentDictionary<String, String>(); 

public async Task<String> GetStuffAsync(String url) 
{ 
    return _cache.GetOrAdd(url, await GetStuffInternalAsync(url)); 
} 

private async Task<String> GetStuffInternalAsync(String url) 
{ 
    WebRequest request = WebRequest.Create(url); 
    using (var response = await request.GetResponseAsync()) 
    using (var sr = new StreamReader(response.GetResponseStream())) 
     return await sr.ReadToEndAsync(); 
} 

然后,我读了一篇文章(o看了一个视频)关于如何cachi NG Task<T>是更好的,因为他们创造昂贵:

ConcurrentDictionary<String, Task<String>> _cache = new ConcurrentDictionary<String, Task<String>>(); 

public Task<String> GetStuffAsync(String url) 
{ 
    return _cache.GetOrAdd(url, GetStuffInternalAsync(url)); 
} 

private async Task<String> GetStuffInternalAsync(String url) 
{ 
    WebRequest request = WebRequest.Create(url); 
    using (var response = await request.GetResponseAsync()) 
    using (var sr = new StreamReader(response.GetResponseStream())) 
     return await sr.ReadToEndAsync(); 
} 

而现在的问题是,如果请求失败(例如:一个HTTP 401),高速缓存将包含一个失败Task<String>,我将不得不重新设置该应用程序,因为它将不可能重新发送请求。

是否有一种优雅的方式使用ConcurrentDictionary<T1,T2>缓存只成功的任务,仍然有原子行为?

+3

操作系统已经为您缓存了HTTP请求,并且会正确地遵循HTTP语义(即不缓存401)。 –

+0

https://stackoverflow.com/questions/31831860/async-threadsafe-get-from-memorycache –

回答

16

首先,您的方法都是错误的,因为它们不会节省您的任何请求(尽管第二个方法至少可以节省您的时间)。

你的第一个代码(带有await)做到这一点:

  1. 使该请求。
  2. 等待请求完成。
  3. 如果缓存中已经有结果,则忽略请求的结果。

您的第二个代码删除了第2步,因此速度更快,但您仍然有大量不必要的请求。

你应该做的,而不是为使用the overload of GetOrAdd() that takes a delegate

public Task<String> GetStuffAsync(String url) 
{ 
    return _cache.GetOrAdd(url, GetStuffInternalAsync); 
} 

这并不能完全消除被忽略请求的可能性,但它确实使他们不太可能。 (对于这一点,你可以尝试取消,你知道被忽视的请求,但我不认为这是值得的努力在这里。)


现在您的实际问题。我认为你应该做的是使用the AddOrUpdate() method。如果该值尚未存在,则添加它。如果它的存在,如果它出现故障更换:

public Task<String> GetStuffAsync(String url) 
{ 
    return _cache.AddOrUpdate(
     url, GetStuffInternalAsync, (u, task) => 
     { 
      if (task.IsCanceled || task.IsFaulted) 
       return GetStuffInternalAsync(u); 
      return task; 
     }); 
} 
+0

我花了5分钟来理解你的第一段,直到我看到我的错误。在原始代码中,我有lambda表达式。 AddOrUpdate可能是我需要的东西,我之前检查过它,但是在阅读这个小文档后我并没有感到非常自信。感谢这个例子。 – vtortola

+2

请注意,如果多个线程在同一时间附近执行'GetStuffAsync','GetStuffInternalAsync'可能会执行多次。 ConcurrentDictionary是线程安全的,但在调用回调委托时不会同步。 –

+0

@ChrisEldredge是的,我提到:“这并不能完全消除被忽略的请求的可能性,但它确实使它们不太可能。” – svick

7

它实际上是合理的(并根据您的设计和性能,关键),以保持这些失败的任务作为Negative Cache。否则,如果一个url总是失败,一次又一次地使用它会破坏整个使用缓存的点。

你需要的是一种不时清除缓存的方法。最简单的方法是用一个定时器替代ConcurrentDictionarry实例。更强大的解决方案是建立自己的LruDictionary或类似的东西。

+3

实际上,我认为最简单的方法是使用'MemoryCache',一段时间后它已经支持从缓存清除值。 – svick

+0

@svick虽然我不确定它支持那些原子操作。 – i3arnon

+0

它有['AddOrGetExisting()'](http://msdn.microsoft.com/en-us/library/system.runtime.caching.memorycache.addorgetexisting),我认为这就够了。 – svick

0

对我来说这项工作:

ObjectCache _cache = MemoryCache.Default; 
static object _lockObject = new object(); 
public Task<T> GetAsync<T>(string cacheKey, Func<Task<T>> func, TimeSpan? cacheExpiration = null) where T : class 
{ 
    var task = (T)_cache[cacheKey]; 
    if (task != null) return task;   
    lock (_lockObject) 
    { 
     task = (T)_cache[cacheKey](cacheKey); 
     if (task != null) return task; 
     task = func(); 
     Set(cacheKey, task, cacheExpiration); 
     task.ContinueWith(t => { 
      if (t.Status != TaskStatus.RanToCompletion) 
       _cache.Remove(cacheKey); 
     }); 
    } 
    return task; 
} 
1

这里有一个办法,保证没有高速缓存未命中的异步操作的缓存结果。

正如在接受的答案的评论中所提到的,如果多次在循环中(取决于SynchronizationContext)或从多个线程请求相同的url,那么web请求会一直发送出去,直到有一个缓存的响应,此时缓存将开始使用。

以下方法为每个唯一密钥创建一个SemaphoreSlim对象。这将防止长时间运行的异步操作对同一个密钥运行多次,同时允许它同时针对不同的密钥运行。显然,为了防止缓存未命中,保留SemaphoreSlim对象的开销很大,所以根据用例它可能不值得。但是,如果保证没有缓存丢失是重要的,否则这会实现这一点。

private readonly ConcurrentDictionary<string, SemaphoreSlim> _keyLocks = new ConcurrentDictionary<string, SemaphoreSlim>(); 
private readonly ConcurrentDictionary<string, string> _cache = new ConcurrentDictionary<string, string>(); 

public async Task<string> GetSomethingAsync(string key) 
{ 
    string value; 
    // get the semaphore specific to this key 
    var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1)); 
    await keyLock.WaitAsync(); 
    try 
    { 
     // try to get value from cache 
     if (!_cache.TryGetValue(key, out value)) 
     { 
      // if value isn't cached, get it the long way asynchronously 
      value = await GetSomethingTheLongWayAsync(); 

      // cache value 
      _cache.TryAdd(key, value); 
     } 
    } 
    finally 
    { 
     keyLock.Release(); 
    } 
    return value; 
} 
+1

您应该使用接受委托的'GetOrAdd'重载,以免不断创建不需要的信号量。 – Servy

+0

@Servy好点。更新了示例 – Brandon

0

另一种简单的方法来做到这一点是延长Lazy<T>AsyncLazy<T>,就像这样:

public class AsyncLazy<T> : Lazy<Task<T>> 
{ 
    public AsyncLazy(Func<Task<T>> taskFactory, LazyThreadSafetyMode mode) : 
     base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode) 
    { } 

    public TaskAwaiter<T> GetAwaiter() { return Value.GetAwaiter(); } 
} 

然后,你可以这样做:

private readonly ConcurrentDictionary<string, AsyncLazy<string>> _cache 
    = new ConcurrentDictionary<string, AsyncLazy<string>>(); 

public async Task<string> GetStuffAsync(string url) 
{ 
    return await _cache.GetOrAdd(url, 
     new AsyncLazy<string>(
      () => GetStuffInternalAsync(url), 
      LazyThreadSafetyMode.ExecutionAndPublication)); 
} 
+0

我希望得到一些反馈意见。 – Enigmativity

1

我已经为一个包装MemoryCache基本上缓存了Lazy<Task<T>>对象,并且可以解决以下所有问题:

  • 没有并行或不必要的操作来获取值将开始。多个调用站点或线程可以等待缓存中的相同值。
  • 失败的任务没有被缓存。 (无负缓存)。
  • 缓存用户无法从缓存中获取无效结果,即使该值在等待期间失效。

该解决方案在my blog中有进一步说明,完整的工作代码可在GitHub处获得。