2012-07-26 56 views
0

我有电子邮件发送窗口服务的问题。该服务在每三分钟延迟一次后开始,并获取要从数据库发送的消息,并开始发送它。这是怎样的代码看起来像:C#线程在窗口服务创建问题

 MessageFilesHandler MFHObj = new MessageFilesHandler(); 
     List<Broadcostmsg> imidiateMsgs = Manager.GetImidiateBroadCastMsgs(conString); 
     if (imidiateMsgs.Count > 0) 
     { 

      // WriteToFileImi(strLog); 

      Thread imMsgThread = new Thread(new    ParameterizedThreadStart(MFHObj.SendImidiatBroadcast));    
      imMsgThread.IsBackground = true; 
      imMsgThread.Start(imidiateMsgs); 
     } 

这将消息发送到大名单,并且需要很长时间才能完成发送到一个更大的列表。现在问题发生在消息仍在发送并且服务获得新消息发送时,之前的发送受到侮辱并且新的消息发送开始,尽管我正在使用线程,每次服务获取消息发送它时都会启动一个新线程。 可以请你帮忙,我在代码中犯的错误。

+1

它看起来像你有一个线程使用'imidiateMsgs',但你没有任何类型的线程安全锁定保护'imidiateMsgs'。如果你有一本关于C#的好书,它应该有一个关于线程安全的章节,你可以学习。 – Tod 2012-07-26 05:27:19

+0

感谢您的建议。我一定会关注,但现在我没有时间阅读一本书。我需要尽快解决这个问题。如果你有解决方案,请帮助我。谢谢 – 2012-07-26 05:31:19

回答

0

它看起来像要求是建立一个消费者生产者队列。在哪个制作人会不断添加消息到列表中,并且消费者将从该列表中选择项目并且使用它来执行一些工作 唯一让我担心的是,您每次都创建一个新线程来发送电子邮件,而不是从线程池中选择线程。如果继续创建越来越多的线程,应用程序的性能会因上下文切换所产生的头部而降低。

如果您使用.net framwe工作4.0,那么灵魂变得相当容易。您可以使用System.Collections.Concurrent.ConcurrentQueue进行排队和出列您的项目。它的线程安全,所以不需要锁定对象。使用Tasks来处理您的消息。

BlockingCollection在其构造函数中需要一个IProducerConsumerCollection,或者如果调用它的空构造函数,它将默认使用ConcurrentQueue。

这样才能排队消息。

//define a blocking collectiom 
var blockingCollection = new BlockingCollection<string>(); 

//Producer 
Task.Factory.StartNew(() => 
{ 
    while (true) 
    { 
     blockingCollection.Add("value" + count); 
     count++;      
    } 
}); 

//consumer 
//GetConsumingEnumerable would wait until it find some item for work 
// its similar to while(true) loop that we put inside consumer queue 
Task.Factory.StartNew(() => 
{ 
    foreach (string value in blockingCollection.GetConsumingEnumerable()) 
    { 
     Console.WriteLine("Worker 1: " + value); 
    }     
}); 

UPDATE

由于您使用的框架3.5。我建议你看看Joseph Albahari的实施Consumer/Producer Queue。它是你找到的最好的之一。

直接从上面的链接

public class PCQueue 
{ 
    readonly object _locker = new object(); 
    Thread[] _workers; 
    Queue<Action> _itemQ = new Queue<Action>(); 

    public PCQueue (int workerCount) 
    { 
    _workers = new Thread [workerCount]; 

    // Create and start a separate thread for each worker 
    for (int i = 0; i < workerCount; i++) 
     (_workers [i] = new Thread (Consume)).Start(); 
    } 

    public void Shutdown (bool waitForWorkers) 
    { 
    // Enqueue one null item per worker to make each exit. 
    foreach (Thread worker in _workers) 
     EnqueueItem (null); 

    // Wait for workers to finish 
    if (waitForWorkers) 
     foreach (Thread worker in _workers) 
     worker.Join(); 
    } 

    public void EnqueueItem (Action item) 
    { 
    lock (_locker) 
    { 
     _itemQ.Enqueue (item);   // We must pulse because we're 
     Monitor.Pulse (_locker);   // changing a blocking condition. 
    } 
    } 

    void Consume() 
    { 
    while (true)      // Keep consuming until 
    {         // told otherwise. 
     Action item; 
     lock (_locker) 
     { 
     while (_itemQ.Count == 0) Monitor.Wait (_locker); 
     item = _itemQ.Dequeue(); 
     } 
     if (item == null) return;   // This signals our exit. 
     item();       // Execute item. 
    } 
    } 
} 

优点这种方法以代码是你可以控制你需要创建为优化性能的线程数。使用线程池方式,尽管它很安全,但您无法控制可同时创建的线程数。

+0

任务体内的无限循环?哦,没有... – Dennis 2012-07-26 06:05:42

+0

如果它没有无限循环,你将如何确保你的消费者线程总是处理。事件可以用来表示新项目的到来,但我还没有看到任何生产者,消费者队列与事件发信号 – Anand 2012-07-26 06:13:58

+0

我正在使用框架3.5 – 2012-07-26 07:18:14

0

我觉得你在一个循环里面使用你的代码WAITS对于新的消息,你是否管理过这些等待?让我们来看看:

while(imidiateMsgs.Count == 0) 
{ 
    //Wait for new Message 
} 

//Now you have a new message Here 

//Make a new thread to process message 

还有对于等待中不同的方法,我建议使用BlockingQueues:

在公共场所:

BlockingCollection<Broadcostmsg> imidiateMsgs = new BlockingCollection<Broadcostmsg>(); 

在你的消费者(线程产生消息):

SendImidiatBroadcast = imidiateMsgs.Take();//this will wait for new message 
//Now you have a new message Here 

//Make a new thread to process message 

在生产者(回答消息的线程):

imidiateMsgs.Add(SendImidiatBroadcast); 

而且你必须使用线程池,每次回答的消息作出新的线程,不要每次初始化新的线程。