2013-06-25 37 views
3

我一直在研究这一两天,并已阅读关于多线程和blob客户端的以前的问题并实施了他们的建议。任务线程和Azure CloudBlobClient

我已经将问题解决到下面。

没有错误产生,只是没有写入threadtest容器(它已经存在)。有时候一个blob被写入,然后什么都没有。

如果我增加睡眠到1秒一切都很好。

代码的原因是基准Azure的blob写入功能。 (我目前拿到8种单线程的情况下做70万小时,但我敢肯定,我可以得到更高,如果我能想出解决办法)

using System; 
using System.Net; 
using System.Threading; 
using Microsoft.WindowsAzure; 
using Microsoft.WindowsAzure.ServiceRuntime; 
using Microsoft.WindowsAzure.StorageClient; 
using System.Threading.Tasks; 

namespace ThreadedWriterTest 
{ 
    public class WorkerRole : RoleEntryPoint 
    { 
     private static CloudStorageAccount storageAccount; 

     public override void Run() 
     { 
      while (true) 
      { 
       Thread.Sleep(10); 
       Task.Factory.StartNew(()=> writeStuff()); 
      } 
     } 

     private void writeStuff() 
     { 
      CloudBlobClient threadClient = storageAccount.CreateCloudBlobClient(); 
      threadClient.GetBlobReference("threadtest/" + Guid.NewGuid().ToString()).UploadText("Hello " + Guid.NewGuid().ToString()); 
     } 



     public override bool OnStart() 
     { 
      ServicePointManager.DefaultConnectionLimit = 12; 
      storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("XXX")); 
      return base.OnStart(); 
     } 
    } 
} 
+0

如果只需要调用'writeStuff()'直接在while循环它的工作原理。 –

+2

如果平均写入时间超过10毫秒,那么最终会有很多“任务”等待。也许这就是你的问题 - 你的虚拟机挂在CPU或内存上?在每个可用连接启动一个线程,并让每个线程紧密循环写入可能会更好。或者,甚至更好的是,使用具有某种机制的异步写入方法来控制未完成的请求。 –

+0

你是真的。这段代码产生了太多的并发线程,我使用了一个信号量来限制数量,并且很快就会发布一个解决方案。 –

回答

1

产生太多的并发线程上面的代码,我的幼稚的做法用Thread.Sleep()节流并不足以限制线程数。

引入一个信号量(基本上是一个用于统计并行执行多少个线程的机制)可以大大地解决这个问题。我正在稳步增加并发限制和实例数量,并且已经超过100万个小时。 (实际的代码生成与奇数一个4MB〜随机长度数据16-32K - 4个实例10个并发线程)

using System; 
using System.Net; 
using System.Threading; 
using Microsoft.WindowsAzure; 
using Microsoft.WindowsAzure.ServiceRuntime; 
using Microsoft.WindowsAzure.StorageClient; 
using System.Threading.Tasks; 

namespace ThreadedWriterTest 
{ 
    public class WorkerRole : RoleEntryPoint 
    { 
     private static CloudStorageAccount storageAccount; 
     private static Semaphore semaphore = new Semaphore(3, 3); 

     public override void Run() 
     { 
      while (true) 
      { 
       semaphore.WaitOne(); 
       Task.Factory.StartNew(()=> writeStuff()); 
      } 
     } 

     private void writeStuff() 
     { 
      CloudBlobClient threadClient = storageAccount.CreateCloudBlobClient(); 
      threadClient.GetBlobReference("threadtest/" + Guid.NewGuid().ToString()).UploadText("Hello " + Guid.NewGuid().ToString()); 
      semaphore.Release(); 
     } 



     public override bool OnStart() 
     { 
      ServicePointManager.DefaultConnectionLimit = 12; 
      storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("XXX")); 
      return base.OnStart(); 
     } 
    } 
}