2010-05-28 32 views
4

我有一个情况,许多线程都同时生成最终写入一个长串流文件流的数据。我需要以某种方式将这些写入序列化,以便将流写入正确的顺序。线程I/O重排序缓冲区的标准术语?

,我的2048 J任务 ..j Ñ,其每一个产生的数据块ö输入队列。作业在8个线程上并行运行,但输出块必须以与相应输入块相同的顺序出现在输出块中,输出文件必须按以下顺序排列:o o o o ...

的解决方案,这是非常不言自明:我需要在Tomasulo's algorithm某种缓冲区积累和正确的顺序写入输出块,类似于CPU重新排序缓冲器,或到路TCP在将它们传递到应用层之前重新组装无序数据包。

在我开始编写代码之前,我想快速搜索文献,看看是否有任何文章以特别聪明或有效的方式解决了这个问题,因为我有严重的实时和内存限制。我似乎无法找到任何描述这一点的论文;在[线程,并发,重新排序缓冲区,重新组装,io,序列化]的每个排列上的学者搜索没有取得任何有用的结果。我觉得我不应该在寻找正确的术语。

是否有这种模式的常用学术名称或关键字,我可以搜索?

回答

0

其实,你不应该需要积累大块。大多数操作系统和语言提供了一种随机访问文件抽象,允许每个线程独立将其输出数据写入文件中的正确位置,而不影响来自任何其他线程的输出数据。

或者你是否正在写入真正的串行输出文件,如套接字?

+0

真正串行 - 流密码。 – Crashworks 2010-05-28 22:14:58

+0

只有在处理完成之前输出记录的长度已知,您的解决方案才能正常工作。 – 2010-05-30 17:34:22

0

我个人不会使用可重新配置的缓冲区。我会为每个作业创建一个“作业”对象,并根据您的环境使用消息传递或互斥来按顺序接收来自每个作业的完整数据。如果下一份工作没有完成,那么'作家'流程会一直等到它完成。

+0

恐怕我没有按照你的意思去做。你的意思是我应该有多少个互斥体,每个作业都有一个互斥体,并且作者应该按升序等待每个互斥体?这样做的麻烦在于,我一次只能拥有大约20个作业的内存,并且如果遇到当前窗口恰好以相反顺序完成的情况,那么会留下几条线程,直到“头”一个完成。 – Crashworks 2010-05-30 23:22:10

+0

这就是我所建议的,是的。如果任务以相反的顺序完成(除了史蒂夫的建议,如果您的记录已知),或者将完整的结果缓存到磁盘,我认为任何其他解决方案都不会更好。 – 2010-05-31 17:31:02

0

我会使用与您使用的线程数量相同长度的环形缓冲区。环缓冲区也会有相同数量的互斥量。

rinbuffer还必须知道它写入文件的最后一个块的id。它相当于你的环形缓冲区的0索引。

在添加到环缓冲区时,您检查是否可以写入,即设置索引0,然后可以一次向该文件写入多个块。

如果未设置索引0,只需锁定当前线程以等待。 - 你也可以有一个比缓冲区长2-3倍的缓冲区,并且只在适当的时候锁定,例如:当足够的作业完成缓冲区已经启动时。

不要忘了更新的最后一块写坚韧;)

书面方式将文件时,您还可以使用双缓冲。

0

让输出队列包含期货而不是实际数据。当您从输入队列中检索一个项目时,立即将相应的未来发布到输出队列中(注意确保这会保留顺序---见下文)。当工作者线程处理完项目后,可以设置未来的值。输出线程可以从队列中读取每个将来,并阻塞,直到未来准备就绪。如果后期准备工作提前做好准备,只要期货有序,这根本不会影响产出线。

有两种方法可确保输出队列上的期货按正确的顺序排列。第一种方法是使用单个互斥锁从输入队列读取数据并写入输出队列。每个线程都会锁定互斥锁,从输入队列获取一个项目,将未来发布到输出队列并释放互斥锁。

第二个是有一个主线程从输入队列中读取,在输出队列中发布未来,然后将项目交给工作线程执行。

在C++中有一个互斥体保护的队列,这将是这样的:

#include <thread> 
#include <mutex> 
#include <future> 

struct work_data{}; 
struct result_data{}; 

std::mutex queue_mutex; 
std::queue<work_data> input_queue; 
std::queue<std::future<result_data> > output_queue; 

result_data process(work_data const&); // do the actual work 

void worker_thread() 
{ 
    for(;;) // substitute an appropriate termination condition 
    { 
     std::promise<result_data> p; 
     work_data data; 
     { 
      std::lock_guard<std::mutex> lk(queue_mutex); 
      if(input_queue.empty()) 
      { 
       continue; 
      } 
      data=input_queue.front(); 
      input_queue.pop(); 
      std::promise<result_data> item_promise; 
      output_queue.push(item_promise.get_future()); 
      p=std::move(item_promise); 
     } 
     p.set_value(process(data)); 
    } 
} 

void write(result_data const&); // write the result to the output stream 

void output_thread() 
{ 
    for(;;) // or whatever termination condition 
    { 
     std::future<result_data> f; 
     { 
      std::lock_guard<std::mutex> lk(queue_mutex); 
      if(output_queue.empty()) 
      { 
       continue; 
      } 
      f=std::move(output_queue.front()); 
      output_queue.pop(); 
     } 
     write(f.get()); 
    } 
}