2012-06-14 129 views
0

首先,是否可以创建一个标记为“ContinueWith”(并标记此问题)?谢谢!如何在最终ContinueWith完成后从ConcurrentDictionary中删除项目

对不起,这篇文章的长度,但我不想浪费任何人试图帮助我的时间,因为我遗漏了相关的细节。也就是说,它可能还会发生。 :)

现在的细节。我正在开发一个订阅了几个ActiveMQ队列主题的服务。其中两个主题有些相关。一个是“公司更新”,一个是“产品更新”。两者的“ID”是CompanyID。 公司主题包括数据中的产品主题。因为其他订户需要产品数据但不希望/不需要订阅产品主题。由于我的服务是多线程(超出了我们的自由裁量权的要求),如邮件到达时我添加任务来处理每一个使用AddOrUpdate一个ConcurrentDictionary在更新的parm是一个简单的ContinueWith(见下文)。完成以防止因为这些主题和订阅者是“持久的”而可能发生的同时更新,因此如果我的侦听器服务脱机(无论何种原因),我们可以用同一CompanyID的多条消息(公司和/或产品)结束。 (最后!)任务完成后(无论是一个任务,还是ContinueWith任务链中的末尾)完成后,我想从ConcurrentDictionary(显然)中删除它。怎么样?我已经想到并从同事那里得到了一些想法,但我并不喜欢他们中的任何一个。我不打算列出这些想法,因为您的答案可能是我拥有但不喜欢的想法之一,但最终可能是最好的。

我试图压缩代码片段,以防止您不必上下滚动太多,不像我的描述。 :)

nrtq =不相关质疑

public interface IMessage 
{ 
    long CompantId { get; set; } 
    void Process(); 
} 
public class CompanyMessage : IMessage 
{ //implementation, nrtq } 
public class ProductMessage : IMessage 
{ //implementation, nrtq } 

public class Controller 
{ 
    private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>(); 
    //other needed declarations, nrtq 

    public Controller(){//constructor stuff, nrtq } 

    public StartSubscribers() 
    { 
    //other code, nrtq 
    _companySubscriber.OnMessageReceived += HandleCompanyMsg; 
    _productSubscriber.OnMessageReceived += HandleProductMsg; 
    } 

    private void HandleCompanyMsg(string msg) 
    { 
    try { 
     //other code, nrtq 
     QueueItUp(new CompanyMessage(message)); 
    } catch (Exception ex) { //other code, nrtq } 
    } 

    private void HandleProductMsg(string msg) 
    { 
    try { 
     //other code, nrtq 
     QueueItUp(new ProductMessage(message)); 
    } catch (Exception ex) { //other code, nrtq } 
    } 

    private static void QueueItUp(IMessage message) 
    { 
    _workers.AddOrUpdate(message.CompanyId, 
     x => { 
     var task = new Task(message.Process); 
     task.Start(); 
     return task; 
     }, 
     (x, y) => y.ContinueWith((z) => message.Process()) 
    ); 
    } 

谢谢!

+0

你能否勾起最后的继续去掉字典键? (感谢您简短的代码,顺便说一句!) – usr

+0

@usr - 这是我遇到的“问题”。如果您有解决方案,很想听听。什么时候我最终ContWith?我不知道我会完成多少任务。示例:我添加第一个任务(和ContWith删除密钥)。 B4完成后,我再获得2个并与他们连接,但他们在Del之后执行Del任务。虽然Msg2的任务正在运行,但我得到了一个新的msg(#4),但ID不再在Dict中,因此我开始了一个** NEW **任务,现在可以同时运行任务消息给Msg3。如果有一个很棒的“InsertContWithBeforeLastContWith”! :) –

+0

或者是一个“FinalContinue”,它会将任务放在链中,但始终保留它,以便任何新的ContinueWith任务在它之前执行。 –

回答

0

我暂时不会“接受”这个答案,因为我渴望看看其他人能否想出更好的解决方案。

一位同事提出了一个解决方案,我稍微调整了一下。是的,我知道使用lock陈述与ConcurrentDictionary的讽刺(?)。我现在没有时间去查看是否有更好的收集类型可供使用。基本上,我们不是只为现有任务做ContinueWith(),而是使用ContinueWith()来替换自己加上的另一个任务。

这有什么不同?很高兴你问! :)如果我们刚完成ContinueWith(),那么!worker.Value.IsCompleted将返回true一旦链中的第一个任务完成。但是,通过将任务替换为两个(或更多)链接任务,则就集合而言,只有一个任务和!worker.Value.IsCompleted不会返回true,直到链中的所有任务都完成为止。

我承认我有点担心用自己+(新任务)替换一个任务,因为如果任务在它被替换时碰巧正在运行。那么,我测试了这些活的日光,并没有遇到任何问题。我相信发生的事情是,因为任务在其自己的线程中运行,并且集合只是持有指向它的指针,所以正在运行的任务不受影响。通过将其替换为自己+(新任务),我们维护指向正在执行的线程的指针,并在完成时获取“通知”,以便下一个任务可以“继续”或返回true。另外,“清理”循环的工作方式以及它所在的位置,意味着我们将有“已完成”的任务在集合中四处闲逛,但直到下一次“清理”运行为止下次收到消息时。再次,我做了很多测试,看看是否可能导致内存问题,但是我的服务从未使用超过20 MB的RAM,即使每秒处理数百条消息。我们必须收到一些相当大的信息,并且有很多长时间运行的任务会造成问题,但是由于情况可能有所不同,因此需要记住这一点。

如上所示,在下面的代码中,nrtq =与问题无关。

public interface IMessage 
{ 
    long CompantId { get; set; } 
    void Process(); 
} 
public class CompanyMessage : IMessage 
{ //implementation, nrtq } 
public class ProductMessage : IMessage 
{ //implementation, nrtq } 

public class Controller 
{ 
    private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>(); 
    //other needed declarations, nrtq 

    public Controller(){//constructor stuff, nrtq } 

    public StartSubscribers() 
    { 
    //other code, nrtq 
    _companySubscriber.OnMessageReceived += HandleCompanyMsg; 
    _productSubscriber.OnMessageReceived += HandleProductMsg; 
    } 

    private void HandleCompanyMsg(string msg) 
    { 
    //other code, nrtq 
    QueueItUp(new CompanyMessage(message)); 
    } 

    private void HandleProductMsg(string msg) 
    { 
    //other code, nrtq 
    QueueItUp(new ProductMessage(message)); 
    } 

    private static void QueueItUp(IMessage message) 
    { 
    lock(_workers) 
    { 
     foreach (var worker in Workers) 
     { 
     if (!worker.Value.IsCompleted) continue; 
     Task task; 
     Workers.TryRemove(worker.Key, out task); 
     } 
     var id = message.CompanyId; 
     if (_workers.ContainsKey(id)) 
     _workers[id] = _workers[id].ContinueWith(x => message.Process()); 
     else 
     { 
     var task = new Task(y => message.Process(), id); 
     _workers.TryAdd(id, task); 
     task.Start(); 
     } 
    } 
    } 
相关问题