2016-09-23 48 views
0

我在线程环境中运行时遇到了redis问题。redis在线程环境中不一致

我有一个叫做AwaitableParallelForeachWorker的类,我可以在一个有效载荷中为每个项目运行一个特定的函数。 (我知道这是不是很漂亮,但它的工作)

public class AwaitableParallelForeachWorker : IAwaitableParallelForeachWorker 
{ 
    private readonly object _lockObject = new object(); 
    private int _tasksCompleted; 

    public async Task Run<T>(Func<T, Task> action, IEnumerable<T> payload) 
    { 
     var list = payload.ToList(); 
     var tasks = list.Select(x => new Task(async() => 
     { 
      await action(x); 
      TaskDone(); 
     })); 
     Parallel.ForEach(tasks, task => task.Start()); 
     while (_tasksCompleted < list.Count) 
     { 
      await Task.Delay(10); 
     } 
    } 

    public void TaskDone() 
    { 
     lock (_lockObject) 
     { 
      _tasksCompleted++; 
     } 
    } 
} 

这里的Redis的缓存代码:

public class NewsappRedisCache : INewsappRedisCache 
{ 
    private static readonly string ConnectionString = ConfigurationManager.AppSettings["RedisCache"]; 

    private static readonly Lazy<ConnectionMultiplexer> LazyConnection = 
     new Lazy<ConnectionMultiplexer>(() => ConnectionMultiplexer.Connect(ConnectionString)); 

    private static IDatabase MimerArticleDatabase => Connection.GetDatabase(2); 

    public async Task<MimerArticle> GetMimerArticleAsync(Guid id) 
    { 
     var redisValue = await MimerArticleDatabase.StringGetAsync($"{nameof(MimerArticle)}-{id}"); 
     if (!redisValue.HasValue) return null; 
     var mimerArticle = JsonConvert.DeserializeObject<MimerArticle>(redisValue.ToString()); 
     return mimerArticle; 
    } 

我写了这个简单的测试,这就要求我Redis的缓存使用AwaitableParallelForeachWorker

1000次
public class Test 
    { 
    private NewsappRedisCache _redisCache; 

    [Fact] 
    public async void TestRedis() 
    { 
     var guids = new List<Guid>(); 

     for (var i = 0; i < 1000; i++) 
     { 
      guids.Add(Guid.NewGuid()); 
     } 

     _redisCache = new NewsappRedisCache(); 
     await new AwaitableParallelForeachWorker().Run(CallRedis, guids); 
    } 

    private async Task CallRedis(Guid id) 
    { 
     await _redisCache.GetMimerArticleAsync(id); 
    } 
} 

现在它变得很奇怪。有时候1000到达redis缓存的时间会在瞬间执行。我已经通过检查天蓝色的门户来验证get实际上是否打开了缓存。但有时每个都需要大约1秒。

我不知道为什么。我试过改变AwaitableParallelForeachWorker的功能,但它一直不一致。 如果我运行每个得到一个正常的foreach它执行得很好,但没有AwaitableParallelForeachWorker实际工作时一样快。

所以我坚持认为它与线程/任务有关。

任何人可以提供一些帮助?

回答

1

我无法找到确切的原因,但我不能评论符合这一任,所以这里有一些事情要考虑:

  • 不应该的测试方法是async Task,不async void?一些测试框架处理这种可以说是不正确的用法,但也许你的不会。如果框架无法知道方法何时完成,它会认为所有事情都是在第一次等待时完成的。

  • Run()方法可以显著简化为:

    public Task Run<T>(Func<T, Task> action, IEnumerable<T> payload) 
    { 
        return Task.WhenAll(payload.Select(action)); 
    } 
    

    ,并摆脱了班里其他同学的。

  • 您的CallRedis()方法不需要状态机开销;而不是:

    private Task CallRedis(Guid id) 
    { 
        return _redisCache.GetMimerArticleAsync(id); 
    } 
    
  • 虽然我已把你行:

    var guids = Enumerable.Range(0, 1000).Select(Guid.NewGuid()).ToList(); 
    // :) 
    
+0

的变化Task.WhenAll(payload.Select(动作))的伎俩!我以前使用过:return Task.WhenAll(payload.Select(async x => await action(x))),但是这和我做的奇怪的Run方法一样。这就是为什么我最终以这种方式实现Run ...我显然不会异步并等待。 Anwyays非常感谢! – Boenne