2014-04-29 98 views
1

我已经写在文本文件不同的值(100次),C结构,例如的1.txt,2.txt ... 100.txt不正确的输出与TBB管道


我在Linux上使用英特尔TBB。我已经创建:

  1. InputFilter输入(serial_in_order MODE)
  2. TransformFIlter(serial_in_order MODE)
  3. OutputFilter输出(Serial_in_order MODE)

的输入过滤从文件中读取结构,并将其传递到TransformFilter。 TrasnformFilter更新结构值并将其传递给OutputFilter。 OutputFilter将新结构写入光盘。

基本上,它是一种结构简单的读写应用程序。

class InputFilter: public tbb::filter { 
public: 
    InputFilter(int); 
    ~InputFilter(); 
private: 
    int total_streams; 
    int count; 
    struct video_process_object input_obj; 
    void* operator()(void*); 
}; 

InputFilter::InputFilter(int x) 
     : filter(serial_in_order) { 
    total_streams = x; 
    count = 1; 
} 

InputFilter::~InputFilter() { 
    total_streams = 0; 
} 

void* InputFilter::operator()(void*) { 
    char path[50] = { }; 
    sprintf(path, "input//%d.txt", count); 
    printf("Path : %s\n", path); 
    FILE *fp; 
    fp = fopen(path, "r"); 

    if(fp == NULL || count > total_streams) { 
     fclose(fp); 
     printf("\n*******Cannot find more data.Terminating********\n\n\n"); 
     return NULL; 
    } 

    fscanf(fp, "%d", &input_obj.video_id); 
    fscanf(fp, "%s", &input_obj.storage_url); 
    fscanf(fp, "%s", &input_obj.storage_type); 
    fscanf(fp, "%d", &input_obj.face_detect); 
    fscanf(fp, "%d", &input_obj.face_recognise); 
    fscanf(fp, "%d", &input_obj.scene_recognise); 
    fscanf(fp, "%d", &input_obj.activity_recognise); 
    fscanf(fp, "%d", &input_obj.speech_recognise); 
    fclose(fp); 

    count++; 
    return &input_obj; 
} 

class TransformFilter: public tbb::filter { 
public: 
    TransformFilter(); 
    ~TransformFilter(); 
private: 
    struct video_process_object input_transform; 
    void* operator()(void*); 
}; 

TransformFilter::TransformFilter() 
     : filter(serial_in_order) { 
} 

TransformFilter::~TransformFilter() { 
} 

void* TransformFilter::operator()(void *item) { 

    input_transform = *static_cast<struct video_process_object*>(item); 

    input_transform.video_id += 1000; 
    strcat(input_transform.storage_url, " nabeel"); 
    strcat(input_transform.storage_type, " N"); 
    input_transform.face_detect += 1000; 
    input_transform.face_recognise += 1000; 

    return &input_transform; 
} 

class OutputFilter: public tbb::filter { 
public: 
    OutputFilter(); 
    ~OutputFilter(); 
private: 
    struct video_process_object output_obj; 
    void* operator()(void*); 
}; 

OutputFilter::OutputFilter() 
     : filter(serial_in_order) { 
    int status = mkdir("output", S_IRWXU | S_IRWXG | S_IRWXO); 
    if(status == -1) 
     printf("\nOutput directory already exists\n\n"); 
} 

OutputFilter::~OutputFilter() { 
} 

void* OutputFilter::operator()(void *item) { 

    output_obj = *static_cast<struct video_process_object*>(item); 

    FILE *fp; 

    char path[50] = { }; 
    sprintf(path, "output//%d.txt", output_obj.video_id - 1000); 
    printf("Output Path : %s\t\t %d\n\n", path, output_obj.video_id); 

    if((fp = fopen(path, "w")) == NULL) { 
     fprintf(stderr, "Cannot open output file.\n"); 
     return NULL; 
    } 

    fprintf(fp, "%d\n", output_obj.video_id); 
    fprintf(fp, "%s\n", output_obj.storage_url); 
    fprintf(fp, "%s\n", output_obj.storage_type); 
    fprintf(fp, "%d\n", output_obj.face_detect); 
    fprintf(fp, "%d\n", output_obj.face_recognise); 
    fprintf(fp, "%d\n", output_obj.scene_recognise); 
    fprintf(fp, "%d\n", output_obj.activity_recognise); 
    fprintf(fp, "%d\n", output_obj.speech_recognise); 

    fclose(fp); 
    return NULL; 
} 

int main() { 
    tbb::pipeline pipeline; 

    InputFilter input_filter(100); 
    pipeline.add_filter(input_filter); 

    TransformFilter transform_filter; 
    pipeline.add_filter(transform_filter); 

    OutputFilter output_filter; 
    pipeline.add_filter(output_filter); 

    tbb::tick_count t0 = tbb::tick_count::now(); 

    tbb::task_scheduler_init init_parallel; 
    pipeline.run(1); 
    tbb::tick_count t1 = tbb::tick_count::now(); 

    return 0; 
} 

一切工作正常使用少量文件,例如5或10的问题开始,当我读到大量的文件,如50或100的问题是:

有时输入过滤器读取10.txt文件并且TransformFilter处理它。但立即InputFilter读取11.txt。 OutputFIlter跳过10.txt并处理11.txt。

我怎样才能确保这不会发生?

+0

问题(如果有)在过滤器定义中。请将它们添加到代码中。另外,您指定了令牌= 1,这意味着串行执行,是否有意?并且请修正缩进(删除'主程序代码'前的空格) – Anton

+0

不,我尝试了不同的令牌。它不工作,这就是为什么我最后尝试了一个。 。我已附上完整的代码 – user3358147

+0

我还意识到了一件事。 。我修复了我的代码。现在它在串行执行中工作正常。虽然在并行执行时它不能正常工作,比如如果我使用超过1的令牌。 – user3358147

回答

1

有一个数据争用,因为video_process_objects被放置在过滤器结构中,并通过过滤器之间的引用传递(当然并行运行)。所以,你必须在InputFilter开始处理下一个标记读出新的数据到其video_process_object而第一令牌刚刚开始读取TransformFilter由同一地址的数据的情况:

 Token 1    ||   Token 2 
input_filter.operator()  || 
transform_filter.operator() || input_filter.operator() 
... 

为了解决这个问题,动态地分配数据,例如:

struct video_process_object *input_obj_ptr = new video_process_object; 
fscanf(fp, "%d", &input_obj_ptr->video_id); 
... 
return input_obj_ptr; 

并将其释放到最后一个过滤器中,因为它的返回值无论如何都被忽略。 幻灯片49-50在this old presentation草绘了类似的代码。

最后,让我挑战你对tbb :: pipelene和serial_in_order过滤器类型的选择。 TBB Reference说:

平行过滤器是可行的,因为它们允许并行加速。如果过滤器必须是连续的,那么在实际情况下,失序变体是优选的,因为它对处理顺序的限制较少。

由于处理和文件是独立的,我没有理由把这个额外的'按顺序'限制。为了构造更好的代码,还需要考虑另一个引用:

函数parallel_pipeline提供了一种强类型的lambda友好的方式来构建和运行管道。

+0

嗨。 。感谢您的评论。我使用动态分配和东西似乎工作正常。最后一件事 。 。在动态分配中,我将如何释放内存?因为我从过滤器返回结构值 – user3358147

+0

关于带有过滤器的serial_in_order或serial_out_order,我只是做一些实验,因为最终目标是将它用于不同的应用程序,它将使用此序列。在当前的例子中,这不是必要的。我也学习使用TBB。可能在未来几天,我会跳转到parallel_pipeline。感谢您指出 – user3358147

+0

在最后一个过滤器中取消分配它,因为它的返回值无论如何都被忽略。 – Anton