2015-10-30 54 views
0

好吧,所以我在创建一堆线程时遇到了问题,它们都使用同一个对象。这个想法是,我有一个“队列”的项目(也是一个列表)和项目应逐一处理,直到所有的项目已被处理。目前,这对一个线程(当我更改threadcount = 1时)很好,但是当我尝试使threadcount = 2并且线程正在竞争时,这一切都会发生......一个糟糕的地方。使用共享资源创建多个线程

这里有几个快速课我给出了我想要完成的一个详细示例......我有一个相当不错的预感,它会与使用“锁”关键字有关,但我关于如何使用它,我不是100%确定的。

在你的回答中,请给出一个解决方案的代码示例,以使您的答案清晰。谢谢!

验证码:

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading.Tasks; 
using System.Threading; 
using System.Diagnostics; 

namespace MyNamespace 
{ 
    class Class1 
    { 
     static void Main() 
     { 
      Debug.WriteLine("starting application..."); 

      int threadcount = 2; 
      List<Task> tasks = new List<Task>(); 

      List<Class2> myObjs = new List<Class2>(); 
      myObjs.Add(new Class2("list item 1")); 
      myObjs.Add(new Class2("list item 2")); 
      myObjs.Add(new Class2("list item 3")); 
      myObjs.Add(new Class2("list item 4")); 
      myObjs.Add(new Class2("list item 5")); 
      myObjs.Add(new Class2("list item 6")); 
      myObjs.Add(new Class2("list item 7")); 
      myObjs.Add(new Class2("list item 8")); 
      myObjs.Add(new Class2("list item 9")); 

      Debug.WriteLine("about to create " + threadcount + " task(s)..."); 

      int t = 0; 
      do 
      { 
       t++; 
       Debug.WriteLine("creating task " + t); 
       Class3 starter = new Class3(); 
       tasks.Add(starter.StartNewThread(myObjs)); 
      } while (t < threadcount); 

      Task.WaitAll(tasks.ToArray()); 
      Debug.WriteLine("all tasks have completed"); 
     } 
    } 

    class Class2 
    { 
     private string m_status; 
     public string status 
     { 
      get { return m_status; } 
      set { m_status = value; } 
     } 

     private string m_text; 
     public string text 
     { 
      get { return m_text; } 
      set { m_text = value; } 
     } 

     private int m_threadid; 
     public int threadid 
     { 
      get { return m_threadid; } 
      set { m_threadid = value; } 
     } 

     public Class2() 
     { 
      m_status = "created"; 
      m_text = ""; 
      m_threadid = 0; 
     } 
     public Class2(string intext) 
     { 
      m_status = "created"; 
      m_text = intext; 
      m_threadid = 0; 
     } 
    } 

    class Class3 
    { 
     public Task StartNewThread(List<Class2> taskObjs) 
     { 
      Task<List<Class2>> task = Task.Factory 
       .StartNew(() => threadTaskWorker(taskObjs), 
       CancellationToken.None, 
       TaskCreationOptions.None, 
       TaskScheduler.Default) 
       .ContinueWith(completed_task => threadTaskComplete(completed_task.Result)); 

      return task; 
     } 
     private List<Class2> threadTaskWorker(List<Class2> taskObjs) 
     { 
      Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId; 
      Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created."); 

      //Process all items in the list that need processing 
      Class2 nextObj; 
      do 
      { 
       //Look for next item in list that needs processing 
       nextObj = null; 
       foreach (Class2 taskObj in taskObjs) 
       { 
        if (taskObj.status == "created") 
        { 
         nextObj = taskObj; 
         break; 
        } 
       } 

       if (nextObj != null) 
       { 
        Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + 
         " is handling " + nextObj.text); 

        nextObj.status = "processing"; 
        nextObj.threadid = Thread.CurrentThread.ManagedThreadId; 
        nextObj.text += "(handled)"; 

        Random rnd = new Random(); 
        Thread.Sleep(rnd.Next(300, 3000)); 

        nextObj.status = "completed"; 
       } 
      } while (nextObj != null); 

      Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed."); 

      //Return the task object 
      return taskObjs; 
     } 
     private List<Class2> threadTaskComplete(List<Class2> taskObjs) 
     { 
      Debug.WriteLine("a thread has finished, here are the current item's status..."); 

      foreach (Class2 taskObj in taskObjs) 
      { 
       Debug.WriteLine(taskObj.text + 
        " thread:" + taskObj.threadid + 
        " status:" + taskObj.status); 
      } 

      //Return the task object 
      return taskObjs; 
     } 
    } 
} 

结果:

/* 
starting application... 
about to create 2 task(s)... 
creating task 1 
creating task 2 
thread #10 created. 
thread #11 created. 
thread #10 is handling list item 1 
thread #11 is handling list item 1 
thread #10 is handling list item 2 
thread #11 is handling list item 2 
thread #10 is handling list item 3 
thread #11 is handling list item 4 
thread #10 is handling list item 5 
thread #11 is handling list item 5 
thread #10 is handling list item 6 
thread #11 is handling list item 6 
thread #10 is handling list item 7 
thread #11 is handling list item 8 
thread #10 is handling list item 9 
thread #11 destroyed. 
a thread has finished, here are the current item's status... 
list item 1(handled) thread:11 status:completed 
list item 2(handled)(handled) thread:11 status:completed 
list item 3(handled) thread:10 status:completed 
list item 4(handled) thread:11 status:completed 
list item 5(handled)(handled) thread:11 status:completed 
list item 6(handled)(handled) thread:11 status:completed 
list item 7(handled) thread:10 status:completed 
list item 8(handled) thread:11 status:completed 
list item 9(handled) thread:10 status:processing 
thread #10 destroyed. 
a thread has finished, here are the current item's status... 
list item 1(handled) thread:11 status:completed 
list item 2(handled)(handled) thread:11 status:completed 
list item 3(handled) thread:10 status:completed 
list item 4(handled) thread:11 status:completed 
list item 5(handled)(handled) thread:11 status:completed 
list item 6(handled)(handled) thread:11 status:completed 
list item 7(handled) thread:10 status:completed 
list item 8(handled) thread:11 status:completed 
list item 9(handled) thread:10 status:completed 
all tasks have completed 
*/ 

预期的结果:

/* 
starting application... 
about to create 2 task(s)... 
creating task 1 
creating task 2 
thread #10 created. 
thread #11 created. 
thread #10 is handling list item 1 
thread #11 is handling list item 2 
thread #10 is handling list item 3 
thread #11 is handling list item 4 
thread #10 is handling list item 5 
thread #10 is handling list item 6 
thread #11 is handling list item 7 
thread #10 is handling list item 8 
thread #11 is handling list item 9 
thread #10 destroyed. 
a thread has finished, here are the current item's status... 
list item 1(handled) thread:10 status:completed 
list item 2(handled) thread:11 status:completed 
list item 3(handled) thread:10 status:completed 
list item 4(handled) thread:11 status:completed 
list item 5(handled) thread:10 status:completed 
list item 6(handled) thread:10 status:completed 
list item 7(handled) thread:11 status:completed 
list item 8(handled) thread:10 status:completed 
list item 9(handled) thread:11 status:processing 
thread #11 destroyed. 
a thread has finished, here are the current item's status... 
list item 1(handled) thread:10 status:completed 
list item 2(handled) thread:11 status:completed 
list item 3(handled) thread:10 status:completed 
list item 4(handled) thread:11 status:completed 
list item 5(handled) thread:10 status:completed 
list item 6(handled) thread:10 status:completed 
list item 7(handled) thread:11 status:completed 
list item 8(handled) thread:10 status:completed 
list item 9(handled) thread:11 status:completed 
all tasks have completed 
*/ 

回答

0

首先,感谢@khargoosh和@interceptwind的输入!这是帮助我理解锁并提出解决方案的关键。这就是我想出的结果,最终稳定工作!它已经过测试,结果是ACTUAL结果。在答案中,我决定使用4个线程来显示结果。

验证码:

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading.Tasks; 
using System.Threading; 
using System.Diagnostics; 

namespace MyNamespace 
{ 
    class Class1 
    { 
     static void Main() 
     { 
      Debug.WriteLine("starting application..."); 

      int threadcount = 4; 
      List<Task> tasks = new List<Task>(); 

      List<Class2> myObjs = new List<Class2>(); 
      myObjs.Add(new Class2("list item 1")); 
      myObjs.Add(new Class2("list item 2")); 
      myObjs.Add(new Class2("list item 3")); 
      myObjs.Add(new Class2("list item 4")); 
      myObjs.Add(new Class2("list item 5")); 
      myObjs.Add(new Class2("list item 6")); 
      myObjs.Add(new Class2("list item 7")); 
      myObjs.Add(new Class2("list item 8")); 
      myObjs.Add(new Class2("list item 9")); 

      Debug.WriteLine("about to create " + threadcount + " task(s)..."); 

      int t = 0; 
      do 
      { 
       t++; 
       Debug.WriteLine("creating task " + t); 
       Class3 starter = new Class3(); 
       tasks.Add(starter.StartNewThread(myObjs)); 
      } while (t < threadcount); 

      Task.WaitAll(tasks.ToArray()); 
      Debug.WriteLine("all tasks have completed"); 
     } 
    } 

    class Class2 
    { 
     private object m_locker = new object(); 
     public object locker 
     { 
      get { return m_locker; } 
      set { m_locker = value; } 
     } 

     private string m_status; 
     public string status 
     { 
      get { return m_status; } 
      set { m_status = value; } 
     } 

     private string m_text; 
     public string text 
     { 
      get { return m_text; } 
      set { m_text = value; } 
     } 

     private int m_threadid; 
     public int threadid 
     { 
      get { return m_threadid; } 
      set { m_threadid = value; } 
     } 

     public Class2() 
     { 
      m_status = "created"; 
      m_text = ""; 
      m_threadid = 0; 
     } 
     public Class2(string intext) 
     { 
      m_status = "created"; 
      m_text = intext; 
      m_threadid = 0; 
     } 
    } 

    class Class3 
    { 
     public Task StartNewThread(List<Class2> taskObjs) 
     { 
      Task<List<Class2>> task = Task.Factory 
       .StartNew(() => threadTaskWorker(taskObjs), 
       CancellationToken.None, 
       TaskCreationOptions.None, 
       TaskScheduler.Default) 
       .ContinueWith(completed_task => threadTaskComplete(completed_task.Result)); 

      return task; 
     } 
     private List<Class2> threadTaskWorker(List<Class2> taskObjs) 
     { 
      Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId; 
      Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created."); 

      //Process all items in the list that need processing 
      Class2 nextObj; 
      do 
      { 
       //Look for next item in list that needs processing 
       nextObj = null; 
       foreach (Class2 taskObj in taskObjs) 
       { 
        nextObj = taskObj; 

        lock (nextObj.locker) 
        { 
         if (taskObj.status == "created") 
         { 
          nextObj.status = "processing"; 
          break; 
         } 
         else nextObj = null; 
        } 
       } 

       if (nextObj != null) 
       { 
        Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + 
         " is handling " + nextObj.text); 

        nextObj.threadid = Thread.CurrentThread.ManagedThreadId; 
        nextObj.text += "(handled)"; 

        Random rnd = new Random(); 
        Thread.Sleep(rnd.Next(300, 3000)); 

        nextObj.status = "completed"; 
       } 
      } while (nextObj != null); 

      Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed."); 

      //Return the task object 
      return taskObjs; 
     } 
     private List<Class2> threadTaskComplete(List<Class2> taskObjs) 
     { 
      Debug.WriteLine("a thread has finished, here are the current item's status..."); 

      foreach (Class2 taskObj in taskObjs) 
      { 
       Debug.WriteLine(taskObj.text + 
        " thread:" + taskObj.threadid + 
        " status:" + taskObj.status); 
      } 

      //Return the task object 
      return taskObjs; 
     } 
    } 
} 

结果:

/* 
starting application... 
about to create 4 task(s)... 
creating task 1 
creating task 2 
creating task 3 
creating task 4 
thread #11 created. 
thread #13 created. 
thread #12 created. 
thread #12 is handling list item 3 
thread #11 is handling list item 1 
thread #13 is handling list item 2 
thread #14 created. 
thread #14 is handling list item 4 
thread #12 is handling list item 5 
thread #11 is handling list item 6 
thread #13 is handling list item 7 
thread #14 is handling list item 8 
thread #12 is handling list item 9 
thread #11 destroyed. 
a thread has finished, here are the current item's status... 
list item 1(handled) thread:11 status:completed 
list item 2(handled) thread:13 status:completed 
list item 3(handled) thread:12 status:completed 
list item 4(handled) thread:14 status:completed 
list item 5(handled) thread:12 status:completed 
list item 6(handled) thread:11 status:completed 
list item 7(handled) thread:13 status:processing 
list item 8(handled) thread:14 status:processing 
list item 9(handled) thread:12 status:processing 
thread #13 destroyed. 
thread #14 destroyed. 
a thread has finished, here are the current item's status... 
list item 1(handled) thread:11 status:completed 
list item 2(handled) thread:13 status:completed 
list item 3(handled) thread:12 status:completed 
a thread has finished, here are the current item's status... 
list item 1(handled) thread:11 status:completed 
list item 4(handled) thread:14 status:completed 
list item 5(handled) thread:12 status:completed 
list item 2(handled) thread:13 status:completed 
list item 3(handled) thread:12 status:completed 
list item 6(handled) thread:11 status:completed 
list item 7(handled) thread:13 status:completed 
list item 4(handled) thread:14 status:completed 
list item 5(handled) thread:12 status:completed 
list item 8(handled) thread:14 status:completed 
list item 9(handled) thread:12 status:processing 
list item 6(handled) thread:11 status:completed 
list item 7(handled) thread:13 status:completed 
list item 8(handled) thread:14 status:completed 
list item 9(handled) thread:12 status:processing 
thread #12 destroyed. 
a thread has finished, here are the current item's status... 
list item 1(handled) thread:11 status:completed 
list item 2(handled) thread:13 status:completed 
list item 3(handled) thread:12 status:completed 
list item 4(handled) thread:14 status:completed 
list item 5(handled) thread:12 status:completed 
list item 6(handled) thread:11 status:completed 
list item 7(handled) thread:13 status:completed 
list item 8(handled) thread:14 status:completed 
list item 9(handled) thread:12 status:completed 
all tasks have completed 
*/ 
0

如果ACTU盟友想要一个先进先出队列的项目,你可以同时访问然后使用ConcurrentQueue。使用TryDequeue()方法检索对象将确保每个对象只被访问一次。

例子:

var cq = new ConcurrentQueue<T>(); 
//populate queue 
... 
//process queue until empty -- this can be done in parallel 
T item; 
while(cq.trydequeue(out item)){ 
    //process item 
} 
//queue was empty when we tried to retrieve something. 
+0

我不认为这是一个不好回答,但我不认为这就是我正在寻找到底。我有许多线程在列表上执行不同的任务,我认为'ConcurrentQueue'结构是由许多线程处理的'单个任务'构建的。另一方面,我有许多线程处理许多任务。对不起,我没有在这个问题中说清楚,我试图保持简单,因为我只是想了解我想要的锁定......尽管如此,感谢您帮助我了解新的有用的东西! ;) –

+0

@ArvoBowen所以你有一个项目的集合,你需要几个任务到每个?这些任务的性质是否必须按照特定的顺序进行?一个项目可以同时执行多个任务吗? – moreON

2

如果你不想使用ConcurrentQueue,或者如果你正在使用的不是线程安全的其他共享资源,使用您在使用早前指出的选项lock关键字。

MSDN

锁关键字标志着一个语句块作为通过获得互斥锁给定对象,执行语句,然后解除锁定的一个关键部分。

当一个线程获得该遇到lock(object)语句必须等待锁,然后再继续成为可用给定object,其他线程锁。

/// any resource shared between threads 
private List<int> sharedResource = new List<int>(); 

/// best practice is to use a private object to synchronise threads 
/// see: https://msdn.microsoft.com/en-us/library/c5kehkcz.aspx 

private object resourceLock = new object(); 

void MethodAccessingSharedResource() 
{ 
    /// Only one thread can acquire the lock on resourceLock at a time. 

    lock (resourceLock) 
    { 
     /// The thread can safely access the shared resource here. 
     /// Other threads will wait at lock(resourceLock) until 
     /// this thread gives up the lock. 
    } 

    /// The thread has released the lock on resourceLock. 
    /// Another thread can now enter the lock(){} code block. 
} 
+0

请注意,这已经是一个非常好的SO讨论主题。 – khargoosh

+0

我很开心......两种方法('lock'和'ConcurrentQueue')都是我可以接受的解决方案。是否有理由相互选择一个呢?我想因为我有更多的事情要做(两组线程根据状态做不同的事情),我需要使用'lock'来代替。从听起来像'ConcurrentQueue'是一次处理项目的好处。思考? –

+1

对于需要处理一次的项目,您只能使用“队列”。这是队列的性质。您是否使用队列或其他类型的收集取决于您的应用程序。 'ConcurrentQueue'非常方便,因为它在内部处理同步。当你需要对共享资源的访问进行细粒度控制时,使用'lock'是很好的选择,当然它不限于队列,并且可以用于任何类型的线程同步工作。也没有理由不能同时使用两者,但花点时间考虑一下你的线程。 – khargoosh

0

首先,你需要在Class2中被用作更衣室的另一目的

class Class2 
{ 
    public object locker = new object(); 

    private string m_status; 
    ... 
} 

编辑:接下来,在你的Class3中的处理循环,你需要首先检查是否您的nextObj.status是“创建”的。如果是,请将其更改为“正在处理”并继续处理它。如果它不跳到下一个对象。

请注意,我们将锁定nextObj.status以防止2个线程同时访问它。(基于MoreOn的评论)

private List<Class2> threadTaskWorker(List<Class2> taskObjs) 
    { 
     Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId; 
     Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created."); 

     //Process all items in the list that need processing 
     foreach (Class2 nextObj in taskObjs) 
     { 
      Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + 
       " is handling " + nextObj.text); 

      lock (nextObj.locker) 
      { 
       if (nextObj.status == "created") 
        nextObj.status = "processing"; 
       else 
        continue; 
      } 

      nextObj.status = "processing"; 
      nextObj.threadid = Thread.CurrentThread.ManagedThreadId; 
      nextObj.text += "(handled)"; 
      Random rnd = new Random(); 
      Thread.Sleep(rnd.Next(300, 3000)); 
      nextObj.status = "completed"; 
     } 

     Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed."); 
     return taskObjs; 
    } 
+0

我认为你的锁定不在正确的地方。两个线程都可以检查状态未完成,然后双方进入,然后(一个接一个)处理同一个项目。此外,我认为它应该(如原来那样)检查该项目是否仍然处于您想要开始处理的特定状态。 – moreON

+0

@moreON我正要发布一些相同的内容!我喜欢这个答案,但是当试图实现它时,我最终遇到了问题。当两个线程第一次启动时,它们都会看到'if(nextObj.status!=“completed”)'= true并且两个线程都继续(导致问题)。我也改变了代码来使用'if(nextObj.status ==“created”)'而不是因为最终的实现我会需要它来做到这一点。我有很多步骤,这个线程只需要担心它正在尝试处理的单个状态(又称“已创建”)。 –

+0

深入挖掘,我想我解决了这个问题。我将对你的答案进行编辑。 ;) –