2017-05-24 33 views
2

我想使用QtConcurrent::map函数来操作QVector。我所有的示例程序不会是1QtConcurrent :: map没有任何好处

QVector<double> arr(10000000, 0); 
QElapsedTimer timer; 
qDebug() << QThreadPool::globalInstance()->maxThreadCount() << "Threads"; 

int end; 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
timer.start(); 
for(int i = 0; i < 100; ++i) { 
    std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; }); 
} 
end = timer.elapsed(); 
qDebug() << end; 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
timer.start(); 
for(int i = 0; i < 100; ++i) { 
    std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; }); 
} 
end = timer.elapsed(); 
qDebug() << end; 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
timer.start(); 
for(int i = 0; i < 100; ++i) { 
    QFuture<void> qf = QtConcurrent::map(arr.begin(), arr.end(), [](double &x){ ++x; }); 
    qf.waitForFinished(); 
} 
end = timer.elapsed(); 
qDebug() << end; 

然而递增的QVector所有值方案产出

4 Threads 
905 // std::transform 
886 // std::for_each 
876 // QtConcurrent::map 

所以几乎与多线程版本没有速度优势。我确认实际上有4个线程正在运行。我使用了-O2优化。更常见的QThreadPool方法更适合这种情况吗?

编辑:

我尝试使用QtConcurrent::run()一个differernt方法。下面是程序代码的相关部分:

void add1(QVector<double>::iterator first, QVector<double>::iterator last) { 
    for(; first != last; ++first) { 
     *first += 1; 
    } 
} 

/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; }); 
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
QFuture<void> qf[numThreads]; 
for(int j = 0; j < numThreads; ++j) { 
    qf[j] = QtConcurrent::run(add1, arr.begin()+j*n/numThreads, arr.begin()+(j+1)*n/numThreads-1); 
} 
for(int j = 0; j < numThreads; ++j) { 
    qf[j].waitForFinished(); 
} 

所以我手动在不同的线程分配任务。但我仍然很难获得性能提升:

181 ms // std::for_each 
163 ms // QtConcurrent::run 

这里还有什么不对?

+1

你为什么期望加快速度?您在每次循环迭代中都在等待未来。 – juanchopanza

+0

我不是这方面的专家,但我期望map()启动4个线程,这应该使这个代码行比STL函数更快地完成。还是我误解了这个功能的概念? – NullAchtFuffZehn

回答

3

看起来QtConcurrent与简单使用C++线程原语和roll-your-own-thread-pools相比具有很高的开销。

template<class T> 
struct threaded_queue { 
    using lock = std::unique_lock<std::mutex>; 
    void push_back(T t) { 
    { 
     lock l(m); 
     data.push_back(std::move(t)); 
    } 
    cv.notify_one(); 
    } 
    boost::optional<T> pop_front() { 
    lock l(m); 
    cv.wait(l, [this]{ return abort || !data.empty(); }); 
    if (abort) return {}; 
    auto r = std::move(data.back()); 
    data.pop_back(); 
    return std::move(r); 
    } 
    void terminate() { 
    { 
     lock l(m); 
     abort = true; 
     data.clear(); 
    } 
    cv.notify_all(); 
    } 
    ~threaded_queue() 
    { 
    terminate(); 
    } 
private: 
    std::mutex m; 
    std::deque<T> data; 
    std::condition_variable cv; 
    bool abort = false; 
}; 
struct thread_pool { 
    thread_pool(std::size_t n = 1) { start_thread(n); } 
    thread_pool(thread_pool&&) = delete; 
    thread_pool& operator=(thread_pool&&) = delete; 
    ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks 
    template<class F, class R=std::result_of_t<F&()>> 
    std::future<R> queue_task(F task) { 
    std::packaged_task<R()> p(std::move(task)); 
    auto r = p.get_future(); 
    tasks.push_back(std::move(p)); 
    return r; 
    } 
    template<class F, class R=std::result_of_t<F&()>> 
    std::future<R> run_task(F task) { 
    if (threads_active() >= total_threads()) { 
     start_thread(); 
    } 
    return queue_task(std::move(task)); 
    } 
    void terminate() { 
    tasks.terminate(); 
    } 
    std::size_t threads_active() const { 
    return active; 
    } 
    std::size_t total_threads() const { 
    return threads.size(); 
    } 
    void clear_threads() { 
    terminate(); 
    threads.clear(); 
    } 
    void start_thread(std::size_t n = 1) { 
    while(n-->0) { 
     threads.push_back(
     std::async(std::launch::async, 
      [this]{ 
      while(auto task = tasks.pop_front()) { 
       ++active; 
       try{ 
       (*task)(); 
       } catch(...) { 
       --active; 
       throw; 
       } 
       --active; 
      } 
      } 
     ) 
    ); 
    } 
    } 
private: 
    std::vector<std::future<void>> threads; 
    threaded_queue<std::packaged_task<void()>> tasks; 
    std::atomic<std::size_t> active = {}; 
}; 

struct my_timer_t { 
    std::chrono::high_resolution_clock::time_point first; 
    std::chrono::high_resolution_clock::duration duration; 

    void start() { 
     first = std::chrono::high_resolution_clock::now(); 
    } 
    std::chrono::high_resolution_clock::duration finish() { 
     return duration = std::chrono::high_resolution_clock::now()-first; 
    } 
    unsigned long long ms() const { 
     return std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(); 
    } 
}; 
int main() { 
    std::vector<double> arr(1000000, 0); 
    my_timer_t timer; 

    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
    timer.start(); 
    for(int i = 0; i < 100; ++i) { 
     std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; }); 
    } 
    timer.finish(); 
    auto time_transform = timer.ms(); 
    std::cout << time_transform << "<- std::transform (" << arr[rand()%arr.size()] << ")\n"; 
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
    timer.start(); 
    for(int i = 0; i < 100; ++i) { 
     std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; }); 
    } 
    timer.finish(); 
    auto time_for_each = timer.ms(); 
    std::cout << time_for_each << "<- std::for_each (" << arr[rand()%arr.size()] << ")\n"; 
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ 
    enum { num_threads = 8 }; 
    thread_pool pool(num_threads); 
    timer.start(); 
    for(int i = 0; i < 100; ++i) { 
     std::array< std::future<void>, num_threads > tasks; 
     for (int t = 0; t < num_threads; ++t) { 
      tasks[t] = pool.run_task([&,t]{ 
       std::for_each(arr.begin()+(arr.size()/num_threads)*t, arr.begin()+(arr.size()/num_threads)*(t+1), [](double& x){++x;}); 
      }); 
     } 
     // std::cout << "loop! -- " << pool.threads_active() << "/" << pool.total_threads() << std::endl; 
     for (int t = 0; t < num_threads; ++t) 
      tasks[t].wait(); 
    } 
    timer.finish(); 
    auto time_pool = timer.ms(); 
    std::cout << time_pool << "<- thread_pool (" << arr[rand()%arr.size()] << ")\n"; 
} 

Live example。用一个简单的C++ 11线程池来划分任务8周时的方式

153<- std::transform (100) 
131<- std::for_each (200) 
82<- thread_pool (300) 

一个显著加速:

此产生。 (当分裂任务4种方式时大约是105)。

现在我确实使用了比您的测试集小10倍的测试集,因为在我的程序运行了很长时间之后,在线系统超时。

将会有开销与您的线程池系统进行通信,但我的天真线程池不应该超过像这样的真正的库。

现在,一个严重的问题是,你可能是内存IO绑定;如果所有人都必须等待字节,更多线程更快地访问字节将无济于事。

+1

你是如何测试'QtConcurrent'的开销的?请注意,您将'++'操作分组为'num_threads'批次。你也可以用'QtConcurrent'来做到这一点。 – m7913d

+0

@ m7913d这就是'QtConcurrent'应该做的事情;根据硬件线程的数量启动一些子线程来处理部分任务。我只是手动做了。我通过'for_each'获得了显着的提速。 – Yakk

+0

'QtConcurrent'将每个操作分配给一个线程(考虑到最大并发线程数)。它不会将它们分组。请注意,如果它们可能没有花费相同的时间,则不能直接对组进行操作。 – m7913d

相关问题