2014-04-03 33 views
2

我在访问相同BlockingCollection的C#应用​​程序中使用了两个线程。这工作正常,但我想检索第一个值两次,因此两个线程检索相同的值*。多线程BlockingCollection相同的值

几秒钟后,我想轮询两个线程的currentIndex并删除每个值<索引。因此,例如,线程的最低currentIndex为5,应用程序将删除队列中索引0-5处的项目。另一个解决方案是在所有线程处理该值时删除队列中的值。

我该如何做到这一点?我想我需要另一种类型的缓冲区..?

预先感谢您!

*如果.Take()被thread1调用,则该项目在集合中被删除,而thread2不能再次获得相同的项目。


更新:

我想数据存储在缓冲器中,因此,例如线程1的数据保存到HDD和线程2分析(相同)的数据(并行)。

+0

你问的是非常不寻常的。我怀疑它说的是你的程序设计中的一个更大的问题。你能给我们多一点关于你的程序的信息 - 特别是数据流?我怀疑我们可以为您提供更好的选择。 –

+0

更新了第一篇文章。 – Odrai

回答

5

使用生产者 - 消费者将Value1添加到两个单独的ConcurrentQueues。让线程出队然后从他们自己的队列中处理它们。

编辑7/4/14: 下面是一个朦胧,哈克和半想出来的解决方案:创建一个缓冲的自定义对象。它可以包含您尝试在线程1中缓冲的信息以及线程2中分析结果的空间。

将对象添加到线程1和BlockingCollection中的缓冲区。使用线程2分析结果并用结果更新对象。阻塞集合不应该太大,因为它只是处理引用不应该打你的记忆。这假定你不会在两个线程上同时修改缓冲区中的信息。

另外,还有一半想出来的解决方案是将信息同时送入缓冲区和阻塞收集。分析来自BlockingCollection的数据,将其输入到输出集合中,并再次将它们与缓冲区进行匹配。如果你做对了,这个选项可以处理并发修改,但可能会有更多的工作。

我认为选择一个更好。正如我已经指出的那样,这些只是一半形成的,但它们可能会帮助您找到适合您的特定需求的东西。祝你好运。

+0

谢谢你的回答。所以不可能有两个线程共享的缓冲区并使用相同的值? – Odrai

+0

我并不是说它无法完成,但我无法想出一条脱离我头顶的路。我也不确定它是否应该完成。一般而言,并发集合删除元素以防止您尝试执行的操作,因为这通常不合需要。如果你真的需要破解可能将值添加到非并发链表并阅读它们,暂停和删除将起作用。但我认为你会后悔走这条路。 – mike1952

+0

嗨Odrai,我的编辑帮助呢?我不确定堆栈溢出是否发送编辑警报,因此您可能没有看到它。 – mike1952

1

我建议重新考虑你的设计。

当你有一个必须处理的项目列表,然后给每个线程一个他必须处理的项目队列。

有了这样的解决方案,给两个或多个线程处理相同的值都不会有问题。

这样的事情,没有测试只是键入。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Collections.Concurrent; 

namespace ConsoleApplication2 
{ 

    class Item 
    { 
    private int _value; 
    public int Value 
    { 
     get 
     { 
     return _value; 
     } 
    } 

    // all you need 
    public Item(int i) 
    { 
     _value = i; 
    } 
    } 

    class WorkerParameters 
    { 
    public ConcurrentQueue<Item> Items = new ConcurrentQueue<Item>(); 
    } 

    class Worker 
    { 
    private Thread _thread; 
    private WorkerParameters _params = new WorkerParameters(); 

    public void EnqueueItem(Item item) 
    { 
     _params.Items.Enqueue(item); 
    } 

    public void Start() 
    { 
     _thread = new Thread(new ParameterizedThreadStart(ThreadProc)); 
     _thread.Start(); 
    } 

    public void Stop() 
    { 
     // build somthing to stop your thread 
    } 

    public static void ThreadProc(object threadParams) 
    { 
     WorkerParameters p = (WorkerParameters)threadParams; 
     while (true) 
     { 
     while (p.Items.Count > 0) 
     { 
      Item item = null; 
      p.Items.TryDequeue(out item); 

      if (item != null) 
      { 
      // do something 
      } 

     } 
     System.Threading.Thread.Sleep(50); 
     } 
    } 
    } 

    class Program 
    { 
    static void Main(string[] args) 
    { 

     Worker w1 = new Worker(); 
     Worker w2 = new Worker(); 
     w1.Start(); 
     w2.Start(); 

     List<Item> itemsToProcess = new List<Item>(); 
     for (int i = 1; i < 1000; i++) 
     { 
     itemsToProcess.Add(new Item(i)); 
     } 

     for (int i = 1; i < 1000; i++) 
     { 
     w1.EnqueueItem(itemsToProcess[i]); 
     w2.EnqueueItem(itemsToProcess[i]); 
     } 


    } 
    } 
} 
相关问题