2017-04-12 14 views
0

我有一个控制台应用程序,它从Azure表存储中检索数据并将其传输到文本文件。目前,我的代码如下所示:Azure Table存储ExecuteSegmentedQuery和StreamWriter的单独线程

using (StreamWriter writer = new StreamWriter(filePath)) 
     { 
      //header 
      writer.Write("PartitionKey|RowKey|Timestamp|Id|"); 
      writer.WriteLine(); 

      do 
      { 
       var entities = new List<TModel>(); 
       var queryResult = table.ExecuteQuerySegmented(new TableQuery<TModel>(), token); 
       entities.AddRange(queryResult.Results); 
       count = count + 1000; 

       Console.WriteLine("{0} records retrieved", count); 

       //records 
       foreach (TModel entity in entities) 
       { 
        writer.Write("\"" + entity.PartitionKey + "\"|\"" + entity.RowKey + "\"|" + entity.Timestamp + "|" + entity.Id); 
        writer.WriteLine(); 
       } 
       token = queryResult.ContinuationToken; 
      } 
      while (token != null); 
      writer.Dispose(); 

下面这段代码在单线程这意味着它将它写入文本文件之前得到的第1000条记录中运行,那么将获得下1000条记录然后追加它到一个文件。

我想知道是否有办法将数据检索和写入文件的处理拆分为两个单独的线程,这意味着一个线程将执行分段查询的执行,并且将执行写入数据到文件(像一个队列)?

欢迎任何建议。提前致谢!

回答

0

根据您的描述,我建议您创建2个主题。一个是生产者线程,另一个是消费者线程。此外,还需要线程安全收集。我们可以使用ConcurrentQueue Class来存储将从这两个线程访问的实体。

以下代码供您参考。

class Program 
{ 
    static void Main(string[] args) 
    { 
     //Run producer thread 
     Task.Run(new Action(ReadDataFromTable)); 
     //Run consumer thread 
     Task.Run(new Action(WriteDataToText)); 

     Console.Read(); 
    } 

    public static ConcurrentQueue<TModel> entitiesQueue = new ConcurrentQueue<TModel>(); 
    //This tag point out whether the read process is down 
    public static bool ReadTableFinished = false; 
    public static void ReadDataFromTable() 
    { 
     do 
     { 
      var entities = new List<TModel>(); 
      var queryResult = table.ExecuteQuerySegmented(new TableQuery<TModel>(), token); 
      entities.AddRange(queryResult.Results); 
      count = count + 1000; 

      Console.WriteLine("{0} records retrieved", count); 

      //Save entites to the queue 
      foreach (TModel entity in entities) 
      { 
       entitiesQueue.Enqueue(entity); 
      } 
      token = queryResult.ContinuationToken; 
     } 
     while (token != null); 

     ReadTableFinished = true; 
    } 

    public static void WriteDataToText() 
    { 
     using (StreamWriter writer = new StreamWriter(filePath)) 
     { 
      //header 
      writer.Write("PartitionKey|RowKey|Timestamp|Id|"); 
      writer.WriteLine(); 

      do 
      { 
       TModel entity = null; 
       //Read and remove entity from queue 
       entitiesQueue.TryDequeue(out entity); 

       if (entity != null) 
       { 
        //Write data to text file 
        writer.Write("\"" + entity.PartitionKey + "\"|\"" + entity.RowKey + "\"|" + entity.Timestamp + "|" + entity.Id); 
        writer.WriteLine(); 
       } 
       else 
       { 
        //If all the entities are read out from the table and now there is no entities in the queue, we will break this loop and exit current thread. 
        if (ReadTableFinished) 
        { 
         break; 
        } 
       } 
      } 
      while (true); 
      writer.Dispose(); 
     } 
    } 
} 

添加断点而不Console.Read不会进入代码()。

如果您未添加Console.Read()来阻止UI线程,UI线程将完全运行。如果UI线程的状态完成,则其他线程(任务)不能运行。如果你不想添加Console.Read()。您可以在Main方法中使用以下代码。 Task.WaitAll方法将阻止当前UI线程,直到所有任务完全运行。

static void Main(string[] args) 
{ 
    //Run producer thread 
    Task t1 = Task.Run(new Action(ReadDataFromTable)); 
    //Run consumer thread 
    Task t2 = Task.Run(new Action(WriteDataToText)); 

    Task.WaitAll(t1, t2); 
    Console.WriteLine("all completed"); 
} 
+0

运行此代码时没有任何反应。控制台和文本文件中没有消息没有出现。也许你需要在Main部分添加一些内容,以便线程能够运行。 – Joseph

+0

Task.Run方法将立即调用一个新的线程。你是否删除了Console.Read();?如果是,请重新添加。另外,您可以在代码中添加一个断点并逐行运行您的代码,以检查它们是否按预期执行。 – Amor

+0

我打算控制台应用程序是一个无人参与的服务,因此添加Console.Read()将意味着需要一个用户输入,我认为这对无人参与服务没有意义。另外,如果没有Console.Read(),添加断点将不会输入代码。 Console.Read()除了可以是其他选项吗? – Joseph